Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write homogenous partitions from the partition transformer #2277

Merged
merged 15 commits into from Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/changes/2277--homogenous-partitions.md
@@ -0,0 +1,3 @@
Partition transforms now will always emit homogenous
partitions, which should make compaction and aging more
efficient.
102 changes: 102 additions & 0 deletions libvast/include/vast/detail/fanout_counter.hpp
@@ -0,0 +1,102 @@
// _ _____ __________
// | | / / _ | / __/_ __/ Visibility
// | |/ / __ |_\ \ / / Across
// |___/_/ |_/___/ /_/ Space and Time
//
// SPDX-FileCopyrightText: (c) 2021 The VAST Contributors
// SPDX-License-Identifier: BSD-3-Clause

#pragma once

#include <caf/response_promise.hpp>

namespace vast::detail {

struct fanout_empty_state {};

/// A counter that can be used to keep track of N fan-out requests
/// and trigger a continuation after all of them have returned.
/// If any of the requests returned an error, the error continuation
/// will be triggered instead.
/// Can optionally take a state that can be shared between all of
/// the individual requests.
/// The success continuation has the signature `void()` and the error
/// continuation has the signature `void(caf::error&&)`, or `void(State&&)`
/// and `void(State&&, caf::error&&)` if a state is used.
/// It is assumed that all calls to `fanout_counter` will come from
/// the same actor context, so no attempt at synchronization is made.
template <typename State, typename SuccessContinuation,
typename ErrorContinuation>
requires(std::is_default_constructible_v<State>)
struct fanout_counter {
public:
fanout_counter(size_t expected, SuccessContinuation then,
ErrorContinuation error)
: success_count(0),
error_count(0),
expected(expected),
last_error(caf::none),
state_(State{}),
then(then),
error(error) {
}

void receive_success() {
++success_count;
if (success_count + error_count == expected)
finish();
}

void receive_error(caf::error error) {
++error_count;
last_error = std::move(error);
if (success_count + error_count == expected)
finish();
}

State& state() {
return state_;
}

private:
void finish() {
if constexpr (std::is_same_v<State, fanout_empty_state>) {
if (error_count > 0)
error(std::move(last_error));
else
then();
} else {
if (error_count > 0)
error(std::move(state_), std::move(last_error));
else
then(std::move(state_));
}
}

size_t success_count;
size_t error_count;
size_t expected;
caf::error last_error;
State state_;
SuccessContinuation then;
ErrorContinuation error;
};

template <typename Continuation, typename ErrorContinuation>
auto make_fanout_counter(size_t expected, Continuation&& then,
ErrorContinuation&& error) {
using counter
= fanout_counter<fanout_empty_state, Continuation, ErrorContinuation>;
return std::make_shared<counter>(expected, std::forward<Continuation>(then),
std::forward<ErrorContinuation>(error));
}

template <typename State, typename Continuation, typename ErrorContinuation>
auto make_fanout_counter(size_t expected, Continuation&& then,
ErrorContinuation&& error) {
using counter = fanout_counter<State, Continuation, ErrorContinuation>;
return std::make_shared<counter>(expected, std::forward<Continuation>(then),
std::forward<ErrorContinuation>(error));
}

} // namespace vast::detail
1 change: 1 addition & 0 deletions libvast/include/vast/fwd.hpp
Expand Up @@ -419,6 +419,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(vast_types, first_vast_type_id)
VAST_ADD_TYPE_ID((std::vector<vast::table_slice>))
VAST_ADD_TYPE_ID((std::vector<vast::table_slice_column>))
VAST_ADD_TYPE_ID((std::vector<vast::uuid>))
VAST_ADD_TYPE_ID((std::vector<vast::partition_info>))

