Skip to content

Commit

Permalink
Write homogenous partitions from the partition transformer
Browse files Browse the repository at this point in the history
This changes the partition transformer to write not just
one output partition but rather a separate partition for
every output type.

In general the logic is "all-or-nothing", ie. the partition
transformer returns an error if a single output partition
fails to be persisted to disk.

This has a number of downstream implications, in particular
the index now also returns a vector of new partition infos
and the catalog has been expanded with a few bulk handlers.
  • Loading branch information
lava committed May 25, 2022
1 parent 22dcefa commit 38a4729
Show file tree
Hide file tree
Showing 13 changed files with 569 additions and 279 deletions.
1 change: 1 addition & 0 deletions libvast/include/vast/fwd.hpp
Expand Up @@ -420,6 +420,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(vast_types, first_vast_type_id)
VAST_ADD_TYPE_ID((std::vector<vast::table_slice>))
VAST_ADD_TYPE_ID((std::vector<vast::table_slice_column>))
VAST_ADD_TYPE_ID((std::vector<vast::uuid>))
VAST_ADD_TYPE_ID((std::vector<vast::partition_info>))

VAST_ADD_TYPE_ID((vast::detail::framed<vast::table_slice>))
VAST_ADD_TYPE_ID((std::vector<vast::detail::framed<vast::table_slice>>))
Expand Down
5 changes: 5 additions & 0 deletions libvast/include/vast/index_statistics.hpp
Expand Up @@ -29,6 +29,11 @@ struct index_statistics {
/// The number of events for a given layout.
detail::heterogenous_string_hashmap<layout_statistics> layouts;

void merge_inplace(const index_statistics& other) {
for (auto const& [field, layout] : other.layouts)
layouts[field].count += layout.count;
}

template <class Inspector>
friend auto inspect(Inspector& f, index_statistics& x) {
return f(caf::meta::type_name("index_statistics"), x.layouts);
Expand Down
12 changes: 9 additions & 3 deletions libvast/include/vast/partition_synopsis.hpp
Expand Up @@ -108,19 +108,25 @@ struct partition_info {
time max_import_time;

/// How many events of each type the partition contains.
index_statistics stats;
// A `partition_info` is only created for new partitions, so
// it can not be a heterogenous legacy partition but must have
// exactly one type.
vast::type type;

template <class Inspector>
friend auto inspect(Inspector& f, partition_info& x) {
return f(caf::meta::type_name("partition_info"), x.uuid, x.events,
x.max_import_time, x.stats);
x.max_import_time, x.type);
}
};

/// A partition synopsis with some additional information.
// A `augmented_partition_synopsis` is only created for new
// partitions, so it can not be a heterogenous legacy partition
// but must have exactly one type.
struct augmented_partition_synopsis {
vast::uuid uuid;
index_statistics stats;
vast::type type;
partition_synopsis_ptr synopsis;
};

Expand Down
24 changes: 17 additions & 7 deletions libvast/include/vast/system/actors.hpp
Expand Up @@ -212,20 +212,27 @@ using partition_creation_listener_actor = typed_actor_fwd<

/// The CATALOG actor interface.
using catalog_actor = typed_actor_fwd<
// Bulk import a set of partition synopses.
// Reinitialize the catalog from a set of partition synopses. Used at
// startup, so the map is expected to be huge and we use a shared_ptr
// to be sure it's not accidentally copied.
caf::replies_to<
atom::merge,
std::shared_ptr<std::map<uuid, partition_synopsis_ptr>>>::with<atom::ok>,
// Merge a single partition synopsis.
caf::replies_to<atom::merge, uuid, partition_synopsis_ptr>::with< //
atom::ok>,
caf::replies_to<atom::merge, uuid, partition_synopsis_ptr>::with<atom::ok>,
// Merge a set of partition synopsis.
caf::replies_to<atom::merge,
std::vector<augmented_partition_synopsis>>::with<atom::ok>,
// Get *ALL* partition synopses stored in the catalog.
caf::replies_to<atom::get>::with<std::vector<partition_synopsis_pair>>,
// Erase a single partition synopsis.
caf::replies_to<atom::erase, uuid>::with<atom::ok>,
// Atomically remove one and merge another partition synopsis
caf::replies_to<atom::replace, uuid, uuid,
partition_synopsis_ptr>::with<atom::ok>,
// Atomatically replace a set of partititon synopses with another.
caf::replies_to<atom::replace, std::vector<uuid>,
std::vector<augmented_partition_synopsis>>::with<atom::ok>,
// Return the candidate partitions for an expression.
caf::replies_to<atom::candidates, vast::uuid,
vast::expression>::with<catalog_result>,
Expand Down Expand Up @@ -287,14 +294,16 @@ using index_actor = typed_actor_fwd<
query_cursor>,
// Erases the given partition from the INDEX.
caf::replies_to<atom::erase, uuid>::with<atom::done>,
// Erases the given set of partitions from the INDEX.
caf::replies_to<atom::erase, std::vector<uuid>>::with<atom::done>,
// Applies the given transformation to the partition.
// When keep_original_partition is yes: erases the existing partition and
// returns the synopsis of the new partition. If the partition is completely
// erased, returns the nil uuid. When keep_original_partition is no: does an
// in-place transform keeping the old ids, and makes a new partition
// preserving the old one(s).
caf::replies_to<atom::apply, transform_ptr, std::vector<uuid>,
keep_original_partition>::with<partition_info>,
keep_original_partition>::with<std::vector<partition_info>>,
// Makes the identity of the importer known to the index.
caf::reacts_to<atom::importer, idspace_distributor_actor>>
// Conform to the protocol of the STREAM SINK actor for table slices.
Expand Down Expand Up @@ -373,9 +382,9 @@ using filesystem_actor = typed_actor_fwd<

/// The interface of an BULK PARTITION actor.
using partition_transformer_actor = typed_actor_fwd<
// Persist transformed partition to given path.
caf::replies_to<atom::persist, std::filesystem::path,
std::filesystem::path>::with<augmented_partition_synopsis>,
// Persist the transformed partitions and return the generated
// partition synopses.
caf::replies_to<atom::persist>::with<std::vector<augmented_partition_synopsis>>,
// INTERNAL: Continuation handler for `atom::done`.
caf::reacts_to<atom::internal, atom::resume, atom::done, vast::id>>
// query::extract API
Expand Down Expand Up @@ -550,6 +559,7 @@ CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::partition_synopsis_ptr)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::partition_synopsis_pair)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(std::vector<vast::partition_synopsis_pair>)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::augmented_partition_synopsis)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(std::vector<vast::augmented_partition_synopsis>)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::transform_ptr)
#undef vast_uuid_synopsis_map

