Skip to content

Commit

Permalink
Merge pull request #37 from oxen-io/dev
Browse files Browse the repository at this point in the history
v1.2.5
  • Loading branch information
jagerman committed Apr 20, 2021
2 parents d570093 + 4ef1060 commit 5175403
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 97 deletions.
10 changes: 4 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ cmake_minimum_required(VERSION 3.7)
# Has to be set before `project()`, and ignored on non-macos:
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")

project(liboxenmq CXX C)
project(liboxenmq
VERSION 1.2.5
LANGUAGES CXX C)

include(GNUInstallDirs)

set(OXENMQ_VERSION_MAJOR 1)
set(OXENMQ_VERSION_MINOR 2)
set(OXENMQ_VERSION_PATCH 4)
set(OXENMQ_VERSION "${OXENMQ_VERSION_MAJOR}.${OXENMQ_VERSION_MINOR}.${OXENMQ_VERSION_PATCH}")
message(STATUS "oxenmq v${OXENMQ_VERSION}")
message(STATUS "oxenmq v${PROJECT_VERSION}")

set(OXENMQ_LIBVERSION 0)

Expand Down
2 changes: 1 addition & 1 deletion liblokimq.pc.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@

Name: liblokimq
Description: ZeroMQ-based communication library (compatibility package for liboxenmq)
Version: @OXENMQ_VERSION@
Version: @PROJECT_VERSION@

Libs: -L${libdir} -loxenmq
Libs.private: @PRIVATE_LIBS@
Expand Down
2 changes: 1 addition & 1 deletion liboxenmq.pc.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@

Name: liboxenmq
Description: ZeroMQ-based communication library
Version: @OXENMQ_VERSION@
Version: @PROJECT_VERSION@

Libs: -L${libdir} -loxenmq
Libs.private: @PRIVATE_LIBS@
Expand Down
66 changes: 24 additions & 42 deletions oxenmq/connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void OxenMQ::setup_external_socket(zmq::socket_t& socket) {
}
}

void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey) {
void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey, bool use_ephemeral_routing_id) {

setup_external_socket(socket);

Expand All @@ -57,7 +57,7 @@ void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot
socket.set(zmq::sockopt::curve_secretkey, privkey);
}

if (PUBKEY_BASED_ROUTING_ID) {
if (!use_ephemeral_routing_id) {
std::string routing_id;
routing_id.reserve(33);
routing_id += 'L'; // Prefix because routing id's starting with \0 are reserved by zmq (and our pubkey might start with \0)
Expand All @@ -67,39 +67,18 @@ void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot
// else let ZMQ pick a random one
}

ConnectionID OxenMQ::connect_sn(std::string_view pubkey, std::chrono::milliseconds keep_alive, std::string_view hint) {
if (!proxy_thread.joinable())
throw std::logic_error("Cannot call connect_sn() before calling `start()`");

detail::send_control(get_control_socket(), "CONNECT_SN", bt_serialize<bt_dict>({{"pubkey",pubkey}, {"keep_alive",keep_alive.count()}, {"hint",hint}}));

return pubkey;
}

ConnectionID OxenMQ::connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
AuthLevel auth_level, std::chrono::milliseconds timeout) {
if (!proxy_thread.joinable())
throw std::logic_error("Cannot call connect_remote() before calling `start()`");

auto id = next_conn_id++;
LMQ_TRACE("telling proxy to connect to ", remote, ", id ", id);
detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize<bt_dict>({
{"auth_level", static_cast<std::underlying_type_t<AuthLevel>>(auth_level)},
{"conn_id", id},
{"connect", detail::serialize_object(std::move(on_connect))},
{"failure", detail::serialize_object(std::move(on_failure))},
{"pubkey", remote.curve() ? remote.pubkey : ""},
{"remote", remote.zmq_address()},
{"timeout", timeout.count()},
}));

return id;
// Deprecated versions:
ConnectionID OxenMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect,
ConnectFailure on_failure, AuthLevel auth_level, std::chrono::milliseconds timeout) {
return connect_remote(address{remote}, std::move(on_connect), std::move(on_failure),
auth_level, connect_option::timeout{timeout});
}