VAST_ADD_TYPE_ID((vast::detail::framed<vast::table_slice>))
VAST_ADD_TYPE_ID((std::vector<vast::detail::framed<vast::table_slice>>))
Expand Down
5 changes: 5 additions & 0 deletions libvast/include/vast/index_statistics.hpp
Expand Up @@ -29,6 +29,11 @@ struct index_statistics {
/// The number of events for a given layout.
detail::heterogenous_string_hashmap<layout_statistics> layouts;

void merge_inplace(const index_statistics& other) {
for (auto const& [field, layout] : other.layouts)
layouts[field].count += layout.count;
}

template <class Inspector>
friend auto inspect(Inspector& f, index_statistics& x) {
return f(caf::meta::type_name("index_statistics"), x.layouts);
Expand Down
12 changes: 9 additions & 3 deletions libvast/include/vast/partition_synopsis.hpp
Expand Up @@ -108,19 +108,25 @@ struct partition_info {
time max_import_time;

/// How many events of each type the partition contains.
index_statistics stats;
// A `partition_info` is only created for new partitions, so
// it can not be a heterogenous legacy partition but must have
// exactly one type.
vast::type type;

template <class Inspector>
friend auto inspect(Inspector& f, partition_info& x) {
return f(caf::meta::type_name("partition_info"), x.uuid, x.events,
x.max_import_time, x.stats);
x.max_import_time, x.type);
}
};

/// A partition synopsis with some additional information.
// A `augmented_partition_synopsis` is only created for new
// partitions, so it can not be a heterogenous legacy partition
// but must have exactly one type.
struct augmented_partition_synopsis {
vast::uuid uuid;
index_statistics stats;
vast::type type;
partition_synopsis_ptr synopsis;
};

Expand Down
3 changes: 3 additions & 0 deletions libvast/include/vast/plugin.hpp
Expand Up @@ -250,6 +250,9 @@ class store_plugin : public virtual plugin {
using builder_and_header = std::pair<system::store_builder_actor, chunk_ptr>;

/// Create a store builder actor that accepts incoming table slices.
/// The store builder is required to keep a reference to itself alive
/// as long as its input stream is live, and persist itself and exit as
/// soon as the input stream terminates.
/// @param accountant The actor handle of the accountant.
/// @param fs The actor handle of a filesystem.
/// @param id The partition id for which we want to create a store. Can
Expand Down
27 changes: 17 additions & 10 deletions libvast/include/vast/system/actors.hpp
Expand Up @@ -192,20 +192,24 @@ using partition_creation_listener_actor = typed_actor_fwd<

/// The CATALOG actor interface.
using catalog_actor = typed_actor_fwd<
// Bulk import a set of partition synopses.
// Reinitialize the catalog from a set of partition synopses. Used at
// startup, so the map is expected to be huge and we use a shared_ptr
// to be sure it's not accidentally copied.
caf::replies_to<
atom::merge,
std::shared_ptr<std::map<uuid, partition_synopsis_ptr>>>::with<atom::ok>,
// Merge a single partition synopsis.
caf::replies_to<atom::merge, uuid, partition_synopsis_ptr>::with< //
atom::ok>,
caf::replies_to<atom::merge, uuid, partition_synopsis_ptr>::with<atom::ok>,
// Merge a set of partition synopsis.
caf::replies_to<atom::merge,
std::vector<augmented_partition_synopsis>>::with<atom::ok>,
// Get *ALL* partition synopses stored in the catalog.
caf::replies_to<atom::get>::with<std::vector<partition_synopsis_pair>>,
// Erase a single partition synopsis.
caf::replies_to<atom::erase, uuid>::with<atom::ok>,
// Atomically remove one and merge another partition synopsis
caf::replies_to<atom::replace, uuid, uuid,
partition_synopsis_ptr>::with<atom::ok>,
// Atomatically replace a set of partititon synopses with another.
caf::replies_to<atom::replace, std::vector<uuid>,
std::vector<augmented_partition_synopsis>>::with<atom::ok>,
lava marked this conversation as resolved.
Show resolved Hide resolved
// Return the candidate partitions for an expression.
caf::replies_to<atom::candidates, vast::uuid,
vast::expression>::with<catalog_result>,
Expand Down Expand Up @@ -261,14 +265,16 @@ using index_actor = typed_actor_fwd<
caf::reacts_to<uuid, uint32_t>,
// Erases the given partition from the INDEX.
caf::replies_to<atom::erase, uuid>::with<atom::done>,
// Erases the given set of partitions from the INDEX.
caf::replies_to<atom::erase, std::vector<uuid>>::with<atom::done>,
// Applies the given transformation to the partition.
// When keep_original_partition is yes: erases the existing partition and
// returns the synopsis of the new partition. If the partition is completely
// erased, returns the nil uuid. When keep_original_partition is no: does an
// in-place transform keeping the old ids, and makes a new partition
// preserving the old one(s).
caf::replies_to<atom::apply, transform_ptr, std::vector<uuid>,
keep_original_partition>::with<partition_info>,
keep_original_partition>::with<std::vector<partition_info>>,
// Makes the identity of the importer known to the index.
caf::reacts_to<atom::importer, idspace_distributor_actor>>
// Conform to the protocol of the STREAM SINK actor for table slices.
Expand Down Expand Up @@ -345,9 +351,9 @@ using filesystem_actor = typed_actor_fwd<

/// The interface of an BULK PARTITION actor.
using partition_transformer_actor = typed_actor_fwd<
// Persist transformed partition to given path.
caf::replies_to<atom::persist, std::filesystem::path,
std::filesystem::path>::with<augmented_partition_synopsis>,
// Persist the transformed partitions and return the generated
// partition synopses.
caf::replies_to<atom::persist>::with<std::vector<augmented_partition_synopsis>>,
// INTERNAL: Continuation handler for `atom::done`.
caf::reacts_to<atom::internal, atom::resume, atom::done, vast::id>>
// query::extract API
Expand Down Expand Up @@ -519,6 +525,7 @@ CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::partition_synopsis_ptr)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::partition_synopsis_pair)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(std::vector<vast::partition_synopsis_pair>)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::augmented_partition_synopsis)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(std::vector<vast::augmented_partition_synopsis>)
CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::transform_ptr)
#undef vast_uuid_synopsis_map

Expand Down
8 changes: 8 additions & 0 deletions libvast/include/vast/system/index.hpp
Expand Up @@ -161,10 +161,18 @@ struct index_state {
// Maps partitions to their expected location on the file system.
[[nodiscard]] std::filesystem::path partition_path(const uuid& id) const;

/// Returns a format string that can be formatted with a partition id to
/// get the location of the corresponding partition.
[[nodiscard]] std::string partition_path_template() const;

// Maps partition synopses to their expected location on the file system.
[[nodiscard]] std::filesystem::path
partition_synopsis_path(const uuid& id) const;

/// Returns a format string that can be formatted with a partition id to
/// get the location of the corresponding partition synopsis.
[[nodiscard]] std::string partition_synopsis_path_template() const;

caf::error load_from_disk();

void flush_to_disk();
Expand Down