Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
65b2dc9
Asyncio: Unified send/recv syntax
roshanrags Sep 29, 2020
5b4af16
Beacon: Unified send/recv syntax
roshanrags Sep 29, 2020
fd08b81
stream: Unified send/recv syntax
roshanrags Sep 29, 2020
9e79696
lpf: Unified send/recv syntax
roshanrags Sep 29, 2020
f22274c
pubsub: Unified send/recv syntax
roshanrags Sep 29, 2020
489eee9
sdk: Unified send/recv syntax
roshanrags Sep 29, 2020
52c607e
simulator: Unified send/recv syntax
roshanrags Sep 29, 2020
6d7ecce
rlpx: Unified send/recv syntax
roshanrags Sep 29, 2020
2ce62bc
misc: Unified send/recv syntax
roshanrags Sep 29, 2020
e0f9b3e
Add Transport scaffold for easy creation of new transports
roshanrags Oct 4, 2020
983ffca
Use the scaffold in Udp
roshanrags Oct 4, 2020
2fc137d
Put utility class inside the scaffold
roshanrags Oct 4, 2020
ab82275
Add factory scaffold
roshanrags Oct 10, 2020
37cc31d
Rename f
roshanrags Oct 10, 2020
3e04f21
Use the factory scaffold in udp
roshanrags Oct 10, 2020
02ef19d
Support arbitrary dial parameters
roshanrags Oct 10, 2020
385f20f
Rewrite dial and combine all variants using variadic templates
roshanrags Oct 10, 2020
a0cec61
Clean up unused dial_impl
roshanrags Oct 10, 2020
856d2a0
Upstream message field macros from stream
roshanrags Oct 10, 2020
84a6a2e
Don't pass dial args to did_create
roshanrags Oct 10, 2020
2cfbd8a
Convert BaseMessage to Buffer automatically
roshanrags Oct 10, 2020
f549e9c
Make the endian suffix controllable
roshanrags Oct 10, 2020
1ff18ad
Add a const qualified payload_buffer
roshanrags Oct 10, 2020
9f47633
Add truncate_unsafe functions
roshanrags Oct 10, 2020
abeb27c
Bugfix: Wrong name in undef
roshanrags Oct 10, 2020
1e158c4
Use derived class properly following CRTP
roshanrags Oct 10, 2020
74111c5
Message type overhaul, cannot access using declarations is derived cl…
roshanrags Oct 10, 2020
992ca93
Bugfix: Pointer instead of reference
roshanrags Oct 10, 2020
a8ebc6a
Support additional args in dial
roshanrags Oct 10, 2020
97ab6d9
Minor fixes
roshanrags Oct 10, 2020
34ffffb
Try out the transport scaffold, add a VersionedTransport
roshanrags Oct 10, 2020
e9f992c
Try out the transport factory scaffold, add a VersionedTransportFactory
roshanrags Oct 10, 2020
05bb396
Use the versioned transport in DiscoveryClient
roshanrags Oct 10, 2020
96fa7bf
Use it in discovery server as well
roshanrags Oct 10, 2020
1bf31e1
Merge branch 'master' of https://github.com/marlinprotocol/OpenWeaver
roshanrags Oct 13, 2020
7d10f44
Fix issues after merging
roshanrags Oct 13, 2020
2977ced
Bugfix: Validate size before accessing version
roshanrags Oct 13, 2020
6c0c728
Use versioned transport in cluster discoverer
roshanrags Oct 13, 2020
94d019f
Remove old version byte
roshanrags Oct 13, 2020
980f0cb
Discover peers in same cycle instead of next
roshanrags Oct 15, 2020
d20814d
Merge branch 'master' of https://github.com/marlinprotocol/OpenWeaver…
roshanrags Oct 15, 2020
9a6d2f4
Log level change
roshanrags Oct 15, 2020
b0ffa91
Use types instead of values to support variadics
roshanrags Oct 15, 2020
794ff07
Sugar to make transport scaffolds simpler
roshanrags Oct 15, 2020
31eade8
Sugar to make transport factory scaffolds simpler
roshanrags Oct 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions asyncio/examples/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ using namespace marlin::core;
using namespace marlin::asyncio;