ConnectionID OxenMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
std::string_view pubkey, AuthLevel auth_level, std::chrono::milliseconds timeout) {
return connect_remote(address{remote}.set_pubkey(pubkey),
std::move(on_connect), std::move(on_failure), auth_level, timeout);
ConnectionID OxenMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect,
ConnectFailure on_failure, std::string_view pubkey, AuthLevel auth_level,
std::chrono::milliseconds timeout) {
return connect_remote(address{remote}.set_pubkey(pubkey), std::move(on_connect),
std::move(on_failure), auth_level, connect_option::timeout{timeout});
}

void OxenMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
Expand All @@ -111,7 +90,7 @@ void OxenMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
}

std::pair<zmq::socket_t *, std::string>
OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, std::chrono::milliseconds keep_alive) {
OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, bool use_ephemeral_routing_id, std::chrono::milliseconds keep_alive) {
ConnectionID remote_cid{remote};
auto its = peers.equal_range(remote_cid);
peer_info* peer = nullptr;
Expand Down Expand Up @@ -163,7 +142,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,

LMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote));
zmq::socket_t socket{context, zmq::socket_type::dealer};
setup_outgoing_socket(socket, remote);
setup_outgoing_socket(socket, remote, use_ephemeral_routing_id);
try {
socket.connect(addr);
} catch (const zmq::error_t& e) {
Expand All @@ -189,9 +168,11 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consumer data) {
std::string_view hint, remote_pk;
std::chrono::milliseconds keep_alive;
bool optional = false, incoming_only = false, outgoing_only = false;
bool optional = false, incoming_only = false, outgoing_only = false, ephemeral_rid = EPHEMERAL_ROUTING_ID;

// Alphabetical order
if (data.skip_until("ephemeral_rid"))
ephemeral_rid = data.consume_integer<bool>();
if (data.skip_until("hint"))
hint = data.consume_string_view();
if (data.skip_until("incoming"))
Expand All @@ -206,7 +187,7 @@ std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consume
throw std::runtime_error("Internal error: Invalid proxy_connect_sn command; pubkey missing");
remote_pk = data.consume_string_view();

return proxy_connect_sn(remote_pk, hint, optional, incoming_only, outgoing_only, keep_alive);
return proxy_connect_sn(remote_pk, hint, optional, incoming_only, outgoing_only, ephemeral_rid, keep_alive);
}

template <typename Container, typename AccessIndex>
Expand Down Expand Up @@ -315,17 +296,18 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
std::string remote;
std::string remote_pubkey;
std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT;
bool ephemeral_rid = EPHEMERAL_ROUTING_ID;

if (data.skip_until("auth_level"))
auth_level = static_cast<AuthLevel>(data.consume_integer<std::underlying_type_t<AuthLevel>>());
if (data.skip_until("conn_id"))
conn_id = data.consume_integer<long long>();
if (data.skip_until("connect")) {
if (data.skip_until("connect"))
on_connect = detail::deserialize_object<ConnectSuccess>(data.consume_integer<uintptr_t>());
}
if (data.skip_until("failure")) {
if (data.skip_until("ephemeral_rid"))
ephemeral_rid = data.consume_integer<bool>();
if (data.skip_until("failure"))
on_failure = detail::deserialize_object<ConnectFailure>(data.consume_integer<uintptr_t>());
}
if (data.skip_until("pubkey")) {
remote_pubkey = data.consume_string();
assert(remote_pubkey.size() == 32 || remote_pubkey.empty());
Expand All @@ -344,7 +326,7 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {

zmq::socket_t sock{context, zmq::socket_type::dealer};
try {
setup_outgoing_socket(sock, remote_pubkey);
setup_outgoing_socket(sock, remote_pubkey, ephemeral_rid);
sock.connect(remote);
} catch (const zmq::error_t &e) {
proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] {
Expand Down
Loading

0 comments on commit 5175403

Please sign in to comment.