Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement the put, replace, and extend operators as documented #3089

Merged
merged 20 commits into from
Apr 25, 2023

Conversation

dominiklohmann
Copy link
Member

@dominiklohmann dominiklohmann commented Apr 19, 2023

This new version of put the operator projects, re-orders, and also supports extractors and selectors besides fixed values.

Here's an example:

❯ gunzip -c vast/integration/data/json/sip.log.json.gz \
  | vast exec 'read json | head 1 | put user_agent, schema=#type, time=#import_time, first_ip=:ip | write json' \
  | jq
{
  "user_agent": "friendly-scanner",
  "schema": "b751f83ded9bfc9d",
  "time": "1970-01-01T00:00:00.000000",
  "first_ip": "119.57.72.26"
}

The logic has also been implemented for extend and replace accordingly.

Fixes tenzir/issues#286

@dominiklohmann dominiklohmann added the feature New functionality label Apr 19, 2023
@dominiklohmann dominiklohmann requested a review from a team April 19, 2023 15:36
This new version of the operator projects, re-orders, and also supports
extractors and selectors besides fixed values.

Here's an example:

```
❯ gunzip -c vast/integration/data/json/sip.log.json.gz \
  | vast exec 'read json | head 1 | put user_agent, schema=#type, time=#import_time, first_ip=:ip | write json' \
  | jq
{
  "user_agent": "friendly-scanner",
  "schema": "b751f83ded9bfc9d",
  "time": "1970-01-01T00:00:00.000000",
  "first_ip": "119.57.72.26"
}
```
@tobim
Copy link
Member

tobim commented Apr 19, 2023

The #type and #import_time extractors don't seem to work yet.

@dominiklohmann
Copy link
Member Author

dominiklohmann commented Apr 19, 2023

The #type and #import_time extractors don't seem to work yet.

They do work correctly, but the example has an inferred type so #type is the type fingerprint b751f83ded9bfc9d and the data was never imported, so #import_time is the UNIX epoch.

Arguably for the import time, the function should add nulls if the import time is not available. Edit: I made this change.

@mavam
Copy link
Member

mavam commented Apr 20, 2023

While you're touching this code, would it make sense to add the #node_id selector as well?

@dominiklohmann
Copy link
Member Author

While you're touching this code, would it make sense to add the #node_id selector as well?

I'd say no, but we can do so pretty soon in a follow-up PR. We have this tracked on the roadmap, and this PR is neither the time nor the place for it.

This hasn't been needed for a while now, the last VAST version to write
this did not even have a partition version, and we nowadays only support
partitions with a partition version of at least 1.
@dominiklohmann dominiklohmann changed the title Implement the put operator as documented Re-implement the put, replace, and extend operators as documented Apr 20, 2023
@jachris jachris requested review from jachris and removed request for a team April 20, 2023 10:59
@jachris
Copy link
Contributor

jachris commented Apr 21, 2023

When I tested this, replace with type-extractors (as in replace :ip=123) did not work, presumably because your PR still uses resolve_key_suffix to resolve the extractor.

@dominiklohmann
Copy link
Member Author

dominiklohmann commented Apr 21, 2023

@jachris When I tested this, replace with type-extractors (as in replace :ip=123) did not work, presumably because your PR still uses resolve_key_suffix to resolve the extractor.

Right, that is just missing. I have a fix locally but that is rather complicated. Would you prefer that to be in another PR or still pushed on top of this one?

Edit: We agreed in a quick call to just push this now, which I've just done.

Copy link
Contributor

@jachris jachris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice 👍

@tobim
Copy link
Member

tobim commented Apr 25, 2023

I investigated the web plugin crash a bit more. When building the plugin in standalone debug mode I get an ASAN error when accessing query/new:

vast -e 127.0.0.1:42001 -d it/query-endpoint/node/vast.db/ start --commands="web server --mode=dev --port=42025"
[09:29:08.588] loaded plugin: "/home/tobim/t/vast/put-operator/install/gcc/debug/lib64/vast/plugins/libvast-plugin-parquet.so"
[09:29:08.588] loaded plugin: "/home/tobim/t/vast/put-operator/install/gcc/debug/lib64/vast/plugins/libvast-plugin-pcap.so"
[09:29:08.588] loaded plugin: "/home/tobim/t/vast/put-operator/install/gcc/debug/lib64/vast/plugins/libvast-plugin-sigma.so"
[09:29:08.588] loaded plugin: "/home/tobim/t/vast/put-operator/install/gcc/debug/lib64/vast/plugins/libvast-plugin-web.so"
[09:29:08.830] VAST (v3.0.3-285-g04da29f9c5-dirty) is listening on 127.0.0.1:42001
[09:29:08.832] running post-start command web server --mode=dev --port=42025
[09:29:08.836] client connected to VAST node at 127.0.0.1:42001
[09:29:08.855] server listening on on http://127.0.0.1:42025
[09:29:08.919] index-11 finished initializing and is ready to accept queries
=================================================================
==1145194==ERROR: AddressSanitizer: stack-buffer-underflow on address 0x7f52b88d57d0 at pc 0x000001fddc40 bp 0x7f52b88d57b0 sp 0x7f52b88d57a8
READ of size 8 at 0x7f52b88d57d0 thread T8 (caf.worker)
    #0 0x1fddc3f in operator() libvast/include/vast/pipeline.hpp:347
    #1 0x193f171 in std::__n4861::coroutine_handle<vast::internal::generator_promise<vast::table_slice> >::resume() const /nix/store/wx4nsqbssywgx9zkabwgl9sxm5l61ni3-gcc-12.2.0/include/c++/12.2.0/coroutine:244
    #2 0x192fd24 in vast::generator<vast::table_slice>::begin() libvast/include/vast/generator.hpp:230
    #3 0x1fdf13a in operator() libvast/include/vast/pipeline.hpp:350
    #4 0x193f171 in std::__n4861::coroutine_handle<vast::internal::generator_promise<vast::table_slice> >::resume() const /nix/store/wx4nsqbssywgx9zkabwgl9sxm5l61ni3-gcc-12.2.0/include/c++/12.2.0/coroutine:244
    #5 0x192fd24 in vast::generator<vast::table_slice>::begin() libvast/include/vast/generator.hpp:230
    #6 0x1fdc5da in operator() libvast/include/vast/pipeline.hpp:350
    #7 0x193f171 in std::__n4861::coroutine_handle<vast::internal::generator_promise<vast::table_slice> >::resume() const /nix/store/wx4nsqbssywgx9zkabwgl9sxm5l61ni3-gcc-12.2.0/include/c++/12.2.0/coroutine:244
    #8 0x192fd24 in vast::generator<vast::table_slice>::begin() libvast/include/vast/generator.hpp:230
    #9 0x1ac487c in operator() libvast/builtins/endpoints/query.cpp:431
    #10 0x215fc39 in std::__n4861::coroutine_handle<vast::internal::generator_promise<std::monostate> >::resume() const /nix/store/wx4nsqbssywgx9zkabwgl9sxm5l61ni3-gcc-12.2.0/include/c++/12.2.0/coroutine:244
    #11 0x215f98c in vast::generator<std::monostate>::begin() libvast/include/vast/generator.hpp:230
    #12 0x7f52e44acd34 in make_local_executor libvast/src/pipeline.cpp:263
    #13 0x189ce19 in std::__n4861::coroutine_handle<vast::internal::generator_promise<caf::expected<void> > >::resume() const /nix/store/wx4nsqbssywgx9zkabwgl9sxm5l61ni3-gcc-12.2.0/include/c++/12.2.0/coroutine:244
    #14 0x189ad54 in vast::generator<caf::expected<void> >::begin() libvast/include/vast/generator.hpp:230
    #15 0x1ac730d in query_manager libvast/builtins/endpoints/query.cpp:476
    #16 0x1adbbaa in apply_moved_args_prefixed<caf::typed_behavior<caf::result<void>(vast::atom::provision, vast::system::query_cursor), caf::result<vast::atom::done>(vast::atom::next, vast::http_request, long unsigned int), caf::result<void>(vast::atom::done), caf::result<void>(vast::table_slice)> (*)(caf::stateful_actor<vast::plugins::rest_api::query::query_manager_state, caf::typed_event_based_actor<caf::result<void>(vast::atom::provision, vast::system::query_cursor), caf::result<vast::atom::done>(vast::atom::next, vast::http_request, long unsigned int), caf::result<void>(vast::atom::done), caf::result<void>(vast::table_slice)> >*, caf::typed_actor<caf::result<void>(vast::atom::done, vast::uuid), caf::result<void>(caf::subscribe_atom, caf::flush_atom, caf::typed_actor<caf::result<void>(caf::flush_atom)>), caf::result<void>(caf::subscribe_atom, vast::atom::create, caf::typed_actor<caf::result<void>(vast::atom::update, vast::partition_synopsis_pair), caf::result<void>(vast::atom::update, std::vector<vast::partition_synopsis_pair, std::allocator<vast::partition_synopsis_pair> >)>, vast::system::send_initial_dbstate), caf::result<vast::system::query_cursor>(vast::atom::evaluate, vast::query_context), caf::result<vast::system::catalog_lookup_result>(vast::atom::resolve, vast::expression), caf::result<void>(vast::atom::query, vast::uuid, unsigned int), caf::result<vast::atom::done>(vast::atom::erase, vast::uuid), caf::result<vast::atom::done>(vast::atom::erase, std::vector<vast::uuid, std::allocator<vast::uuid> >), caf::result<std::vector<vast::partition_info, std::allocator<vast::partition_info> > >(vast::atom::apply, vast::pipeline, std::vector<vast::partition_info, std::allocator<vast::partition_info> >, vast::system::keep_original_partition), caf::result<void>(caf::flush_atom), caf::result<caf::inbound_stream_slot<vast::table_slice> >(caf::stream<vast::table_slice>), caf::result<vast::detail::vector_map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, vast::data, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, vast::data> >, vast::detail::stable_map_policy> >(vast::atom::status, vast::system::status_verbosity)>, vast::pipeline, bool, std::chrono::duration<long int, std::ratio<1, 1000000000> >, vast::plugins::rest_api::query::(anonymous namespace)::query_format_options), 0, 1, 2, 3, 4, std::tuple<caf::typed_actor<caf::result<void>(vast::atom::done, vast::uuid), caf::result<void>(caf::subscribe_atom, caf::flush_atom, caf::typed_actor<caf::result<void>(caf::flush_atom)>), caf::result<void>(caf::subscribe_atom, vast::atom::create, caf::typed_actor<caf::result<void>(vast::atom::update, vast::partition_synopsis_pair), caf::result<void>(vast::atom::update, std::vector<vast::partition_synopsis_pair, std::allocator<vast::partition_synopsis_pair> >)>, vast::system::send_initial_dbstate), caf::result<vast::system::query_cursor>(vast::atom::evaluate, vast::query_context), caf::result<vast::system::catalog_lookup_result>(vast::atom::resolve, vast::expression), caf::result<void>(vast::atom::query, vast::uuid, unsigned int), caf::result<vast::atom::done>(vast::atom::erase, vast::uuid), caf::result<vast::atom::done>(vast::atom::erase, std::vector<vast::uuid, std::allocator<vast::uuid> >), caf::result<std::vector<vast::partition_info, std::allocator<vast::partition_info> > >(vast::atom::apply, vast::pipeline, std::vector<vast::partition_info, std::allocator<vast::partition_info> >, vast::system::keep_original_partition), caf::result<void>(caf::flush_atom), caf::result<caf::inbound_stream_slot<vast::table_slice> >(caf::stream<vast::table_slice>), caf::result<vast::detail::vector_map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, vast::data, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, vast::data> >, vast::detail::stable_map_policy> >(vast::atom::status, vast::system::status_verbosity)>, vast::pipeline, bool, std::chrono::duration<long int, std::ratio<1, 1000000000> >, vast::plugins::rest_api::query::(anonymous namespace)::query_format_options>, caf::stateful_actor<vast::plugins::rest_api::query::query_manager_state, caf::typed_event_based_actor<caf::result<void>(vast::atom::provision, vast::system::query_cursor), caf::result<vast::atom::done>(vast::atom::next, vast::http_request, long unsigned int), caf::result<void>(vast::atom::done), caf::result<void>(vast::table_slice)> >*&> /nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/include/caf/detail/apply_args.hpp:64
    #17 0x1ad997a in operator() /nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/include/caf/detail/init_fun_factory.hpp:78
    #18 0x1a26061 in caf::detail::unique_function<caf::behavior (caf::local_actor*)>::operator()(caf::local_actor*) /nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/include/caf/detail/unique_function.hpp:145
    #19 0x1b51c24 in caf::typed_event_based_actor<caf::result<void> (vast::atom::provision, vast::system::query_cursor), caf::result<vast::atom::done> (vast::atom::next, vast::http_request, unsigned long), caf::result<void> (vast::atom::done), caf::result<void> (vast::table_slice)>::make_behavior() /nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/include/caf/typed_event_based_actor.hpp:73
    #20 0x1b519e8 in caf::typed_event_based_actor<caf::result<void> (vast::atom::provision, vast::system::query_cursor), caf::result<vast::atom::done> (vast::atom::next, vast::http_request, unsigned long), caf::result<void> (vast::atom::done), caf::result<void> (vast::table_slice)>::initialize() /nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/include/caf/typed_event_based_actor.hpp:60
    #21 0x7f52dd17fcec in caf::scheduled_actor::activate(caf::execution_unit*) (/nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/lib/libcaf_core.so.0.18.7+0x17fcec)
    #22 0x7f52dd1835b8 in caf::scheduled_actor::resume(caf::execution_unit*, unsigned long) (/nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/lib/libcaf_core.so.0.18.7+0x1835b8)
    #23 0x7f52dd078e3d in caf::scheduler::worker<caf::policy::work_stealing>::run() (/nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/lib/libcaf_core.so.0.18.7+0x78e3d)
    #24 0x7f52dd07920e in std::thread::_State_impl<std::thread::_Invoker<std::tuple<caf::actor_system::launch_thread<caf::scheduler::worker<caf::policy::work_stealing>::start()::{lambda()#1}>(char const*, caf::scheduler::worker<caf::policy::work_stealing>::start()::{lambda()#1})::{lambda(auto:1)#1}, caf::intrusive_ptr<caf::ref_counted> > > >::_M_run() (/nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/lib/libcaf_core.so.0.18.7+0x7920e)
    #25 0x7f52daae0532 in execute_native_thread_routine (/nix/store/2w4k8nvdyiggz717ygbbxchpnxrqc6y9-gcc-12.2.0-lib/lib/libstdc++.so.6+0xe0532)
    #26 0x7f52da688e85 in start_thread (/nix/store/76l4v99sk83ylfwkz8wmwrm4s8h73rhd-glibc-2.35-224/lib/libc.so.6+0x88e85)
    #27 0x7f52da70fd2f in clone3 (/nix/store/76l4v99sk83ylfwkz8wmwrm4s8h73rhd-glibc-2.35-224/lib/libc.so.6+0x10fd2f)