Expand Down
8 changes: 8 additions & 0 deletions libvast/include/vast/system/index.hpp
Expand Up @@ -161,10 +161,18 @@ struct index_state {
// Maps partitions to their expected location on the file system.
[[nodiscard]] std::filesystem::path partition_path(const uuid& id) const;

/// Returns a format string that can be formatted with a partition id to
/// get the location of the corresponding partition.
[[nodiscard]] std::string partition_path_template() const;

// Maps partition synopses to their expected location on the file system.
[[nodiscard]] std::filesystem::path
partition_synopsis_path(const uuid& id) const;

/// Returns a format string that can be formatted with a partition id to
/// get the location of the corresponding partition synopsis.
[[nodiscard]] std::string partition_synopsis_path_template() const;

caf::error load_from_disk();

void flush_to_disk();
Expand Down
73 changes: 53 additions & 20 deletions libvast/include/vast/system/partition_transformer.hpp
Expand Up @@ -10,6 +10,7 @@

#include "vast/fwd.hpp"

#include "vast/detail/flat_map.hpp"
#include "vast/index_statistics.hpp"
#include "vast/segment_builder.hpp"
#include "vast/system/active_partition.hpp"
Expand All @@ -23,27 +24,33 @@

namespace vast::system {

/// Helper class used to route table slices to the correct store.
struct partition_transformer_selector {
bool
operator()(const vast::type& filter, const vast::table_slice& column) const;
};

/// Similar to the active partition, but all contents come in a single
/// stream, a transform is applied and no queries need to be answered
/// while the partition is constructed.
struct partition_transformer_state {
static constexpr const char* name = "partition-transformer";

struct stream_data {
caf::expected<chunk_ptr> partition_chunk = caf::no_error;
caf::expected<chunk_ptr> synopsis_chunk = caf::no_error;
caf::expected<std::vector<std::tuple<vast::uuid, vast::type, chunk_ptr>>>
partition_chunks = caf::no_error;
caf::expected<std::vector<std::tuple<vast::uuid, chunk_ptr>>> synopsis_chunks
= caf::no_error;
};

struct path_data {
std::filesystem::path partition_path = {};
std::filesystem::path synopsis_path = {};
caf::typed_response_promise<augmented_partition_synopsis> promise = {};
caf::typed_response_promise<std::vector<augmented_partition_synopsis>> promise
= {};
};

partition_transformer_state() = default;

void add_slice(const table_slice& slice);
void finalize_data();
void fulfill(
partition_transformer_actor::stateful_pointer<partition_transformer_state>
self,
Expand All @@ -60,48 +67,72 @@ struct partition_transformer_state {
accountant_actor accountant = {};

/// Actor handle of the store builder for this partition.
store_builder_actor store_builder = {};
detail::flat_map<type, store_builder_actor> store_builders = {};

/// Actor handle of the filesystem actor.
filesystem_actor fs = {};

/// The transform to be applied to the data.
transform_ptr transform = {};

/// The stream stage to send table slices to the store.
caf::stream_stage_ptr<table_slice,
caf::broadcast_downstream_manager<table_slice>>
stage = {};
/// The stream stage to send table slices to the store(s).
// TODO: Use a specialized downstream manager that has
// a map from layout to store.
using partition_transformer_stream_stage_ptr = caf::stream_stage_ptr<
table_slice, caf::broadcast_downstream_manager<
table_slice, vast::type, partition_transformer_selector>>;

partition_transformer_stream_stage_ptr stage = {};

/// Cached stream error, if the stream terminated abnormally.
caf::error stream_error = {};

/// Cached transform error, if the transform returns one.
caf::error transform_error = {};

/// Cached table slices in this partition.
std::vector<table_slice> slices = {};

/// The maximum number of events in this partition. (not really necessary, but
/// The maximum number of events per partition. (not really necessary, but
/// required by the partition synopsis)
size_t partition_capacity = 0ull;

/// Total number of rows in `slices`.
/// Total number of rows in all transformed `slices`.
size_t events = 0ull;

/// Number of rows per event type.
index_statistics stats;
/// Number of rows per event type in the input and output.
index_statistics stats_in;
index_statistics stats_out;

/// The data of the newly created partition.
active_partition_state::serialization_data data = {};
/// Oldest import timestamp of the input data.
vast::time min_import_time = {};

/// Newest import timestamp of the input data.
vast::time max_import_time = {};

/// The data of the newly created partition(s).
detail::flat_map<type, active_partition_state::serialization_data> data = {};

/// Stores the value index for each field.
// Fields with a `#skip` attribute are stored as `nullptr`.
detail::stable_map<qualified_record_field, value_index_ptr> indexers = {};
using value_index_map
= detail::stable_map<qualified_record_field, value_index_ptr>;
detail::flat_map<vast::type, value_index_map> indexers = {};

/// Store id for partitions.
std::string store_id;

/// Options for creating new synopses.
index_config synopsis_opts = {};

/// Options for creating new value indices.
caf::settings index_opts = {};

// Two format strings that can be formatted with a `vast::uuid`
// as the single parameter. They give the
std::string partition_path_template;
std::string synopsis_path_template;

/// The actor waits until both the stream is finished and an `atom::persist`
/// has arrived. Depending on what happens first, a different set of
/// variables need to be stored in the meantime.
Expand All @@ -113,12 +144,14 @@ struct partition_transformer_state {
};

/// Spawns a PARTITION TRANSFORMER actor with the given parameters.
/// This actor
partition_transformer_actor::behavior_type partition_transformer(
partition_transformer_actor::stateful_pointer<partition_transformer_state>,
uuid id, std::string store_id, const index_config& synopsis_opts,
std::string store_id, const index_config& synopsis_opts,
const caf::settings& index_opts, accountant_actor accountant,
idspace_distributor_actor idspace_distributor,
type_registry_actor type_registry, filesystem_actor fs,
transform_ptr transform);
transform_ptr transform, std::string partition_path_template,
std::string synopsis_path_template);

} // namespace vast::system
13 changes: 13 additions & 0 deletions libvast/src/system/catalog.cpp
Expand Up @@ -379,6 +379,11 @@ catalog(catalog_actor::stateful_pointer<catalog_state> self,
self->state.merge(partition, std::move(synopsis));
return atom::ok_v;
},
[=](atom::merge, std::vector<augmented_partition_synopsis> v) -> atom::ok {
for (auto& aps : v)
self->state.merge(aps.uuid, aps.synopsis);
return atom::ok_v;
},
[=](atom::get) -> std::vector<partition_synopsis_pair> {
std::vector<partition_synopsis_pair> result;
result.reserve(self->state.synopses.size());
Expand All @@ -400,6 +405,14 @@ catalog(catalog_actor::stateful_pointer<catalog_state> self,
self->state.erase(old_partition);
return atom::ok_v;
},
[=](atom::replace, std::vector<uuid> old_uuids,
std::vector<augmented_partition_synopsis> new_synopses) -> atom::ok {
for (auto const& uuid : old_uuids)
self->state.erase(uuid);
for (auto& aps : new_synopses)
self->state.merge(aps.uuid, aps.synopsis);
return atom::ok_v;
},
[=](atom::candidates, vast::uuid lookup_id,
const vast::expression& expr) -> caf::result<catalog_result> {
auto start = std::chrono::steady_clock::now();
Expand Down
2 changes: 1 addition & 1 deletion libvast/src/system/eraser.cpp
Expand Up @@ -96,7 +96,7 @@ eraser(eraser_actor::stateful_pointer<eraser_state> self,
transform, result.partitions,
keep_original_partition::no)
.then(
[self, rp](const partition_info&) mutable {
[self, rp](const std::vector<partition_info>&) mutable {
VAST_DEBUG("{} applied filter transform with query {}", *self,
self->state.query_);
rp.deliver(atom::ok_v);
Expand Down

0 comments on commit 38a4729

Please sign in to comment.