Skip to content

Commit

Permalink
Make sources produce events rather than chunks
Browse files Browse the repository at this point in the history
This change further streamlines and simplifies the ingestion process.
A SOURCE now simply produce sequences of events. In a cluster-in-box
setup where all actors run in the same NODE, this is more efficient,
because it avoid the extra decompression step at INDEX. Even in the
distributed case, it disburdens the CPU-intensive INDEX (no more chunk
decompression) and moves the computational load to ARCHIVE (compress
events into chunks).
  • Loading branch information
mavam committed Jun 2, 2015
1 parent 92068e6 commit 05a137a
Show file tree
Hide file tree
Showing 21 changed files with 330 additions and 424 deletions.
10 changes: 6 additions & 4 deletions doc/vast.1
@@ -1,4 +1,4 @@
.TH VAST 1 "May 27, 2015" 0.1 "Visibility Across Space and Time"
.TH VAST 1 "May 30, 2015" 0.1 "Visibility Across Space and Time"
.SH NAME
.PP
\fB\fCvast\fR \-\- interface to a VAST node
Expand Down Expand Up @@ -157,10 +157,12 @@ spawned on the connected node.
Available \fIactor\fP values with corresponding \fIparameters\fP:
.PP
\fIarchive\fP [\fIparameters\fP]
\fB\fC\-s\fR \fIsize\fP [\fI128\fP]
Maximum segment size in MB
\fB\fC\-c\fR \fIsegments\fP [\fI10\fP]
\fB\fC\-c\fR \fIcompression\fP [\fIlz4\fP]
Compression algorithm for chunks
\fB\fC\-s\fR \fIsegments\fP [\fI10\fP]
Number of cached segments
\fB\fC\-m\fR \fIsize\fP [\fI128\fP]
Maximum segment size in MB
.PP
\fIindex\fP [\fIparameters\fP]
\fB\fC\-a\fR \fIpartitions\fP [\fI5\fP]
Expand Down
8 changes: 5 additions & 3 deletions doc/vast.1.md
Expand Up @@ -173,10 +173,12 @@ Available *arguments*:
Available *actor* values with corresponding *parameters*:

*archive* [*parameters*]
`-s` *size* [*128*]
Maximum segment size in MB
`-c` *segments* [*10*]
`-c` *compression* [*lz4*]
Compression algorithm for chunks
`-s` *segments* [*10*]
Number of cached segments
`-m` *size* [*128*]
Maximum segment size in MB

*index* [*parameters*]
`-a` *partitions* [*5*]
Expand Down
4 changes: 2 additions & 2 deletions doc/vastd.1
@@ -1,7 +1,7 @@
.TH VASTD 1 "May 27, 2015" 0.1 "Visibility Across Space and Time"
.TH VASTD 1 "May 30, 2015" 0.1 "Visibility Across Space and Time"
.SH NAME
.PP
\fB\fCvastd\fR \-\- VAST daemon
\fB\fCvastd\fR \-\- VAST daemon running a node actor
.SH SYNOPSIS
.PP
\fB\fCvastd\fR [\fIoptions\fP]
Expand Down
19 changes: 12 additions & 7 deletions src/vast/actor/archive.cc
@@ -1,5 +1,7 @@
#include <caf/all.hpp>