Address 0x7f52b88d57d0 is located in stack of thread T8 (caf.worker) at offset 0 in frame
    #0 0x1fdd811 in operator() libvast/include/vast/pipeline.hpp:347

  This frame has 1 object(s):
    [32, 40) '_Coro_actor_continue' (line 347)
HINT: this may be a false positive if your program uses some custom stack unwind mechanism, swapcontext or vfork
      (longjmp and C++ exceptions *are* supported)
Thread T8 (caf.worker) created by T0 here:
    #0 0x7f52e644d136 in __interceptor_pthread_create (/nix/store/2w4k8nvdyiggz717ygbbxchpnxrqc6y9-gcc-12.2.0-lib/lib/libasan.so.8+0x4d136)
    #1 0x7f52daae0608 in std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (/nix/store/2w4k8nvdyiggz717ygbbxchpnxrqc6y9-gcc-12.2.0-lib/lib/libstdc++.so.6+0xe0608)
    #2 0x7f52dd06e6ae in caf::actor_system::actor_system(caf::actor_system_config&) (/nix/store/1rndga2smcwb3ad762s76kh90c3frp3x-actor-framework-0.18.7/lib/libcaf_core.so.0.18.7+0x6e6ae)
    #3 0x225f096 in main vast/vast.cpp:164
    #4 0x7f52da62924d in __libc_start_call_main (/nix/store/76l4v99sk83ylfwkz8wmwrm4s8h73rhd-glibc-2.35-224/lib/libc.so.6+0x2924d)
    #5 0x7ffed871547e  ([stack]+0x2047e)

