Skip to content

Commit

Permalink
Update the index to return a vector of partition infos
Browse files Browse the repository at this point in the history
  • Loading branch information
lava committed May 25, 2022
1 parent b42dba9 commit 77ba623
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 119 deletions.
1 change: 1 addition & 0 deletions libvast/include/vast/fwd.hpp
Expand Up @@ -420,6 +420,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(vast_types, first_vast_type_id)
VAST_ADD_TYPE_ID((std::vector<vast::table_slice>))
VAST_ADD_TYPE_ID((std::vector<vast::table_slice_column>))
VAST_ADD_TYPE_ID((std::vector<vast::uuid>))
VAST_ADD_TYPE_ID((std::vector<vast::partition_info>))

VAST_ADD_TYPE_ID((vast::detail::framed<vast::table_slice>))
VAST_ADD_TYPE_ID((std::vector<vast::detail::framed<vast::table_slice>>))
Expand Down
12 changes: 9 additions & 3 deletions libvast/include/vast/partition_synopsis.hpp
Expand Up @@ -108,19 +108,25 @@ struct partition_info {
time max_import_time;

/// How many events of each type the partition contains.
index_statistics stats;
// A `partition_info` is only created for new partitions, so
// it can not be a heterogenous legacy partition but must have
// exactly one type.
vast::type type;

template <class Inspector>
friend auto inspect(Inspector& f, partition_info& x) {
return f(caf::meta::type_name("partition_info"), x.uuid, x.events,
x.max_import_time, x.stats);
x.max_import_time, x.type);
}
};

/// A partition synopsis with some additional information.
// A `augmented_partition_synopsis` is only created for new
// partitions, so it can not be a heterogenous legacy partition
// but must have exactly one type.
struct augmented_partition_synopsis {
vast::uuid uuid;
index_statistics stats;
vast::type type;
partition_synopsis_ptr synopsis;
};

Expand Down
17 changes: 13 additions & 4 deletions libvast/include/vast/system/actors.hpp
Expand Up @@ -212,20 +212,27 @@ using partition_creation_listener_actor = typed_actor_fwd<

/// The CATALOG actor interface.
using catalog_actor = typed_actor_fwd<
// Bulk import a set of partition synopses.
// Reinitialize the catalog from a set of partition synopses. Used at
// startup, so the map is expected to be huge and we use a shared_ptr
// to be sure it's not accidentally copied.
caf::replies_to<
atom::merge,
std::shared_ptr<std::map<uuid, partition_synopsis_ptr>>>::with<atom::ok>,
// Merge a single partition synopsis.
caf::replies_to<atom::merge, uuid, partition_synopsis_ptr>::with< //
atom::ok>,
caf::replies_to<atom::merge, uuid, partition_synopsis_ptr>::with<atom::ok>,
// Merge a set of partition synopsis.
caf::replies_to<atom::merge,
std::vector<augmented_partition_synopsis>>::with<atom::ok>,
// Get *ALL* partition synopses stored in the catalog.
caf::replies_to<atom::get>::with<std::vector<partition_synopsis_pair>>,
// Erase a single partition synopsis.
caf::replies_to<atom::erase, uuid>::with<atom::ok>,
// Atomically remove one and merge another partition synopsis
caf::replies_to<atom::replace, uuid, uuid,
partition_synopsis_ptr>::with<atom::ok>,
// Atomatically replace a set of partititon synopses with another.
caf::replies_to<atom::replace, std::vector<uuid>,
std::vector<augmented_partition_synopsis>>::with<atom::ok>,
// Return the candidate partitions for an expression.
caf::replies_to<atom::candidates, vast::uuid,
vast::expression>::with<catalog_result>,
Expand Down Expand Up @@ -287,14 +294,16 @@ using index_actor = typed_actor_fwd<
query_cursor>,
// Erases the given partition from the INDEX.
caf::replies_to<atom::erase, uuid>::with<atom::done>,
// Erases the given set of partitions from the INDEX.
caf::replies_to<atom::erase, std::vector<uuid>>::with<atom::done>,
// Applies the given transformation to the partition.
// When keep_original_partition is yes: erases the existing partition and
// returns the synopsis of the new partition. If the partition is completely
// erased, returns the nil uuid. When keep_original_partition is no: does an
// in-place transform keeping the old ids, and makes a new partition
// preserving the old one(s).
caf::replies_to<atom::apply, transform_ptr, std::vector<uuid>,
keep_original_partition>::with<partition_info>,
keep_original_partition>::with<std::vector<partition_info>>,
// Makes the identity of the importer known to the index.
caf::reacts_to<atom::importer, idspace_distributor_actor>>
// Conform to the protocol of the STREAM SINK actor for table slices.
Expand Down
4 changes: 3 additions & 1 deletion libvast/include/vast/system/partition_transformer.hpp
Expand Up @@ -51,7 +51,6 @@ struct partition_transformer_state {
partition_transformer_state() = default;

void add_slice(const table_slice& slice);
void finalize_data();
void fulfill(
partition_transformer_actor::stateful_pointer<partition_transformer_state>
self,
Expand Down Expand Up @@ -88,6 +87,9 @@ struct partition_transformer_state {
/// Cached stream error, if the stream terminated abnormally.
caf::error stream_error = {};

/// Cached transform error, if the transform returns one.
caf::error transform_error = {};

/// Cached table slices in this partition.
std::vector<table_slice> slices = {};

Expand Down
13 changes: 13 additions & 0 deletions libvast/src/system/catalog.cpp
Expand Up @@ -379,6 +379,11 @@ catalog(catalog_actor::stateful_pointer<catalog_state> self,
self->state.merge(partition, std::move(synopsis));
return atom::ok_v;
},
[=](atom::merge, std::vector<augmented_partition_synopsis> v) -> atom::ok {
for (auto& aps : v)
self->state.merge(aps.uuid, aps.synopsis);
return atom::ok_v;
},
[=](atom::get) -> std::vector<partition_synopsis_pair> {
std::vector<partition_synopsis_pair> result;
result.reserve(self->state.synopses.size());
Expand All @@ -400,6 +405,14 @@ catalog(catalog_actor::stateful_pointer<catalog_state> self,
self->state.erase(old_partition);
return atom::ok_v;
},
[=](atom::replace, std::vector<uuid> old_uuids,
std::vector<augmented_partition_synopsis> new_synopses) -> atom::ok {
for (auto const& uuid : old_uuids)
self->state.erase(uuid);
for (auto& aps : new_synopses)
self->state.merge(aps.uuid, aps.synopsis);
return atom::ok_v;
},
[=](atom::candidates, vast::uuid lookup_id,
const vast::expression& expr) -> caf::result<catalog_result> {
auto start = std::chrono::steady_clock::now();
Expand Down
2 changes: 1 addition & 1 deletion libvast/src/system/eraser.cpp
Expand Up @@ -96,7 +96,7 @@ eraser(eraser_actor::stateful_pointer<eraser_state> self,
transform, result.partitions,
keep_original_partition::no)
.then(
[self, rp](const partition_info&) mutable {
[self, rp](const std::vector<partition_info>&) mutable {
VAST_DEBUG("{} applied filter transform with query {}", *self,
self->state.query_);
rp.deliver(atom::ok_v);
Expand Down
164 changes: 92 additions & 72 deletions libvast/src/system/index.cpp
Expand Up @@ -21,6 +21,7 @@
#include "vast/data.hpp"
#include "vast/defaults.hpp"
#include "vast/detail/assert.hpp"
#include "vast/detail/fanout_counter.hpp"
#include "vast/detail/fill_status_map.hpp"
#include "vast/detail/narrow.hpp"
#include "vast/detail/notifying_stream_manager.hpp"
Expand Down Expand Up @@ -1254,14 +1255,43 @@ index(index_actor::stateful_pointer<index_state> self,
});
return rp;
},
[self](atom::erase,
const std::vector<uuid>& partition_ids) -> caf::result<atom::done> {
// TODO: It would probably be more efficient to implement the
// handler for multiple ids directly as opposed to dispatching
// onto the single-id erase handler.
auto rp = self->make_response_promise<atom::done>();
auto fanout_counter = detail::make_fanout_counter(
partition_ids.size(),
[rp]() mutable {
rp.deliver(atom::done_v);
},
[rp](caf::error&& e) mutable {
rp.deliver(std::move(e));
});
for (auto const& id : partition_ids) {
self
->request(static_cast<index_actor>(self), caf::infinite,
atom::erase_v, id)
.then(
[=](atom::done) {
fanout_counter->receive_success();
},
[=](caf::error& e) {
fanout_counter->receive_error(std::move(e));
});
}
return rp;
},
// We can't pass this as spawn argument since the importer already
// needs to know the index actor when spawning.
[self](atom::importer, idspace_distributor_actor idspace_distributor) {
self->state.importer = std::move(idspace_distributor);
},
[self](atom::apply, transform_ptr transform,
std::vector<vast::uuid> old_partition_ids,
keep_original_partition keep) -> caf::result<partition_info> {
keep_original_partition keep)
-> caf::result<std::vector<partition_info>> {
VAST_DEBUG("{} applies a transform to partitions {}", *self,
old_partition_ids);
if (!self->state.store_plugin)
Expand All @@ -1285,8 +1315,8 @@ index(index_actor::stateful_pointer<index_state> self,
auto partition_path_template = self->state.partition_path_template();
auto partition_synopsis_path_template
= self->state.partition_synopsis_path_template();
partition_transformer_actor sink = self->spawn(
partition_transformer, store_id, self->state.synopsis_opts,
partition_transformer_actor partition_transfomer = self->spawn(
system::partition_transformer, store_id, self->state.synopsis_opts,
self->state.index_opts, self->state.accountant,
static_cast<idspace_distributor_actor>(self->state.importer),
self->state.type_registry, self->state.filesystem, transform,
Expand All @@ -1296,100 +1326,90 @@ index(index_actor::stateful_pointer<index_state> self,
static const auto match_everything
= vast::predicate{meta_extractor{meta_extractor::type},
relational_operator::ni, data{""}};
auto query
= query::make_extract(sink, query::extract::drop_ids, match_everything);
auto query = query::make_extract(
partition_transfomer, query::extract::drop_ids, match_everything);
query.id = self->state.pending_queries.create_query_id();
query.priority = 100;
auto input_size = detail::narrow_cast<uint32_t>(old_partition_ids.size());
auto err = self->state.pending_queries.insert(
query_state{.query = query,
.client = caf::actor_cast<receiver_actor<atom::done>>(sink),
.client = caf::actor_cast<receiver_actor<atom::done>>(
partition_transfomer),
.candidate_partitions = input_size,
.requested_partitions = input_size},
std::vector{old_partition_ids});
VAST_ASSERT(err == caf::none);
self->state.schedule_lookups();
auto rp = self->make_response_promise<partition_info>();
auto rp = self->make_response_promise<std::vector<partition_info>>();
// TODO: Implement some kind of monadic composition instead of these
// nested requests.
self->request(sink, caf::infinite, atom::persist_v)
self->request(partition_transfomer, caf::infinite, atom::persist_v)
.then(
[self, rp, old_partition_ids,
keep](std::vector<augmented_partition_synopsis>& apsv) mutable {
VAST_INFO("index with size {}", apsv.size());
auto& aps = apsv.front(); // FIXME!
// If the partition was completely deleted, `synopsis` may be null.
auto events = aps.synopsis ? aps.synopsis->events : 0ull;
auto time = aps.synopsis ? aps.synopsis->max_import_time
: vast::time::clock::time_point{};
auto new_partition_id = aps.uuid;
auto result = partition_info{
.uuid = new_partition_id,
.events = events,
.max_import_time = time,
.stats = std::move(aps.stats),
};
// Update the index statistics. We only need to add the events of
// the new partition here, the subtraction of the old events is
// done in `erase`.
for (const auto& [name, stats] : result.stats.layouts)
self->state.stats.layouts[name].count += stats.count;
std::vector<uuid> new_partition_ids;
for (auto const& aps : apsv)
new_partition_ids.push_back(aps.uuid);
auto result = std::vector<partition_info>{};
for (auto const& aps : apsv) {
// If synopsis was null (ie. all events were deleted),
// the partition transformer should not have included
// it in the result.
VAST_ASSERT(aps.synopsis);
auto info = partition_info{
.uuid = aps.uuid,
.events = aps.synopsis->events,
.max_import_time = aps.synopsis->max_import_time,
.type = aps.type,
};
// Update the index statistics. We only need to add the events of
// the new partition here, the subtraction of the old events is
// done in `erase`.
auto name = std::string{info.type.name()};
self->state.stats.layouts[name].count += info.events;
result.emplace_back(std::move(info));
}

if (keep == keep_original_partition::yes) {
if (aps.synopsis)
if (!apsv.empty())
self
->request(self->state.catalog, caf::infinite, atom::merge_v,
new_partition_id, aps.synopsis)
std::move(apsv))
.then(
[self, rp, new_partition_id, result](atom::ok) mutable {
self->state.persisted_partitions.insert(new_partition_id);
rp.deliver(result);
[self, rp, new_partition_ids,
result = std::move(result)](atom::ok) mutable {
self->state.persisted_partitions.insert(
new_partition_ids.begin(), new_partition_ids.end());
rp.deliver(std::move(result));
},
[rp](const caf::error& e) mutable {
rp.deliver(e);
rp.deliver(std::move(e));
});
else
rp.deliver(result);
} else {
// Pick one partition id at random to be "transformed", all the
// other ones are "deleted" from the catalog. If the new
// partition is empty, all partitions are deleted.
if (aps.synopsis) {
VAST_ASSERT(!old_partition_ids.empty());
auto old_partition_id = old_partition_ids.back();
old_partition_ids.pop_back();
self
->request(self->state.catalog, caf::infinite, atom::replace_v,
old_partition_id, new_partition_id, aps.synopsis)
.then(
[self, rp, old_partition_id, new_partition_id,
result](atom::ok) mutable {
self->state.persisted_partitions.insert(new_partition_id);
self
->request(static_cast<index_actor>(self), caf::infinite,
atom::erase_v, old_partition_id)
.then(
[=](atom::done) mutable {
rp.deliver(result);
},
[=](const caf::error& e) mutable {
rp.deliver(e);
});
},
[rp](const caf::error& e) mutable {
rp.deliver(e);
});
} else {
rp.deliver(result);
}
for (auto partition_id : old_partition_ids) {
self
->request(static_cast<index_actor>(self), caf::infinite,
atom::erase_v, partition_id)
.then([](atom::done) { /* nop */ },
[](const caf::error& e) {
VAST_WARN("index failed to erase {} from catalog", e);
} else { // keep == keep_original_partition::no
self
->request(self->state.catalog, caf::infinite, atom::replace_v,
old_partition_ids, std::move(apsv))
.then(
[self, rp, old_partition_ids, new_partition_ids,
result](atom::ok) mutable {
self->state.persisted_partitions.insert(
new_partition_ids.begin(), new_partition_ids.end());
self
->request(static_cast<index_actor>(self), caf::infinite,
atom::erase_v, old_partition_ids)
.then(
[=](atom::done) mutable {
rp.deliver(result);
},
[=](const caf::error& e) mutable {
rp.deliver(e);
});
}
},
[rp](const caf::error& e) mutable {
rp.deliver(e);
});
}
},
[rp](const caf::error& e) mutable {
Expand Down

0 comments on commit 77ba623

Please sign in to comment.