#include "vast/chunk.h"
#include "vast/event.h"
#include "vast/actor/archive.h"
#include "vast/concept/serializable/chunk.h"
#include "vast/concept/serializable/io.h"
Expand All @@ -9,11 +11,13 @@ namespace vast {

using namespace caf;

archive::archive(path dir, size_t capacity, size_t max_segment_size)
archive::archive(path dir, size_t capacity, size_t max_segment_size,
io::compression compression)
: flow_controlled_actor{"archive"},
dir_{dir},
meta_data_filename_{dir_ / "meta.data"},
max_segment_size_{max_segment_size},
compression_{compression},
cache_{capacity}
{
VAST_ASSERT(max_segment_size_ > 0);
Expand Down Expand Up @@ -58,17 +62,18 @@ caf::behavior archive::make_behavior()
accountant_ = accountant;
send(accountant_, label() + "-events", time::now());
},
[=](chunk const& chk)
[=](std::vector<event> const& events)
{
VAST_DEBUG(this, "got chunk [" << chk.meta().ids.find_first() << ',' <<
(chk.meta().ids.find_last() + 1 ) << ')');
VAST_DEBUG(this, "got", events.size(), "events [" <<
events.front().id() << ',' << (events.back().id() + 1) << ')');
chunk chk{events, compression_};
auto too_large = current_size_ + chk.bytes() >= max_segment_size_;
if (! current_.empty() && too_large && ! flush())
return;
current_size_ += chk.bytes();
current_.insert(chk);
if (accountant_)
send(accountant_, chk.events(), time::snapshot());
send(accountant_, uint64_t{events.size()}, time::snapshot());
current_size_ += chk.bytes();
current_.insert(std::move(chk));
},
[=](flush_atom)
{
Expand Down
6 changes: 5 additions & 1 deletion src/vast/actor/archive.h
Expand Up @@ -7,6 +7,7 @@
#include "vast/filesystem.h"
#include "vast/uuid.h"
#include "vast/actor/actor.h"
#include "vast/io/compression.h"
#include "vast/util/cache.h"
#include "vast/util/flat_set.h"
#include "vast/util/range_map.h"
Expand All @@ -30,8 +31,10 @@ struct archive : flow_controlled_actor
/// @param dir The root directory of the archive.
/// @param capacity The number of segments to hold in memory.
/// @param max_segment_size The maximum size in MB of a segment.
/// @param compression The compression method to use for chunks.
/// @pre `max_segment_size > 0`
archive(path dir, size_t capacity, size_t max_segment_size);
archive(path dir, size_t capacity, size_t max_segment_size,
io::compression = io::lz4);

void on_exit();
caf::behavior make_behavior() override;
Expand All @@ -42,6 +45,7 @@ struct archive : flow_controlled_actor
path dir_;
path meta_data_filename_;
size_t max_segment_size_;
io::compression compression_;
util::range_map<event_id, uuid> segments_;
util::cache<uuid, segment> cache_;
segment current_;
Expand Down
20 changes: 9 additions & 11 deletions src/vast/actor/importer.cc
@@ -1,6 +1,6 @@
#include <caf/all.hpp>

#include "vast/chunk.h"
#include "vast/event.h"
#include "vast/actor/importer.h"

namespace vast {
Expand Down Expand Up @@ -64,7 +64,7 @@ behavior importer::make_behavior()
monitor(a);
index_ = a;
},
[=](chunk& chk)
[=](std::vector<event>& events)
{
if (identifier_ == invalid_actor)
{
Expand All @@ -84,23 +84,21 @@ behavior importer::make_behavior()
quit(exit::error);
return;
}
sync_send(identifier_, request_atom::value, chk.events()).then(
sync_send(identifier_, request_atom::value, uint64_t{events.size()}).then(
[=](id_atom, event_id from, event_id to) mutable
{
auto n = to - from;
VAST_DEBUG(this, "got", n,
"IDs for chunk [" << from << "," << to << ")");
if (n < chk.events())
if (n < events.size())
{
VAST_ERROR(this, "got", n, "IDs, needed", chk.events());
VAST_ERROR(this, "got", n, "IDs, needed", events.size());
quit(exit::error);
return;
}
default_bitstream ids;
ids.append(from, false);
ids.append(n, true);
chk.ids(std::move(ids));
auto t = make_message(std::move(chk));
for (auto& e : events)
e.id(from++);
auto t = make_message(std::move(events));
send(archive_, t);
send(index_, t);
},
Expand All @@ -110,7 +108,7 @@ behavior importer::make_behavior()
quit(exit::error);
},
catch_unexpected()
);
);
},
catch_unexpected()
};
Expand Down
34 changes: 18 additions & 16 deletions src/vast/actor/index.cc
Expand Up @@ -2,7 +2,7 @@

