From 2147f4e7599cd2e3e5d440f98b27a698a7c928d5 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Fri, 30 Nov 2018 21:41:32 +0100 Subject: [PATCH 1/3] Deploy copy-on-write for more efficient messaging Benchmarking Broker revealed high copying overhead, because Broker internally copies topics and data frequently the streams between actors. Using a copy-on-write tuple over topic and data instead of a pair avoids copies as long as messages are accessed as read-only. --- bindings/python/_broker.cpp | 2 +- broker/detail/clone_actor.hh | 2 + broker/detail/core_policy.hh | 27 +++-- broker/detail/master_actor.hh | 2 + broker/detail/prefix_matcher.hh | 10 +- broker/detail/shared_publisher_queue.hh | 8 +- broker/detail/shared_queue.hh | 3 +- broker/detail/shared_subscriber_queue.hh | 11 +- broker/endpoint.hh | 11 +- broker/message.hh | 131 +++++++++++++++++++++ broker/publisher.hh | 9 +- broker/store.hh | 5 +- broker/subscriber.hh | 7 +- broker/topic.hh | 2 +- src/broker-node.cc | 24 ++-- src/broker-pipe.cc | 11 +- src/configuration.cc | 3 + src/core_actor.cc | 40 +++---- src/detail/clone_actor.cc | 20 +++- src/detail/core_policy.cc | 74 ++++++------ src/detail/master_actor.cc | 14 ++- src/endpoint.cc | 11 +- src/publisher.cc | 5 +- src/subscriber.cc | 8 +- tests/benchmark/broker-benchmark.cc | 13 +- tests/benchmark/broker-stream-benchmark.cc | 7 +- tests/cpp/core.cc | 35 +++--- tests/cpp/integration.cc | 62 +++++----- tests/cpp/master.cc | 9 +- tests/cpp/publisher.cc | 25 ++-- tests/cpp/ssl.cc | 5 +- tests/cpp/status_subscriber.cc | 2 - tests/cpp/subscriber.cc | 29 +++-- tests/test.hpp | 10 ++ 34 files changed, 403 insertions(+), 234 deletions(-) create mode 100644 broker/message.hh diff --git a/bindings/python/_broker.cpp b/bindings/python/_broker.cpp index c851122b..b37b3d61 100644 --- a/bindings/python/_broker.cpp +++ b/bindings/python/_broker.cpp @@ -271,7 +271,7 @@ PYBIND11_MODULE(_broker, m) { .def("publish", (void (broker::endpoint::*)(broker::topic t, broker::data d)) &broker::endpoint::publish) .def("publish", (void (broker::endpoint::*)(const broker::endpoint_info& dst, broker::topic t, broker::data d)) &broker::endpoint::publish) .def("publish_batch", - [](broker::endpoint& ep, std::vector xs) { ep.publish(xs); }) + [](broker::endpoint& ep, std::vector xs) { ep.publish(xs); }) .def("make_publisher", &broker::endpoint::make_publisher) .def("make_subscriber", &broker::endpoint::make_subscriber, py::arg("topics"), py::arg("max_qsize") = 20) .def("make_status_subscriber", &broker::endpoint::make_status_subscriber, py::arg("receive_statuses") = false) diff --git a/broker/detail/clone_actor.hh b/broker/detail/clone_actor.hh index 7a336a8b..59fbfc81 100644 --- a/broker/detail/clone_actor.hh +++ b/broker/detail/clone_actor.hh @@ -38,6 +38,8 @@ public: forward(make_internal_command(std::move(x))); } + void command(internal_command::variant_type& cmd); + void command(internal_command& cmd); void operator()(none); diff --git a/broker/detail/core_policy.hh b/broker/detail/core_policy.hh index d0f98d3f..878c5e7b 100644 --- a/broker/detail/core_policy.hh +++ b/broker/detail/core_policy.hh @@ -9,16 +9,17 @@ #include #include #include +#include +#include #include #include #include #include -#include - #include "broker/data.hh" #include "broker/filter_type.hh" #include "broker/internal_command.hh" +#include "broker/message.hh" #include "broker/peer_filter.hh" #include "broker/topic.hh" @@ -42,7 +43,7 @@ public: template struct local_trait { /// Type of a single element in the stream. - using element = std::pair; + using element = caf::cow_tuple; /// Type of a full batch in the stream. using batch = std::vector; @@ -61,9 +62,8 @@ public: /// Streaming-related types for peers. struct peer_trait { /// Type of a single element in the stream. - using element = caf::message; + using element = node_message; - /// Type of a full batch in the stream. using batch = std::vector; /// Type of the downstream_manager that broadcasts data to local actors. @@ -84,12 +84,13 @@ public: /// Stream handshake in step 1 that includes our own filter. The receiver /// replies with a step2 handshake. - using step1_handshake = caf::outbound_stream_slot; /// Stream handshake in step 2. The receiver already has our filter /// installed. - using step2_handshake = caf::outbound_stream_slot; @@ -193,7 +194,7 @@ public: /// @param peer_hdl Handle to the peering (remote) core actor. /// @returns `false` if the peer is already connected, `true` otherwise. /// @pre Current message is an `open_stream_msg`. - void ack_peering(const caf::stream& in, + void ack_peering(const caf::stream& in, const caf::actor& peer_hdl); /// Queries whether we have an outbound path to `hdl`. @@ -220,19 +221,19 @@ public: // -- selectively pushing data into the streams ------------------------------ /// Pushes data to workers without forwarding it to peers. - void local_push(topic x, data y); + void local_push(data_message x); /// Pushes data to stores without forwarding it to peers. - void local_push(topic x, internal_command y); + void local_push(command_message x); /// Pushes data to peers only without forwarding it to local substreams. - void remote_push(caf::message msg); + void remote_push(node_message x); /// Pushes data to peers and workers. - void push(topic x, data y); + void push(data_message msg); /// Pushes data to peers and stores. - void push(topic x, internal_command y); + void push(command_message msg); // -- properties ------------------------------------------------------------- diff --git a/broker/detail/master_actor.hh b/broker/detail/master_actor.hh index 2f8eef53..74f1a560 100644 --- a/broker/detail/master_actor.hh +++ b/broker/detail/master_actor.hh @@ -49,6 +49,8 @@ public: void command(internal_command& cmd); + void command(internal_command::variant_type& cmd); + void operator()(none); void operator()(put_command&); diff --git a/broker/detail/prefix_matcher.hh b/broker/detail/prefix_matcher.hh index 0d337b47..d64e3870 100644 --- a/broker/detail/prefix_matcher.hh +++ b/broker/detail/prefix_matcher.hh @@ -4,6 +4,7 @@ #include #include +#include #include #include "broker/topic.hh" @@ -17,13 +18,8 @@ struct prefix_matcher { bool operator()(const filter_type& filter, const topic& t) const; template - bool operator()(const filter_type& filter, - const std::pair& x) const { - return (*this)(filter, x.first); - } - - bool operator()(const filter_type& filter, const caf::message& msg) const { - return msg.match_element(0) && (*this)(filter, msg.get_as(0)); + bool operator()(const filter_type& filter, const T& x) const { + return (*this)(filter, get_topic(x)); } }; diff --git a/broker/detail/shared_publisher_queue.hh b/broker/detail/shared_publisher_queue.hh index 5f4c0bdf..e0607792 100644 --- a/broker/detail/shared_publisher_queue.hh +++ b/broker/detail/shared_publisher_queue.hh @@ -6,6 +6,7 @@ #include "broker/detail/assert.hh" #include "broker/detail/shared_queue.hh" +#include "broker/message.hh" namespace broker { namespace detail { @@ -21,7 +22,7 @@ namespace detail { /// - consume() fires the flare when it removes items from xs_ and less than 20 /// items remain /// - produce() extinguishes the flare it adds items to xs_, exceeding 20 -template > +template class shared_publisher_queue : public shared_queue { public: using value_type = ValueType; @@ -124,17 +125,16 @@ private: const size_t capacity_; }; -template > +template using shared_publisher_queue_ptr = caf::intrusive_ptr>; -template > +template shared_publisher_queue_ptr make_shared_publisher_queue(size_t buffer_size) { return caf::make_counted>(buffer_size); } - } // namespace detail } // namespace broker diff --git a/broker/detail/shared_queue.hh b/broker/detail/shared_queue.hh index 08497c86..dd8bb579 100644 --- a/broker/detail/shared_queue.hh +++ b/broker/detail/shared_queue.hh @@ -11,6 +11,7 @@ #include #include "broker/data.hh" +#include "broker/message.hh" #include "broker/topic.hh" #include "broker/detail/flare.hh" @@ -19,7 +20,7 @@ namespace broker { namespace detail { /// Base class for `shared_publisher_queue` and `shared_subscriber_queue`. -template > +template class shared_queue : public caf::ref_counted { public: using value_type = ValueType; diff --git a/broker/detail/shared_subscriber_queue.hh b/broker/detail/shared_subscriber_queue.hh index ecf3655e..93939490 100644 --- a/broker/detail/shared_subscriber_queue.hh +++ b/broker/detail/shared_subscriber_queue.hh @@ -5,13 +5,14 @@ #include #include "broker/detail/shared_queue.hh" +#include "broker/message.hh" namespace broker { namespace detail { /// Synchronizes a publisher with a background worker. Uses the `pending` flag /// and the `flare` to signalize demand to the user. Users can write as long as -/// the flare remains active. The user consumes items, while the worker +/// the flare remains active. The user consumes items, while the worker /// produces them. /// /// The protocol on the flare is as follows: @@ -19,12 +20,12 @@ namespace detail { /// - the flare is active as long as xs_ has more than one item /// - produce() fires the flare when it adds items to xs_ and xs_ was empty /// - consume() extinguishes the flare when it removes the last item from xs_ -template > +template class shared_subscriber_queue : public shared_queue { public: using value_type = ValueType; - using super = shared_queue; + using super = shared_queue; using guard_type = typename super::guard_type; @@ -75,11 +76,11 @@ public: } }; -template > +template using shared_subscriber_queue_ptr = caf::intrusive_ptr>; -template > +template shared_subscriber_queue_ptr make_shared_subscriber_queue() { return caf::make_counted>(); } diff --git a/broker/endpoint.hh b/broker/endpoint.hh index cf3cee9c..89f0f2a7 100644 --- a/broker/endpoint.hh +++ b/broker/endpoint.hh @@ -22,16 +22,17 @@ #include "broker/backend_options.hh" #include "broker/configuration.hh" #include "broker/endpoint_info.hh" -#include "broker/status_subscriber.hh" #include "broker/expected.hh" #include "broker/frontend.hh" #include "broker/fwd.hh" +#include "broker/message.hh" #include "broker/network_info.hh" #include "broker/peer_info.hh" #include "broker/status.hh" +#include "broker/status_subscriber.hh" #include "broker/store.hh" -#include "broker/topic.hh" #include "broker/time.hh" +#include "broker/topic.hh" namespace broker { @@ -42,9 +43,7 @@ class endpoint { public: // --- member types ---------------------------------------------------------- - using value_type = std::pair; - - using stream_type = caf::stream; + using stream_type = caf::stream; using actor_init_fun = std::function; @@ -201,7 +200,7 @@ public: void publish(topic t, std::initializer_list xs); // Publishes all messages in `xs`. - void publish(std::vector xs); + void publish(std::vector xs); publisher make_publisher(topic ts); diff --git a/broker/message.hh b/broker/message.hh new file mode 100644 index 00000000..fc71839a --- /dev/null +++ b/broker/message.hh @@ -0,0 +1,131 @@ +#ifndef BROKER_EVENT_HH +#define BROKER_EVENT_HH + +#include + +#include +#include + +#include "broker/data.hh" +#include "broker/internal_command.hh" +#include "broker/topic.hh" + +namespace broker { + +/// A user-defined message with topic and data. +using data_message = caf::cow_tuple; + +/// A broker-internal message with topic and command. +using command_message = caf::cow_tuple; + +/// A message for node-to-node communication with either a user-defined data +/// message or a broker-internal command messages. +struct node_message { + /// Content of the message. + caf::variant content; + + /// Time-to-life counter. + uint16_t ttl; +}; + +/// @relates node_message +template +typename Inspector::result_type inspect(Inspector& f, node_message& x) { + return f(x.content, x.ttl); +} + +/// Generates a broker ::data_message. +template +data_message make_data_message(Topic&& t, Data&& d) { + return data_message(std::forward(t), std::forward(d)); +} + +/// Generates a broker ::command_message. +template +command_message make_command_message(Topic&& t, Command&& d) { + return command_message(std::forward(t), std::forward(d)); +} + +/// Generates a broker ::node_message. +inline node_message make_node_message(data_message msg, uint16_t ttl) { + return {std::move(msg), ttl}; +} + +/// Generates a broker ::node_message. +inline node_message make_node_message(command_message msg, uint16_t ttl) { + return {std::move(msg), ttl}; +} + +/// Retrieves the topic from a ::data_message. +inline const topic& get_topic(const data_message& x) { + return get<0>(x); +} + +/// Retrieves the topic from a ::command_message. +inline const topic& get_topic(const command_message& x) { + return get<0>(x); +} + +/// Retrieves the topic from a ::generic_message. +inline const topic& get_topic(const node_message& x) { + if(caf::holds_alternative(x.content)) + return get_topic(caf::get(x.content)); + return get_topic(caf::get(x.content)); +} + +/// Moves the topic out of a ::data_message, copying it if more than one +/// reference exists.. +inline topic&& move_topic(data_message& x) { + return std::move(get<0>(x.unshared())); +} + +/// Moves the topic out of a ::command_message, copying it if more than one +/// reference exists.. +inline topic&& move_topic(command_message& x) { + return std::move(get<0>(x.unshared())); +} + +/// Moves the topic out of a ::generic_message, copying it if more than one +/// reference exists.. +inline topic&& move_topic(node_message& x) { + if( caf::holds_alternative(x.content)) + return move_topic(caf::get(x.content)); + return move_topic(caf::get(x.content)); +} + +/// Retrieves the data from a ::data_message. +inline const data& get_data(const data_message& x) { + return get<1>(x); +} + +/// Moves the data out of a ::data_message, copying it if more than one +/// reference exists. +inline const data&& move_data(data_message& x) { + return std::move(get<1>(x.unshared())); +} +/// Retrieves the command from a ::command_message. +inline const internal_command::variant_type& +get_command(const command_message& x) { + return get<1>(x).content; +} + +/// Moves the command out of a ::command_message, copying it if more than one +/// reference exists. +inline internal_command::variant_type +move_command(command_message& x) { + return std::move(get<1>(x.unshared()).content); +} + +/// Returns whether `x` contains a ::node_message. +inline bool is_data_message(const node_message& x) { + return caf::holds_alternative(x.content); +} + +/// Returns whether `x` contains a ::command_message. +inline bool is_command_message(const node_message& x) { + return caf::holds_alternative(x.content); +} + +} // namespace broker + +#endif // BROKER_EVENT_HH diff --git a/broker/publisher.hh b/broker/publisher.hh index b849b9bb..b625653e 100644 --- a/broker/publisher.hh +++ b/broker/publisher.hh @@ -9,6 +9,7 @@ #include "broker/atoms.hh" #include "broker/fwd.hh" +#include "broker/message.hh" #include "broker/detail/shared_publisher_queue.hh" @@ -23,7 +24,7 @@ public: // --- nested types ---------------------------------------------------------- - using value_type = std::pair; + using value_type = data_message; using guard_type = std::unique_lock; @@ -72,13 +73,13 @@ public: } // --- mutators -------------------------------------------------------------- - + /// Forces the publisher to drop all remaining items from the queue when the /// destructor gets called. void drop_all_on_destruction(); - + // --- messaging ------------------------------------------------------------- - + /// Sends `x` to all subscribers. void publish(data x); diff --git a/broker/store.hh b/broker/store.hh index 60a4c1aa..4d033f39 100644 --- a/broker/store.hh +++ b/broker/store.hh @@ -4,10 +4,11 @@ #include #include -#include +#include #include #include #include +#include #include "broker/api_flags.hh" #include "broker/atoms.hh" @@ -30,7 +31,7 @@ class store { public: friend class endpoint; - using stream_type = caf::stream>; + using stream_type = caf::stream>; /// A response to a lookup request issued by a ::proxy. struct response { diff --git a/broker/subscriber.hh b/broker/subscriber.hh index 311186d3..a242732c 100644 --- a/broker/subscriber.hh +++ b/broker/subscriber.hh @@ -7,15 +7,16 @@ #include "broker/data.hh" #include "broker/fwd.hh" -#include "broker/topic.hh" +#include "broker/message.hh" #include "broker/subscriber_base.hh" +#include "broker/topic.hh" #include "broker/detail/shared_subscriber_queue.hh" namespace broker { /// Provides blocking access to a stream of data. -class subscriber : public subscriber_base> { +class subscriber : public subscriber_base { public: // --- friend declarations --------------------------------------------------- @@ -23,7 +24,7 @@ public: // --- nested types ---------------------------------------------------------- - using super = subscriber_base>; + using super = subscriber_base; // --- constructors and destructors ------------------------------------------ diff --git a/broker/topic.hh b/broker/topic.hh index afabc307..a076b70d 100644 --- a/broker/topic.hh +++ b/broker/topic.hh @@ -1,10 +1,10 @@ #ifndef BROKER_TOPIC_HH #define BROKER_TOPIC_HH -#include #include #include #include +#include #include #include "broker/detail/operators.hh" diff --git a/src/broker-node.cc b/src/broker-node.cc index 44388f00..1b84672c 100644 --- a/src/broker-node.cc +++ b/src/broker-node.cc @@ -261,11 +261,12 @@ void relay_mode(broker::endpoint& ep, broker::topic topic) { auto in = ep.make_subscriber({topic}); for (;;) { auto x = in.get(); - if (is_ping_msg(x.second)) { - verbose::println("received ping ", msg_id(x.second)); - } else if (is_pong_msg(x.second)) { - verbose::println("received pong ", msg_id(x.second)); - } else if (is_stop_msg(x.second)) { + auto& val = get<1>(x); + if (is_ping_msg(val)) { + verbose::println("received ping ", msg_id(val)); + } else if (is_pong_msg(val)) { + verbose::println("received pong ", msg_id(val)); + } else if (is_stop_msg(val)) { verbose::println("received stop"); return; } @@ -290,7 +291,7 @@ void ping_mode(broker::endpoint& ep, broker::topic topic) { ep.publish(topic, make_ping_msg(0, 0)); while (!connected) { auto x = in.get(caf::duration{retry_timeout}); - if (x && is_pong_msg(x->second, 0)) + if (x && is_pong_msg(get<1>(*x), 0)) connected = true; else ep.publish(topic, make_ping_msg(0, 0)); @@ -303,7 +304,7 @@ void ping_mode(broker::endpoint& ep, broker::topic topic) { ep.publish(topic, make_ping_msg(i, s)); do { auto x = in.get(); - done = is_pong_msg(x.second, i); + done = is_pong_msg(get<1>(x), i); } while (!done); auto t1 = std::chrono::system_clock::now(); auto roundtrip = std::chrono::duration_cast(t1 - t0); @@ -318,10 +319,11 @@ void pong_mode(broker::endpoint& ep, broker::topic topic) { auto in = ep.make_subscriber({topic}); for (;;) { auto x = in.get(); - if (is_ping_msg(x.second)) { - verbose::println("received ping ", msg_id(x.second)); - ep.publish(topic, make_pong_msg(msg_id(x.second))); - } else if (is_stop_msg(x.second)) { + auto& val = get<1>(x); + if (is_ping_msg(val)) { + verbose::println("received ping ", msg_id(val)); + ep.publish(topic, make_pong_msg(msg_id(val))); + } else if (is_stop_msg(val)) { verbose::println("received stop"); return; } diff --git a/src/broker-pipe.cc b/src/broker-pipe.cc index f948685d..de64f1a1 100644 --- a/src/broker-pipe.cc +++ b/src/broker-pipe.cc @@ -19,12 +19,12 @@ #pragma GCC diagnostic ignored "-Wdeprecated-declarations" #include #include +#include #include #include #include #include #include -#include #pragma GCC diagnostic pop #include "broker/atoms.hh" @@ -38,6 +38,8 @@ #include "broker/topic.hh" using broker::data; +using broker::data_message; +using broker::make_data_message; using broker::topic; using namespace caf; @@ -125,7 +127,8 @@ void publish_mode_stream(broker::endpoint& ep, const std::string& topic_str, [](size_t& msgs) { msgs = 0; }, - [=](size_t& msgs, downstream>& out, size_t hint) { + [=](size_t& msgs, downstream& out, + size_t hint) { auto num = std::min(cap - msgs, hint); std::string line; for (size_t i = 0; i < num; ++i) @@ -134,7 +137,7 @@ void publish_mode_stream(broker::endpoint& ep, const std::string& topic_str, msgs = cap; return; } else { - out.push(std::make_pair(topic_str, std::move(line))); + out.push(make_data_message(topic_str, std::move(line))); } msgs += num; }, @@ -181,7 +184,7 @@ void subscribe_mode_stream(broker::endpoint& ep, const std::string& topic_str, [](size_t& msgs) { msgs = 0; }, - [=](size_t& msgs, std::pair x) { + [=](size_t& msgs, data_message x) { print_line(std::cout, deep_to_string(x)); if (++msgs >= cap) throw std::runtime_error("Reached cap"); diff --git a/src/configuration.cc b/src/configuration.cc index 33d63e56..261ba0f0 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -42,6 +42,9 @@ configuration::configuration(broker_options opts) : options_(std::move(opts)) { add_message_type>("broker::optional"); add_message_type("broker::snapshot"); add_message_type("broker::internal_command"); + add_message_type("broker::command_message"); + add_message_type("broker::data_message"); + add_message_type("broker::node_message"); add_message_type("broker::set_command"); add_message_type( "broker::store::stream_type::value_type"); diff --git a/src/core_actor.cc b/src/core_actor.cc index a4d0fb7e..b1266da9 100644 --- a/src/core_actor.cc +++ b/src/core_actor.cc @@ -303,7 +303,8 @@ caf::behavior core_actor(caf::stateful_actor* self, return st.policy().start_peering(peer_hdl, std::move(peer_ts)); }, // Step #2: B establishes a stream to A and sends its own filter - [=](const stream& in, filter_type& filter, caf::actor& peer_hdl) { + [=](const stream& in, filter_type& filter, + caf::actor& peer_hdl) { CAF_LOG_TRACE(CAF_ARG(in) << CAF_ARG(filter) << peer_hdl); auto& st = self->state; CAF_LOG_DEBUG("received handshake step #2 from" << peer_hdl @@ -328,7 +329,7 @@ caf::behavior core_actor(caf::stateful_actor* self, }, // Step #3: - A establishes a stream to B // - B has a stream to A and vice versa now - [=](const stream& in, ok_atom, caf::actor& peer_hdl) { + [=](const stream& in, ok_atom, caf::actor& peer_hdl) { CAF_LOG_TRACE(CAF_ARG(in) << CAF_ARG(peer_hdl)); auto& st = self->state; if (!st.policy().has_outbound_path_to(peer_hdl)) { @@ -382,26 +383,26 @@ caf::behavior core_actor(caf::stateful_actor* self, auto& st = self->state; st.governor->add_unchecked_inbound_path(in); }, - [=](atom::publish, topic& t, data& x) { - CAF_LOG_TRACE(CAF_ARG(t) << CAF_ARG(x)); - self->state.policy().push(std::move(t), std::move(x)); + [=](atom::publish, data_message& x) { + CAF_LOG_TRACE(CAF_ARG(x)); + self->state.policy().push(std::move(x)); }, - [=](atom::publish, topic& t, internal_command& x) { - CAF_LOG_TRACE(CAF_ARG(t) << CAF_ARG(x)); - self->state.policy().push(std::move(t), std::move(x)); + [=](atom::publish, command_message& x) { + CAF_LOG_TRACE(CAF_ARG(x)); + self->state.policy().push(std::move(x)); }, // --- communication to local actors only, i.e., never forward to peers ---- - [=](atom::publish, atom::local, topic& t, data& x) { - CAF_LOG_TRACE(CAF_ARG(t) << CAF_ARG(x)); - self->state.policy().local_push(std::move(t), std::move(x)); + [=](atom::publish, atom::local, data_message& x) { + CAF_LOG_TRACE(CAF_ARG(x)); + self->state.policy().local_push(std::move(x)); }, - [=](atom::publish, atom::local, topic& t, internal_command& x) { - CAF_LOG_TRACE(CAF_ARG(t) << CAF_ARG(x)); - self->state.policy().local_push(std::move(t), std::move(x)); + [=](atom::publish, atom::local, command_message& x) { + CAF_LOG_TRACE(CAF_ARG(x)); + self->state.policy().local_push(std::move(x)); }, // --- "one-to-one" communication that bypasses streaming entirely --------- - [=](atom::publish, endpoint_info& e, topic& t, data& x) { - CAF_LOG_TRACE(CAF_ARG(t) << CAF_ARG(x)); + [=](atom::publish, endpoint_info& e, data_message& x) { + CAF_LOG_TRACE(CAF_ARG(e) << CAF_ARG(x)); auto& st = self->state; actor hdl; if (e.network) { @@ -417,8 +418,7 @@ caf::behavior core_actor(caf::stateful_actor* self, return; } } - self->send(hdl, atom::publish::value, atom::local::value, - std::move(t), std::move(x)); + self->send(hdl, atom::publish::value, atom::local::value, std::move(x)); }, // --- data store management ----------------------------------------------- [=](atom::store, atom::master, atom::attach, const std::string& name, @@ -567,9 +567,9 @@ caf::behavior core_actor(caf::stateful_actor* self, [=](atom::store, atom::master, atom::snapshot, const std::string& name, caf::actor& clone) { // Instruct master to generate a snapshot. - self->state.policy().push( + self->state.policy().push(make_command_message( name / topics::master_suffix, - make_internal_command(self, std::move(clone))); + make_internal_command(self, std::move(clone)))); }, [=](atom::store, atom::master, atom::get, const std::string& name) -> result { diff --git a/src/detail/clone_actor.cc b/src/detail/clone_actor.cc index 63eeceeb..84c35940 100644 --- a/src/detail/clone_actor.cc +++ b/src/detail/clone_actor.cc @@ -56,11 +56,16 @@ void clone_state::init(caf::event_based_actor* ptr, std::string&& nm, } void clone_state::forward(internal_command&& x) { - self->send(core, atom::publish::value, master_topic, std::move(x)); + self->send(core, atom::publish::value, + make_command_message(master_topic, std::move(x))); +} + +void clone_state::command(internal_command::variant_type& cmd) { + caf::visit(*this, cmd); } void clone_state::command(internal_command& cmd) { - caf::visit(*this, cmd.content); + command(cmd.content); } void clone_state::operator()(none) { @@ -381,8 +386,11 @@ caf::behavior clone_actor(caf::stateful_actor* self, }, // processing step [=](caf::unit_t&, store::stream_type::value_type y) { - if ( y.second.content.is() ) { - self->state.command(y.second); + // TODO: our operator() overloads require mutable references, but + // only a fraction actually benefit from it. + auto cmd = move_command(y); + if (caf::holds_alternative(cmd)) { + self->state.command(cmd); return; } @@ -390,11 +398,11 @@ caf::behavior clone_actor(caf::stateful_actor* self, return; if ( self->state.awaiting_snapshot ) { - self->state.pending_remote_updates.emplace_back(std::move(y.second)); + self->state.pending_remote_updates.emplace_back(std::move(cmd)); return; } - self->state.command(y.second); + self->state.command(cmd); } ); } diff --git a/src/detail/core_policy.cc b/src/detail/core_policy.cc index 67d8ced2..b12dac4b 100644 --- a/src/detail/core_policy.cc +++ b/src/detail/core_policy.cc @@ -104,49 +104,46 @@ void core_policy::handle_batch(stream_slot, const strong_actor_ptr& peer, // Only received from other peers. Extract content for to local workers // or stores and then forward to other peers. for (auto& msg : xs.get_mutable_as(0)) { - if (msg.size() < 2 || !msg.match_element(0)) { - CAF_LOG_DEBUG("dropped unexpected message type"); - continue; + const topic* t; + // Dispatch to local workers or stores messages. + if (is_data_message(msg)) { + auto& dm = get(msg.content); + t = &get_topic(dm); + if (num_workers > 0) + workers().push(dm); + } else { + auto& cm = get(msg.content); + t = &get_topic(cm); + if (num_stores > 0) + stores().push(cm); } - // Extract worker messages. - if (num_workers > 0 && msg.match_element(1)) - workers().push(msg.get_as(0), msg.get_as(1)); - // Extract store messages. - if (num_stores > 0 && msg.match_element(1)) - stores().push(msg.get_as(0), msg.get_as(1)); // Check if forwarding is on. if (!state_->options.forward) continue; // Somewhat hacky, but don't forward data store clone messages. - if (ends_with(msg.get_as(0).string(), - topics::clone_suffix.string())) + if (ends_with(t->string(), topics::clone_suffix.string())) continue; // Either decrease TTL if message has one already, or add one. - if (msg.size() < 3) { - // Does not have a TTL yet, set a TTL of 5. - msg += make_message(state_->options.ttl - 1); // We're hop 1 already. - } else { - auto& ttl = msg.get_mutable_as(2); - if (--ttl <= 0) { - CAF_LOG_WARNING("dropped a message with expired TTL"); - continue; - } + if (--msg.ttl == 0) { + CAF_LOG_WARNING("dropped a message with expired TTL"); + continue; } // Forward to other peers. peers().push(std::move(msg)); } return; } + auto ttl = state_->options.ttl; if (xs.match_elements()) { CAF_LOG_DEBUG("forward batch from local workers to peers"); for (auto& x : xs.get_mutable_as(0)) - peers().push(make_message(std::move(x.first), std::move(x.second))); + peers().push(make_node_message(std::move(x), ttl)); return; } if (xs.match_elements()) { CAF_LOG_DEBUG("forward batch from local stores to peers"); for (auto& x : xs.get_mutable_as(0)) - peers().push(make_message(std::move(x.first), std::move(x.second))); + peers().push(make_node_message(std::move(x), ttl)); return; } CAF_LOG_ERROR("unexpected batch:" << deep_to_string(xs)); @@ -274,7 +271,7 @@ bool core_policy::has_peer(const actor& hdl) const { return peer_to_opath_.count(hdl) != 0 || peer_to_ipath_.count(hdl) != 0; } -void core_policy::ack_peering(const stream& in, +void core_policy::ack_peering(const stream& in, const actor& peer_hdl) { CAF_LOG_TRACE(CAF_ARG(peer_hdl)); // Check whether we already receive inbound traffic from the peer. Could use @@ -370,42 +367,41 @@ auto core_policy::add_worker(filter_type filter) // -- selectively pushing data into the streams ------------------------------ /// Pushes data to workers without forwarding it to peers. -void core_policy::local_push(topic x, data y) { - CAF_LOG_TRACE(CAF_ARG(x) << CAF_ARG(y)); +void core_policy::local_push(data_message x) { + CAF_LOG_TRACE(CAF_ARG(x) << CAF_ARG2("num_paths", workers().num_paths())); if (workers().num_paths() > 0) { - workers().push(std::move(x), std::move(y)); + workers().push(std::move(x)); workers().emit_batches(); } } /// Pushes data to stores without forwarding it to peers. -void core_policy::local_push(topic x, internal_command y) { - CAF_LOG_TRACE(CAF_ARG(x) << CAF_ARG(y) << - CAF_ARG2("num_paths", stores().num_paths())); +void core_policy::local_push(command_message x) { + CAF_LOG_TRACE(CAF_ARG(x) << CAF_ARG2("num_paths", stores().num_paths())); if (stores().num_paths() > 0) { - stores().push(std::move(x), std::move(y)); + stores().push(std::move(x)); stores().emit_batches(); } } /// Pushes data to peers only without forwarding it to local substreams. -void core_policy::remote_push(message msg) { +void core_policy::remote_push(node_message msg) { CAF_LOG_TRACE(CAF_ARG(msg)); peers().push(std::move(msg)); peers().emit_batches(); } /// Pushes data to peers and workers. -void core_policy::push(topic x, data y) { - CAF_LOG_TRACE(CAF_ARG(x) << CAF_ARG(y)); - remote_push(make_message(std::move(x), std::move(y))); +void core_policy::push(data_message msg) { + CAF_LOG_TRACE(CAF_ARG(msg)); + remote_push(make_node_message(std::move(msg), state_->options.ttl)); //local_push(std::move(x), std::move(y)); } /// Pushes data to peers and stores. -void core_policy::push(topic x, internal_command y) { - CAF_LOG_TRACE(CAF_ARG(x) << CAF_ARG(y)); - remote_push(make_message(std::move(x), std::move(y))); +void core_policy::push(command_message msg) { + CAF_LOG_TRACE(CAF_ARG(msg)); + remote_push(make_node_message(std::move(msg), state_->options.ttl)); //local_push(std::move(x), std::move(y)); } @@ -498,13 +494,13 @@ void core_policy::add_opath(stream_slot slot, const actor& peer_hdl) { auto core_policy::add(std::true_type, const actor& hdl) -> step1_handshake { auto xs = std::make_tuple(state_->filter, actor_cast(self())); - return parent_->add_unchecked_outbound_path(hdl, std::move(xs)); + return parent_->add_unchecked_outbound_path(hdl, std::move(xs)); } auto core_policy::add(std::false_type, const actor& hdl) -> step2_handshake { atom_value ok = ok_atom::value; auto xs = std::make_tuple(ok, actor_cast(self())); - return parent_->add_unchecked_outbound_path(hdl, std::move(xs)); + return parent_->add_unchecked_outbound_path(hdl, std::move(xs)); } } // namespace detail diff --git a/src/detail/master_actor.cc b/src/detail/master_actor.cc index 1efa69d9..0826852b 100644 --- a/src/detail/master_actor.cc +++ b/src/detail/master_actor.cc @@ -59,7 +59,8 @@ void master_state::init(caf::event_based_actor* ptr, std::string&& nm, } void master_state::broadcast(internal_command&& x) { - self->send(core, atom::publish::value, clones_topic, std::move(x)); + self->send(core, atom::publish::value, + make_command_message(clones_topic, std::move(x))); } void master_state::remind(timespan expiry, const data& key) { @@ -80,7 +81,11 @@ void master_state::expire(data& key) { } void master_state::command(internal_command& cmd) { - caf::visit(*this, cmd.content); + command(cmd.content); +} + +void master_state::command(internal_command::variant_type& cmd) { + caf::visit(*this, cmd); } void master_state::operator()(none) { @@ -310,7 +315,10 @@ caf::behavior master_actor(caf::stateful_actor* self, }, // processing step [=](caf::unit_t&, store::stream_type::value_type y) { - self->state.command(y.second); + // TODO: our operator() overloads require mutable references, but + // only a fraction actually benefit from it. + auto cmd = move_command(y); + self->state.command(cmd); }, // cleanup [](caf::unit_t&, const caf::error&) { diff --git a/src/endpoint.cc b/src/endpoint.cc index beb2a30d..13042af0 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -264,18 +264,21 @@ void endpoint::forward(std::vector ts) void endpoint::publish(topic t, data d) { BROKER_INFO("publishing" << std::make_pair(t, d)); - caf::anon_send(core(), atom::publish::value, std::move(t), std::move(d)); + caf::anon_send(core(), atom::publish::value, + make_data_message(std::move(t), std::move(d))); } void endpoint::publish(const endpoint_info& dst, topic t, data d) { BROKER_INFO("publishing" << std::make_pair(t, d) << "to" << dst.node); - caf::anon_send(core(), atom::publish::value, dst, std::move(t), std::move(d)); + caf::anon_send(core(), atom::publish::value, dst, + make_data_message(std::move(t), std::move(d))); } -void endpoint::publish(std::vector xs) { +void endpoint::publish(std::vector xs) { for ( auto& x : xs ) { BROKER_INFO("publishing" << x); - caf::anon_send(core(), atom::publish::value, std::move(x.first), std::move(x.second)); + auto& tup = x.unshared(); + caf::anon_send(core(), atom::publish::value, std::move(x)); } } diff --git a/src/publisher.cc b/src/publisher.cc index 6a8ce49f..fa53554e 100644 --- a/src/publisher.cc +++ b/src/publisher.cc @@ -5,6 +5,7 @@ #include "broker/data.hh" #include "broker/endpoint.hh" +#include "broker/message.hh" #include "broker/topic.hh" using namespace caf; @@ -55,9 +56,9 @@ behavior publisher_worker(stateful_actor* self, [](unit_t&) { // nop }, - [=](unit_t&, downstream& out, size_t num) { + [=](unit_t&, downstream& out, size_t num) { auto& st = self->state; - auto consumed = qptr->consume(num, [&](std::pair&& x) { + auto consumed = qptr->consume(num, [&](data_message&& x) { out.push(std::move(x)); }); if (consumed > 0) { diff --git a/src/subscriber.cc b/src/subscriber.cc index c8460611..70301730 100644 --- a/src/subscriber.cc +++ b/src/subscriber.cc @@ -51,11 +51,9 @@ struct subscriber_worker_state { const char* subscriber_worker_state::name = "subscriber_worker"; -using input_type = std::pair; - -class subscriber_sink : public stream_sink { +class subscriber_sink : public stream_sink { public: - using super = stream_sink; + using super = stream_sink; using queue_ptr = detail::shared_subscriber_queue_ptr<>; @@ -76,7 +74,7 @@ class subscriber_sink : public stream_sink { protected: void handle(inbound_path*, downstream_msg::batch& x) override { CAF_LOG_TRACE(CAF_ARG(x)); - using vec_type = std::vector; + using vec_type = std::vector; if (x.xs.match_elements()) { auto& xs = x.xs.get_mutable_as(0); auto xs_size = xs.size(); diff --git a/tests/benchmark/broker-benchmark.cc b/tests/benchmark/broker-benchmark.cc index 49e4473f..b0145cc2 100644 --- a/tests/benchmark/broker-benchmark.cc +++ b/tests/benchmark/broker-benchmark.cc @@ -298,20 +298,19 @@ void clientMode(const char* host, int port) { ep.subscribe_nosync({"/benchmark/stats"}, [](caf::unit_t&) { }, - [&](caf::unit_t&, std::pair x) { - receivedStats(ep, std::move(x.second)); + [&](caf::unit_t&, broker::data_message x) { + receivedStats(ep, move_data(x)); }, [](caf::unit_t&, const caf::error&) { // nop }); if ( use_non_blocking ) { - using value_type = std::pair; ep.publish_all( [](caf::unit_t&) { // nop }, - [&](caf::unit_t&, caf::downstream& out, size_t num) { + [&](caf::unit_t&, caf::downstream& out, size_t num) { const broker::bro::Event* ev; std::cerr << "can publish " << num << ", have " << q.size_approx() << std::endl; while ( num-- && (ev = q.peek()) ) { @@ -382,8 +381,8 @@ void serverMode(const char* iface, int port) { ep.subscribe_nosync({"/benchmark/events"}, [](caf::unit_t&) { }, - [&](caf::unit_t&, std::pair x) { - auto& msg = x.second; + [&](caf::unit_t&, broker::data_message x) { + auto msg = move_data(x); if ( broker::bro::Message::type(msg) == broker::bro::Message::Type::Event ) { broker::bro::Event ev(std::move(msg)); @@ -411,7 +410,7 @@ void serverMode(const char* iface, int port) { ep.subscribe_nosync({"/benchmark/terminate"}, [](caf::unit_t&) { }, - [&](caf::unit_t&, std::pair x) { + [&](caf::unit_t&, broker::data_message x) { terminate = true; }, [](caf::unit_t&, const caf::error&) { diff --git a/tests/benchmark/broker-stream-benchmark.cc b/tests/benchmark/broker-stream-benchmark.cc index 4b3175c2..3b1ae204 100644 --- a/tests/benchmark/broker-stream-benchmark.cc +++ b/tests/benchmark/broker-stream-benchmark.cc @@ -52,7 +52,7 @@ void sink_mode(broker::endpoint& ep, topic t) { [](caf::unit_t&) { // nop }, - [=](caf::unit_t&, std::vector>& xs) { + [=](caf::unit_t&, std::vector& xs) { global_count += xs.size(); }, [=](caf::unit_t&, const caf::error&) { @@ -65,14 +65,15 @@ void sink_mode(broker::endpoint& ep, topic t) { void source_mode(broker::endpoint& ep, topic t) { using namespace caf; - auto msg = std::make_pair(std::move(t), data{"Lorem ipsum dolor sit amet."}); + auto msg = make_data_message(t, "Lorem ipsum dolor sit amet."); auto worker = ep.publish_all( [](caf::unit_t&) { // nop }, - [=](caf::unit_t&, downstream>& out, size_t num) { + [=](caf::unit_t&, downstream& out, size_t num) { for (size_t i = 0; i < num; ++i) out.push(msg); + global_count += num; }, [=](const caf::unit_t&) { return false; diff --git a/tests/cpp/core.cc b/tests/cpp/core.cc index 5b15ee1a..ef3cbe72 100644 --- a/tests/cpp/core.cc +++ b/tests/cpp/core.cc @@ -25,9 +25,8 @@ struct driver_state { buf_type xs; static const char* name; void reset() { - xs - = buf_type{{"a", 0}, {"b", true}, {"a", 1}, {"a", 2}, {"b", false}, - {"b", true}, {"a", 3}, {"b", false}, {"a", 4}, {"a", 5}}; + xs = data_msgs({{"a", 0}, {"b", true}, {"a", 1}, {"a", 2}, {"b", false}, + {"b", true}, {"a", 3}, {"b", false}, {"a", 4}, {"a", 5}}); } driver_state() { reset(); @@ -190,17 +189,19 @@ CAF_TEST(local_peers) { consume_message(); self->receive( [](const buf& xs) { - buf expected{{"b", true}, {"b", false}, {"b", true}, {"b", false}}; + auto expected = data_msgs({{"b", true}, {"b", false}, + {"b", true}, {"b", false}}); CAF_REQUIRE_EQUAL(xs, expected); } ); CAF_MESSAGE("send message 'directly' from core1 to core2 (bypass streaming)"); anon_send(core1, atom::publish::value, endpoint_info{core2.node(), caf::none}, - topic("b"), data{true}); - expect((atom::publish, endpoint_info, topic, data), - from(_).to(core1).with(_, _, _, _)); - expect((atom::publish, atom::local, topic, data), - from(core1).to(core2).with(_, _, topic("b"), data{true})); + make_data_message(topic("b"), data{true})); + expect((atom::publish, endpoint_info, data_message), + from(_).to(core1).with(_, _, _)); + expect((atom::publish, atom::local, data_message), + from(core1).to(core2).with(_, _, + make_data_message(topic("b"), data{true}))); run(); CAF_MESSAGE("check log of the consumer again"); self->send(leaf, atom::get::value); @@ -208,8 +209,8 @@ CAF_TEST(local_peers) { consume_message(); self->receive( [](const buf& xs) { - buf expected{{"b", true}, {"b", false}, {"b", true}, - {"b", false}, {"b", true}}; + auto expected = data_msgs({{"b", true}, {"b", false}, {"b", true}, + {"b", false}, {"b", true}}); CAF_REQUIRE_EQUAL(xs, expected); } ); @@ -341,7 +342,8 @@ CAF_TEST(triangle_peering) { run(); // Check log of the consumers. using buf = std::vector; - buf expected{{"b", true}, {"b", false}, {"b", true}, {"b", false}}; + auto expected = data_msgs({{"b", true}, {"b", false}, + {"b", true}, {"b", false}}); for (auto& leaf : {leaf2, leaf3}) { self->send(leaf, atom::get::value); sched.prioritize(leaf); @@ -418,7 +420,8 @@ CAF_TEST(sequenced_peering) { run(); // Check log of the consumer on core2. using buf = std::vector; - buf expected{{"b", true}, {"b", false}, {"b", true}, {"b", false}}; + auto expected = data_msgs({{"b", true}, {"b", false}, + {"b", true}, {"b", false}}); self->send(leaf1, atom::get::value); sched.prioritize(leaf1); consume_message(); @@ -687,7 +690,8 @@ CAF_TEST(remote_peers_setup1) { exec_all(); earth.self->receive( [](const buf& xs) { - buf expected{{"b", true}, {"b", false}, {"b", true}, {"b", false}}; + auto expected = data_msgs({{"b", true}, {"b", false}, + {"b", true}, {"b", false}}); CAF_REQUIRE_EQUAL(xs, expected); } ); @@ -747,7 +751,8 @@ CAF_TEST(remote_peers_setup2) { mars.consume_message(); mars.self->receive( [](const buf& xs) { - buf expected{{"b", true}, {"b", false}, {"b", true}, {"b", false}}; + auto expected = data_msgs({{"b", true}, {"b", false}, + {"b", true}, {"b", false}}); CAF_REQUIRE_EQUAL(xs, expected); } ); diff --git a/tests/cpp/integration.cc b/tests/cpp/integration.cc index 94dc4563..2c82facb 100644 --- a/tests/cpp/integration.cc +++ b/tests/cpp/integration.cc @@ -52,9 +52,6 @@ using caf::unit_t; using caf::io::accept_handle; using caf::io::connection_handle; -// Useful type aliases. -using data_vector = std::vector; - namespace { configuration make_config() { @@ -123,7 +120,7 @@ struct peer_fixture { std::vector acceptors; // Stores all received items for subscribed topics. - data_vector data; + std::vector data; // Stores the interval between two credit rounds. caf::timespan credit_round_interval; @@ -178,7 +175,7 @@ struct peer_fixture { [](unit_t&) { // nop }, - [=](unit_t&, endpoint::value_type x) { + [=](unit_t&, data_message x) { data.emplace_back(std::move(x)); }, [](unit_t&, const caf::error&) { @@ -191,13 +188,14 @@ struct peer_fixture { // Publishes all `(t, xs)...` tuples. template void publish(topic t, Ts... xs) { - using buf_t = std::deque; - auto buf = std::make_shared(buf_t{std::make_pair(t, std::move(xs))...}); + using buf_t = std::deque; + auto buf + = std::make_shared(buf_t{make_data_message(t, std::move(xs))...}); ep.publish_all_nosync( [](unit_t&) { // nop }, - [=](unit_t&, caf::downstream& out, size_t num) { + [=](unit_t&, caf::downstream& out, size_t num) { auto n = std::min(num, buf->size()); CAF_MESSAGE("push" << n << "values downstream"); for (size_t i = 0u; i < n; ++i) @@ -342,16 +340,13 @@ CAF_TEST(topic_prefix_matching_async_subscribe) { mercury.publish("bro/events/failures", "oops", "sorry!"); mercury.publish("bro/events/logging", 123, 456); MESSAGE("verify published data"); - auto data = [](std::initializer_list xs) -> data_vector { - return xs; - }; - CAF_CHECK_EQUAL(mercury.data, data({})); - CAF_CHECK_EQUAL(venus.data, data({{"bro/events/failures", "oops"}, - {"bro/events/failures", "sorry!"}, - {"bro/events/logging", 123}, - {"bro/events/logging", 456}})); - CAF_CHECK_EQUAL(earth.data, data({{"bro/events/failures", "oops"}, - {"bro/events/failures", "sorry!"}})); + CAF_CHECK_EQUAL(mercury.data, data_msgs({})); + CAF_CHECK_EQUAL(venus.data, data_msgs({{"bro/events/failures", "oops"}, + {"bro/events/failures", "sorry!"}, + {"bro/events/logging", 123}, + {"bro/events/logging", 456}})); + CAF_CHECK_EQUAL(earth.data, data_msgs({{"bro/events/failures", "oops"}, + {"bro/events/failures", "sorry!"}})); venus.loop_after_next_enqueue(); venus.ep.unpeer("mercury", 4040); earth.loop_after_next_enqueue(); @@ -405,21 +400,22 @@ CAF_TEST(topic_prefix_matching_make_subscriber) { mercury.publish("bro/events/failures", "oops", "sorry!"); mercury.publish("bro/events/logging", 123, 456); MESSAGE("verify published data"); - auto data = [](std::initializer_list xs) -> data_vector { - return xs; - }; - CAF_CHECK_EQUAL(venus_s1.poll(), data({{"bro/events/failures", "oops"}, - {"bro/events/failures", "sorry!"}, - {"bro/events/logging", 123}, - {"bro/events/logging", 456}})); - CAF_CHECK_EQUAL(venus_s2.poll(), data({{"bro/events/failures", "oops"}, - {"bro/events/failures", "sorry!"}, - {"bro/events/logging", 123}, - {"bro/events/logging", 456}})); - CAF_CHECK_EQUAL(earth_s1.poll(), data({{"bro/events/failures", "oops"}, - {"bro/events/failures", "sorry!"}})); - CAF_CHECK_EQUAL(earth_s2.poll(), data({{"bro/events/failures", "oops"}, - {"bro/events/failures", "sorry!"}})); + CAF_CHECK_EQUAL(venus_s1.poll(), + data_msgs({{"bro/events/failures", "oops"}, + {"bro/events/failures", "sorry!"}, + {"bro/events/logging", 123}, + {"bro/events/logging", 456}})); + CAF_CHECK_EQUAL(venus_s2.poll(), + data_msgs({{"bro/events/failures", "oops"}, + {"bro/events/failures", "sorry!"}, + {"bro/events/logging", 123}, + {"bro/events/logging", 456}})); + CAF_CHECK_EQUAL(earth_s1.poll(), + data_msgs({{"bro/events/failures", "oops"}, + {"bro/events/failures", "sorry!"}})); + CAF_CHECK_EQUAL(earth_s2.poll(), + data_msgs({{"bro/events/failures", "oops"}, + {"bro/events/failures", "sorry!"}})); exec_loop(); venus.loop_after_next_enqueue(); venus.ep.unpeer("mercury", 4040); diff --git a/tests/cpp/master.cc b/tests/cpp/master.cc index facdb7c8..19df7699 100644 --- a/tests/cpp/master.cc +++ b/tests/cpp/master.cc @@ -47,8 +47,9 @@ CAF_TEST(local_master) { CAF_CHECK_EQUAL(n, "foo"); // send put command to the master's topic anon_send(core, atom::publish::value, atom::local::value, - n / topics::master_suffix, - make_internal_command("hello", "universe")); + make_command_message( + n / topics::master_suffix, + make_internal_command("hello", "universe"))); run(); // read back what we have written sched.inline_next_enqueue(); // ds.get talks to the master_actor (blocking) @@ -161,8 +162,8 @@ CAF_TEST(master_with_clone) { ds_mars.put("user", "neverlord"); expect_on(mars, (atom_value, internal_command), from(_).to(ds_mars.frontend()).with(atom::local::value, _)); - expect_on(mars, (atom_value, topic, internal_command), - from(_).to(mars.ep.core()).with(atom::publish::value, _, _)); + expect_on(mars, (atom_value, command_message), + from(_).to(mars.ep.core()).with(atom::publish::value, _)); exec_all(); earth.sched.inline_next_enqueue(); // .get talks to the master CAF_CHECK_EQUAL(value_of(ds_earth.get("user")), data{"neverlord"}); diff --git a/tests/cpp/publisher.cc b/tests/cpp/publisher.cc index a466069f..25e9866b 100644 --- a/tests/cpp/publisher.cc +++ b/tests/cpp/publisher.cc @@ -21,6 +21,7 @@ #include "broker/data.hh" #include "broker/endpoint.hh" #include "broker/filter_type.hh" +#include "broker/message.hh" #include "broker/publisher.hh" #include "broker/topic.hh" @@ -33,13 +34,12 @@ using namespace broker::detail; using namespace caf; -using value_type = std::pair; -using stream_type = stream>; +using stream_type = stream; namespace { struct consumer_state { - std::vector xs; + std::vector xs; }; behavior consumer(stateful_actor* self, @@ -55,7 +55,7 @@ behavior consumer(stateful_actor* self, // nop }, // Process single element. - [=](unit_t&, value_type x) { + [=](unit_t&, data_message x) { self->state.xs.emplace_back(std::move(x)); }, // Cleanup. @@ -97,7 +97,7 @@ CAF_TEST(blocking_publishers) { auto d2 = pub2.worker(); run(); // Data flows from our publishers to core1 to core2 and finally to leaf. - using buf = std::vector; + using buf = std::vector; // First, set of published messages gets filtered out at core2. pub1.publish(0); run(); @@ -116,7 +116,8 @@ CAF_TEST(blocking_publishers) { consume_message(); self->receive( [](const buf& xs) { - buf expected{{"a/b", true}, {"a/b", false}, {"a/b", true}}; + auto expected = data_msgs({{"a/b", true}, {"a/b", false}, + {"a/b", true}}); CAF_REQUIRE_EQUAL(xs, expected); } ); @@ -144,15 +145,16 @@ CAF_TEST(nonblocking_publishers) { // publish_all uses thread communication which would deadlock when using our // test_scheduler. We avoid this by pushing the call to publish_all to its // own thread. - using buf_type = std::vector; + using buf_type = std::vector; ep.publish_all_nosync( // Initialize send buffer with 10 elements. [](buf_type& xs) { - xs = buf_type{{"a", 0}, {"b", true}, {"a", 1}, {"a", 2}, {"b", false}, - {"b", true}, {"a", 3}, {"b", false}, {"a", 4}, {"a", 5}}; + xs = data_msgs({{"a", 0}, {"b", true}, {"a", 1}, {"a", 2}, + {"b", false}, {"b", true}, {"a", 3}, + {"b", false}, {"a", 4}, {"a", 5}}); }, // Get next element. - [](buf_type& xs, downstream& out, size_t num) { + [](buf_type& xs, downstream& out, size_t num) { auto n = std::min(num, xs.size()); for (size_t i = 0u; i < n; ++i) out.push(xs[i]); @@ -171,7 +173,8 @@ CAF_TEST(nonblocking_publishers) { consume_message(); self->receive( [](const buf_type& xs) { - buf_type expected{{"b", true}, {"b", false}, {"b", true}, {"b", false}}; + auto expected = data_msgs({{"b", true}, {"b", false}, + {"b", true}, {"b", false}}); CAF_REQUIRE_EQUAL(xs, expected); } ); diff --git a/tests/cpp/ssl.cc b/tests/cpp/ssl.cc index b1d408a3..f06ee19b 100644 --- a/tests/cpp/ssl.cc +++ b/tests/cpp/ssl.cc @@ -81,9 +81,8 @@ MESSAGE("prepare authenticated connection"); auto b = venus_auth.ep.peer("127.0.0.1", p); CAF_REQUIRE(b); - using value_type = std::pair; - value_type ping{"/broker/test", "ping"}; - value_type pong{"/broker/test", "pong"}; + data_message ping{"/broker/test", "ping"}; + data_message pong{"/broker/test", "pong"}; MESSAGE("mercury_auth sending ping"); mercury_auth.ep.publish({ping}); diff --git a/tests/cpp/status_subscriber.cc b/tests/cpp/status_subscriber.cc index 4b95f44f..16ea50c2 100644 --- a/tests/cpp/status_subscriber.cc +++ b/tests/cpp/status_subscriber.cc @@ -23,8 +23,6 @@ using namespace caf; using namespace broker; using namespace broker::detail; -using value_type = std::pair; - namespace { struct fixture : base_fixture { diff --git a/tests/cpp/subscriber.cc b/tests/cpp/subscriber.cc index b0902722..44b94bd0 100644 --- a/tests/cpp/subscriber.cc +++ b/tests/cpp/subscriber.cc @@ -17,6 +17,7 @@ #include "broker/data.hh" #include "broker/endpoint.hh" #include "broker/filter_type.hh" +#include "broker/message.hh" #include "broker/subscriber.hh" #include "broker/topic.hh" @@ -28,32 +29,28 @@ using namespace caf; using namespace broker; using namespace broker::detail; -using value_type = std::pair; - namespace { void driver(event_based_actor* self, const actor& sink) { - using buf_type = std::vector; + using buf_type = std::vector; self->make_source( // Destination. sink, // Initialize send buffer with 10 elements. [](buf_type& xs) { - xs = buf_type{{"a", 0}, {"b", true}, {"a", 1}, {"a", 2}, {"b", false}, - {"b", true}, {"a", 3}, {"b", false}, {"a", 4}, {"a", 5}}; + xs = data_msgs({{"a", 0}, {"b", true}, {"a", 1}, {"a", 2}, + {"b", false}, {"b", true}, {"a", 3}, {"b", false}, + {"a", 4}, {"a", 5}}); }, // Get next element. - [](buf_type& xs, downstream& out, size_t num) { + [](buf_type& xs, downstream& out, size_t num) { auto n = std::min(num, xs.size()); for (size_t i = 0u; i < n; ++i) out.push(xs[i]); xs.erase(xs.begin(), xs.begin() + static_cast(n)); }, // Did we reach the end?. - [](const buf_type& xs) { - return xs.empty(); - } - ); + [](const buf_type& xs) { return xs.empty(); }); } } // namespace @@ -86,8 +83,9 @@ CAF_TEST(blocking_subscriber) { CAF_MESSAGE("driver: " << to_string(d1)); run(); CAF_MESSAGE("check content of the subscriber's buffer"); - using buf = std::vector; - buf expected{{"b", true}, {"b", false}, {"b", true}, {"b", false}}; + using buf = std::vector; + auto expected = data_msgs({{"b", true}, {"b", false}, + {"b", true}, {"b", false}}); CAF_CHECK_EQUAL(sub.poll(), expected); // Shutdown. CAF_MESSAGE("Shutdown core actors."); @@ -109,14 +107,14 @@ CAF_TEST(nonblocking_subscriber) { self->send(core1, atom::peer::value, core2); run(); // Connect a subscriber (leaf) to core2. - using buf = std::vector; + using buf = std::vector; buf result; ep.subscribe_nosync( {"b"}, [](unit_t&) { // nop }, - [&](unit_t&, value_type x) { + [&](unit_t&, data_message x) { result.emplace_back(std::move(x)); }, [](unit_t&, const error&) { @@ -127,7 +125,8 @@ CAF_TEST(nonblocking_subscriber) { auto d1 = sys.spawn(driver, core1); // Communication is identical to the consumer-centric test in test/cpp/core.cc run(); - buf expected{{"b", true}, {"b", false}, {"b", true}, {"b", false}}; + auto expected = data_msgs({{"b", true}, {"b", false}, + {"b", true}, {"b", false}}); CAF_REQUIRE_EQUAL(result, expected); // Shutdown. CAF_MESSAGE("Shutdown core actors."); diff --git a/tests/test.hpp b/tests/test.hpp index 7ac760f1..422fd28c 100644 --- a/tests/test.hpp +++ b/tests/test.hpp @@ -95,4 +95,14 @@ inline caf::error error_of(caf::expected x) { return std::move(x.error()); } +/// Convenience function for creating a vector of events from topic and data +/// pairs. +inline std::vector +data_msgs(std::initializer_list> xs) { + std::vector result; + for (auto& x : xs) + result.emplace_back(x.first, x.second); + return result; +} + #endif From b4234aad6097e05a0afb0e3bee759de164a2e36f Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Thu, 13 Dec 2018 14:56:00 +0100 Subject: [PATCH 2/3] Port examples to new API --- doc/_examples/comm.cc | 6 +++--- doc/_examples/ping.cc | 4 ++-- doc/_examples/synopsis.cc | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/doc/_examples/comm.cc b/doc/_examples/comm.cc index de425096..6e74f02e 100644 --- a/doc/_examples/comm.cc +++ b/doc/_examples/comm.cc @@ -12,8 +12,8 @@ void f1() { endpoint ep; auto sub = ep.make_subscriber({"/topic/test"}); auto msg = sub.get(); -auto topic = msg.first; -auto data_ = msg.second; +auto topic = get_topic(msg); +auto data_ = get_data(msg); std::cout << "topic: " << topic << " data: " << data_ << std::endl; // --get-end @@ -24,7 +24,7 @@ if ( sub.available() ) msg = sub.get(); // Won't block now. for ( auto m : sub.poll() ) // Iterate over all available messages - std::cout << "topic: " << m.first << " data: " << m.second << std::endl; + std::cout << "topic: " << get_topic(m) << " data: " << get_data(m) << std::endl; // --poll-end /// diff --git a/doc/_examples/ping.cc b/doc/_examples/ping.cc index 588476ed..5a216b02 100644 --- a/doc/_examples/ping.cc +++ b/doc/_examples/ping.cc @@ -19,7 +19,7 @@ int main() { auto st = caf::get_if(&ss_res); if ( ! (st && st->code() == sc::peer_added) ) { std::cerr << "could not connect" << std::endl; - return 1; + return 1; } for ( int n = 0; n < 5; n++ ) { @@ -29,7 +29,7 @@ int main() { // Wait for "pong" reply event. auto msg = sub.get(); - bro::Event pong(std::move(msg.second)); + bro::Event pong(move_data(msg)); std::cout << "received " << pong.name() << pong.args() << std::endl; } diff --git a/doc/_examples/synopsis.cc b/doc/_examples/synopsis.cc index 6c57e475..74017c8b 100644 --- a/doc/_examples/synopsis.cc +++ b/doc/_examples/synopsis.cc @@ -14,7 +14,7 @@ int main() auto sub = ep.make_subscriber({"/test/t2"}); // Subscribe to incoming messages for topic. auto msg = sub.get(); // Wait for one incoming message. - std::cout << "got data for topic " << msg.first << ": " << msg.second << std::endl; + std::cout << "got data for topic " << get_topic(msg) << ": " << get_data(msg) << std::endl; // Data stores From a7e61a7288c4c93a9ffac96480a8e9af80c10287 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Sat, 2 Mar 2019 14:53:15 +0100 Subject: [PATCH 3/3] Improve documentation and simplify getters --- broker/message.hh | 59 ++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/broker/message.hh b/broker/message.hh index fc71839a..9590e4d6 100644 --- a/broker/message.hh +++ b/broker/message.hh @@ -1,5 +1,5 @@ -#ifndef BROKER_EVENT_HH -#define BROKER_EVENT_HH +#ifndef BROKER_MESSAGE_HH +#define BROKER_MESSAGE_HH #include @@ -28,6 +28,16 @@ struct node_message { uint16_t ttl; }; +/// Returns whether `x` contains a ::node_message. +inline bool is_data_message(const node_message& x) { + return caf::holds_alternative(x.content); +} + +/// Returns whether `x` contains a ::command_message. +inline bool is_command_message(const node_message& x) { + return caf::holds_alternative(x.content); +} + /// @relates node_message template typename Inspector::result_type inspect(Inspector& f, node_message& x) { @@ -68,27 +78,27 @@ inline const topic& get_topic(const command_message& x) { /// Retrieves the topic from a ::generic_message. inline const topic& get_topic(const node_message& x) { - if(caf::holds_alternative(x.content)) + if (is_data_message(x)) return get_topic(caf::get(x.content)); return get_topic(caf::get(x.content)); } -/// Moves the topic out of a ::data_message, copying it if more than one -/// reference exists.. +/// Moves the topic out of a ::data_message. Causes `x` to make a lazy copy of +/// its content if other ::data_message objects hold references to it. inline topic&& move_topic(data_message& x) { return std::move(get<0>(x.unshared())); } -/// Moves the topic out of a ::command_message, copying it if more than one -/// reference exists.. +/// Moves the topic out of a ::command_message. Causes `x` to make a lazy copy +/// of its content if other ::command_message objects hold references to it. inline topic&& move_topic(command_message& x) { return std::move(get<0>(x.unshared())); } -/// Moves the topic out of a ::generic_message, copying it if more than one -/// reference exists.. +/// Moves the topic out of a ::node_message. Causes `x` to make a lazy copy of +/// its content if other ::node_message objects hold references to it. inline topic&& move_topic(node_message& x) { - if( caf::holds_alternative(x.content)) + if (is_data_message(x)) return move_topic(caf::get(x.content)); return move_topic(caf::get(x.content)); } @@ -98,34 +108,25 @@ inline const data& get_data(const data_message& x) { return get<1>(x); } -/// Moves the data out of a ::data_message, copying it if more than one -/// reference exists. -inline const data&& move_data(data_message& x) { +/// Moves the data out of a ::data_message. Causes `x` to make a lazy copy of +/// its content if other ::data_message objects hold references to it. +inline data&& move_data(data_message& x) { return std::move(get<1>(x.unshared())); } -/// Retrieves the command from a ::command_message. + +/// Retrieves the command content from a ::command_message. inline const internal_command::variant_type& get_command(const command_message& x) { return get<1>(x).content; } -/// Moves the command out of a ::command_message, copying it if more than one -/// reference exists. -inline internal_command::variant_type -move_command(command_message& x) { +/// Moves the command content out of a ::command_message. Causes `x` to make a +/// lazy copy of its content if other ::command_message objects hold references +/// to it. +inline internal_command::variant_type&& move_command(command_message& x) { return std::move(get<1>(x.unshared()).content); } -/// Returns whether `x` contains a ::node_message. -inline bool is_data_message(const node_message& x) { - return caf::holds_alternative(x.content); -} - -/// Returns whether `x` contains a ::command_message. -inline bool is_command_message(const node_message& x) { - return caf::holds_alternative(x.content); -} - } // namespace broker -#endif // BROKER_EVENT_HH +#endif // BROKER_MESSAGE_HH