Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deploy copy-on-write for more efficient messaging #38

Merged
merged 4 commits into from
Mar 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/python/_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ PYBIND11_MODULE(_broker, m) {
.def("publish", (void (broker::endpoint::*)(broker::topic t, broker::data d)) &broker::endpoint::publish)
.def("publish", (void (broker::endpoint::*)(const broker::endpoint_info& dst, broker::topic t, broker::data d)) &broker::endpoint::publish)
.def("publish_batch",
[](broker::endpoint& ep, std::vector<broker::endpoint::value_type> xs) { ep.publish(xs); })
[](broker::endpoint& ep, std::vector<broker::data_message> xs) { ep.publish(xs); })
.def("make_publisher", &broker::endpoint::make_publisher)
.def("make_subscriber", &broker::endpoint::make_subscriber, py::arg("topics"), py::arg("max_qsize") = 20)
.def("make_status_subscriber", &broker::endpoint::make_status_subscriber, py::arg("receive_statuses") = false)
Expand Down
2 changes: 2 additions & 0 deletions broker/detail/clone_actor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public:
forward(make_internal_command<T>(std::move(x)));
}

void command(internal_command::variant_type& cmd);

void command(internal_command& cmd);

void operator()(none);
Expand Down
27 changes: 14 additions & 13 deletions broker/detail/core_policy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
#include <caf/actor.hpp>
#include <caf/actor_addr.hpp>
#include <caf/broadcast_downstream_manager.hpp>
#include <caf/cow_tuple.hpp>
#include <caf/detail/stream_distribution_tree.hpp>
#include <caf/fused_downstream_manager.hpp>
#include <caf/fwd.hpp>
#include <caf/message.hpp>
#include <caf/stream_slot.hpp>

#include <caf/detail/stream_distribution_tree.hpp>

#include "broker/data.hh"
#include "broker/filter_type.hh"
#include "broker/internal_command.hh"
#include "broker/message.hh"
#include "broker/peer_filter.hh"
#include "broker/topic.hh"

