Skip to content

Commit

Permalink
Feature/reentrancy (#111)
Browse files Browse the repository at this point in the history
* gossip confirmance fixes #1

* gossip: bugfixes

* gossip: more fixes

* gossip: build related minor changes

* gossip: one more fix

* gossip: flush hello message asap

* experiment with noise protocol

* made injector creation fns inline to prevent multiple definitions

* gossip restructured, interop issues fixed

* Noise::write now reports the correct amount of bytes

Signed-off-by: Igor Egorov <igor@soramitsu.co.jp>

* refactoring: move prev implementation of Kademlia to other namespace

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: kademlia dependency injection

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: improve random generator

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: getting listened multiaddress in tcp listener

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: reduce logging in secio

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* draft: new implementation of kademlia

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* draft: processing with kademlia

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* draft: continue processing with kademlia

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* draft: continue

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* draft: kademlia

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: validation

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: DSA

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: CIDv1 encoding
refactoring: replace deprecated sha256 function by hasher

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: multiaddress operations

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: make PeerId comparable to using in std::set

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactoring: addr repo

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: Go-implementation compatibility

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: rendezvous chat as example of Kademlia using

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: unit-test for kademlia parts

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: StorageBackend interface
fix: ContentValue

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* wipe: remove previous implementation of Kademlia

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: cmake files

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

cmake

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: some warnings

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: resolve TODOes, comments, format

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: clang-tidy issues

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: clang-tidy issues

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: clang-tidy issues

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: using timeout for make new stream
feature: smart using of peer routing

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: kademlia message parsing

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactoring: optimize executors' working

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactoring: optimize start of rendezvous chat

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactoring: headers including

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* refactoring: remaining request executors; naming

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* feature: add handle and timeout as argument of Host::connect

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* fix: some log messages

Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>

* injectors temp fix

* gossip: uncommented writing bytes checking

* removed redundant std::hash definition

* Hack yamux to allow weighty messages processing

Signed-off-by: Igor Egorov <igor@soramitsu.co.jp>

* fix vtable

* fixes in DI injectors

* yamux test corrected according to curent window update policy

* added buffering primitives

* defer* functions in Reader/Writer interfaces and Yamux redesign pt.1

* scheduler fix regarding move assignment + cancel

* write queue interface change

* read buffer fix regarding subspan

* some diagnostic logging

* tcp connection fixes related to closing behavior

* yamux bugfixes

* yamux tests regression WIP

* build fix

* .

* .

* bugfixes

* fixes

* .

* .

* suppressed most verbose logging in yamux and multiselect

* yamux stream adjustWindowSize adjusted

* fixes regarding std::move of r/w callbacks (against possible ptrs invalidation)

* test cases with jumbo messages transfer added for yamux on noise/tls/plaintext

* bugfixes related to yamux window sizes, overflow, and acknowledgements

* echo protocol and examples support very large msgs

* changes in tests

* all muxers acceptance test recovered

* all muxers acceptance test: fixed issue with mock lifetime

* CI fixes

* yamux refactorings helped to avoid memory issues caused by reentrant callbacks

* build fix for mac

* another yamux fix to avoid memory issues caused by reentrant callbacks

* gossip: hotfix related to unbanning peers

* minor fixes reflecting review feedback

* temp loggers workaround

* Feature/multiselect upd (#121)

* multiselect revised, WIP

* multiselect: simple outbound stream negotiate

* multiselect numerous fixes

* multiselect: instances and reuse

* multiselect: fixes

* multiselect: removed old implementation

* multiselect: interop with go impl fixes

* multiselect: bugfixes

* multiselect: ProtocolMuxer interface abstracts simple outbound stream negotiation

* multiselect: cleanups and logging

* trigger CI

* temporarily disabled tests that required synchronous reaction of multiselect

* just removed unused lines

* reverted back ci.yml

Co-authored-by: Igor Egorov <igor@soramitsu.co.jp>
Co-authored-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>
Co-authored-by: turuslan <turuslan.devbox@gmail.com>
  • Loading branch information
4 people committed Apr 2, 2021
1 parent f610419 commit 8d1b7ef
Show file tree
Hide file tree
Showing 165 changed files with 5,646 additions and 4,983 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "^(AppleClang|Clang|GNU)$")
add_flag(-Wsign-compare)
add_flag(-Wtype-limits) # size_t - size_t >= 0 -> always true

# suppress warnings if a certain compiler version doesn't know some of the warnings above
add_flag(-Wno-unknown-warning-option)

# disable those flags
add_flag(-Wno-unused-command-line-argument) # clang: warning: argument unused during compilation: '--coverage' [-Wunused-command-line-argument]
add_flag(-Wno-unused-parameter) # prints too many useless warnings
Expand Down
46 changes: 37 additions & 9 deletions example/01-echo/libp2p_echo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,26 @@ int main(int argc, char *argv[]) {
using libp2p::crypto::PublicKey;
using libp2p::common::operator""_unhex;

if (argc != 2) {
auto run_duration = std::chrono::seconds(5);

std::string message("Hello from C++");

if (argc > 2) {
auto n = atoi(argv[2]); // NOLINT
if (n > (int)message.size()) { // NOLINT
std::string jumbo_message;
auto sz = static_cast<size_t>(n);
jumbo_message.reserve(sz + message.size());
for (size_t i = 0, count = sz / message.size(); i < count; ++i) {
jumbo_message.append(message);
}
jumbo_message.resize(sz);
message.swap(jumbo_message);
run_duration = std::chrono::seconds(150);
}
}

if (argc < 2) {
std::cerr << "please, provide an address of the server\n";
std::exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -73,7 +92,7 @@ int main(int argc, char *argv[]) {
// create io_context - in fact, thing, which allows us to execute async
// operations
auto context = injector.create<std::shared_ptr<boost::asio::io_context>>();
context->post([host{std::move(host)}, &echo, argv] { // NOLINT
context->post([host{std::move(host)}, &echo, &message, argv] { // NOLINT
auto server_ma_res =
libp2p::multi::Multiaddress::create(argv[1]); // NOLINT
if (!server_ma_res) {
Expand Down Expand Up @@ -103,26 +122,35 @@ int main(int argc, char *argv[]) {

// create Host object and open a stream through it
host->newStream(
peer_info, echo.getProtocolId(), [&echo](auto &&stream_res) {
peer_info, echo.getProtocolId(), [&echo, &message](auto &&stream_res) {
if (!stream_res) {
std::cerr << "Cannot connect to server: "
<< stream_res.error().message() << std::endl;
std::exit(EXIT_FAILURE);
}

auto stream_p = std::move(stream_res.value());

auto echo_client = echo.createClient(stream_p);
std::cout << "SENDING 'Hello from C++!'\n";

if (message.size() < 120) {
std::cout << "SENDING " << message << "\n";
} else {
std::cout << "SENDING " << message.size() << " bytes" << std::endl;
}
echo_client->sendAnd(
"Hello from C++!\n",
[stream = std::move(stream_p)](auto &&response_result) {
std::cout << "RESPONSE " << response_result.value()
<< std::endl;
message, [stream = std::move(stream_p)](auto &&response_result) {
auto &resp = response_result.value();
if (resp.size() < 120) {
std::cout << "RESPONSE " << resp << std::endl;
} else {
std::cout << "RESPONSE size=" << resp.size() << std::endl;
}
stream->close([](auto &&) { std::exit(EXIT_SUCCESS); });
});
});
});

// run the IO context
context->run_for(std::chrono::seconds(5));
context->run_for(run_duration);
}
7 changes: 6 additions & 1 deletion example/01-echo/libp2p_echo_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <libp2p/common/literals.hpp>
#include <libp2p/host/basic_host.hpp>
#include <libp2p/injector/host_injector.hpp>
#include <libp2p/muxer/muxed_connection_config.hpp>
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/logger.hpp>
#include <libp2p/protocol/echo.hpp>
Expand Down Expand Up @@ -115,7 +116,11 @@ int main(int argc, char **argv) {
insecure_mode ? initInsecureServer(keypair) : initSecureServer(keypair);

// set a handler for Echo protocol
libp2p::protocol::Echo echo{libp2p::protocol::EchoConfig{1}};
libp2p::protocol::Echo echo{libp2p::protocol::EchoConfig{
.max_server_repeats =
libp2p::protocol::EchoConfig::kInfiniteNumberOfRepeats,
.max_recv_size =
libp2p::muxer::MuxedConnectionConfig::kDefaultMaxWindowSize}};
server.host->setProtocolHandler(
echo.getProtocolId(),
[&echo](std::shared_ptr<libp2p::connection::Stream> received_stream) {
Expand Down
2 changes: 1 addition & 1 deletion example/02-kademlia/rendezvous_chat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ int main(int argc, char *argv[]) {
libp2p::injector::useKademliaConfig(kademlia_config)));

try {
if (argc < 1) {
if (argc < 2) {
std::cerr << "Needs one argument - address" << std::endl;
exit(EXIT_FAILURE);
}
Expand Down
11 changes: 6 additions & 5 deletions example/03-gossip/gossip_chat_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#include <fmt/format.h>
#include <boost/program_options.hpp>

#include <libp2p/injector/gossip_injector.hpp>
#include <libp2p/injector/host_injector.hpp>
#include <libp2p/protocol/gossip/gossip.hpp>
#include <libp2p/log/configurator.hpp>

#include "console_async_reader.hpp"
Expand Down Expand Up @@ -80,8 +81,7 @@ int main(int argc, char *argv[]) {
config.echo_forward_mode = true;

// injector creates and ties dependent objects
auto injector = libp2p::injector::makeGossipInjector(
libp2p::injector::useGossipConfig(config));
auto injector = libp2p::injector::makeHostInjector();

utility::setupLoggers(options->log_level);

Expand All @@ -107,8 +107,9 @@ int main(int argc, char *argv[]) {
std::cerr << "I am " << local_address_str << "\n";

// create gossip node
auto gossip =
injector.create<std::shared_ptr<libp2p::protocol::gossip::Gossip>>();
auto gossip = libp2p::protocol::gossip::create(
injector.create<std::shared_ptr<libp2p::protocol::Scheduler>>(), host,
std::move(config));

using Message = libp2p::protocol::gossip::Gossip::Message;

Expand Down
119 changes: 119 additions & 0 deletions include/libp2p/basic/read_buffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef LIBP2P_BASIC_READ_BUFFER_HPP
#define LIBP2P_BASIC_READ_BUFFER_HPP

#include <deque>
#include <vector>

#include <boost/optional.hpp>
#include <gsl/span>

namespace libp2p::basic {

class ReadBuffer {
public:
using BytesRef = gsl::span<uint8_t>;

static constexpr size_t kDefaultAllocGranularity = 65536;

ReadBuffer(const ReadBuffer &) = delete;
ReadBuffer &operator=(const ReadBuffer &) = delete;

~ReadBuffer() = default;
ReadBuffer(ReadBuffer &&) = default;
ReadBuffer &operator=(ReadBuffer &&) = default;

explicit ReadBuffer(size_t alloc_granularity = kDefaultAllocGranularity);

size_t size() const {
return total_size_;
}

bool empty() const {
return total_size_ == 0;
}

/// Adds new data to the buffer
void add(BytesRef bytes);

/// Returns # of bytes actually copied into out
size_t consume(BytesRef &out);

/// Returns # of bytes actually copied into out
size_t addAndConsume(BytesRef in, BytesRef &out);

/// Clears and deallocates
void clear();

private:
using Fragment = std::vector<uint8_t>;

/// Consumes all data into out
size_t consumeAll(BytesRef &out);

/// Consumes the 1st fragment or part of it
size_t consumePart(uint8_t *out, size_t n);

/// Granularity for coarse allocation
size_t alloc_granularity_;

/// Total size of unconsumed bytes
size_t total_size_;

/// The 1st fragment may advance
size_t first_byte_offset_;

/// Available allocated bytes remains in the last fragment
size_t capacity_remains_;

/// Fragments allocated
std::deque<Fragment> fragments_;
};

/// Temporary buffer for incoming messages, filled from incoming (network)
/// data up to expected size
class FixedBufferCollector {
public:
using CBytesRef = gsl::span<const uint8_t>;
using BytesRef = gsl::span<uint8_t>;
using Buffer = std::vector<uint8_t>;

static constexpr size_t kDefaultMemoryThreshold = 65536;

explicit FixedBufferCollector(
size_t expected_size = 0,
size_t memory_threshold = kDefaultMemoryThreshold);

/// Expects the next message of a given size, if the current one is
/// not read to the end, it will be discarded
void expect(size_t size);

/// Fills the buffer (if read partially) with head bytes of data,
/// returns data if filled up to expected size or empty option if not,
/// modifies data (cuts head)
/// Data returned is valid until next expect() call && data is live
boost::optional<CBytesRef> add(CBytesRef &data);
boost::optional<BytesRef> add(BytesRef &data);

/// Resets to initial state
void reset();

private:
/// If buffer memory allocated is above this threshold,
/// it will be freed on the next expect() call
size_t memory_threshold_;

/// Size expected
size_t expected_size_;

/// The buffer
Buffer buffer_;
};

} // namespace libp2p::basic

#endif // LIBP2P_BASIC_READ_BUFFER_HPP
9 changes: 9 additions & 0 deletions include/libp2p/basic/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ namespace libp2p::basic {
*/
virtual void readSome(gsl::span<uint8_t> out, size_t bytes,
ReadCallbackFunc cb) = 0;

/**
* @brief Defers reporting result or error to callback to avoid reentrancy
* (i.e. callback will not be called before initiator function returns)
* @param res read result
* @param cb callback
*/
virtual void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) = 0;
};

} // namespace libp2p::basic
Expand Down
67 changes: 67 additions & 0 deletions include/libp2p/basic/varint_prefix_reader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef LIBP2P_VARINT_PREFIX_READER_HPP
#define LIBP2P_VARINT_PREFIX_READER_HPP

#include <gsl/span>

namespace libp2p::basic {

/// Collects and interprets varint from incoming data,
/// Reader is stateful, see varint_prefix_reader_test.cpp for usage examples
class VarintPrefixReader {
public:
/// Current state
enum State {
/// Needs more bytes
kUnderflow,

/// Varint is ready, value() is ultimate
kReady,

/// Overflow of uint64_t, too many bytes with high bit set
kOverflow,

/// consume() called when state is kReady
kError
};

/// Returns state
State state() const {
return state_;
}

/// Returns current value, called when state() == kReady
uint64_t value() const {
return value_;
}

/// Resets reader's state
void reset();

/// Consumes one byte from wire, returns reader's state
/// (or kError if called when state() == kReady)
State consume(uint8_t byte);

/// Consumes bytes from buffer.
/// On success, modifies buffer (cuts off first bytes which were consumed),
/// returns reader's state
/// (or kError if called when state() == kReady)
State consume(gsl::span<const uint8_t> &buffer);

private:
/// Current value accumulated
uint64_t value_ = 0;

/// Current reader's state
State state_ = kUnderflow;

/// Bytes got at the moment, this controls overflow of value_
uint8_t got_bytes_ = 0;
};
} // namespace libp2p::basic

#endif // LIBP2P_VARINT_PREFIX_READER_HPP

0 comments on commit 8d1b7ef

Please sign in to comment.