struct Delegate {
void did_recv_bytes(TcpTransport<Delegate> &transport, Buffer &&bytes) {
void did_recv(TcpTransport<Delegate> &transport, Buffer &&bytes) {
SPDLOG_INFO(
"Transport {{ Src: {}, Dst: {} }}: Did recv bytes: {} bytes",
transport.src_addr.to_string(),
Expand All @@ -15,7 +15,7 @@ struct Delegate {
);
}

void did_send_bytes(TcpTransport<Delegate> &transport, Buffer &&bytes) {
void did_send(TcpTransport<Delegate> &transport, Buffer &&bytes) {
SPDLOG_INFO(
"Transport {{ Src: {}, Dst: {} }}: Did send bytes: {} bytes",
transport.src_addr.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions asyncio/examples/udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ using namespace marlin::core;
using namespace marlin::asyncio;

struct Delegate {
void did_recv_packet(UdpTransport<Delegate> &transport, Buffer &&packet) {
void did_recv(UdpTransport<Delegate> &transport, Buffer &&packet) {
SPDLOG_INFO(
"Transport: {{Src: {}, Dst: {}}}, Did recv packet: {} bytes",
transport.src_addr.to_string(),
Expand All @@ -16,7 +16,7 @@ struct Delegate {
transport.close();
}

void did_send_packet(UdpTransport<Delegate> &transport, Buffer &&packet) {
void did_send(UdpTransport<Delegate> &transport, Buffer &&packet) {
SPDLOG_INFO(
"Transport: {{Src: {}, Dst: {}}}, Did send packet: {} bytes",
transport.src_addr.to_string(),
Expand Down
10 changes: 5 additions & 5 deletions asyncio/include/marlin/asyncio/tcp/TcpTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class TcpTransport {
TcpTransport(TcpTransport const&) = delete;

void setup(DelegateType *delegate);
void did_recv_bytes(core::Buffer &&bytes);
void did_recv(core::Buffer &&bytes);
int send(core::Buffer &&bytes);
uint16_t close_reason = 0;
void close(uint16_t reason = 0);
Expand Down Expand Up @@ -140,7 +140,7 @@ void TcpTransport<DelegateType>::recv_cb(
return;
}

transport->did_recv_bytes(
transport->did_recv(
core::Buffer((uint8_t*)buf->base, nread)
);
}
Expand All @@ -167,8 +167,8 @@ void TcpTransport<DelegateType>::setup(DelegateType *delegate) {

//! sends the incoming bytes to the application/HOT delegate
template<typename DelegateType>
void TcpTransport<DelegateType>::did_recv_bytes(core::Buffer &&bytes) {
delegate->did_recv_bytes(*this, std::move(bytes));
void TcpTransport<DelegateType>::did_recv(core::Buffer &&bytes) {
delegate->did_recv(*this, std::move(bytes));
}

template<typename DelegateType>
Expand All @@ -185,7 +185,7 @@ void TcpTransport<DelegateType>::send_cb(
status
);
} else {
data->transport.delegate->did_send_bytes(
data->transport.delegate->did_send(
data->transport,
std::move(data->bytes)
);
Expand Down
45 changes: 18 additions & 27 deletions asyncio/include/marlin/asyncio/udp/UdpTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@
#ifndef MARLIN_ASYNCIO_UDPTRANSPORT_HPP
#define MARLIN_ASYNCIO_UDPTRANSPORT_HPP

#include <marlin/core/Buffer.hpp>
#include <marlin/core/messages/BaseMessage.hpp>
#include <marlin/core/SocketAddress.hpp>
#include <marlin/core/CidrBlock.hpp>
#include <marlin/core/TransportManager.hpp>
#include <marlin/core/transports/TransportScaffold.hpp>
#include <uv.h>
#include <spdlog/spdlog.h>

Expand All @@ -24,10 +21,15 @@ namespace asyncio {

//! Wrapper transport class around libuv udp functionality
template<typename DelegateType>
class UdpTransport {
class UdpTransport : public core::TransportScaffold<UdpTransport<DelegateType>, DelegateType, uv_udp_t*> {
public:
using TransportScaffoldType = core::TransportScaffold<UdpTransport<DelegateType>, DelegateType, uv_udp_t*>;

using TransportScaffoldType::src_addr;
using TransportScaffoldType::dst_addr;
private:
uv_udp_t *socket = nullptr;
core::TransportManager<UdpTransport<DelegateType>> &transport_manager;
using TransportScaffoldType::base_transport;
using TransportScaffoldType::transport_manager;

static void send_cb(
uv_udp_send_t *req,
Expand All @@ -40,17 +42,13 @@ class UdpTransport {
};

std::list<uv_udp_send_t *> pending_req;

public:
using MessageType = core::BaseMessage;

core::SocketAddress src_addr;
core::SocketAddress dst_addr;
using MessageType = typename TransportScaffoldType::MessageType;
static_assert(std::is_same_v<MessageType, core::BaseMessage>);

using TransportScaffoldType::delegate;
bool internal = false;

DelegateType *delegate = nullptr;

UdpTransport(
core::SocketAddress const &src_addr,
core::SocketAddress const &dst_addr,
Expand All @@ -59,13 +57,13 @@ class UdpTransport {
);
UdpTransport(UdpTransport const&) = delete;

// Overrides
void setup(DelegateType *delegate);
void did_recv_packet(core::Buffer &&packet);
int send(core::Buffer &&packet);
int send(MessageType &&packet);
void close(uint16_t reason = 0);

bool is_internal();

int send(core::Buffer &&packet);
};


Expand All @@ -77,8 +75,7 @@ UdpTransport<DelegateType>::UdpTransport(
core::SocketAddress const &dst_addr,
uv_udp_t *socket,
core::TransportManager<UdpTransport<DelegateType>> &transport_manager
) : socket(socket), transport_manager(transport_manager),
src_addr(src_addr), dst_addr(dst_addr), delegate(nullptr) {
) : TransportScaffoldType(src_addr, dst_addr, socket, transport_manager) {
if(
core::CidrBlock::from_string("10.0.0.0/8").does_contain_address(dst_addr) ||
core::CidrBlock::from_string("172.16.0.0/12").does_contain_address(dst_addr) ||
Expand All @@ -99,12 +96,6 @@ void UdpTransport<DelegateType>::setup(DelegateType *delegate) {
this->delegate = delegate;
}

//! sends the incoming bytes to the application/HOT delegate
template<typename DelegateType>
void UdpTransport<DelegateType>::did_recv_packet(core::Buffer &&packet) {
delegate->did_recv_packet(*this, std::move(packet));
}

template<typename DelegateType>
void UdpTransport<DelegateType>::send_cb(
uv_udp_send_t *req,
Expand All @@ -126,7 +117,7 @@ void UdpTransport<DelegateType>::send_cb(
status
);
} else {
data->transport->delegate->did_send_packet(
data->transport->delegate->did_send(
*data->transport,
std::move(data->packet)
);
Expand All @@ -152,7 +143,7 @@ int UdpTransport<DelegateType>::send(core::Buffer &&packet) {
auto buf = uv_buf_init((char*)req_data->packet.data(), req_data->packet.size());
int res = uv_udp_send(
req,
socket,
base_transport,
&buf,
1,
reinterpret_cast<const sockaddr *>(&dst_addr),
Expand Down
Loading