Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore types unrelated to the configuration in the summarize plugin #2258

Merged
merged 5 commits into from May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,2 @@
Transform steps removing all nested fields from a record leaving only empty
nested records no longer cause VAST to crash.
3 changes: 3 additions & 0 deletions libvast/include/vast/type.hpp
Expand Up @@ -1234,6 +1234,9 @@ class record_type final : public stateful_type_base {

offset index; ///< The index of the field to transform.
function_type fun; /// The transformation function to apply.

friend std::strong_ordering
operator<=>(const transformation& lhs, const transformation& rhs) noexcept;
};

/// The behavior of the merge function in case of conflicts.
Expand Down
26 changes: 14 additions & 12 deletions libvast/src/arrow_table_slice.cpp
Expand Up @@ -1046,18 +1046,20 @@ std::pair<type, std::shared_ptr<arrow::RecordBatch>> transform_columns(
nested_index.push_back(0);
nested_layer = impl(impl, std::move(nested_layer),
std::move(nested_index), current, sentinel);
auto nested_layout = type{record_type{nested_layer.fields}};
nested_layout.assign_metadata(layer.fields[index.back()].type);
result.fields.emplace_back(layer.fields[index.back()].name,
nested_layout);
auto nested_arrow_fields = arrow::FieldVector{};
nested_arrow_fields.reserve(nested_layer.fields.size());
for (const auto& nested_field : nested_layer.fields)
nested_arrow_fields.push_back(
nested_field.type.to_arrow_field(nested_field.name));
result.arrays.push_back(
arrow::StructArray::Make(nested_layer.arrays, nested_arrow_fields)
.ValueOrDie());
if (!nested_layer.fields.empty()) {
auto nested_layout = type{record_type{nested_layer.fields}};
nested_layout.assign_metadata(layer.fields[index.back()].type);
result.fields.emplace_back(layer.fields[index.back()].name,
nested_layout);
auto nested_arrow_fields = arrow::FieldVector{};
nested_arrow_fields.reserve(nested_layer.fields.size());
for (const auto& nested_field : nested_layer.fields)
nested_arrow_fields.push_back(
nested_field.type.to_arrow_field(nested_field.name));
result.arrays.push_back(
arrow::StructArray::Make(nested_layer.arrays, nested_arrow_fields)
.ValueOrDie());
}
} else {
result.fields.push_back(std::move(layer.fields[index.back()]));
result.arrays.push_back(std::move(layer.arrays[index.back()]));
Expand Down
195 changes: 93 additions & 102 deletions libvast/src/type.cpp
Expand Up @@ -2408,6 +2408,12 @@ type map_type::value_type() const noexcept {

// -- record_type -------------------------------------------------------------

std::strong_ordering
operator<=>(const record_type::transformation& lhs,
const record_type::transformation& rhs) noexcept {
return lhs.index <=> rhs.index;
}

record_type::record_type(const record_type& other) noexcept = default;

record_type& record_type::operator=(const record_type& rhs) noexcept = default;
Expand Down Expand Up @@ -3013,112 +3019,97 @@ record_type::insert_after(std::vector<struct field> fields) noexcept {

std::optional<record_type> record_type::transform(
std::vector<transformation> transformations) const noexcept {
// This function recursively calls do_transform while iterating over all the
// leaves of the record type backwards.
//
// Given this record type:
//
// type x = record {
// a: record {
// b: integer,
// c: integer,
// },
// d: record {
// e: record {
// f: integer,
// },
// },
// }
//
// A transformation of `d.e` will cause `f` and `a` to be untouched and
// simply copied to the new record type, while all the other fields need to
// be re-created. This is essentially an optimization over the naive approach
// that recursively converts the record type into a list of record fields,
// and then modifies that. As an additional benefit this function allows for
// applying multiple transformations at the same time.
//
// This algorithm works by walking over the transformations in reverse order
// (by offset), and unwrapping the record type into fields for all fields
// whose offset is a prefix of the transformations target offset.
// Transformations are applied when unwrapping if the target offset matches
// the field offset exactly. After having walked over a record type, the
// fields are joined back together at the end of the recursive lambda call.
const auto do_transform
= [](const auto& do_transform, const record_type& self, offset index,
auto& current, const auto end) noexcept -> std::optional<record_type> {
if (current == end)
return self;
auto new_fields = std::vector<struct field>{};
new_fields.reserve(self.num_fields());
index.emplace_back(self.num_fields());
while (index.back() > 0 && current != end) {
const auto& old_field = self.field(--index.back());
// Compare the offsets of the next target with our current offset.
const auto [index_mismatch, current_index_mismatch]
= std::mismatch(index.begin(), index.end(), current->index.begin(),
current->index.end());
if (index_mismatch == index.end()
&& current_index_mismatch == current->index.end()) {
// The offset matches exactly, so we apply the transformation.
do {
auto replacements = std::invoke(std::move(current->fun), old_field);
std::move(replacements.rbegin(), replacements.rend(),
std::back_inserter(new_fields));
++current;
} while (current != end && current->index == index);
} else if (index_mismatch == index.end()) {
// The index is a prefix of the target offset for the next
// transformation, so we recurse one level deeper.
VAST_ASSERT(caf::holds_alternative<record_type>(old_field.type));
if (auto sub_result
= do_transform(do_transform, caf::get<record_type>(old_field.type),
index, current, end))
new_fields.push_back({
std::string{old_field.name},
std::move(*sub_result),
});
// Check for invalid arguments on the way in.
VAST_ASSERT(current == end || index != current->index,
"cannot apply transformations to both a nested record type "
"and its children at the same time.");
VAST_ASSERT(std::is_sorted(transformations.begin(), transformations.end()),
"transformations must be sorted by index");
VAST_ASSERT(transformations.end()
== std::adjacent_find(
transformations.begin(), transformations.end(),
[](const auto& lhs, const auto& rhs) noexcept {
const auto [lhs_mismatch, rhs_mismatch]
= std::mismatch(lhs.index.begin(), lhs.index.end(),
rhs.index.begin(), rhs.index.end());
return lhs_mismatch == lhs.index.end();
}),
"transformation indices must not be a subset of the following "
"transformation's index");
lava marked this conversation as resolved.
Show resolved Hide resolved
// The current unpacked layer of the transformation, i.e., the pieces required
// to re-assemble the current layer of both the record type and the record
// batch.
struct unpacked_layer : std::vector<struct record_type::field> {
using vector::vector;
};
const auto impl
= [](const auto& impl, unpacked_layer layer, offset index, auto& current,
const auto sentinel) noexcept -> unpacked_layer {
VAST_ASSERT(!index.empty());
auto result = unpacked_layer{};
// Iterate over the current layer. For every entry in the current layer, we
// need to do one of three things:
// 1. Apply the transformation if the index matches the transformation
// index.
// 2. Recurse to the next layer if the index is a prefix of the
// transformation index.
// 3. Leave the elements untouched.
for (; index.back() < layer.size(); ++index.back()) {
const auto [is_prefix_match, is_exact_match]
= [&]() noexcept -> std::pair<bool, bool> {
if (current == sentinel)
return {false, false};
const auto [index_mismatch, current_index_mismatch]
= std::mismatch(index.begin(), index.end(), current->index.begin(),
current->index.end());
const auto is_prefix_match = index_mismatch == index.end();
const auto is_exact_match
= is_prefix_match && current_index_mismatch == current->index.end();
return {is_prefix_match, is_exact_match};
}();
if (is_exact_match) {
VAST_ASSERT(current != sentinel);
auto new_fields
= std::invoke(std::move(current->fun), record_type::field_view{
layer[index.back()].name,
layer[index.back()].type,
});
for (auto&& field : std::move(new_fields))
result.push_back(std::move(field));
++current;
} else if (is_prefix_match) {
auto nested_layer = unpacked_layer{};
nested_layer.reserve(
caf::get<record_type>(layer[index.back()].type).num_fields());
for (auto&& [name, type] :
caf::get<record_type>(layer[index.back()].type).fields())
nested_layer.push_back({std::string{name}, type});
auto nested_index = index;
nested_index.push_back(0);
nested_layer = impl(impl, std::move(nested_layer),
std::move(nested_index), current, sentinel);
if (!nested_layer.empty()) {
auto nested_layout = type{record_type{nested_layer}};
nested_layout.assign_metadata(layer[index.back()].type);
result.emplace_back(layer[index.back()].name, nested_layout);
}
} else {
// Check for invalid arguments on the way out.
VAST_ASSERT(current_index_mismatch != current->index.end(),
"cannot apply transformations to both a nested record type "
"and its children at the same time.");
// We don't have a match and we also don't have a transformation, so
// we just leave the field untouched.
new_fields.push_back({
std::string{old_field.name},
old_field.type,
});
result.push_back(std::move(layer[index.back()]));
}
}
// In case fbs::type::Type::we exited the loop earlier, we still have to add
// all the remaining fields back to the modified record (untouched).
while (index.back() > 0) {
const auto& old_field = self.field(--index.back());
new_fields.push_back({
std::string{old_field.name},
old_field.type,
});
}
if (new_fields.empty())
return std::nullopt;
type result{};
construct_record_type(result, new_fields.rbegin(), new_fields.rend());
return caf::get<record_type>(result);
return result;
};
// Verify that transformations are sorted in order.
VAST_ASSERT(std::is_sorted(transformations.begin(), transformations.end(),
[](const auto& lhs, const auto& rhs) noexcept {
return lhs.index <= rhs.index;
}));
auto current = transformations.rbegin();
auto result
= do_transform(do_transform, *this, {}, current, transformations.rend());
VAST_ASSERT(current == transformations.rend(), "index out of bounds");
return result;
if (transformations.empty())
return *this;
auto current = transformations.begin();
const auto sentinel = transformations.end();
auto layer = unpacked_layer{};
layer.reserve(num_fields());
for (auto&& [name, type] : fields())
layer.push_back({std::string{name}, type});
// Run the possibly recursive implementation.
layer = impl(impl, std::move(layer), {0}, current, sentinel);
VAST_ASSERT(current == sentinel, "index out of bounds");
// Re-assemble the record type after the transformation.
if (layer.empty())
return {};
return record_type{layer};
}

caf::expected<record_type>
Expand Down
8 changes: 8 additions & 0 deletions plugins/summarize/CHANGELOG.md
Expand Up @@ -6,6 +6,14 @@ This changelog documents all notable changes to the summarize plugin for VAST.
### Breaking Changes

- The `aggregate` plugin is now called `summarize`.
[#2228](https://github.com/tenzir/vast/pull/2228)

### Bug Fixes

- The `summarize` no longer fails when its configuration does not match the
events it's operating on at all, i.e., when all columns are unrelated, and
instead ignores such events.
[#2258](https://github.com/tenzir/vast/pull/2258)

## v1.0.0

Expand Down
29 changes: 24 additions & 5 deletions plugins/summarize/summarize.cpp
Expand Up @@ -202,6 +202,8 @@ struct summary {
group_by_key_hash, group_by_key_equal>;

/// Creates a new summary given a configuration and a layout.
/// @note Returns `caf::no_error` in case the layout and the configuration do
/// not match.
static caf::expected<summary>
make(const configuration& config, const type& layout) {
auto result = summary{};
Expand Down Expand Up @@ -251,7 +253,10 @@ struct summary {
++flat_index;
}
auto adjusted_rt = rt.transform(std::move(drop_transformations));
VAST_ASSERT(adjusted_rt);
// If the schema cannot be adjusted the summary does not apply to the given
// layout. This is not an error.
if (!adjusted_rt)
return caf::no_error;
VAST_ASSERT(!layout.has_attributes());
result.adjusted_layout_ = type{layout.name(), *adjusted_rt};
result.flattened_adjusted_layout_ = flatten(result.adjusted_layout_);
Expand Down Expand Up @@ -605,11 +610,21 @@ class summarize_step : public transform_step {
/// lazily.
[[nodiscard]] caf::error
add(type layout, std::shared_ptr<arrow::RecordBatch> batch) override {
if (ignored_layouts_.contains(layout)) {
ignored_batches_.emplace_back(std::move(layout), std::move(batch));
return caf::none;
}
auto summary = summaries_.find(layout);
if (summary == summaries_.end()) {
auto make_summary_result = summary::make(config_, layout);
if (!make_summary_result)
return make_summary_result.error();
if (!make_summary_result) {
if (make_summary_result.error() != caf::no_error)
return make_summary_result.error();
VAST_ASSERT(!ignored_layouts_.contains(layout));
ignored_layouts_.insert(layout);
ignored_batches_.emplace_back(std::move(layout), std::move(batch));
return caf::none;
}
auto [new_summary, ok]
= summaries_.try_emplace(layout, std::move(*make_summary_result));
VAST_ASSERT(ok);
Expand All @@ -621,8 +636,8 @@ class summarize_step : public transform_step {

/// Retrieves the result of the transformation.
[[nodiscard]] caf::expected<std::vector<transform_batch>> finish() override {
auto result = std::vector<transform_batch>{};
result.reserve(summaries_.size());
auto result = std::exchange(ignored_batches_, {});
result.reserve(result.size() + summaries_.size());
for (auto&& [layout, summary] : summaries_) {
auto summary_result = summary.finish();
if (!summary_result)
Expand All @@ -640,6 +655,10 @@ class summarize_step : public transform_step {

/// A mapping of layout to the configured summary.
std::unordered_map<type, summary> summaries_ = {};

/// A list of layouts that we ignore.
std::unordered_set<type> ignored_layouts_ = {};
std::vector<transform_batch> ignored_batches_ = {};
};

/// The plugin entrypoint for the summarize transform plugin.
Expand Down