diff --git a/libvast/src/system/index.cpp b/libvast/src/system/index.cpp index 4fffc00f404..671cd3b85b5 100644 --- a/libvast/src/system/index.cpp +++ b/libvast/src/system/index.cpp @@ -1184,10 +1184,9 @@ index(index_actor::stateful_pointer self, [self](atom::importer, idspace_distributor_actor idspace_distributor) { self->state.importer = std::move(idspace_distributor); }, - [self]( - atom::apply, transform_ptr transform, - std::vector old_partition_ids, - keep_original_partition keep) -> caf::result { + [self](atom::apply, transform_ptr transform, + std::vector old_partition_ids, + keep_original_partition keep) -> caf::result { VAST_DEBUG("{} applies a transform to partitions {}", *self, old_partition_ids); if (!self->state.store_plugin) @@ -1227,7 +1226,7 @@ index(index_actor::stateful_pointer self, self->send(lookup.worker, atom::supervise_v, query_id, lookup.query, std::move(actors), caf::actor_cast>(sink)); - auto rp = self->make_response_promise(); + auto rp = self->make_response_promise(); // TODO: Implement some kind of monadic composition instead of these // nested requests. self @@ -1236,15 +1235,22 @@ index(index_actor::stateful_pointer self, self->state.partition_synopsis_path(new_partition_id)) .then( [self, rp, old_partition_ids, new_partition_id, - keep](partition_synopsis_ptr& synopsis) mutable { - auto result = partition_synopsis_pair{new_partition_id, synopsis}; - // TODO: We eventually want to allow transforms that delete - // whole events, at that point we also need to update the index - // statistics here. + keep](augmented_partition_synopsis& aps) mutable { + auto result = partition_info{ + .uuid = aps.uuid, + .events = aps.synopsis->events, + .max_import_time = aps.synopsis->max_import_time, + .stats = std::move(aps.stats), + }; + // Update the index statistics. We only need to add the events of + // the new partition here, the subtraction of the old events is + // done in `erase`. + for (const auto& [name, stats] : result.stats.layouts) + self->state.stats.layouts[name].count += stats.count; if (keep == keep_original_partition::yes) { self ->request(self->state.meta_index, caf::infinite, atom::merge_v, - new_partition_id, synopsis) + new_partition_id, aps.synopsis) .then( [self, rp, new_partition_id, result](atom::ok) mutable { self->state.persisted_partitions.insert(new_partition_id); @@ -1260,7 +1266,7 @@ index(index_actor::stateful_pointer self, self ->request(self->state.meta_index, caf::infinite, atom::replace_v, old_partition_id, new_partition_id, - synopsis) + aps.synopsis) .then( [self, rp, old_partition_id, new_partition_id, result](atom::ok) mutable { diff --git a/libvast/src/system/partition_transformer.cpp b/libvast/src/system/partition_transformer.cpp index 113f7884c0f..a2b6d55771d 100644 --- a/libvast/src/system/partition_transformer.cpp +++ b/libvast/src/system/partition_transformer.cpp @@ -109,7 +109,11 @@ void partition_transformer_state::fulfill( *stream_data.partition_chunk) .then( [self, promise](atom::ok) mutable { - promise.deliver(self->state.data.synopsis); + promise.deliver(augmented_partition_synopsis{ + .uuid = self->state.data.id, + .stats = std::move(self->state.stats), + .synopsis = std::move(self->state.data.synopsis), + }); self->quit(); }, [self, promise](caf::error& e) mutable { @@ -134,7 +138,7 @@ partition_transformer_actor::behavior_type partition_transformer( return partition_transformer_actor::behavior_type::make_empty_behavior(); } auto builder_and_header - = store_plugin->make_store_builder(accountant, fs, id); + = store_plugin->make_store_builder(std::move(accountant), fs, id); if (!builder_and_header) { self->quit(caf::make_error(ec::invalid_argument, "could not create store builder for backend {}", @@ -200,6 +204,13 @@ partition_transformer_actor::behavior_type partition_transformer( slice.import_time(self->state.data.synopsis->max_import_time); self->state.original_import_times.clear(); self->state.events += slice.rows(); + auto layout_name = slice.layout().name(); + auto& layouts = self->state.stats.layouts; + auto it = layouts.find(layout_name); + if (it == layouts.end()) + it = layouts.emplace(std::string{layout_name}, layout_statistics{}) + .first; + it.value().count += slice.rows(); self->state.slices.push_back(std::move(slice)); } VAST_DEBUG("partition-transformer received all table slices"); @@ -275,13 +286,14 @@ partition_transformer_actor::behavior_type partition_transformer( }, [self](atom::persist, std::filesystem::path partition_path, std::filesystem::path synopsis_path) - -> caf::result { + -> caf::result { VAST_DEBUG("partition-transformer will persist itself to {}", partition_path); auto path_data = partition_transformer_state::path_data{}; path_data.partition_path = std::move(partition_path); path_data.synopsis_path = std::move(synopsis_path); - auto promise = self->make_response_promise(); + auto promise + = self->make_response_promise(); path_data.promise = promise; // Immediately fulfill the promise if we are already done // with the serialization. diff --git a/libvast/test/partition_roundtrip.cpp b/libvast/test/partition_roundtrip.cpp index 327f594d87f..036fd93d482 100644 --- a/libvast/test/partition_roundtrip.cpp +++ b/libvast/test/partition_roundtrip.cpp @@ -80,7 +80,7 @@ TEST(index roundtrip) { for (auto& uuid : state.persisted_partitions) expected_uuids.insert(uuid); // Add some fake statistics - state.stats.layouts["zeek.conn"] = vast::system::layout_statistics{54931u}; + state.stats.layouts["zeek.conn"] = vast::layout_statistics{54931u}; // Serialize the index. flatbuffers::FlatBufferBuilder builder; auto index = pack(builder, state); diff --git a/libvast/test/system/eraser.cpp b/libvast/test/system/eraser.cpp index ef2f39bd340..819d05b659f 100644 --- a/libvast/test/system/eraser.cpp +++ b/libvast/test/system/eraser.cpp @@ -89,7 +89,7 @@ mock_index(system::index_actor::stateful_pointer self) { FAIL("no mock implementation available"); }, [=](atom::apply, transform_ptr, std::vector, - system::keep_original_partition) -> partition_synopsis_pair { + system::keep_original_partition) -> partition_info { FAIL("no mock implementation available"); }, [=](atom::importer, system::idspace_distributor_actor) { diff --git a/libvast/test/system/partition_transformer.cpp b/libvast/test/system/partition_transformer.cpp index 13c6ea5f9d1..9e1c7737639 100644 --- a/libvast/test/system/partition_transformer.cpp +++ b/libvast/test/system/partition_transformer.cpp @@ -111,8 +111,10 @@ TEST(identity transform / done before persist) { run(); vast::partition_synopsis_ptr synopsis = nullptr; rp.receive( - [&](vast::partition_synopsis_ptr& ps) { - synopsis = ps; + [&](vast::augmented_partition_synopsis& aps) { + CHECK_EQUAL(aps.uuid, uuid); + CHECK_EQUAL(aps.stats.layouts["zeek.conn"].count, 20ull); + synopsis = aps.synopsis; }, [&](caf::error& err) { FAIL("failed to persist: " << err); @@ -184,9 +186,9 @@ TEST(delete transform / persist before done) { run(); vast::partition_synopsis_ptr synopsis = nullptr; rp.receive( - [&](vast::partition_synopsis_ptr& ps) { - REQUIRE(ps); - synopsis = ps; + [&](vast::augmented_partition_synopsis& aps) { + REQUIRE(aps.synopsis); + synopsis = aps.synopsis; }, [&](const caf::error& e) { REQUIRE_EQUAL(e, caf::no_error); @@ -307,8 +309,8 @@ TEST(partition transform via the index) { vast::system::keep_original_partition::yes); run(); rp3.receive( - [=](const vast::partition_synopsis_pair& pair) { - CHECK_EQUAL(pair.synopsis->events, events); + [=](const vast::partition_info& info) { + CHECK_EQUAL(info.events, events); }, [](const caf::error& e) { REQUIRE_EQUAL(e, caf::no_error); @@ -325,8 +327,8 @@ TEST(partition transform via the index) { vast::system::keep_original_partition::no); run(); rp5.receive( - [=](const vast::partition_synopsis_pair& pair) { - CHECK_EQUAL(pair.synopsis->events, events); + [=](const vast::partition_info& info) { + CHECK_EQUAL(info.events, events); }, [](const caf::error& e) { REQUIRE_EQUAL(e, caf::no_error); diff --git a/libvast/test/system/query_processor.cpp b/libvast/test/system/query_processor.cpp index c19d29ec0db..9f44d97eac4 100644 --- a/libvast/test/system/query_processor.cpp +++ b/libvast/test/system/query_processor.cpp @@ -73,7 +73,7 @@ mock_index(system::index_actor::stateful_pointer self) { FAIL("no mock implementation available"); }, [=](atom::apply, transform_ptr, std::vector, - system::keep_original_partition) -> partition_synopsis_pair { + system::keep_original_partition) -> partition_info { FAIL("no mock implementation available"); }, [=](atom::importer, system::idspace_distributor_actor) { diff --git a/libvast/vast/detail/heterogenous_string_hash.hpp b/libvast/vast/detail/heterogenous_string_hash.hpp index 47067ed50cd..86d6a23da36 100644 --- a/libvast/vast/detail/heterogenous_string_hash.hpp +++ b/libvast/vast/detail/heterogenous_string_hash.hpp @@ -7,11 +7,30 @@ #pragma once +#include + #include +#include #include namespace vast::detail { +struct heterogenous_string_equal { + using is_transparent = void; + + bool operator()(const std::string& lhs, const std::string& rhs) const { + return lhs == rhs; + } + + bool operator()(const std::string& lhs, std::string_view sv) const { + return lhs == sv; + } + + bool operator()(const std::string& lhs, const char* sv) const { + return lhs == sv; + } +}; + struct heterogenous_string_hash { using is_transparent = void; @@ -28,11 +47,13 @@ struct heterogenous_string_hash { } }; -/// A map from std::string to `Value` allowing for C++20 heterogenous -/// lookups. +/// A map from std::string to `Value` allowing for heterogenous lookups. +// Note that we can't use the C++20 heterogenous unordered_map yet, +// because we still want to support GCC 10 on Debian. (and of course, +// there's a good chance that robin_map would be faster anyways) template using heterogenous_string_hashmap - = std::unordered_map>; + = tsl::robin_map; } // namespace vast::detail diff --git a/libvast/vast/fwd.hpp b/libvast/vast/fwd.hpp index 434c32d63a8..d17c0496dc9 100644 --- a/libvast/vast/fwd.hpp +++ b/libvast/vast/fwd.hpp @@ -121,6 +121,7 @@ class uuid; class value_index; struct attribute; +struct augmented_partition_synopsis; struct legacy_address_type; struct legacy_alias_type; struct meta_extractor; @@ -138,6 +139,7 @@ struct flow; struct integer; struct legacy_integer_type; struct invocation; +struct index_statistics; struct legacy_list_type; struct legacy_map_type; struct model; @@ -146,10 +148,12 @@ struct legacy_none_type; struct offset; struct partition_synopsis; struct partition_synopsis_pair; +struct partition_info; struct legacy_pattern_type; struct predicate; struct qualified_record_field; struct query; +struct layout_statistics; struct legacy_real_type; struct legacy_record_type; struct status; @@ -323,8 +327,6 @@ struct component_state; struct component_state_map; struct data_point; struct index_state; -struct index_statistics; -struct layout_statistics; struct measurement; struct meta_index_result; struct metrics_metadata; @@ -364,6 +366,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(vast_types, first_vast_type_id) VAST_ADD_TYPE_ID((vast::integer)) VAST_ADD_TYPE_ID((vast::invocation)) VAST_ADD_TYPE_ID((vast::negation)) + VAST_ADD_TYPE_ID((vast::partition_info)) VAST_ADD_TYPE_ID((vast::pattern)) VAST_ADD_TYPE_ID((vast::port)) VAST_ADD_TYPE_ID((vast::port_type)) diff --git a/libvast/vast/index_statistics.hpp b/libvast/vast/index_statistics.hpp new file mode 100644 index 00000000000..6aea35ed8cf --- /dev/null +++ b/libvast/vast/index_statistics.hpp @@ -0,0 +1,38 @@ +// _ _____ __________ +// | | / / _ | / __/_ __/ Visibility +// | |/ / __ |_\ \ / / Across +// |___/_/ |_/___/ /_/ Space and Time +// +// SPDX-FileCopyrightText: (c) 2016 The VAST Contributors +// SPDX-License-Identifier: BSD-3-Clause + +#pragma once + +#include "vast/fwd.hpp" + +#include "vast/detail/heterogenous_string_hash.hpp" + +namespace vast { + +/// Accumulates statistics for a given layout. +struct layout_statistics { + uint64_t count = 0ull; ///< Number of events indexed. + + template + friend auto inspect(Inspector& f, layout_statistics& x) { + return f(caf::meta::type_name("layout_statistics"), x.count); + } +}; + +/// Accumulates statistics about indexed data. +struct index_statistics { + /// The number of events for a given layout. + detail::heterogenous_string_hashmap layouts; + + template + friend auto inspect(Inspector& f, index_statistics& x) { + return f(caf::meta::type_name("index_statistics"), x.layouts); + } +}; + +} // namespace vast diff --git a/libvast/vast/partition_synopsis.hpp b/libvast/vast/partition_synopsis.hpp index a909a1b387e..e3fead5dcfc 100644 --- a/libvast/vast/partition_synopsis.hpp +++ b/libvast/vast/partition_synopsis.hpp @@ -10,6 +10,7 @@ #include "vast/detail/friend_attribute.hpp" #include "vast/fbs/partition_synopsis.hpp" +#include "vast/index_statistics.hpp" #include "vast/qualified_record_field.hpp" #include "vast/synopsis.hpp" #include "vast/table_slice.hpp" @@ -28,8 +29,11 @@ namespace vast { /// Contains one synopsis per partition column. struct partition_synopsis final : public caf::ref_counted { partition_synopsis() = default; + ~partition_synopsis() override = default; + partition_synopsis(const partition_synopsis&) = delete; partition_synopsis(partition_synopsis&&) = default; + partition_synopsis& operator=(const partition_synopsis&) = delete; partition_synopsis& operator=(partition_synopsis&&) = default; /// Add data to the synopsis. @@ -84,6 +88,36 @@ struct partition_synopsis final : public caf::ref_counted { partition_synopsis* copy() const; }; +/// Some quantitative information about a partition. +struct partition_info { + /// The partition id. + vast::uuid uuid = vast::uuid::nil(); + + /// Total number of events in the partition. The sum of all + /// values in `stats`. + size_t events = 0ull; + + /// The newest import timestamp of the table slices in this partition. + time max_import_time; + + /// How many events of each type the partition contains. + index_statistics stats; + + template + friend auto inspect(Inspector& f, partition_info& x) { + return f(caf::meta::type_name("partition_info"), x.uuid, x.events, + x.max_import_time, x.stats); + } +}; + +/// A partition synopsis with some additional information. +struct augmented_partition_synopsis { + vast::uuid uuid; + index_statistics stats; + partition_synopsis_ptr synopsis; +}; + +/// A partition synopsis and a uuid. struct partition_synopsis_pair { vast::uuid uuid; partition_synopsis_ptr synopsis; diff --git a/libvast/vast/system/actors.hpp b/libvast/vast/system/actors.hpp index fc43fef270f..b1754d2acc6 100644 --- a/libvast/vast/system/actors.hpp +++ b/libvast/vast/system/actors.hpp @@ -279,7 +279,7 @@ using index_actor = typed_actor_fwd< // in-place transform keeping the old ids, and makes a new partition // preserving the old one(s). caf::replies_to, - keep_original_partition>::with, + keep_original_partition>::with, // Makes the identity of the importer known to the index. caf::reacts_to> // Conform to the protocol of the STREAM SINK actor for table slices. @@ -360,7 +360,7 @@ using filesystem_actor = typed_actor_fwd< using partition_transformer_actor = typed_actor_fwd< // Persist transformed partition to given path. caf::replies_to::with, + std::filesystem::path>::with, // INTERNAL: Continuation handler for `atom::done`. caf::reacts_to> // query::extract API @@ -532,6 +532,7 @@ CAF_END_TYPE_ID_BLOCK(vast_actors) CAF_ALLOW_UNSAFE_MESSAGE_TYPE(std::shared_ptr) CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::partition_synopsis_ptr) CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::partition_synopsis_pair) +CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::augmented_partition_synopsis) CAF_ALLOW_UNSAFE_MESSAGE_TYPE(vast::transform_ptr) #undef vast_uuid_synopsis_map diff --git a/libvast/vast/system/index.hpp b/libvast/vast/system/index.hpp index 1edec5ec2da..c50ce1d3f2d 100644 --- a/libvast/vast/system/index.hpp +++ b/libvast/vast/system/index.hpp @@ -13,6 +13,7 @@ #include "vast/detail/lru_cache.hpp" #include "vast/detail/stable_set.hpp" #include "vast/fbs/index.hpp" +#include "vast/index_statistics.hpp" #include "vast/plugin.hpp" #include "vast/query.hpp" #include "vast/system/active_partition.hpp" @@ -89,27 +90,6 @@ struct active_partition_info { } }; -/// Accumulates statistics for a given layout. -struct layout_statistics { - uint64_t count; ///< Number of events indexed. - - template - friend auto inspect(Inspector& f, layout_statistics& x) { - return f(caf::meta::type_name("layout_statistics"), x.count); - } -}; - -/// Accumulates statistics about indexed data. -struct index_statistics { - /// The number of events for a given layout. - std::unordered_map layouts; - - template - friend auto inspect(Inspector& f, index_statistics& x) { - return f(caf::meta::type_name("index_statistics"), x.layouts); - } -}; - /// Loads partitions from disk by UUID. class partition_factory { public: diff --git a/libvast/vast/system/partition_transformer.hpp b/libvast/vast/system/partition_transformer.hpp index bc222bd0dee..5633652775e 100644 --- a/libvast/vast/system/partition_transformer.hpp +++ b/libvast/vast/system/partition_transformer.hpp @@ -10,6 +10,7 @@ #include "vast/fwd.hpp" +#include "vast/index_statistics.hpp" #include "vast/segment_builder.hpp" #include "vast/system/active_partition.hpp" #include "vast/system/actors.hpp" @@ -34,7 +35,7 @@ struct partition_transformer_state { struct path_data { std::filesystem::path partition_path = {}; std::filesystem::path synopsis_path = {}; - caf::typed_response_promise promise = {}; + caf::typed_response_promise promise = {}; }; partition_transformer_state() = default; @@ -76,6 +77,9 @@ struct partition_transformer_state { /// Total number of rows in `slices`. size_t events = 0ull; + /// Number of rows per event type. + index_statistics stats; + /// The data of the newly created partition. active_partition_state::serialization_data data = {};