From 05a137a147f63a6072bbe98ad2a4296d528a9518 Mon Sep 17 00:00:00 2001 From: Matthias Vallentin Date: Mon, 1 Jun 2015 15:39:19 -0700 Subject: [PATCH] Make sources produce events rather than chunks This change further streamlines and simplifies the ingestion process. A SOURCE now simply produce sequences of events. In a cluster-in-box setup where all actors run in the same NODE, this is more efficient, because it avoid the extra decompression step at INDEX. Even in the distributed case, it disburdens the CPU-intensive INDEX (no more chunk decompression) and moves the computational load to ARCHIVE (compress events into chunks). --- doc/vast.1 | 10 +- doc/vast.1.md | 8 +- doc/vastd.1 | 4 +- src/vast/actor/archive.cc | 19 ++- src/vast/actor/archive.h | 6 +- src/vast/actor/importer.cc | 20 +-- src/vast/actor/index.cc | 34 ++-- src/vast/actor/node.cc | 22 ++- src/vast/actor/node_spawn.cc | 14 -- src/vast/actor/partition.cc | 165 ++++++------------- src/vast/actor/partition.h | 10 +- src/vast/actor/source/base.h | 116 +------------ src/vast/announce.cc | 33 +--- src/vast/concept/serializable/vector_event.h | 67 ++++++++ test/unit/fixtures/chunks.h | 51 ------ test/unit/fixtures/events.h | 55 +++++++ test/unit/tests/actor/index.cc | 28 ++-- test/unit/tests/actor/partition.cc | 42 ++--- test/unit/tests/actor/source_bgpdump.cc | 17 +- test/unit/tests/actor/source_pcap.cc | 8 +- test/unit/tests/serialization.cc | 25 +++ 21 files changed, 330 insertions(+), 424 deletions(-) create mode 100644 src/vast/concept/serializable/vector_event.h delete mode 100644 test/unit/fixtures/chunks.h create mode 100644 test/unit/fixtures/events.h diff --git a/doc/vast.1 b/doc/vast.1 index 189c4d85d86..2abc6a8cbc4 100644 --- a/doc/vast.1 +++ b/doc/vast.1 @@ -1,4 +1,4 @@ -.TH VAST 1 "May 27, 2015" 0.1 "Visibility Across Space and Time" +.TH VAST 1 "May 30, 2015" 0.1 "Visibility Across Space and Time" .SH NAME .PP \fB\fCvast\fR \-\- interface to a VAST node @@ -157,10 +157,12 @@ spawned on the connected node. Available \fIactor\fP values with corresponding \fIparameters\fP: .PP \fIarchive\fP [\fIparameters\fP] - \fB\fC\-s\fR \fIsize\fP [\fI128\fP] - Maximum segment size in MB - \fB\fC\-c\fR \fIsegments\fP [\fI10\fP] + \fB\fC\-c\fR \fIcompression\fP [\fIlz4\fP] + Compression algorithm for chunks + \fB\fC\-s\fR \fIsegments\fP [\fI10\fP] Number of cached segments + \fB\fC\-m\fR \fIsize\fP [\fI128\fP] + Maximum segment size in MB .PP \fIindex\fP [\fIparameters\fP] \fB\fC\-a\fR \fIpartitions\fP [\fI5\fP] diff --git a/doc/vast.1.md b/doc/vast.1.md index ee63ffa627d..fb81784b09d 100644 --- a/doc/vast.1.md +++ b/doc/vast.1.md @@ -173,10 +173,12 @@ Available *arguments*: Available *actor* values with corresponding *parameters*: *archive* [*parameters*] - `-s` *size* [*128*] - Maximum segment size in MB - `-c` *segments* [*10*] + `-c` *compression* [*lz4*] + Compression algorithm for chunks + `-s` *segments* [*10*] Number of cached segments + `-m` *size* [*128*] + Maximum segment size in MB *index* [*parameters*] `-a` *partitions* [*5*] diff --git a/doc/vastd.1 b/doc/vastd.1 index 5db5c97bd36..2a44cf6ff02 100644 --- a/doc/vastd.1 +++ b/doc/vastd.1 @@ -1,7 +1,7 @@ -.TH VASTD 1 "May 27, 2015" 0.1 "Visibility Across Space and Time" +.TH VASTD 1 "May 30, 2015" 0.1 "Visibility Across Space and Time" .SH NAME .PP -\fB\fCvastd\fR \-\- VAST daemon +\fB\fCvastd\fR \-\- VAST daemon running a node actor .SH SYNOPSIS .PP \fB\fCvastd\fR [\fIoptions\fP] diff --git a/src/vast/actor/archive.cc b/src/vast/actor/archive.cc index baee9d54ecc..f62643a5262 100644 --- a/src/vast/actor/archive.cc +++ b/src/vast/actor/archive.cc @@ -1,5 +1,7 @@ #include +#include "vast/chunk.h" +#include "vast/event.h" #include "vast/actor/archive.h" #include "vast/concept/serializable/chunk.h" #include "vast/concept/serializable/io.h" @@ -9,11 +11,13 @@ namespace vast { using namespace caf; -archive::archive(path dir, size_t capacity, size_t max_segment_size) +archive::archive(path dir, size_t capacity, size_t max_segment_size, + io::compression compression) : flow_controlled_actor{"archive"}, dir_{dir}, meta_data_filename_{dir_ / "meta.data"}, max_segment_size_{max_segment_size}, + compression_{compression}, cache_{capacity} { VAST_ASSERT(max_segment_size_ > 0); @@ -58,17 +62,18 @@ caf::behavior archive::make_behavior() accountant_ = accountant; send(accountant_, label() + "-events", time::now()); }, - [=](chunk const& chk) + [=](std::vector const& events) { - VAST_DEBUG(this, "got chunk [" << chk.meta().ids.find_first() << ',' << - (chk.meta().ids.find_last() + 1 ) << ')'); + VAST_DEBUG(this, "got", events.size(), "events [" << + events.front().id() << ',' << (events.back().id() + 1) << ')'); + chunk chk{events, compression_}; auto too_large = current_size_ + chk.bytes() >= max_segment_size_; if (! current_.empty() && too_large && ! flush()) return; - current_size_ += chk.bytes(); - current_.insert(chk); if (accountant_) - send(accountant_, chk.events(), time::snapshot()); + send(accountant_, uint64_t{events.size()}, time::snapshot()); + current_size_ += chk.bytes(); + current_.insert(std::move(chk)); }, [=](flush_atom) { diff --git a/src/vast/actor/archive.h b/src/vast/actor/archive.h index 47d5ffe29c9..042cc762f52 100644 --- a/src/vast/actor/archive.h +++ b/src/vast/actor/archive.h @@ -7,6 +7,7 @@ #include "vast/filesystem.h" #include "vast/uuid.h" #include "vast/actor/actor.h" +#include "vast/io/compression.h" #include "vast/util/cache.h" #include "vast/util/flat_set.h" #include "vast/util/range_map.h" @@ -30,8 +31,10 @@ struct archive : flow_controlled_actor /// @param dir The root directory of the archive. /// @param capacity The number of segments to hold in memory. /// @param max_segment_size The maximum size in MB of a segment. + /// @param compression The compression method to use for chunks. /// @pre `max_segment_size > 0` - archive(path dir, size_t capacity, size_t max_segment_size); + archive(path dir, size_t capacity, size_t max_segment_size, + io::compression = io::lz4); void on_exit(); caf::behavior make_behavior() override; @@ -42,6 +45,7 @@ struct archive : flow_controlled_actor path dir_; path meta_data_filename_; size_t max_segment_size_; + io::compression compression_; util::range_map segments_; util::cache cache_; segment current_; diff --git a/src/vast/actor/importer.cc b/src/vast/actor/importer.cc index 39a2d81c3e3..1b3c5a2557d 100644 --- a/src/vast/actor/importer.cc +++ b/src/vast/actor/importer.cc @@ -1,6 +1,6 @@ #include -#include "vast/chunk.h" +#include "vast/event.h" #include "vast/actor/importer.h" namespace vast { @@ -64,7 +64,7 @@ behavior importer::make_behavior() monitor(a); index_ = a; }, - [=](chunk& chk) + [=](std::vector& events) { if (identifier_ == invalid_actor) { @@ -84,23 +84,21 @@ behavior importer::make_behavior() quit(exit::error); return; } - sync_send(identifier_, request_atom::value, chk.events()).then( + sync_send(identifier_, request_atom::value, uint64_t{events.size()}).then( [=](id_atom, event_id from, event_id to) mutable { auto n = to - from; VAST_DEBUG(this, "got", n, "IDs for chunk [" << from << "," << to << ")"); - if (n < chk.events()) + if (n < events.size()) { - VAST_ERROR(this, "got", n, "IDs, needed", chk.events()); + VAST_ERROR(this, "got", n, "IDs, needed", events.size()); quit(exit::error); return; } - default_bitstream ids; - ids.append(from, false); - ids.append(n, true); - chk.ids(std::move(ids)); - auto t = make_message(std::move(chk)); + for (auto& e : events) + e.id(from++); + auto t = make_message(std::move(events)); send(archive_, t); send(index_, t); }, @@ -110,7 +108,7 @@ behavior importer::make_behavior() quit(exit::error); }, catch_unexpected() - ); + ); }, catch_unexpected() }; diff --git a/src/vast/actor/index.cc b/src/vast/actor/index.cc index ecb62850c78..00e193c3913 100644 --- a/src/vast/actor/index.cc +++ b/src/vast/actor/index.cc @@ -2,7 +2,7 @@ #include #include "vast/bitmap_index.h" -#include "vast/chunk.h" +#include "vast/event.h" #include "vast/print.h" #include "vast/query_options.h" #include "vast/actor/partition.h" @@ -190,17 +190,17 @@ behavior index::make_behavior() send(t, done_atom::value); return t; }, - [=](chunk const& chk) + [=](std::vector const& events) { auto& a = active_[next_active_++ % active_.size()]; auto i = partitions_.find(a.first); VAST_ASSERT(i != partitions_.end()); VAST_ASSERT(a.second != invalid_actor); // Replace partition with a new one on overflow. If the max is too small - // that even the first chunk doesn't fit, then we just accept this and - // have a one-chunk partition. + // that even the first batch doesn't fit, then we just accept this and + // have a partition with a single batch. if (i->second.events > 0 - && i->second.events + chk.events() > max_events_per_partition_) + && i->second.events + events.size() > max_events_per_partition_) { VAST_VERBOSE(this, "replaces partition (" << a.first << ')'); send_exit(a.second, exit::stop); @@ -216,18 +216,20 @@ behavior index::make_behavior() } // Update partition meta data. auto& p = i->second; - p.events += chk.events(); + p.events += events.size(); p.last_modified = time::now(); - if (p.from == time::duration{} || chk.meta().first < p.from) - p.from = chk.meta().first; - if (p.to == time::duration{} || chk.meta().last > p.to) - p.to = chk.meta().last; - // Relay chunk. - VAST_DEBUG(this, "forwards chunk [" << chk.base() << ',' << chk.base() + - chk.events() << ')', "to", a.second, '(' << a.first << ')'); - auto t = spawn(time::snapshot(), chk.events()); + if (p.from == time::duration{} || events.front().timestamp() < p.from) + p.from = events.front().timestamp(); + if (p.to == time::duration{} || events.back().timestamp() > p.to) + p.to = events.back().timestamp(); + // Relay events. + VAST_DEBUG(this, "forwards", events.size(), "events [" << + events.front().id() << ',' << (events.back().id() + 1) << ')', + "to", a.second, '(' << a.first << ')'); + auto t = spawn(time::snapshot(), uint64_t{events.size()}); send(t, supervisor_atom::value, this); - send(a.second, chk, t); + send(a.second, message::concat(current_message(), + make_message(std::move(t)))); }, [=](expression const& expr, query_options opts, actor const& subscriber) { @@ -290,7 +292,7 @@ behavior index::make_behavior() qs.cont->task = spawn(time::snapshot()); send(qs.cont->task, this); // Relay the continuous query to all active partitions, as these may - // still receive chunks. + // still receive events. for (auto& a : active_) send(a.second, expr, continuous_atom::value); } diff --git a/src/vast/actor/node.cc b/src/vast/actor/node.cc index 2a1fa6f55b7..495958b515a 100644 --- a/src/vast/actor/node.cc +++ b/src/vast/actor/node.cc @@ -21,6 +21,7 @@ #include "vast/actor/exporter.h" #include "vast/actor/node.h" #include "vast/expr/normalize.h" +#include "vast/io/compression.h" #include "vast/io/file_stream.h" #include "vast/util/assert.h" #include "vast/util/endpoint.h" @@ -301,17 +302,32 @@ message node::spawn_actor(message const& msg) }, on("archive", any_vals) >> [&] { + io::compression method; + auto comp = "lz4"s; uint64_t segments = 10; uint64_t size = 128; r = params.extract_opts({ - {"segments,c", "maximum number of cached segments", segments}, - {"size,s", "maximum size of segment before flushing (MB)", size} + {"compression,c", "compression method for event batches", comp}, + {"segments,s", "maximum number of cached segments", segments}, + {"size,m", "maximum size of segment before flushing (MB)", size} }); if (! r.error.empty()) return make_message(error{std::move(r.error)}); + if (comp == "null") + method = io::null; + else if (comp == "lz4") + method = io::lz4; + else if (comp == "snappy") +#ifdef VAST_HAVE_SNAPPY + method = io::snappy; +#else + return make_message(error{"not compiled with snappy support"}); +#endif + else + return make_message(error{"unknown compression method: ", comp}); size <<= 20; // MB'ify auto dir = dir_ / "archive"; - auto a = spawn(dir, segments, size); + auto a = spawn(dir, segments, size, method); attach_functor([=](uint32_t ec) { anon_send_exit(a, ec); }); send(a, put_atom::value, accountant_atom::value, accountant_); return put({a, "archive", label}); diff --git a/src/vast/actor/node_spawn.cc b/src/vast/actor/node_spawn.cc index b6f44a681f1..490e25bc58d 100644 --- a/src/vast/actor/node_spawn.cc +++ b/src/vast/actor/node_spawn.cc @@ -29,12 +29,10 @@ namespace vast { message node::spawn_source(std::string const& label, message const& params) { auto batch_size = uint64_t{100000}; - auto comp = "lz4"s; auto schema_file = ""s; auto input = ""s; auto r = params.extract_opts({ {"batch,b", "number of events to ingest at once", batch_size}, - {"compression,c", "compression method for event batches", comp}, {"schema,s", "alternate schema file", schema_file}, {"dump-schema,d", "print schema and exit"}, {"read,r", "path to read events from", input}, @@ -161,18 +159,6 @@ message node::spawn_source(std::string const& label, message const& params) // Set parameters. send(src, batch_atom::value, batch_size); send(src, put_atom::value, accountant_atom::value, accountant_); - if (comp == "null") - send(src, io::null); - else if (comp == "lz4") - send(src, io::lz4); - else if (comp == "snappy") -#ifdef VAST_HAVE_SNAPPY - send(src, io::snappy); -#else - return make_message(error{"not compiled with snappy support"}); -#endif - else - return make_message(error{"unknown compression method: ", comp}); // Save it. terminate = false; return put({src, "source", label}); diff --git a/src/vast/actor/partition.cc b/src/vast/actor/partition.cc index e54c8145c7d..cbeaab79eae 100644 --- a/src/vast/actor/partition.cc +++ b/src/vast/actor/partition.cc @@ -15,8 +15,8 @@ namespace vast { namespace { -// Accumulates all hits from a chunk, evalutes a query, and sends the result of -// the evaluation to PARTITION. +// Accumulates all hits from an event batch, evalutes a query, and sends the +// result of the evaluation to PARTITION. template struct continuous_query_proxy : default_actor { @@ -39,7 +39,7 @@ struct continuous_query_proxy : default_actor predicate_map const& map_; }; - // Accumulates hits from indexers for a single chunk. + // Accumulates hits from indexers for a single event batch. struct accumulator : default_actor { accumulator(std::vector exprs, actor sink) @@ -235,8 +235,6 @@ behavior partition::make_behavior() } if (remove_upstream_node(msg.source)) return; - if (dechunkifiers_.erase(msg.source) == 1) - return; auto i = std::find_if( indexers_.begin(), indexers_.end(), @@ -251,33 +249,6 @@ behavior partition::make_behavior() register_upstream_node(), [=](exit_msg const& msg) { - // Exit phase 2: after all dechunkifiers have exited we wait for all indexers - // and terminate afterwards. - auto exit_phase2 = - [reason=msg.reason, on_down, this](down_msg const& down) - { - on_down(down); - if (indexers_.empty()) - quit(reason); - }; - // Exit phase 1: wait for all dechunkifiers to exit. If we have indexers - // left, tell them to exit. - auto exit_phase1 = - [reason=msg.reason, on_down, exit_phase2, this](down_msg const& down) - { - on_down(down); - if (! dechunkifiers_.empty()) - return; - if (indexers_.empty()) - { - quit(reason); - return; - } - VAST_DEBUG(this, "enters exit phase 2 after phase 1"); - become(exit_phase2); - for (auto& i : indexers_) - send_exit(i.second, reason); - }; if (msg.reason == exit::kill) { if (proxy_) @@ -297,38 +268,41 @@ behavior partition::make_behavior() send_exit(q.second.task, msg.reason); if (proxy_) send_exit(proxy_, msg.reason); - // Begin the two-phase shutdown. - if (! dechunkifiers_.empty()) + if (indexers_.empty()) { - VAST_DEBUG(this, "enters exit phase 1"); - become(exit_phase1); - flush(); - return; + quit(msg.reason); } - if (! indexers_.empty()) + else { - VAST_DEBUG(this, "enters exit phase 2"); - become(exit_phase2); + VAST_DEBUG(this, "brings down all indexers"); for (auto& i : indexers_) send_exit(i.second, msg.reason); - flush(); - return; + become( + [reason=msg.reason, on_down, this](down_msg const& down) + { + // Terminate as soon as all indexers have exited. + on_down(down); + if (indexers_.empty()) + quit(reason); + } + ); } flush(); - quit(msg.reason); }, on_down, - [=](chunk const& chk, actor const& task) + [=](std::vector const& events, actor const& task) { - VAST_DEBUG(this, "got chunk with", chk.meta().schema.size(), "types and", - chk.events(), "events:", - '[' << chk.base() << ',' << (chk.base() + chk.events()) << ')'); - // Create event indexers according to chunk schema. - VAST_ASSERT(! chk.meta().schema.empty()); - auto base = chk.base(); - auto interval = to_string(base) + "-" + to_string(base + chk.events()); + VAST_DEBUG(this, "got", events.size(), "events [", + events.front().id() << ',' << (events.back().id() + 1) << ')'); + // Extract all unique types. + util::flat_set types; + for (auto& e : events) + types.insert(e.type()); + // Create one event indexer per type. + auto base = events.front().id(); + auto interval = to_string(base) + "-" + to_string(base + events.size()); std::vector indexers; - for (auto& t : chk.meta().schema) + for (auto& t : types) if (! t.find_attribute(type::attribute::skip)) { if (! schema_.add(t)) @@ -344,38 +318,30 @@ behavior partition::make_behavior() } if (indexers.empty()) { + VAST_WARN(this, "didn't find any types to index"); send_exit(task, exit::done); return; } - // Spin up a dechunkifier for this chunk and tell it to send all events - // to the freshly created indexers. - auto dechunkifier = spawn( - [chk, indexers=std::move(indexers), task, proxy=proxy_] - (event_based_actor* self) - { - VAST_ASSERT(! indexers.empty()); - auto events = chk.uncompress(); - VAST_ASSERT(! events.empty()); - auto msg = make_message(std::move(events), task); - for (auto& i : indexers) - { - self->send(task, i); - self->send(i, msg); - } - if (proxy != invalid_actor) - self->send(proxy, std::move(indexers)); - }); - dechunkifiers_.insert(dechunkifier->address()); - if (++chunks_indexed_concurrently_ > 10) // TODO: calibrate + for (auto& i : indexers) + { + send(task, i); + send(i, current_message()); + } + if (proxy_ != invalid_actor) + send(proxy_, std::move(indexers)); + events_indexed_concurrently_ += events.size(); + if (++events_indexed_concurrently_ > 1 << 20) // TODO: calibrate overloaded(true); send(task, supervisor_atom::value, this); - VAST_DEBUG(this, "indexes", chunks_indexed_concurrently_, - "chunks in parallel"); + VAST_DEBUG(this, "indexes", events_indexed_concurrently_, + "events in parallel"); }, [=](done_atom, time::moment start, uint64_t events) { VAST_DEBUG(this, "indexed", events, "events in", time::snapshot() - start); - if (--chunks_indexed_concurrently_ < 10) + VAST_ASSERT(events_indexed_concurrently_ > events); + events_indexed_concurrently_ -= events; + if (events_indexed_concurrently_ < 1 << 20) overloaded(false); }, [=](expression const& expr, continuous_atom) @@ -401,7 +367,7 @@ behavior partition::make_behavior() if (! q->second.task) { // Even if we still have evaluated this query in the past, we still - // spin up a new task to ensure that we incorporate results from chunks + // spin up a new task to ensure that we incorporate results from events // that have arrived in the meantime. VAST_DEBUG(this, "spawns new query task"); q->second.task = spawn(time::snapshot(), q->first); @@ -441,7 +407,7 @@ behavior partition::make_behavior() }, [=](done_atom, time::moment start, predicate const& pred) { - // Once we've completed all tasks of a certain predicate for all chunks, + // Once we've completed all tasks of a certain predicate for all events, // we evaluate all queries in which the predicate participates. auto& ps = predicates_[pred]; VAST_DEBUG(this, "took", time::snapshot() - start, @@ -471,39 +437,16 @@ behavior partition::make_behavior() }, [=](flush_atom, actor const& task) { - auto do_flush = [=] - { - VAST_DEBUG(this, "peforms flush"); - send(task, this); - for (auto& i : indexers_) - if (i.second) - { - send(task, i.second); - send(i.second, flush_atom::value, task); - } - flush(); - send(task, done_atom::value); - }; - VAST_DEBUG(this, "got flush request"); - if (dechunkifiers_.empty()) - { - do_flush(); - } - else - { - VAST_DEBUG(this, "waits for dechunkifiers to exit"); - become( - keep_behavior, - [=](down_msg const& msg) - { - on_down(msg); - if (dechunkifiers_.empty()) - { - do_flush(); - unbecome(); - } - }); - } + VAST_DEBUG(this, "peforms flush"); + send(task, this); + for (auto& i : indexers_) + if (i.second) + { + send(task, i.second); + send(i.second, flush_atom::value, task); + } + flush(); + send(task, done_atom::value); }, catch_unexpected() }; diff --git a/src/vast/actor/partition.h b/src/vast/actor/partition.h index df212b30154..0ebfe57e7aa 100644 --- a/src/vast/actor/partition.h +++ b/src/vast/actor/partition.h @@ -3,7 +3,6 @@ #include #include -#include "vast/chunk.h" #include "vast/expression.h" #include "vast/filesystem.h" #include "vast/schema.h" @@ -16,10 +15,8 @@ namespace vast { /// A horizontal partition of the index. /// -/// For each chunk PARTITION receives, it spawns (i) a dedicated DECHUNKIFIER -/// which turns the chunk back into a sequence of events, (ii) a set of -/// EVENT_INDEXERs for all event types occurring in the chunk, and (iii) -/// registers all EVENT_INDEXERs as sink of the DECHUNKIFIER. +/// For each event batch PARTITION receives, it spawns one EVENT_INDEXERs per +/// type occurring in the batch and forwards them the events. struct partition : flow_controlled_actor { using bitstream_type = default_bitstream; @@ -61,11 +58,10 @@ struct partition : flow_controlled_actor caf::actor sink_; caf::actor proxy_; schema schema_; - size_t chunks_indexed_concurrently_ = 0; + size_t events_indexed_concurrently_ = 0; std::multimap indexers_; std::map queries_; std::map predicates_; - std::set dechunkifiers_; }; } // namespace vast diff --git a/src/vast/actor/source/base.h b/src/vast/actor/source/base.h index 4eb68a2547f..4687e793fbd 100644 --- a/src/vast/actor/source/base.h +++ b/src/vast/actor/source/base.h @@ -4,76 +4,12 @@ #include #include "vast/event.h" -#include "vast/chunk.h" #include "vast/actor/actor.h" #include "vast/util/assert.h" #include "vast/util/result.h" namespace vast { namespace source { -namespace detail { - -class chunkifier : public flow_controlled_actor -{ -public: - chunkifier(caf::actor source, io::compression method) - : flow_controlled_actor{"chunkifier"}, - source_{source}, - compression_{method} - { - trap_exit(true); - } - - void on_exit() - { - source_ = caf::invalid_actor; - accountant_ = caf::invalid_actor; - } - - caf::behavior make_behavior() override - { - using namespace caf; - return - { - register_upstream_node(), - [=](exit_msg const& msg) - { - if (downgrade_exit()) - return; - quit(msg.reason); - }, - [=](down_msg const& msg) - { - if (remove_upstream_node(msg.source)) - return; - }, - [=](accountant_atom, actor const& accountant) - { - VAST_DEBUG(this, "registers accountant", accountant); - accountant_ = accountant; - send(accountant_, label() + "-events", time::now()); - }, - [=](std::vector const& events, caf::actor const& sink) - { - VAST_DEBUG(this, "forwards", events.size(), "events"); - send(sink, chunk{events, compression_}); - if (accountant_ != invalid_actor) - send(accountant_, uint64_t{events.size()}, time::snapshot()); - if (mailbox().count() > 50) - overloaded(true); - else if (overloaded()) - overloaded(false); - } - }; - } - -private: - caf::actor source_; - caf::actor accountant_; - io::compression compression_ = io::lz4; -}; - -} // namespace detail /// The base class for data sources which synchronously extract events /// one-by-one. @@ -90,7 +26,6 @@ class base : public flow_controlled_actor void on_exit() { accountant_ = caf::invalid_actor; - chunkifier_ = caf::invalid_actor; sinks_.clear(); } @@ -103,33 +38,16 @@ class base : public flow_controlled_actor { if (downgrade_exit()) return; - if (! chunkifier_) - { - this->quit(msg.reason); - return; - } if (! events_.empty()) - send_events(); - send_exit(chunkifier_, msg.reason); - become( - [=](down_msg const& down) - { - if (down.source == chunkifier_) - quit(msg.reason); - }); + send(sinks_[next_sink_++ % sinks_.size()], std::move(events_)); + quit(msg.reason); }, [=](down_msg const& msg) { - // Handle chunkifier termination. - if (msg.source == chunkifier_) - { - quit(msg.reason); - return; - } // Handle sink termination. auto sink = std::find_if( sinks_.begin(), sinks_.end(), - [this](auto& x) { return x->address() == current_sender(); }); + [&](auto& x) { return x->address() == msg.source; }); if (sink != sinks_.end()) sinks_.erase(sink); if (sinks_.empty()) @@ -148,10 +66,6 @@ class base : public flow_controlled_actor if (! done()) send(this, run_atom::value); }, - [=](io::compression method) - { - compression_ = method; - }, [=](batch_atom, uint64_t batch_size) { VAST_DEBUG(this, "sets batch size to", batch_size); @@ -184,14 +98,6 @@ class base : public flow_controlled_actor }, [=](run_atom) { - if (! chunkifier_) - { - chunkifier_ = spawn( - this, compression_); - if (accountant_) - send(chunkifier_, accountant_atom::value, accountant_); - send(chunkifier_, upstream_atom::value, this); - } if (sinks_.empty()) { VAST_ERROR(this, "cannot run without sinks"); @@ -214,9 +120,11 @@ class base : public flow_controlled_actor } if (! events_.empty()) { + VAST_VERBOSE(this, "produced", events_.size(), "events"); if (accountant_ != invalid_actor) send(accountant_, uint64_t{events_.size()}, time::snapshot()); - send_events(); + send(sinks_[next_sink_++ % sinks_.size()], std::move(events_)); + events_ = {}; } if (done()) send_exit(*this, exit::done); @@ -239,20 +147,8 @@ class base : public flow_controlled_actor } private: - void send_events() - { - using namespace caf; - VAST_ASSERT(! events_.empty()); - VAST_VERBOSE(this, "produced", events_.size(), "events"); - auto& sink = sinks_[next_sink_++ % sinks_.size()]; - send(chunkifier_, std::move(events_), sink); - events_ = {}; - } - bool done_ = false; - io::compression compression_ = io::lz4; caf::actor accountant_; - caf::actor chunkifier_; std::vector sinks_; size_t next_sink_ = 0; uint64_t batch_size_ = std::numeric_limits::max(); diff --git a/src/vast/announce.cc b/src/vast/announce.cc index eb8e647dda5..dc7e7e2f00f 100644 --- a/src/vast/announce.cc +++ b/src/vast/announce.cc @@ -23,6 +23,7 @@ #include "vast/concept/serializable/schema.h" #include "vast/concept/serializable/state.h" #include "vast/concept/serializable/type.h" +#include "vast/concept/serializable/vector_event.h" #include "vast/concept/serializable/caf/message.h" #include "vast/concept/serializable/std/array.h" #include "vast/concept/serializable/std/chrono.h" @@ -113,38 +114,6 @@ void announce_bmi_hierarchy(std::string const& bs_name) ); } -//class vector_event_type_info -// : public caf::detail::abstract_uniform_type_info> -//{ -//public: -// vector_event_type_info(io::compression method) -// : caf::detail::abstract_uniform_type_info>( -// "std::vector"), -// compression_{method} -// { -// } -// -//private: -// void serialize(void const* ptr, caf::serializer* sink) const final -// { -// caf_to_vast_serializer s{*sink}; -// auto events = reinterpret_cast const*>(ptr); -// chunk chk{*events, compression_}; -// s << chk; -// } -// -// void deserialize(void* ptr, caf::deserializer* source) const final -// { -// caf_to_vast_deserializer d{*source}; -// auto x = reinterpret_cast*>(ptr); -// chunk chk; -// d >> chk; -// *x = chk.uncompress(); -// } -// -// io::compression compression_; -//}; - } // namespace void announce_types() diff --git a/src/vast/concept/serializable/vector_event.h b/src/vast/concept/serializable/vector_event.h new file mode 100644 index 00000000000..1972a0defd5 --- /dev/null +++ b/src/vast/concept/serializable/vector_event.h @@ -0,0 +1,67 @@ +#ifndef VAST_CONCEPT_SERIALIZABLE_VECTOR_EVENT_H +#define VAST_CONCEPT_SERIALIZABLE_VECTOR_EVENT_H + +#include + +#include "vast/concept/serializable/data.h" +#include "vast/concept/serializable/state.h" +#include "vast/concept/serializable/type.h" +#include "vast/concept/serializable/caf/adapters.h" +#include "vast/concept/state/event.h" +#include "vast/concept/state/value.h" +#include "vast/concept/state/time.h" +#include "vast/util/assert.h" +#include "vast/util/flat_set.h" + +namespace vast { + +template +void serialize(Serializer& sink, std::vector const& events) +{ + util::flat_set digests; + sink << uint64_t{events.size()}; + for (auto& e : events) + { + auto digest = e.type().digest(); + sink << digest; + if (digests.count(digest) == 0) + { + digests.insert(digest); + sink << e.type(); + } + sink << e.data() << e.id() << e.timestamp(); + } +} + +template +void deserialize(Deserializer& source, std::vector& events) +{ + std::map types; + uint64_t size; + source >> size; + events.resize(size); + for (auto& e : events) + { + type::hash_type::digest_type digest; + source >> digest; + auto i = types.find(digest); + if (i == types.end()) + { + type t; + source >> t; + VAST_ASSERT(digest == t.digest()); + i = types.emplace(digest, std::move(t)).first; + } + data d; + event_id id; + time::point ts; + source >> d >> id >> ts; + e = value{std::move(d), i->second}; + e.id(id); + e.timestamp(ts); + } +} + +} // namespace vast + +#endif diff --git a/test/unit/fixtures/chunks.h b/test/unit/fixtures/chunks.h deleted file mode 100644 index b4911447170..00000000000 --- a/test/unit/fixtures/chunks.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef FIXTURES_CHUNKS_H -#define FIXTURES_CHUNKS_H - -#include - -#include "vast/event.h" -#include "vast/chunk.h" - -using namespace vast; - -namespace fixtures { - -struct chunks -{ - chunks() - { - type0 = type::record{{"c", type::count{}}, {"s", type::string{}}}; - type0.name("test_record_event"); - chunk::writer w0{chunk0}; - for (auto i = 0u; i < 1024u; ++i) - { - auto e = event::make(record{i, to_string(i)}, type0); - e.id(i); - e.timestamp(time::now()); - REQUIRE(w0.write(e)); - } - w0.flush(); - REQUIRE(chunk0.events() == 1024u); - - type1 = type::real{}; - type1.name("test_double_event"); - chunk::writer w1{chunk1}; - for (auto i = 0u; i < 500u; ++i) - { - auto e = event::make(4.2 + i, type1); - e.id(i + chunk0.events()); - e.timestamp(time::now()); - REQUIRE(w1.write(e)); - } - w1.flush(); - } - - chunk chunk0; - type type0; - chunk chunk1; - type type1; -}; - -} // namespace fixtures - -#endif diff --git a/test/unit/fixtures/events.h b/test/unit/fixtures/events.h new file mode 100644 index 00000000000..82905432ac2 --- /dev/null +++ b/test/unit/fixtures/events.h @@ -0,0 +1,55 @@ +#ifndef FIXTURES_EVENTS_H +#define FIXTURES_EVENTS_H + +#include + +#include "vast/event.h" + +using namespace vast; + +namespace fixtures { + +struct simple_events +{ + simple_events() + : events0(512), + events1(2048), + events(1024) + { + type0 = type::record{{"c", type::count{}}, {"s", type::string{}}}; + type0.name("test_record_event"); + type1 = type::record{{"r", type::real{}}, {"s", type::boolean{}}}; + type1.name("test_record_event2"); + + for (auto i = 0u; i < events0.size(); ++i) + { + events0[i] = event::make(record{i, std::to_string(i)}, type0); + events0[i].id(i); + } + + for (auto i = 0u; i < events1.size(); ++i) + { + events1[i] = event::make(record{4.2 + i, i % 2 == 0}, type1); + events1[i].id(events0.size() + i); + } + + for (auto i = 0u; i < events.size(); ++i) + { + if (i % 2 == 0) + events[i] = event::make(record{i, std::to_string(i)}, type0); + else + events[i] = event::make(record{4.2 + i, true}, type1); + events[i].id(events0.size() + events1.size() + i); + } + } + + type type0; + type type1; + std::vector events0; + std::vector events1; + std::vector events; +}; + +} // namespace fixtures + +#endif diff --git a/test/unit/tests/actor/index.cc b/test/unit/tests/actor/index.cc index d41763cb3ec..2da04ba7f60 100644 --- a/test/unit/tests/actor/index.cc +++ b/test/unit/tests/actor/index.cc @@ -1,29 +1,28 @@ #include -#include "vast/chunk.h" #include "vast/event.h" #include "vast/query_options.h" #include "vast/actor/index.h" #define SUITE actors #include "test.h" -#include "fixtures/chunks.h" +#include "fixtures/events.h" using namespace caf; using namespace vast; -FIXTURE_SCOPE(chunk_scope, fixtures::chunks) +FIXTURE_SCOPE(fixture_scope, fixtures::simple_events) TEST(index) { using bitstream_type = index::bitstream_type; - MESSAGE("sending chunks to index"); + MESSAGE("sending events to index"); path dir = "vast-test-index"; scoped_actor self; auto idx = self->spawn(dir, 500, 2, 3); - self->send(idx, chunk0); - self->send(idx, chunk1); + self->send(idx, events0); + self->send(idx, events1); MESSAGE("flushing index through termination"); self->send_exit(idx, exit::done); @@ -73,23 +72,16 @@ TEST(index) task = t; }); - MESSAGE("sending another chunk and getting continuous hits"); - std::vector events(2048); - for (size_t i = 0; i < events.size(); ++i) - { - auto j = 1524 + i; - events[i] = event::make(record{j, to_string(j)}, type0); - events[i].id(j); - } - self->send(idx, chunk{std::move(events)}); - self->receive([&](bitstream_type const& bs) { CHECK(bs.count() == 549); }); + MESSAGE("sending another event batch and getting continuous hits"); + self->send(idx, events); + self->receive([&](bitstream_type const& bs) { CHECK(bs.count() == 95); }); - MESSAGE("disabling continuous query and sending another chunk"); + MESSAGE("disabling continuous query and sending another event"); self->send(idx, *expr, continuous_atom::value, disable_atom::value); self->receive([&](down_msg const& msg) { CHECK(msg.source == task); }); auto e = event::make(record{1337u, to_string(1337)}, type0); e.id(4711); - self->send(idx, chunk{{std::move(e)}}); + self->send(idx, std::vector{std::move(e)}); // Make sure that we didn't get any new hits. CHECK(self->mailbox().count() == 0); diff --git a/test/unit/tests/actor/partition.cc b/test/unit/tests/actor/partition.cc index e0802dd8fa4..02f85338e6d 100644 --- a/test/unit/tests/actor/partition.cc +++ b/test/unit/tests/actor/partition.cc @@ -1,30 +1,33 @@ #include + +#include "vast/bitstream.h" #include "vast/event.h" #include "vast/actor/partition.h" #include "vast/actor/task.h" #define SUITE actors #include "test.h" -#include "fixtures/chunks.h" +#include "fixtures/events.h" using namespace caf; using namespace vast; -FIXTURE_SCOPE(chunk_scope, fixtures::chunks) +FIXTURE_SCOPE(fixture_scope, fixtures::simple_events) TEST(partition) { using bitstream_type = partition::bitstream_type; - MESSAGE("sending chunks to partition"); + MESSAGE("sending events to partition"); path dir = "vast-test-partition"; scoped_actor self; auto p = self->spawn(dir, self); - auto t = self->spawn(time::snapshot(), chunk0.events()); - self->send(p, chunk0, t); + auto t = self->spawn(time::snapshot(), + uint64_t{events0.size()}); + self->send(p, events0, t); self->receive([&](down_msg const& msg) { CHECK(msg.source == t); }); - t = self->spawn(time::snapshot(), chunk1.events()); - self->send(p, chunk1, t); + t = self->spawn(time::snapshot(), uint64_t{events1.size()}); + self->send(p, events1, t); self->receive([&](down_msg const& msg) { CHECK(msg.source == t); }); MESSAGE("flushing partition through termination"); @@ -57,16 +60,9 @@ TEST(partition) REQUIRE(expr); self->send(p, *expr, continuous_atom::value); - MESSAGE("sending another chunk"); - std::vector events(2048); - for (size_t i = 0; i < events.size(); ++i) - { - auto j = chunk0.events() + chunk1.events() + i; - events[i] = event::make(record{j, to_string(j)}, type0); - events[i].id(j); - } - t = self->spawn(time::snapshot(), 2048ull); - self->send(p, chunk{std::move(events)}, t); + MESSAGE("sending another event"); + t = self->spawn(time::snapshot(), uint64_t{events.size()}); + self->send(p, events, t); self->receive([&](down_msg const& msg) { CHECK(msg.source == t); }); MESSAGE("getting continuous hits"); @@ -74,16 +70,20 @@ TEST(partition) [&](expression const& e, bitstream_type const& hits, continuous_atom) { CHECK(*expr == e); - // (1524..3571).map { |x| x.to_s }.select { |x| x =~ /7/ }.length == 549 - CHECK(hits.count() == 549); + // (0..1024) + // .select{|x| x % 2 == 0} + // .map{|x| x.to_s} + // .select{|x| x =~ /7/} + // .length == 95 + CHECK(hits.count() == 95); }); - MESSAGE("disabling continuous query and sending another chunk"); + MESSAGE("disabling continuous query and sending another event"); self->send(p, *expr, continuous_atom::value, disable_atom::value); auto e = event::make(record{1337u, to_string(1337)}, type0); e.id(4711); t = self->spawn(time::snapshot(), 1ull); - self->send(p, chunk{{std::move(e)}}, t); + self->send(p, std::vector{std::move(e)}, t); self->receive([&](down_msg const& msg) { CHECK(msg.source == t); }); // Make sure that we didn't get any new hits. CHECK(self->mailbox().count() == 0); diff --git a/test/unit/tests/actor/source_bgpdump.cc b/test/unit/tests/actor/source_bgpdump.cc index a52a70af89f..e8908285c23 100644 --- a/test/unit/tests/actor/source_bgpdump.cc +++ b/test/unit/tests/actor/source_bgpdump.cc @@ -21,27 +21,26 @@ TEST(bgpdump_source) MESSAGE("running the source"); anon_send(bgpdump, run_atom::value); self->receive( - [&](chunk const& chk) + [&](std::vector const& events) { - auto v = chk.uncompress(); - CHECK(v.size() == 11782); - CHECK(v[0].type().name() == "bgpdump::state_change"); - auto r = get(v[0]); + REQUIRE(events.size() == 11782); + CHECK(events[0].type().name() == "bgpdump::state_change"); + auto r = get(events[0]); REQUIRE(r); CHECK((*r)[1] == *to
("2a02:20c8:1f:1::4")); CHECK((*r)[2] == 50304u); CHECK((*r)[3] == "3"); CHECK((*r)[4] == "2"); - CHECK(v[2].type().name() == "bgpdump::announcement"); - r = get(v[2]); + CHECK(events[2].type().name() == "bgpdump::announcement"); + r = get(events[2]); REQUIRE(r); CHECK((*r)[1] == *to
("2001:8e0:0:ffff::9")); auto as_path = get((*r)[4]); REQUIRE(as_path); CHECK(as_path->size() == 4); CHECK((*as_path)[3] == 15194u); - CHECK(v[13].type().name() == "bgpdump::withdrawn"); - r = get(v[13]); + CHECK(events[13].type().name() == "bgpdump::withdrawn"); + r = get(events[13]); REQUIRE(r); CHECK((*r)[1] == *to
("68.67.63.245")); CHECK((*r)[2] == 22652u); diff --git a/test/unit/tests/actor/source_pcap.cc b/test/unit/tests/actor/source_pcap.cc index b3d42dbfec3..2658f932449 100644 --- a/test/unit/tests/actor/source_pcap.cc +++ b/test/unit/tests/actor/source_pcap.cc @@ -22,10 +22,10 @@ TEST(pcap_source) MESSAGE("running the source"); anon_send(pcap, run_atom::value); self->receive( - [&](chunk const& chk) + [&](std::vector const& events) { - CHECK(chk.meta().schema.find_type("vast::packet") != nullptr); - CHECK(chk.events() == 44); + REQUIRE(events.size() == 44); + CHECK(events[0].type().name() == "vast::packet"); }, fail ); @@ -43,7 +43,7 @@ TEST(pcap_source) self->receive([&](upstream_atom, actor const& a) { CHECK(a == pcap); }); anon_send(pcap, run_atom::value); self->receive( - [&](chunk const& chk) { CHECK(chk.events() == 36); }, + [&](std::vector const& events) { CHECK(events.size() == 36); }, fail ); self->receive( diff --git a/test/unit/tests/serialization.cc b/test/unit/tests/serialization.cc index e7193fc5e75..55931ad1953 100644 --- a/test/unit/tests/serialization.cc +++ b/test/unit/tests/serialization.cc @@ -1,6 +1,7 @@ #include "vast/announce.h" #include "vast/concept/serializable/hierarchy.h" #include "vast/concept/serializable/state.h" +#include "vast/concept/serializable/vector_event.h" #include "vast/concept/serializable/std/list.h" #include "vast/concept/serializable/std/unordered_map.h" #include "vast/concept/serializable/std/vector.h" @@ -11,6 +12,7 @@ #define SUITE serialization #include "test.h" +#include "fixtures/events.h" using namespace vast; using namespace vast::util; @@ -251,3 +253,26 @@ TEST(polymorphic serialization) CHECK(d1->j == 1337); } } + +FIXTURE_SCOPE(events_scope, fixtures::simple_events) + +// The serialization of events goes through custom (de)serialization routines +// to avoid redudant type serialization. +TEST(vector serialization) +{ + std::string str; + { + auto out = io::make_container_output_stream(str); + binary_serializer bs{out}; + bs << events; + } + + std::vector deserialized; + auto in = io::make_container_input_stream(str); + binary_deserializer ds{in}; + ds >> deserialized; + + CHECK(events == deserialized); +} + +FIXTURE_SCOPE_END()