Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(observability): Initial API for tap #6363

Closed
wants to merge 43 commits into from
Closed

Conversation

leebenson
Copy link
Member

@leebenson leebenson commented Feb 5, 2021

Introduces the beginnings of tap - a way to observe event::LogEvents via the API.

The intended use-case is an (upcoming) vector tap CLI subcommand + UI dashboard.

Demo

14 second demo (no sound):

Note that this shows a Subscription.events field; this was changed to Subscription.logEvents:

https://www.loom.com/share/9bceb26e5f37447da3834bd346047487

4 min version (narrated):

This expands on the above demo with commentary, and additional fields that have since been added:

https://www.loom.com/share/b543370c88f84e86869f99286060e189

Current scope

This PR adds an initial Subscription.logEvents subscription, which returns all log events generated by a source or transform component, identified by componentName: String.

Later work will include:

  • Handling reloads. There's currently no mechanism to re-register components that may have changed.
  • Limiting velocity to X first/last/sampled events.
  • Filtering events based on matched content, similar to the filter interfaced introduced in enhancement(graphql api): Filtering in the API #6028.
  • Adding type-specific field getters to the graph, to return arbitrarily nested keys on LogEvent.
  • Add a vector tap client to consume the API.
  • UI dashboards that will communicate over an active WebSocket to observe the logs graphically.
  • Multiplexing tails over a single subscription.

Example query

Assuming this configured source:

[sources.gen1]
  type = "generator"
  lines = ["Hello from Vector"]
  batch_interval = 1.0
  format = "shuffle"

The following subscription query:

subscription {
  logEvents(componentName:"gen1") {
    message
    timestamp
    json
    yaml
  }
}

Will return payloads like the following, at 1 second intervals (controlled via batch_interval = 1.0)

{
  "data": {
    "logEvents": {
      "json": "{\"message\":\"Hello from Vector\",\"timestamp\":\"2021-02-07T13:16:49.602623Z\"}",
      "message": "Hello from Vector",
      "timestamp": "2021-02-07T13:16:49.602623+00:00",
      "yaml": "---\nmessage: Hello from Vector\ntimestamp: \"2021-02-07T13:16:49.602623Z\""
    }
  }
}

Later work will include the ability to override the natural velocity of events, sample at user-defined intervals.

Implementation

As event::LogEvents are emitted via sources/transforms, a new event::EventInspector inspects the events and uses a tokio broadcast channel to send them to subscribers.

In the API, a listener is created ad-hoc based on an active subscription.

Based on the discussion in #3212, I initially attempted to model the emitter as a sink to use the existing fanout pattern. This seemed unfeasible - the StreamSink trait's run method receives an event without awareness of its origin. Since a key characteristic of tap is the ability to narrow the scope to particular components, this appeared to require wholesale changes to existing implementors, which felt redundant and potentially wasteful given the narrow use-case.

Instead, I adopted an existing pattern where an EventProcessed is emitted via an .inspect() call on the input rx channel itself, prior to being received by the downstream component. One side-effect to this is that clients technically receive the event before a connecting event has had chance to receive it. I felt this is okay though, given that the component being inspected is the one that actually emits the event; there's no concept of downstream ack'ing that I felt we needed to cater for.

Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
@leebenson leebenson requested review from a team, lukesteensen, binarylogic, pablosichert, jszwedko, MOZGIII and sghall and removed request for a team, lukesteensen, binarylogic and pablosichert February 5, 2021 11:29
@leebenson
Copy link
Member Author

leebenson commented Feb 5, 2021

Reviewers:

  • @jszwedko - For your general keen eyes, and the GraphQL patterns used here. I've initially left the LogEvent type fairly open-ended for future expansion, but you may have some extra thoughts.

  • @MOZGIII - sanity check on the approach for observing events, particularly the checks around ensuring event cloning is limited only when there are active listeners (here). We briefly discussed using atomic types to determine low-cost checks; I think that's the case here (that's actually not the case; it uses a Mutex), but would appreciate your thoughts.

  • @sghall - any thoughts on working with the GraphQL in the UI would be appreciated. This is the first step of a wider upgrade path toward filtering, limiting velocity, etc (described in the PR notes above), so treat this as early work. But any requirements I'm overlooking, please advise and I'll factor those in to the schema design.

@@ -116,8 +116,8 @@ goauth = { version = "0.8.1", optional = true }
smpl_jwt = { version = "0.5.0", optional = true }

# API
async-graphql = { version = "2.4.10", optional = true }
async-graphql-warp = { version = "2.4.10", optional = true }
async-graphql = { version = "=2.4.11", optional = true }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one-liner took a surprisingly long time to figure out :-)