#include <caf/all.hpp>
#include "vast/bitmap_index.h"
#include "vast/chunk.h"
#include "vast/event.h"
#include "vast/print.h"
#include "vast/query_options.h"
#include "vast/actor/partition.h"
Expand Down Expand Up @@ -190,17 +190,17 @@ behavior index::make_behavior()
send(t, done_atom::value);
return t;
},
[=](chunk const& chk)
[=](std::vector<event> const& events)
{
auto& a = active_[next_active_++ % active_.size()];
auto i = partitions_.find(a.first);
VAST_ASSERT(i != partitions_.end());
VAST_ASSERT(a.second != invalid_actor);
// Replace partition with a new one on overflow. If the max is too small
// that even the first chunk doesn't fit, then we just accept this and
// have a one-chunk partition.
// that even the first batch doesn't fit, then we just accept this and
// have a partition with a single batch.
if (i->second.events > 0
&& i->second.events + chk.events() > max_events_per_partition_)
&& i->second.events + events.size() > max_events_per_partition_)
{
VAST_VERBOSE(this, "replaces partition (" << a.first << ')');
send_exit(a.second, exit::stop);
Expand All @@ -216,18 +216,20 @@ behavior index::make_behavior()
}
// Update partition meta data.
auto& p = i->second;
p.events += chk.events();
p.events += events.size();
p.last_modified = time::now();
if (p.from == time::duration{} || chk.meta().first < p.from)
p.from = chk.meta().first;
if (p.to == time::duration{} || chk.meta().last > p.to)
p.to = chk.meta().last;
// Relay chunk.
VAST_DEBUG(this, "forwards chunk [" << chk.base() << ',' << chk.base() +
chk.events() << ')', "to", a.second, '(' << a.first << ')');
auto t = spawn<task>(time::snapshot(), chk.events());
if (p.from == time::duration{} || events.front().timestamp() < p.from)
p.from = events.front().timestamp();
if (p.to == time::duration{} || events.back().timestamp() > p.to)
p.to = events.back().timestamp();
// Relay events.
VAST_DEBUG(this, "forwards", events.size(), "events [" <<
events.front().id() << ',' << (events.back().id() + 1) << ')',
"to", a.second, '(' << a.first << ')');
auto t = spawn<task>(time::snapshot(), uint64_t{events.size()});
send(t, supervisor_atom::value, this);
send(a.second, chk, t);
send(a.second, message::concat(current_message(),
make_message(std::move(t))));
},
[=](expression const& expr, query_options opts, actor const& subscriber)
{
Expand Down Expand Up @@ -290,7 +292,7 @@ behavior index::make_behavior()
qs.cont->task = spawn<task>(time::snapshot());
send(qs.cont->task, this);
// Relay the continuous query to all active partitions, as these may
// still receive chunks.
// still receive events.
for (auto& a : active_)
send(a.second, expr, continuous_atom::value);
}
Expand Down
22 changes: 19 additions & 3 deletions src/vast/actor/node.cc
Expand Up @@ -21,6 +21,7 @@
#include "vast/actor/exporter.h"
#include "vast/actor/node.h"
#include "vast/expr/normalize.h"
#include "vast/io/compression.h"
#include "vast/io/file_stream.h"
#include "vast/util/assert.h"
#include "vast/util/endpoint.h"
Expand Down Expand Up @@ -301,17 +302,32 @@ message node::spawn_actor(message const& msg)
},
on("archive", any_vals) >> [&]
{
io::compression method;
auto comp = "lz4"s;
uint64_t segments = 10;
uint64_t size = 128;
r = params.extract_opts({
{"segments,c", "maximum number of cached segments", segments},
{"size,s", "maximum size of segment before flushing (MB)", size}
{"compression,c", "compression method for event batches", comp},
{"segments,s", "maximum number of cached segments", segments},
{"size,m", "maximum size of segment before flushing (MB)", size}
});
if (! r.error.empty())
return make_message(error{std::move(r.error)});
if (comp == "null")
method = io::null;
else if (comp == "lz4")
method = io::lz4;
else if (comp == "snappy")
#ifdef VAST_HAVE_SNAPPY
method = io::snappy;
#else
return make_message(error{"not compiled with snappy support"});
#endif
else
return make_message(error{"unknown compression method: ", comp});
size <<= 20; // MB'ify
auto dir = dir_ / "archive";
auto a = spawn<archive, priority_aware>(dir, segments, size);
auto a = spawn<archive, priority_aware>(dir, segments, size, method);
attach_functor([=](uint32_t ec) { anon_send_exit(a, ec); });
send(a, put_atom::value, accountant_atom::value, accountant_);
return put({a, "archive", label});
Expand Down
14 changes: 0 additions & 14 deletions src/vast/actor/node_spawn.cc
Expand Up @@ -29,12 +29,10 @@ namespace vast {
message node::spawn_source(std::string const& label, message const& params)
{
auto batch_size = uint64_t{100000};
auto comp = "lz4"s;
auto schema_file = ""s;
auto input = ""s;
auto r = params.extract_opts({
{"batch,b", "number of events to ingest at once", batch_size},
{"compression,c", "compression method for event batches", comp},
{"schema,s", "alternate schema file", schema_file},
{"dump-schema,d", "print schema and exit"},
{"read,r", "path to read events from", input},
Expand Down Expand Up @@ -161,18 +159,6 @@ message node::spawn_source(std::string const& label, message const& params)
// Set parameters.
send(src, batch_atom::value, batch_size);
send(src, put_atom::value, accountant_atom::value, accountant_);
if (comp == "null")
send(src, io::null);
else if (comp == "lz4")
send(src, io::lz4);
else if (comp == "snappy")
#ifdef VAST_HAVE_SNAPPY
send(src, io::snappy);
#else
return make_message(error{"not compiled with snappy support"});
#endif
else
return make_message(error{"unknown compression method: ", comp});
// Save it.
terminate = false;
return put({src, "source", label});
Expand Down

0 comments on commit 05a137a

Please sign in to comment.