SUMMARY: AddressSanitizer: stack-buffer-underflow libvast/include/vast/pipeline.hpp:347 in operator()
Shadow bytes around the buggy address:
  0x0fead7112aa0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0fead7112ab0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0fead7112ac0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0fead7112ad0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0fead7112ae0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
=>0x0fead7112af0: 00 00 00 00 00 00 00 00 00 00[f1]f1 f1 f1 00 f3
  0x0fead7112b00: f3 f3 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0fead7112b10: 00 00 00 00 f1 f1 f1 f1 00 f3 f3 f3 00 00 00 00
  0x0fead7112b20: 00 00 00 00 00 00 00 00 f1 f1 f1 f1 00 f3 f3 f3
  0x0fead7112b30: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
  0x0fead7112b40: 00 00 f1 f1 f1 f1 00 f3 f3 f3 00 00 00 00 00 00
Shadow byte legend (one shadow byte represents 8 application bytes):
  Addressable:           00
  Partially addressable: 01 02 03 04 05 06 07 
  Heap left redzone:       fa
  Freed heap region:       fd
  Stack left redzone:      f1
  Stack mid redzone:       f2
  Stack right redzone:     f3
  Stack after return:      f5
  Stack use after scope:   f8
  Global redzone:          f9
  Global init order:       f6
  Poisoned by user:        f7
  Container overflow:      fc
  Array cookie:            ac
  Intra object redzone:    bb
  ASan internal:           fe
  Left alloca redzone:     ca
  Right alloca redzone:    cb
==1145194==ABORTING

The exact command that triggers it is:

curl -XPOST -H"Content-Type: application/json" -d '{"query": "where 192.168.1.102 | extend foo=123 | replace orig_h=resp_h | put foo, source=orig_h, destination=id.resp_h, schema=#type"}' http://127.0.0.1:42025/api/v0/query/new

The same problem does not occur when the plugin is built as part of VAST.
It also does not occur if the query only contains the where.

@jachris
Copy link
Contributor

jachris commented Apr 25, 2023

This should be fixed with 59f1689.

@dominiklohmann dominiklohmann merged commit bb90407 into main Apr 25, 2023
@dominiklohmann dominiklohmann deleted the topic/put-operator branch April 25, 2023 12:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New functionality
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants