Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly handle partition transforms without output #2123

Merged
merged 4 commits into from Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/unreleased/changes/2123--partition-transforms.md
@@ -0,0 +1,2 @@
Fixed an issue where partition transforms that erase complete
partitions would trigger an internal assertion failure.
107 changes: 60 additions & 47 deletions libvast/src/system/index.cpp
Expand Up @@ -1253,10 +1253,14 @@ index(index_actor::stateful_pointer<index_state> self,
.then(
[self, rp, old_partition_ids, new_partition_id,
keep](augmented_partition_synopsis& aps) mutable {
// If the partition was completely deleted, `synopsis` may be null.
auto events = aps.synopsis ? aps.synopsis->events : 0ull;
auto time = aps.synopsis ? aps.synopsis->max_import_time
: vast::time::clock::time_point{};
auto result = partition_info{
.uuid = aps.uuid,
.events = aps.synopsis->events,
.max_import_time = aps.synopsis->max_import_time,
.events = events,
.max_import_time = time,
.stats = std::move(aps.stats),
};
// Update the index statistics. We only need to add the events of
Expand All @@ -1265,54 +1269,63 @@ index(index_actor::stateful_pointer<index_state> self,
for (const auto& [name, stats] : result.stats.layouts)
self->state.stats.layouts[name].count += stats.count;
if (keep == keep_original_partition::yes) {
self
->request(self->state.meta_index, caf::infinite, atom::merge_v,
new_partition_id, aps.synopsis)
.then(
[self, rp, new_partition_id, result](atom::ok) mutable {
self->state.persisted_partitions.insert(new_partition_id);
rp.deliver(result);
},
[rp](const caf::error& e) mutable {
rp.deliver(e);
});
if (aps.synopsis)
self
->request(self->state.meta_index, caf::infinite,
atom::merge_v, new_partition_id, aps.synopsis)
.then(
[self, rp, new_partition_id, result](atom::ok) mutable {
self->state.persisted_partitions.insert(new_partition_id);
rp.deliver(result);
},
[rp](const caf::error& e) mutable {
rp.deliver(e);
});
else
rp.deliver(result);
} else {
// Pick one partition id at random to be "transformed", all the
// other ones are "deleted" from the meta index.
auto old_partition_id = old_partition_ids.at(0);
self
->request(self->state.meta_index, caf::infinite,
atom::replace_v, old_partition_id, new_partition_id,
aps.synopsis)
.then(
[self, rp, old_partition_id, new_partition_id,
result](atom::ok) mutable {
self->state.persisted_partitions.insert(new_partition_id);
self
->request(static_cast<index_actor>(self), caf::infinite,
atom::erase_v, old_partition_id)
.then(
[=](atom::done) mutable {
rp.deliver(result);
},
[=](const caf::error& e) mutable {
rp.deliver(e);
// other ones are "deleted" from the meta index. If the new
// partition is empty, all partitions are deleted.
if (aps.synopsis) {
VAST_ASSERT(!old_partition_ids.empty());
auto old_partition_id = old_partition_ids.back();
old_partition_ids.pop_back();
self
->request(self->state.meta_index, caf::infinite,
atom::replace_v, old_partition_id, new_partition_id,
aps.synopsis)
.then(
[self, rp, old_partition_id, new_partition_id,
result](atom::ok) mutable {
self->state.persisted_partitions.insert(new_partition_id);
self
->request(static_cast<index_actor>(self), caf::infinite,
atom::erase_v, old_partition_id)
.then(
[=](atom::done) mutable {
rp.deliver(result);
},
[=](const caf::error& e) mutable {
rp.deliver(e);
});
},
[rp](const caf::error& e) mutable {
rp.deliver(e);
});
} else {
rp.deliver(result);
}
for (auto partition_id : old_partition_ids) {
self
->request(static_cast<index_actor>(self), caf::infinite,
atom::erase_v, partition_id)
.then([](atom::done) { /* nop */ },
[](const caf::error& e) {
VAST_WARN("index failed to erase {} from meta index",
e);
});
},
[rp](const caf::error& e) mutable {
rp.deliver(e);
});
}
for (size_t i = 1; i < old_partition_ids.size(); ++i) {
lava marked this conversation as resolved.
Show resolved Hide resolved
auto partition_id = old_partition_ids[i];
self
->request(self->state.meta_index, caf::infinite, atom::erase_v,
partition_id)
.then([](atom::ok) { /* nop */ },
[](const caf::error& e) {
VAST_WARN("index failed to erase {} from meta index",
e);
});
}
}
},
[rp](const caf::error& e) mutable {
Expand Down
139 changes: 88 additions & 51 deletions libvast/src/system/partition_transformer.cpp
Expand Up @@ -23,6 +23,38 @@

namespace vast::system {

namespace {

void store_or_fulfill(
partition_transformer_actor::stateful_pointer<partition_transformer_state>
self,
partition_transformer_state::stream_data&& stream_data) {
if (std::holds_alternative<std::monostate>(self->state.persist)) {
self->state.persist = std::move(stream_data);
} else {
auto* path_data = std::get_if<partition_transformer_state::path_data>(
&self->state.persist);
VAST_ASSERT(path_data != nullptr, "unexpected variant content");
self->state.fulfill(self, std::move(stream_data), std::move(*path_data));
}
}

void store_or_fulfill(
partition_transformer_actor::stateful_pointer<partition_transformer_state>
self,
partition_transformer_state::path_data&& path_data) {
if (std::holds_alternative<std::monostate>(self->state.persist)) {
self->state.persist = std::move(path_data);
} else {
auto* stream_data = std::get_if<partition_transformer_state::stream_data>(
&self->state.persist);
VAST_ASSERT(stream_data != nullptr, "unexpected variant content");
self->state.fulfill(self, std::move(*stream_data), std::move(path_data));
}
}

} // namespace

// Since we don't have to answer queries while this partition is being
// constructed, we don't have to spawn separate indexer actors and
// stream data but can just compute everything inline here.
Expand Down Expand Up @@ -83,13 +115,22 @@ void partition_transformer_state::fulfill(
self->quit();
return;
}
auto promise = path_data.promise;
if (self->state.events == 0) {
promise.deliver(augmented_partition_synopsis{
.uuid = vast::uuid::nil(),
.stats = std::move(self->state.stats),
.synopsis = nullptr,
});
self->quit();
}
if (!stream_data.partition_chunk) {
path_data.promise.deliver(stream_data.partition_chunk.error());
promise.deliver(stream_data.partition_chunk.error());
self->quit();
return;
}
if (!stream_data.partition_chunk) {
path_data.promise.deliver(stream_data.synopsis_chunk.error());
promise.deliver(stream_data.synopsis_chunk.error());
self->quit();
return;
}
Expand All @@ -103,7 +144,6 @@ void partition_transformer_state::fulfill(
VAST_WARN("could not write transformed synopsis to {}: {}", path,
e);
});
auto promise = path_data.promise;
self
->request(fs, caf::infinite, atom::write_v, path_data.partition_path,
*stream_data.partition_chunk)
Expand All @@ -130,39 +170,14 @@ partition_transformer_actor::behavior_type partition_transformer(
idspace_distributor_actor idspace_distributor,
type_registry_actor type_registry, filesystem_actor fs,
transform_ptr transform) {
const auto* store_plugin = plugins::find<vast::store_plugin>(store_id);
if (!store_plugin) {
self->quit(caf::make_error(ec::invalid_argument,
"could not find a store plugin named {}",
store_id));
return partition_transformer_actor::behavior_type::make_empty_behavior();
}
auto builder_and_header
= store_plugin->make_store_builder(std::move(accountant), fs, id);
if (!builder_and_header) {
self->quit(caf::make_error(ec::invalid_argument,
"could not create store builder for backend {}",
store_id));
return partition_transformer_actor::behavior_type::make_empty_behavior();
}
self->state.data.id = id;
self->state.data.store_id = store_id;
self->state.data.offset = invalid_id;
self->state.data.synopsis = caf::make_copy_on_write<partition_synopsis>();
self->state.data.store_header = builder_and_header->second;
self->state.synopsis_opts = synopsis_opts;
self->state.index_opts = index_opts;
self->state.accountant = std::move(accountant);
self->state.idspace_distributor = std::move(idspace_distributor);
self->state.store_builder = builder_and_header->first;
VAST_ASSERT(self->state.store_builder);
self->state.stage = caf::attach_continuous_stream_stage(
self, [](caf::unit_t&) {},
[](caf::unit_t&, caf::downstream<vast::table_slice>& out,
vast::table_slice x) {
out.push(x);
},
[](caf::unit_t&, const caf::error&) { /* nop */ });
self->state.stage->add_outbound_path(self->state.store_builder);
self->state.fs = std::move(fs);
self->state.type_registry = std::move(type_registry);
// transform can be an aggregate transform here
Expand Down Expand Up @@ -203,16 +218,57 @@ partition_transformer_actor::behavior_type partition_transformer(
if (!self->state.original_import_times.contains(slice_identity))
slice.import_time(self->state.data.synopsis->max_import_time);
self->state.original_import_times.clear();
self->state.events += slice.rows();
auto layout_name = slice.layout().name();
auto& layouts = self->state.stats.layouts;
auto it = layouts.find(layout_name);
if (it == layouts.end())
it = layouts.emplace(std::string{layout_name}, layout_statistics{})
.first;
it.value().count += slice.rows();
self->state.events += slice.rows();
self->state.slices.push_back(std::move(slice));
}
auto stream_data = partition_transformer_state::stream_data{
.partition_chunk = nullptr,
.synopsis_chunk = nullptr,
};
// We're already done if the whole partition got deleted
if (self->state.events == 0) {
store_or_fulfill(self, std::move(stream_data));
return;
}
// ...otherwise, prepare for writing out the transformed data by creating
// a new store, sending it the slices and requesting new idspace.
auto store_id = self->state.data.store_id;
const auto* store_plugin = plugins::find<vast::store_plugin>(store_id);
if (!store_plugin) {
self->state.stream_error
= caf::make_error(ec::invalid_argument,
"could not find a store plugin named {}", store_id);
store_or_fulfill(self, std::move(stream_data));
return;
}
auto builder_and_header = store_plugin->make_store_builder(
self->state.accountant, self->state.fs, self->state.data.id);
if (!builder_and_header) {
self->state.stream_error
= caf::make_error(ec::invalid_argument,
"could not create store builder for backend {}",
store_id);
store_or_fulfill(self, std::move(stream_data));
return;
}
self->state.data.store_header = builder_and_header->second;
self->state.store_builder = builder_and_header->first;
VAST_ASSERT(self->state.store_builder);
self->state.stage = caf::attach_continuous_stream_stage(
self, [](caf::unit_t&) {},
[](caf::unit_t&, caf::downstream<vast::table_slice>& out,
vast::table_slice x) {
out.push(x);
},
[](caf::unit_t&, const caf::error&) { /* nop */ });
self->state.stage->add_outbound_path(self->state.store_builder);
VAST_DEBUG("partition-transformer received all table slices");
self
->request(self->state.idspace_distributor, caf::infinite,
Expand Down Expand Up @@ -274,15 +330,7 @@ partition_transformer_actor::behavior_type partition_transformer(
stream_data.synopsis_chunk = fbs::release(builder);
}
}();
if (std::holds_alternative<std::monostate>(self->state.persist)) {
self->state.persist = std::move(stream_data);
} else {
auto* path_data = std::get_if<partition_transformer_state::path_data>(
&self->state.persist);
VAST_ASSERT(path_data != nullptr, "unexpected variant content");
self->state.fulfill(self, std::move(stream_data),
std::move(*path_data));
}
store_or_fulfill(self, std::move(stream_data));
},
[self](atom::persist, std::filesystem::path partition_path,
std::filesystem::path synopsis_path)
Expand All @@ -295,18 +343,7 @@ partition_transformer_actor::behavior_type partition_transformer(
auto promise
= self->make_response_promise<augmented_partition_synopsis>();
path_data.promise = promise;
// Immediately fulfill the promise if we are already done
// with the serialization.
if (std::holds_alternative<std::monostate>(self->state.persist)) {
self->state.persist = std::move(path_data);
} else {
auto* stream_data
= std::get_if<partition_transformer_state::stream_data>(
&self->state.persist);
VAST_ASSERT(stream_data != nullptr, "unexpected variant content");
self->state.fulfill(self, std::move(*stream_data),
std::move(path_data));
}
store_or_fulfill(self, std::move(path_data));
return promise;
}};
}
Expand Down