Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust index statistics for partition transforms #2097

Merged
merged 5 commits into from Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 18 additions & 12 deletions libvast/src/system/index.cpp
Expand Up @@ -1184,10 +1184,9 @@ index(index_actor::stateful_pointer<index_state> self,
[self](atom::importer, idspace_distributor_actor idspace_distributor) {
self->state.importer = std::move(idspace_distributor);
},
[self](
atom::apply, transform_ptr transform,
std::vector<vast::uuid> old_partition_ids,
keep_original_partition keep) -> caf::result<partition_synopsis_pair> {
[self](atom::apply, transform_ptr transform,
std::vector<vast::uuid> old_partition_ids,
keep_original_partition keep) -> caf::result<partition_info> {
VAST_DEBUG("{} applies a transform to partitions {}", *self,
old_partition_ids);
if (!self->state.store_plugin)
Expand Down Expand Up @@ -1227,7 +1226,7 @@ index(index_actor::stateful_pointer<index_state> self,
self->send(lookup.worker, atom::supervise_v, query_id, lookup.query,
std::move(actors),
caf::actor_cast<receiver_actor<atom::done>>(sink));
auto rp = self->make_response_promise<partition_synopsis_pair>();
auto rp = self->make_response_promise<partition_info>();
// TODO: Implement some kind of monadic composition instead of these
// nested requests.
self
Expand All @@ -1236,15 +1235,22 @@ index(index_actor::stateful_pointer<index_state> self,
self->state.partition_synopsis_path(new_partition_id))
.then(
[self, rp, old_partition_ids, new_partition_id,
keep](partition_synopsis_ptr& synopsis) mutable {
auto result = partition_synopsis_pair{new_partition_id, synopsis};
// TODO: We eventually want to allow transforms that delete
// whole events, at that point we also need to update the index
// statistics here.
keep](augmented_partition_synopsis& aps) mutable {
auto result = partition_info{
.uuid = aps.uuid,
.events = aps.synopsis->events,
.max_import_time = aps.synopsis->max_import_time,
.stats = std::move(aps.stats),
};
// Update the index statistics. We only need to add the events of
// the new partition here, the subtraction of the old events is
// done in `erase`.
for (const auto& [name, stats] : result.stats.layouts)
self->state.stats.layouts[name].count += stats.count;
tobim marked this conversation as resolved.
Show resolved Hide resolved
if (keep == keep_original_partition::yes) {
self
->request(self->state.meta_index, caf::infinite, atom::merge_v,
new_partition_id, synopsis)
new_partition_id, aps.synopsis)
.then(
[self, rp, new_partition_id, result](atom::ok) mutable {
self->state.persisted_partitions.insert(new_partition_id);
Expand All @@ -1260,7 +1266,7 @@ index(index_actor::stateful_pointer<index_state> self,
self
->request(self->state.meta_index, caf::infinite,
atom::replace_v, old_partition_id, new_partition_id,
synopsis)
aps.synopsis)
.then(
[self, rp, old_partition_id, new_partition_id,
result](atom::ok) mutable {
Expand Down
20 changes: 16 additions & 4 deletions libvast/src/system/partition_transformer.cpp
Expand Up @@ -109,7 +109,11 @@ void partition_transformer_state::fulfill(
*stream_data.partition_chunk)
.then(
[self, promise](atom::ok) mutable {
promise.deliver(self->state.data.synopsis);
promise.deliver(augmented_partition_synopsis{
.uuid = self->state.data.id,
.stats = std::move(self->state.stats),
.synopsis = std::move(self->state.data.synopsis),
});
self->quit();
},
[self, promise](caf::error& e) mutable {
Expand All @@ -134,7 +138,7 @@ partition_transformer_actor::behavior_type partition_transformer(
return partition_transformer_actor::behavior_type::make_empty_behavior();
}
auto builder_and_header
= store_plugin->make_store_builder(accountant, fs, id);
= store_plugin->make_store_builder(std::move(accountant), fs, id);
if (!builder_and_header) {
self->quit(caf::make_error(ec::invalid_argument,
"could not create store builder for backend {}",
Expand Down Expand Up @@ -200,6 +204,13 @@ partition_transformer_actor::behavior_type partition_transformer(
slice.import_time(self->state.data.synopsis->max_import_time);
self->state.original_import_times.clear();
self->state.events += slice.rows();
auto layout_name = slice.layout().name();
auto& layouts = self->state.stats.layouts;
auto it = layouts.find(layout_name);
if (it == layouts.end())
it = layouts.emplace(std::string{layout_name}, layout_statistics{})
.first;
it.value().count += slice.rows();
self->state.slices.push_back(std::move(slice));
}
VAST_DEBUG("partition-transformer received all table slices");
Expand Down Expand Up @@ -275,13 +286,14 @@ partition_transformer_actor::behavior_type partition_transformer(
},
[self](atom::persist, std::filesystem::path partition_path,
std::filesystem::path synopsis_path)
-> caf::result<partition_synopsis_ptr> {
-> caf::result<augmented_partition_synopsis> {
VAST_DEBUG("partition-transformer will persist itself to {}",
partition_path);
auto path_data = partition_transformer_state::path_data{};
path_data.partition_path = std::move(partition_path);
path_data.synopsis_path = std::move(synopsis_path);
auto promise = self->make_response_promise<partition_synopsis_ptr>();
auto promise
= self->make_response_promise<augmented_partition_synopsis>();
path_data.promise = promise;
// Immediately fulfill the promise if we are already done
// with the serialization.
Expand Down
2 changes: 1 addition & 1 deletion libvast/test/partition_roundtrip.cpp
Expand Up @@ -80,7 +80,7 @@ TEST(index roundtrip) {
for (auto& uuid : state.persisted_partitions)
expected_uuids.insert(uuid);
// Add some fake statistics
state.stats.layouts["zeek.conn"] = vast::system::layout_statistics{54931u};
state.stats.layouts["zeek.conn"] = vast::layout_statistics{54931u};
// Serialize the index.
flatbuffers::FlatBufferBuilder builder;
auto index = pack(builder, state);
Expand Down
2 changes: 1 addition & 1 deletion libvast/test/system/eraser.cpp
Expand Up @@ -89,7 +89,7 @@ mock_index(system::index_actor::stateful_pointer<mock_index_state> self) {
FAIL("no mock implementation available");
},
[=](atom::apply, transform_ptr, std::vector<uuid>,
system::keep_original_partition) -> partition_synopsis_pair {
system::keep_original_partition) -> partition_info {
FAIL("no mock implementation available");
},
[=](atom::importer, system::idspace_distributor_actor) {
Expand Down
20 changes: 11 additions & 9 deletions libvast/test/system/partition_transformer.cpp
Expand Up @@ -111,8 +111,10 @@ TEST(identity transform / done before persist) {
run();
vast::partition_synopsis_ptr synopsis = nullptr;
rp.receive(
[&](vast::partition_synopsis_ptr& ps) {
synopsis = ps;
[&](vast::augmented_partition_synopsis& aps) {
CHECK_EQUAL(aps.uuid, uuid);
CHECK_EQUAL(aps.stats.layouts["zeek.conn"].count, 20ull);
synopsis = aps.synopsis;
},
[&](caf::error& err) {
FAIL("failed to persist: " << err);
Expand Down Expand Up @@ -184,9 +186,9 @@ TEST(delete transform / persist before done) {
run();
vast::partition_synopsis_ptr synopsis = nullptr;
rp.receive(
[&](vast::partition_synopsis_ptr& ps) {
REQUIRE(ps);
synopsis = ps;
[&](vast::augmented_partition_synopsis& aps) {
REQUIRE(aps.synopsis);
synopsis = aps.synopsis;
},
[&](const caf::error& e) {
REQUIRE_EQUAL(e, caf::no_error);
Expand Down Expand Up @@ -307,8 +309,8 @@ TEST(partition transform via the index) {
vast::system::keep_original_partition::yes);
run();
rp3.receive(
[=](const vast::partition_synopsis_pair& pair) {
CHECK_EQUAL(pair.synopsis->events, events);
[=](const vast::partition_info& info) {
CHECK_EQUAL(info.events, events);
},
[](const caf::error& e) {
REQUIRE_EQUAL(e, caf::no_error);
Expand All @@ -325,8 +327,8 @@ TEST(partition transform via the index) {
vast::system::keep_original_partition::no);
run();
rp5.receive(
[=](const vast::partition_synopsis_pair& pair) {
CHECK_EQUAL(pair.synopsis->events, events);
[=](const vast::partition_info& info) {
CHECK_EQUAL(info.events, events);
},
[](const caf::error& e) {
REQUIRE_EQUAL(e, caf::no_error);
Expand Down
2 changes: 1 addition & 1 deletion libvast/test/system/query_processor.cpp
Expand Up @@ -73,7 +73,7 @@ mock_index(system::index_actor::stateful_pointer<mock_index_state> self) {
FAIL("no mock implementation available");
},
[=](atom::apply, transform_ptr, std::vector<uuid>,
system::keep_original_partition) -> partition_synopsis_pair {
system::keep_original_partition) -> partition_info {
FAIL("no mock implementation available");
},
[=](atom::importer, system::idspace_distributor_actor) {
Expand Down
29 changes: 25 additions & 4 deletions libvast/vast/detail/heterogenous_string_hash.hpp
Expand Up @@ -7,11 +7,30 @@

#pragma once

#include <tsl/robin_map.h>

#include <string>
#include <string_view>
#include <unordered_map>

namespace vast::detail {

struct heterogenous_string_equal {
using is_transparent = void;

bool operator()(const std::string& lhs, const std::string& rhs) const {
return lhs == rhs;
}

bool operator()(const std::string& lhs, std::string_view sv) const {
return lhs == sv;
}

bool operator()(const std::string& lhs, const char* sv) const {
return lhs == sv;
}
};

struct heterogenous_string_hash {
using is_transparent = void;

Expand All @@ -28,11 +47,13 @@ struct heterogenous_string_hash {
}
};

/// A map from std::string to `Value` allowing for C++20 heterogenous
/// lookups.
/// A map from std::string to `Value` allowing for heterogenous lookups.
// Note that we can't use the C++20 heterogenous unordered_map yet,
// because we still want to support GCC 10 on Debian. (and of course,
// there's a good chance that robin_map would be faster anyways)
template <typename Value>
using heterogenous_string_hashmap
= std::unordered_map<std::string, Value, heterogenous_string_hash,
std::equal_to<>>;
= tsl::robin_map<std::string, Value, heterogenous_string_hash,
vast::detail::heterogenous_string_equal>;

} // namespace vast::detail
7 changes: 5 additions & 2 deletions libvast/vast/fwd.hpp
Expand Up @@ -121,6 +121,7 @@ class uuid;
class value_index;

struct attribute;
struct augmented_partition_synopsis;
struct legacy_address_type;
struct legacy_alias_type;
struct meta_extractor;
Expand All @@ -138,6 +139,7 @@ struct flow;
struct integer;
struct legacy_integer_type;
struct invocation;
struct index_statistics;
struct legacy_list_type;
struct legacy_map_type;
struct model;
Expand All @@ -146,10 +148,12 @@ struct legacy_none_type;
struct offset;
struct partition_synopsis;
struct partition_synopsis_pair;
struct partition_info;
struct legacy_pattern_type;
struct predicate;
struct qualified_record_field;
struct query;
struct layout_statistics;
struct legacy_real_type;
struct legacy_record_type;
struct status;
Expand Down Expand Up @@ -323,8 +327,6 @@ struct component_state;
struct component_state_map;
struct data_point;
struct index_state;
struct index_statistics;
struct layout_statistics;
struct measurement;
struct meta_index_result;
struct metrics_metadata;
Expand Down Expand Up @@ -364,6 +366,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(vast_types, first_vast_type_id)
VAST_ADD_TYPE_ID((vast::integer))
VAST_ADD_TYPE_ID((vast::invocation))
VAST_ADD_TYPE_ID((vast::negation))
VAST_ADD_TYPE_ID((vast::partition_info))
VAST_ADD_TYPE_ID((vast::pattern))
VAST_ADD_TYPE_ID((vast::port))
VAST_ADD_TYPE_ID((vast::port_type))
Expand Down
38 changes: 38 additions & 0 deletions libvast/vast/index_statistics.hpp
@@ -0,0 +1,38 @@
// _ _____ __________
// | | / / _ | / __/_ __/ Visibility
// | |/ / __ |_\ \ / / Across
// |___/_/ |_/___/ /_/ Space and Time
//
// SPDX-FileCopyrightText: (c) 2016 The VAST Contributors
// SPDX-License-Identifier: BSD-3-Clause

#pragma once

#include "vast/fwd.hpp"

#include "vast/detail/heterogenous_string_hash.hpp"

namespace vast {

/// Accumulates statistics for a given layout.
struct layout_statistics {
uint64_t count = 0ull; ///< Number of events indexed.

template <class Inspector>
friend auto inspect(Inspector& f, layout_statistics& x) {
return f(caf::meta::type_name("layout_statistics"), x.count);
}
};

/// Accumulates statistics about indexed data.
struct index_statistics {
/// The number of events for a given layout.
detail::heterogenous_string_hashmap<layout_statistics> layouts;

template <class Inspector>
friend auto inspect(Inspector& f, index_statistics& x) {
return f(caf::meta::type_name("index_statistics"), x.layouts);
}
};

} // namespace vast
lava marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 34 additions & 0 deletions libvast/vast/partition_synopsis.hpp
Expand Up @@ -10,6 +10,7 @@

#include "vast/detail/friend_attribute.hpp"
#include "vast/fbs/partition_synopsis.hpp"
#include "vast/index_statistics.hpp"
#include "vast/qualified_record_field.hpp"
#include "vast/synopsis.hpp"
#include "vast/table_slice.hpp"
Expand All @@ -28,8 +29,11 @@ namespace vast {
/// Contains one synopsis per partition column.
struct partition_synopsis final : public caf::ref_counted {
partition_synopsis() = default;
~partition_synopsis() override = default;
partition_synopsis(const partition_synopsis&) = delete;
partition_synopsis(partition_synopsis&&) = default;

partition_synopsis& operator=(const partition_synopsis&) = delete;
partition_synopsis& operator=(partition_synopsis&&) = default;

/// Add data to the synopsis.
Expand Down Expand Up @@ -84,6 +88,36 @@ struct partition_synopsis final : public caf::ref_counted {
partition_synopsis* copy() const;
};

/// Some quantitative information about a partition.
struct partition_info {
/// The partition id.
vast::uuid uuid = vast::uuid::nil();

/// Total number of events in the partition. The sum of all
/// values in `stats`.
size_t events = 0ull;

/// The newest import timestamp of the table slices in this partition.
time max_import_time;
tobim marked this conversation as resolved.
Show resolved Hide resolved

/// How many events of each type the partition contains.
index_statistics stats;

template <class Inspector>
friend auto inspect(Inspector& f, partition_info& x) {
return f(caf::meta::type_name("partition_info"), x.uuid, x.events,
x.max_import_time, x.stats);
}
};

/// A partition synopsis with some additional information.
struct augmented_partition_synopsis {
vast::uuid uuid;
index_statistics stats;
partition_synopsis_ptr synopsis;
};

/// A partition synopsis and a uuid.
struct partition_synopsis_pair {
vast::uuid uuid;
partition_synopsis_ptr synopsis;
Expand Down