Expand All @@ -42,7 +43,7 @@ public:
template <class T>
struct local_trait {
/// Type of a single element in the stream.
using element = std::pair<topic, T>;
using element = caf::cow_tuple<topic, T>;

/// Type of a full batch in the stream.
using batch = std::vector<element>;
Expand All @@ -61,9 +62,8 @@ public:
/// Streaming-related types for peers.
struct peer_trait {
/// Type of a single element in the stream.
using element = caf::message;
using element = node_message;

/// Type of a full batch in the stream.
using batch = std::vector<element>;

/// Type of the downstream_manager that broadcasts data to local actors.
Expand All @@ -84,12 +84,13 @@ public:

/// Stream handshake in step 1 that includes our own filter. The receiver
/// replies with a step2 handshake.
using step1_handshake = caf::outbound_stream_slot<caf::message, filter_type,
using step1_handshake = caf::outbound_stream_slot<node_message,
filter_type,
caf::actor>;

/// Stream handshake in step 2. The receiver already has our filter
/// installed.
using step2_handshake = caf::outbound_stream_slot<caf::message,
using step2_handshake = caf::outbound_stream_slot<node_message,
caf::atom_value,
caf::actor>;

Expand Down Expand Up @@ -193,7 +194,7 @@ public:
/// @param peer_hdl Handle to the peering (remote) core actor.
/// @returns `false` if the peer is already connected, `true` otherwise.
/// @pre Current message is an `open_stream_msg`.
void ack_peering(const caf::stream<caf::message>& in,
void ack_peering(const caf::stream<node_message>& in,
const caf::actor& peer_hdl);

/// Queries whether we have an outbound path to `hdl`.
Expand All @@ -220,19 +221,19 @@ public:
// -- selectively pushing data into the streams ------------------------------

/// Pushes data to workers without forwarding it to peers.
void local_push(topic x, data y);
void local_push(data_message x);

/// Pushes data to stores without forwarding it to peers.
void local_push(topic x, internal_command y);
void local_push(command_message x);

/// Pushes data to peers only without forwarding it to local substreams.
void remote_push(caf::message msg);
void remote_push(node_message x);

/// Pushes data to peers and workers.
void push(topic x, data y);
void push(data_message msg);

/// Pushes data to peers and stores.
void push(topic x, internal_command y);
void push(command_message msg);

// -- properties -------------------------------------------------------------

Expand Down
2 changes: 2 additions & 0 deletions broker/detail/master_actor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public:

void command(internal_command& cmd);

void command(internal_command::variant_type& cmd);

void operator()(none);

void operator()(put_command&);
Expand Down
10 changes: 3 additions & 7 deletions broker/detail/prefix_matcher.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <utility>
#include <vector>

#include <caf/cow_tuple.hpp>
#include <caf/message.hpp>

#include "broker/topic.hh"
Expand All @@ -17,13 +18,8 @@ struct prefix_matcher {
bool operator()(const filter_type& filter, const topic& t) const;

template <class T>
bool operator()(const filter_type& filter,
const std::pair<topic, T>& x) const {
return (*this)(filter, x.first);
}

bool operator()(const filter_type& filter, const caf::message& msg) const {
return msg.match_element<topic>(0) && (*this)(filter, msg.get_as<topic>(0));
bool operator()(const filter_type& filter, const T& x) const {
return (*this)(filter, get_topic(x));
}
};

Expand Down
8 changes: 4 additions & 4 deletions broker/detail/shared_publisher_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "broker/detail/assert.hh"
#include "broker/detail/shared_queue.hh"
#include "broker/message.hh"

namespace broker {
namespace detail {
Expand All @@ -21,7 +22,7 @@ namespace detail {
/// - consume() fires the flare when it removes items from xs_ and less than 20
/// items remain
/// - produce() extinguishes the flare it adds items to xs_, exceeding 20
template <class ValueType = std::pair<topic, data>>
template <class ValueType = data_message>
class shared_publisher_queue : public shared_queue<ValueType> {
public:
using value_type = ValueType;
Expand Down Expand Up @@ -124,17 +125,16 @@ private:
const size_t capacity_;
};

template <class ValueType = std::pair<topic, data>>
template <class ValueType = data_message>
using shared_publisher_queue_ptr
= caf::intrusive_ptr<shared_publisher_queue<ValueType>>;

template <class ValueType = std::pair<topic, data>>
template <class ValueType = data_message>
shared_publisher_queue_ptr<ValueType>
make_shared_publisher_queue(size_t buffer_size) {
return caf::make_counted<shared_publisher_queue<ValueType>>(buffer_size);
}


} // namespace detail
} // namespace broker

Expand Down
3 changes: 2 additions & 1 deletion broker/detail/shared_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <caf/ref_counted.hpp>

#include "broker/data.hh"
#include "broker/message.hh"
#include "broker/topic.hh"

#include "broker/detail/flare.hh"
Expand All @@ -19,7 +20,7 @@ namespace broker {
namespace detail {

/// Base class for `shared_publisher_queue` and `shared_subscriber_queue`.
template <class ValueType = std::pair<topic, data>>
template <class ValueType = data_message>
class shared_queue : public caf::ref_counted {
public:
using value_type = ValueType;
Expand Down
11 changes: 6 additions & 5 deletions broker/detail/shared_subscriber_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@
#include <caf/make_counted.hpp>

#include "broker/detail/shared_queue.hh"
#include "broker/message.hh"

namespace broker {
namespace detail {

/// Synchronizes a publisher with a background worker. Uses the `pending` flag
/// and the `flare` to signalize demand to the user. Users can write as long as
/// the flare remains active. The user consumes items, while the worker
/// the flare remains active. The user consumes items, while the worker
/// produces them.
///
/// The protocol on the flare is as follows:
/// - the flare starts inactive
/// - the flare is active as long as xs_ has more than one item
/// - produce() fires the flare when it adds items to xs_ and xs_ was empty
/// - consume() extinguishes the flare when it removes the last item from xs_
template <class ValueType = std::pair<topic, data>>
template <class ValueType = data_message>
class shared_subscriber_queue : public shared_queue<ValueType> {
public:
using value_type = ValueType;

using super = shared_queue<ValueType>;
using super = shared_queue<value_type>;

using guard_type = typename super::guard_type;

Expand Down Expand Up @@ -75,11 +76,11 @@ public:
}
};

template <class ValueType = std::pair<topic, data>>
template <class ValueType = data_message>
using shared_subscriber_queue_ptr
= caf::intrusive_ptr<shared_subscriber_queue<ValueType>>;

template <class ValueType = std::pair<topic, data>>
template <class ValueType = data_message>
shared_subscriber_queue_ptr<ValueType> make_shared_subscriber_queue() {
return caf::make_counted<shared_subscriber_queue<ValueType>>();
}
Expand Down
11 changes: 5 additions & 6 deletions broker/endpoint.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
#include "broker/backend_options.hh"
#include "broker/configuration.hh"
#include "broker/endpoint_info.hh"
#include "broker/status_subscriber.hh"
#include "broker/expected.hh"
#include "broker/frontend.hh"
#include "broker/fwd.hh"
#include "broker/message.hh"
#include "broker/network_info.hh"
#include "broker/peer_info.hh"
#include "broker/status.hh"
#include "broker/status_subscriber.hh"
#include "broker/store.hh"
#include "broker/topic.hh"
#include "broker/time.hh"
#include "broker/topic.hh"

namespace broker {

Expand All @@ -42,9 +43,7 @@ class endpoint {
public:
// --- member types ----------------------------------------------------------

using value_type = std::pair<topic, data>;

using stream_type = caf::stream<value_type>;
using stream_type = caf::stream<data_message>;

using actor_init_fun = std::function<void (caf::event_based_actor*)>;

Expand Down Expand Up @@ -201,7 +200,7 @@ public:
void publish(topic t, std::initializer_list<data> xs);

// Publishes all messages in `xs`.
void publish(std::vector<value_type> xs);
void publish(std::vector<data_message> xs);

publisher make_publisher(topic ts);

Expand Down