New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(new transform): Add initial transaction
transform
#1991
Conversation
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
fn new(e: LogEvent, strategies: &IndexMap<Atom, MergeStrategy>) -> Self { | ||
Self { | ||
stale_since: Instant::now(), | ||
// TODO: all_fields alternative that consumes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being forced to clone below which can be removed if we add a consuming version of all_fields
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an into_fields
method would be perfectly reasonable to add.
Polling works as documented now thanks to compat layer so ready for review. Added a comment which in my view isn't a killer, but would improve things. Also haven't bothered fixing the |
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking awesome so far! Before merging I think we need to fix up the stream implementation, and it'd also be nice to see some more tests around the individual value mergers. Nothing crazy, but there are definitely some edge cases there I want to make sure we're exercising.
fn new(e: LogEvent, strategies: &IndexMap<Atom, MergeStrategy>) -> Self { | ||
Self { | ||
stale_since: Instant::now(), | ||
// TODO: all_fields alternative that consumes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an into_fields
method would be perfectly reasonable to add.
let poll_flush = move |ctx: &mut Context| -> Poll<Option<Result<StreamEvent, ()>>> { | ||
let p = Pin::new(&mut input_rx).poll_next(ctx); | ||
match p { | ||
Poll::Pending => { | ||
// If our input channel hasn't yielded anything | ||
let now = Instant::now(); | ||
let since_last_flush = now.duration_since(last_flush); | ||
|
||
// But it hasn't been long enough to flush again yet. | ||
if let Some(flush_in) = poll_period.checked_sub(since_last_flush) { | ||
// Wake up when we're ready. | ||
let thread_waker = ctx.waker().clone(); | ||
thread::spawn(move || { | ||
thread::sleep(flush_in); | ||
thread_waker.wake(); | ||
}); | ||
Poll::Pending | ||
} else { | ||
// Otherwise, it has been long enough since the last | ||
// flush to trigger another. | ||
last_flush = now; | ||
Poll::Ready(Some(Ok(StreamEvent::Flush))) | ||
} | ||
} | ||
// Pass `Poll<Option<Result<Event>>>` as `Poll<Option<Result<StreamEvent, ()>>>` | ||
_ => return p.map(|p| p.map(|p| p.map(|p| StreamEvent::Event(p)))), | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to structure this in a simpler way. If I'm understanding correctly, we have two "triggers": incoming events and flush/expiration timers. Incoming events are already modeled as a stream, and we should be able to use a tokio DelayQueue
as a stream of flush/expirations (with the additional benefit of keys, so we don't have to iterate and check our whole state in flush_into
).
With both of those streams available, we should be able to "just" loop and select
over them and do the appropriate flush/flush all/insert action directly (i.e. no need for the StreamEvent
intermediary). We may need to use async-stream
for that (/cc @LucioFranco) and then compat the resulting stream before returning it.
If that all works out (:crossed_fingers:), we can get rid of the thread spawn, most of the manual plumbing, the full state iteration, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a brief play around with creating a flush stream but I backed out as I couldn't see a simple and clean way of closing it once our event stream shuts down (without needing to return futures from transform_stream
).
Not sure I'll have time this week to experiment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is useful or not but @MOZGIII has been working on an expiring type of hashmap for the new file sink https://github.com/timberio/vector/blob/tokio-compat-file-sink/src/expiring_hash_map/mod.rs, this may be useful.
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
Signed-off-by: binarylogic <bjohnson@binarylogic.com>
@lukesteensen @Jeffail where do we sit with the stream changes? Are these absolutely necessary? @Jeffail is transitioning to our other project, so I'd like to limit follow up changes to a day or less. Otherwise, it might make sense to have someone else take this to the finish line. It might also be good practice with the new streaming transform API. |
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
It'd be nice to not have to iterate the state of pending transactions for each event so I think it's worth exploring an alternative. However, my gut feeling is that joining a channel owned by the transform itself within Given the use case for this transform I'm not convinced that these changes are worth blocking it unless someone wants to try it out themselves. If there's a significant and noticeable performance hit from the |
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
Great PR! Please pay attention to the following items before merging: Files matching
Files matching
Files matching
This is an automatically generated QA checklist based on modified files |
This looks great! Does it make sense to add (maybe as a follow-up) an option which would limit the maximum number of events in a single transaction? This would act as protection in case if the transactions are created from data coming from untrusted sources and the downstream sinks can't process transactions which are too large. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing I might consider blocking with the stream stuff is the thread spawn and sleep. That could end up having some weird runtime effects I'd rather avoid. Could we do something similar by spawning a Delay
task, or even better, just having one flush timer that we poll?
/cc @LucioFranco for ideas
From planning: @lukesteensen will be finishing this up. |
@lukesteensen should we close this in favor of #2870. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my fault 😄 , but we should rename this transform for clarity. We'll be directing customers towards this pattern and using a term that aligns with industry terms will benefit us. I'm leaning towards two different names:
canonical
- Already established by Stripe. Devs curious about this can google this term and see that it's an adopted pattern.denormalize
- This is the more correct term, which is also much more clear in it's purpose.
I vote for denormalize
since I don't fully understand why the term "canonical" was used by Stripe. Unless I'm missing technical context, it doesn't accurately describe what's happening.
Closing to avoid future confusion |
This PR adds a new
transaction
transform. In order to flush events periodically (and one final flush during shutdown) it implementstransform_stream
.Closes #1437