The 2.5 branch of async-graphql bumps to Warp 0.3, which changes the signature of Warp filters and interacting with the GraphQL pieces. The bump was needed because of an issue with Context I was running into (the mechanism used to pass the event inspector to the active HTTP request without resorting to a global; I'm trying to move away from the latter.)

Unfortunately, we have other code relying on 0.2.5, which broke a lot of things. After some trial/error, it seems this particular patch version is both stapled to the right Warp version for us, and fixes the Context issue.

I'd like to follow-up with a general bump to Warp 0.3 later to avoid being stuck on this rung of async-graphql (and also because it's the latest anyway.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 do you mind opening an issue for the warp upgrade?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#6383 👍

@@ -216,6 +216,7 @@ tokio-postgres = { version = "0.5.5", optional = true, features = ["runtime", "w
thread_local = "= 1.0.1"
dashmap = "3"
encoding_rs = { version = "0.8", features = ["serde"] }
parking_lot = "0.11.1"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using parking_lot here for its RwLock implementation, which according to the readme:

The numbers for RwLock vary depending on the number of reader and writer threads, but are almost always faster than the standard library RwLock, and even up to 50x faster in some cases.

Given this claim, I was surprised it wasn't being used in other places where we've reached for the stdlib version. There may be design decisions/considerations I'm not privy to that preclude its use - please advise if this sticks out as being undesirable.

If this holds up as a decent choice, there's further usage in API code I could swap it out for.

impl LogEvent {
/// Log message
async fn message(&self) -> Option<String> {
serde_json::from_value(json!(self.0.get("message")?)).ok()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit round the houses to extract a String. Is there a way to avoid the initial serialisation to a JSON string? I'm pretty sure I'm missing something obvious.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you probably want to match:

match self.0.get("message").and_then(|message| match {
  v @ Value::Bytes(_) => Some(v.to_string_lossy()),
  _ => None,
})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I actually missed an existing method of event::Value that reduces this to:

Some(self.0.get("message")?.to_string_lossy())

&'a self,
ctx: &'a Context<'a>,
component_name: String,
) -> impl Stream<Item = LogEvent> + 'a {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see this eventually being extracted out into a general filter, but think it's okay inline for now.

src/sinks/tap.rs Outdated Show resolved Hide resolved
leebenson and others added 3 commits February 7, 2021 20:09
Co-authored-by: Pablo Sichert <mail@pablosichert.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
@sghall
Copy link
Member

sghall commented Feb 8, 2021

Hey @leebenson I'm trying this out. I can't get it to work with a remap transform. Are you thinking that should work?

Update: I wasn't able to get this to work with transforms. I tried the json-parser and the remap transforms. It does mention transforms above, but I'll wait to hear back from Lee.

Also, RE

Currently, running a subscription { logEvents(componentName:"bla") } against an unknown component, simply terminates the subscription. This is the default behaviour when a stream has been exhausted... and since the receiver isn't there to return a Some(LogEvent), that's exactly what's happening right now.

I kinda think a non-existent components should return an error.

@leebenson
Copy link
Member Author

Thanks @sghall. The shift to a sink may have dislodged something with transforms -- will audit tomorrow. Thanks for checking this out.

👍 on returning an error, too.

src/app.rs Outdated
let tap_controller = if cfg!(feature = "api") {
let tap_controller = TapController::new();

let tap_sinks = config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking the issue with transforms is here. I tried this locally and seemed to do the trick:

let tap_sinks = config
    .sources
    .keys().chain(config.transforms.keys())
    .map(|name| {
        (format!("_tap_{}", &name), tap_controller.make_sink(name))
    })
    .collect::<IndexMap<String, SinkOuter>>();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that’s it! Nicely caught 🎉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe the tap is surviving a config change (hot update). Oh, yep, you said that..

Handling reloads. There's currently no mechanism to re-register components that may have changed.

Think that would be a nice follow up or include it here. I think that's definitely needed if you a trying to use this to debug your config.

@leebenson leebenson marked this pull request as draft February 9, 2021 11:36
@leebenson
Copy link
Member Author

Putting this back into draft for now, to implement the reload plumbing.

Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
@leebenson
Copy link
Member Author

I think this should be alright to review.

I've done some refactoring to work with existing topology plumbing, and attempted to limit the surface area of tap, so that it's mostly an implementation detail of building config and not something that needs directly configuring in app.rs.

I've also attempted to improve the log output to make it clearer what's being instantiated -- e.g (full example):

➜  vector git:(leebenson/tap) target/debug/vector -w 1 -c ~/Desktop/vector.toml
Feb 10 19:34:23.065  INFO vector::app: Log level is enabled. level="vector=info,codec=info,file_source=info,tower_limit=trace,rdkafka=info"
Feb 10 19:34:23.066  INFO vector::config::watcher: Creating configuration file watcher.
Feb 10 19:34:23.068  INFO vector::config::watcher: Watching configuration files.
Feb 10 19:34:23.068  INFO vector::app: Loading configs. path=[("/Users/leebenson/Desktop/vector.toml", None)]
Feb 10 19:34:23.084  INFO vector::topology: Running healthchecks.
Feb 10 19:34:23.085  INFO vector::topology: Starting source. name="wifi"
Feb 10 19:34:23.091  INFO vector::topology::builder: Healthcheck: Passed.
Feb 10 19:34:23.091  INFO vector::topology: Starting source. name="gen2"
Feb 10 19:34:23.097  INFO source{component_kind="source" component_name=wifi component_type=file}: vector::sources::file: Starting file server. include=["/var/log/wifi*.log"] exclude=[]
Feb 10 19:34:23.097  INFO vector::topology: Starting transform. name="json"
Feb 10 19:34:23.097  INFO vector::topology: Starting sink. name="blackhole2"
Feb 10 19:34:23.097  INFO vector::topology: Connecting tap component="wifi" sink="3813bf66-948b-4578-9211-d8a577167fe8"
Feb 10 19:34:23.098  INFO vector::topology: Connecting tap component="gen2" sink="505e7f25-5e7c-421b-9eb6-bf5b9332ad33"
Feb 10 19:34:23.098  INFO vector::topology: Connecting tap component="json" sink="04672559-60d2-4c6d-bc18-3b3ed10d42ac"
Feb 10 19:34:23.098  INFO vector: Vector has started. version="0.12.0" git_version="v0.11.0-557-g24af539d" released="Wed, 10 Feb 2021 19:27:16 +0000" arch="x86_64"
Feb 10 19:34:23.098  INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=http://127.0.0.1:8686/playground
Feb 10 19:34:23.099  INFO source{component_kind="source" component_name=wifi component_type=file}:file_server: file_source::checkpointer: Loaded checkpoint data.
Feb 10 19:34:23.110  INFO source{component_kind="source" component_name=wifi component_type=file}:file_server: vector::internal_events::file::source: Resuming to watch file. path=/var/log/wifi.log file_position=92033

Here, the following lines are pertinent during startup:

Feb 10 19:34:23.097  INFO vector::topology: Connecting tap component="wifi" sink="3813bf66-948b-4578-9211-d8a577167fe8"
Feb 10 19:34:23.098  INFO vector::topology: Connecting tap component="gen2" sink="505e7f25-5e7c-421b-9eb6-bf5b9332ad33"
Feb 10 19:34:23.098  INFO vector::topology: Connecting tap component="json" sink="04672559-60d2-4c6d-bc18-3b3ed10d42ac"

If json in this example changes its input from gen2 -> gen3, we get:

Feb 10 19:37:25.593  INFO vector::topology: Starting source. name="gen3"
Feb 10 19:37:25.593  INFO vector::topology: Rebuilding transform. name="json"
Feb 10 19:37:25.593  INFO vector::topology: Removing tap component="gen2"
Feb 10 19:37:25.593  INFO vector::topology: Connecting tap component="gen3" sink="99e6a2ad-fcb4-46f0-9c57-e420e0818045"
Feb 10 19:37:25.593  INFO vector::topology: Connecting tap component="json" sink="04672559-60d2-4c6d-bc18-3b3ed10d42ac"

This effectively:

  • Keeps the tapped json sink running over reloads, so active subscriptions aren't interrupted.
  • Re-wires gen2 -> gen3 as an input; this will appear as one continuous stream to the client, so no events are missed.
  • Re-instantiates both taps (on gen3 and json respectively) as sinks in topology.

Additionally:

  • In order to avoid introducing secondary behaviour for 'internal' sinks such as tap, I'm generating a UUID for each tap sink in RunningTopology.
  • Since neither config nor a config diff are mutated any longer, running an API query for sinks won't return tap sinks.

Follow-up work

There's at least a couple of areas I'd like to improve on in follow-up work:

  • When a component name doesn't exist, an error should be returned instead of just ending the subscription.
  • Add control messages, so that when a component goes away, the receiver can terminate an active subscription.

The other work mentioned in the opening PR notes will also be tackled separately - filtering, velocity control, etc.

@leebenson leebenson marked this pull request as ready for review February 10, 2021 19:58
&self,
cx: SinkContext,
) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
let mut lock = self.locked_tx.write();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the blocking mutex in the async context will lead to reactor locking. I suggest using async-aware mutexes - they'll park the task while waiting, rather than blocking the thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a suggested lib/lock that you'd recommend in this scenario? I saw https://lib.rs/crates/future-parking_lot, but have no experience with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Or at least the current tokio version equivalent of it.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, tokio mutexes are good! Also, https://docs.rs/futures/0.3.12/futures/lock/struct.Mutex.html, and a potentially more interesting https://docs.rs/futures/0.3.12/futures/lock/struct.BiLock.html - it's more lightweight than the usual mutex, but it is unconventional.
There's also an https://docs.rs/arc-swap/1.2.0/arc_swap/ - it's also doesn't block, so it's fine to use within async context.

@@ -212,8 +212,9 @@ tokio-postgres = { version = "0.5.5", features = ["runtime", "with-chrono-0_4"],
toml = "0.5.8"
typetag = "0.1.6"
url = "2.2.0"
uuid = { version = "0.8", features = ["serde", "v4"], optional = true }
uuid = { version = "0.8", features = ["serde", "v4"] }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realise I'm undoing a recent PR to add uuid as optional. The crate size is ~37kb, so I don't think this adds much bloat. This prevents having to add feature guards in a few places. If this is contentious, I can add the guards.

Signed-off-by: Lee Benson <lee@leebenson.com>
@lukesteensen
Copy link
Member

Based on the discussion in #3212, I initially attempted to model the emitter as a sink to use the existing fanout pattern. This seemed unfeasible - the StreamSink trait's run method receives an event without awareness of its origin. Since a key characteristic of tap is the ability to narrow the scope to particular components, this appeared to require wholesale changes to existing implementors, which felt redundant and potentially wasteful given the narrow use-case.

I'm not sure I follow this line of reasoning. Sinks very much narrow the scope of their input to particular components, just not via the sink API itself. It seems like we're reimplementing some existing functionality of the topology here.

In RunningTopology we keep an outputs field that's a map of component names to a fanout::ControlChannel. The purpose of these control channels is to dynamically connect new components into the running topology. During a config reload that adds a new sink, for example, what roughly happens is the following.

let new_sink = config.build();
let (tx, rx) = mpsc::channel();
let sink_task = tokio::spawn(new_sink.run(rx));
for input in config.inputs {
    topology.outputs[input].send(ControlMessage::Add(config.name, tx.clone()))
}

And with that, the output of the components specified in inputs are forwarded to our new sink.

Important for the tap use case is that the fanout component here deals with futures::Sink, not our user-level SinkConfig sinks. So you can easily add a channel sender as above, or a custom futures::Sink with load shedding, throttling, extra filtering, etc.

How I imagined this working was roughly that the API endpoint would have a reference to the current RunningTopology, receive a tap request that contains the component names to connect to (potentially more, but at least that), build one of these simple tap sinks wired up to the ws connection, and add it to the relevant fanout instances. Then when the connection is dropped or otherwise times out, it would send the appropriate ControlMessage::Remove signal.

This would be simpler with a somewhat more ergonomic API around the topology, but fundamentally it should be possible without introducing any tap-specific concepts into the topology code itself. This would really be ideal, because the code is already somewhat hard to follow and simplifying it will only get more difficult if we pile in additional concepts.

This certainly isn't the only way to build this, as you've demonstrated, but I'm not convinced I'd explained it properly and wanted to make sure we at least consider this approach thoroughly before deciding to add to the topology as we're doing here.

@leebenson
Copy link
Member Author

I'm not sure I follow this line of reasoning. Sinks very much narrow the scope of their input to particular components, just not via the sink API itself. It seems like we're reimplementing some existing functionality of the topology here.

@lukesteensen - please ignore the original PR notes regarding this not being viable as a sink. I wasn't thinking about it in the right way. This is no longer relevant.

Thanks for your other notes re: simplifying the connection between topology and tap. That makes a lot of sense. I'll give this a play today.

@leebenson leebenson marked this pull request as draft February 11, 2021 13:39
@leebenson
Copy link
Member Author

Based on feedback here, I wound up working from a clean branch to reimplement.

This has drifted quite a lot, and I'll open a new PR when ready -- closing.

@leebenson leebenson closed this Mar 1, 2021
@leebenson leebenson deleted the leebenson/tap branch March 1, 2021 14:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: api Anything related to Vector's GraphQL API domain: observability Anything related to monitoring/observing Vector type: enhancement A value-adding code change that enhances its existing functionality.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants