Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eliaskosunen committed Feb 5, 2024
1 parent faefc84 commit d0d98b8
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 45 deletions.
2 changes: 1 addition & 1 deletion contrib/tenzir-plugins
21 changes: 9 additions & 12 deletions libtenzir/builtins/contexts/bloom_filter.cpp
Expand Up @@ -49,6 +49,10 @@ class bloom_filter_context final : public virtual context {
bloom_filter_context(uint64_t n, double p) : bloom_filter_{n, p} {
}

auto name() const -> std::string override {
return "bloom-filter";
}

/// Emits context information for every event in `slice` in order.
auto apply(table_slice slice, context::parameter_map parameters) const
-> caf::expected<std::vector<typed_array>> override {
Expand Down Expand Up @@ -163,18 +167,11 @@ class bloom_filter_context final : public virtual context {
.make_query = std::move(query_f)};
}

auto update(chunk_ptr, context::parameter_map)
-> caf::expected<update_result> override {
// TODO: We're getting chunks piece-meal currently and therefore cannot
// determine the boundary of the stream. In theory, we should accumulate the
// chunks until the upstream pipeline is complete. What we need in the
// context API is a signal that we're done.
return ec::unimplemented;
}

auto update(context::parameter_map) -> caf::expected<update_result> override {
return caf::make_error(ec::unimplemented,
"bloom-filter context can not be updated with void");
auto reset(context::parameter_map) -> caf::expected<record> override {
auto params = bloom_filter_.parameters();
TENZIR_ASSERT(params.n && params.p);
bloom_filter_ = dcso_bloom_filter{*params.n, *params.p};
return show();
}

auto save() const -> caf::expected<chunk_ptr> override {
Expand Down
18 changes: 8 additions & 10 deletions libtenzir/builtins/contexts/geoip.cpp
Expand Up @@ -63,7 +63,7 @@ class ctx final : public virtual context {
ctx() noexcept = default;

explicit ctx(context::parameter_map parameters) noexcept {
update(std::move(parameters));
reset(std::move(parameters));
}

~ctx() override {
Expand All @@ -72,6 +72,10 @@ class ctx final : public virtual context {
}
}

auto name() const -> std::string override {
return "geoip";
}

auto entry_data_list_to_list(MMDB_entry_data_list_s* entry_data_list,
int* status, list& l) const
-> MMDB_entry_data_list_s* {
Expand Down Expand Up @@ -378,14 +382,8 @@ class ctx final : public virtual context {
"geoip context can not be updated with events");
}

auto update(chunk_ptr, context::parameter_map)
-> caf::expected<update_result> override {
return caf::make_error(ec::unimplemented, "geoip context can not be "
"updated with bytes");
}

auto update(context::parameter_map parameters)
-> caf::expected<update_result> override {
auto reset(context::parameter_map parameters)
-> caf::expected<record> override {
if (parameters.contains(path_key) and parameters.at(path_key)) {
db_path_ = *parameters[path_key];
}
Expand All @@ -401,7 +399,7 @@ class ctx final : public virtual context {
"'{}': {}",
db_path_, MMDB_strerror(status)));
}
return update_result{.update_info = show(), .make_query = {}};
return show();
}

auto snapshot(parameter_map) const -> caf::expected<expression> override {
Expand Down
16 changes: 7 additions & 9 deletions libtenzir/builtins/contexts/lookup_table.cpp
Expand Up @@ -48,6 +48,10 @@ class ctx final : public virtual context {
// nop
}

auto name() const -> std::string override {
return "lookup-table";
}

/// Emits context information for every event in `slice` in order.
auto apply(table_slice slice, context::parameter_map parameters) const
-> caf::expected<std::vector<typed_array>> override {
Expand Down Expand Up @@ -196,15 +200,9 @@ class ctx final : public virtual context {
.make_query = std::move(query_f)};
}

auto update(chunk_ptr, context::parameter_map)
-> caf::expected<update_result> override {
return caf::make_error(ec::unimplemented, "lookup-table context can not be "
"updated with bytes");
}

auto update(context::parameter_map) -> caf::expected<update_result> override {
return caf::make_error(ec::unimplemented,
"lookup-table context can not be updated with void");
auto reset(context::parameter_map) -> caf::expected<record> override {
context_entries.clear();
return show();
}

auto save() const -> caf::expected<chunk_ptr> override {
Expand Down
1 change: 1 addition & 0 deletions libtenzir/include/tenzir/atoms.hpp
Expand Up @@ -74,6 +74,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(tenzir_atoms, caf::id_block::tenzir_types::end)
TENZIR_ADD_ATOM(replace, "replace")
TENZIR_ADD_ATOM(request, "request")
TENZIR_ADD_ATOM(reserve, "reserve")
TENZIR_ADD_ATOM(reset, "reset")
TENZIR_ADD_ATOM(resolve, "resolve")
TENZIR_ADD_ATOM(resume, "resume")
TENZIR_ADD_ATOM(run, "run")
Expand Down
11 changes: 3 additions & 8 deletions libtenzir/include/tenzir/plugin.hpp
Expand Up @@ -727,6 +727,8 @@ class context {

virtual ~context() noexcept = default;

virtual auto name() const -> std::string = 0;

/// Emits context information for every event in `slice` in order.
virtual auto apply(table_slice slice, parameter_map parameters) const
-> caf::expected<std::vector<typed_array>>
Expand All @@ -740,14 +742,7 @@ class context {
-> caf::expected<update_result>
= 0;

/// Updates the context.
virtual auto update(chunk_ptr bytes, parameter_map parameters)
-> caf::expected<update_result>
= 0;

/// Updates the context.
virtual auto update(parameter_map parameters) -> caf::expected<update_result>
= 0;
virtual auto reset(parameter_map parameters) -> caf::expected<record> = 0;

/// Create a snapshot of the initial expression.
virtual auto snapshot(parameter_map parameters) const
Expand Down
5 changes: 3 additions & 2 deletions nix/tenzir/plugins/source.json
Expand Up @@ -2,7 +2,8 @@
"name": "tenzir-plugins",
"url": "git@github.com:tenzir/tenzir-plugins",
"ref": "main",
"rev": "5bae28d2ed94431c927f0ee8683e87bf9212f7ce",
"rev": "a9d9fdd83ed9e0058fa8b0c06aa723ce4efc0ac3",
"submodules": true,
"shallow": true
"shallow": true,
"allRefs": true
}
3 changes: 3 additions & 0 deletions web/docs/contexts/bloom-filter.md
Expand Up @@ -9,6 +9,9 @@ context create <name> bloom-filter
--capacity <capacity> --fp-probability <probability>
context update <name> --key <field>
context delete <name>
context reset <name>
context load <name>
context save <name>
enrich <name> --field <field>
lookup <name> --field <field>
```
Expand Down
6 changes: 4 additions & 2 deletions web/docs/contexts/geoip.md
Expand Up @@ -6,8 +6,10 @@ A context for enriching IP addresses with geographical data.

```
context create <name> geoip [--db-path <mmdb>]
context update <name> [--db-path <mmdb>]
context delete <name>
context reset <name> [--db-path <mmdb>]
context load <name>
context save <name>
enrich <name> --field <field>
lookup <name> --field <field>
```
Expand All @@ -17,7 +19,7 @@ lookup <name> --field <field>
The `geoip` context uses a [MaxMind](https://www.maxmind.com/) database
to perform IP address lookups.

Run `context update <name> --db-path <mmdb>` to initialize the database at path
Run `context reset <name> --db-path <mmdb>` to initialize the database at path
`<mmdb>`. Omitting `--db-path` causes a reload of a previously initialized
database file.

Expand Down
3 changes: 3 additions & 0 deletions web/docs/contexts/lookup-table.md
Expand Up @@ -9,6 +9,9 @@ data.
context create <name> lookup-table
context update <name> --key <field> [--clear]
context delete <name>
context reset <name>
context load <name>
context save <name>
enrich <name> --field <field>
lookup <name> --field <field>
```
Expand Down
16 changes: 15 additions & 1 deletion web/docs/operators/context.md
Expand Up @@ -15,6 +15,9 @@ Manages a [context](../contexts.md).
context create <name> <context-type>
context delete <name>
context update <name> [<options>]
context reset <name>
context load <name>
context save <name>
```

## Description
Expand All @@ -25,11 +28,22 @@ The `context` operator manages [context](../contexts.md) instances.
returns information about the new context.

- The `delete` command destroys a given context. The pipeline returns
information abou the deleted context.
information about the deleted context.

- The `update` command adds new data to a given context. The pipeline returns
information about what the update performed.

- The `reset` command clears the state of a given context, as if it had just
been created. The pipelines returns information about the cleared context.

- The `load` command takes in bytes, likely previously created with
`context save`, and initializes the context with that data. The pipeline
returns information about the populated context.

- The `save` command outputs the state of the context, serialized into bytes.
The state can be stored somewhere with `save`, and be used to initialize
another context with `context load`.

### `<name>`

The name of the context to create, update, or delete.
Expand Down

0 comments on commit d0d98b8

Please sign in to comment.