-
Notifications
You must be signed in to change notification settings - Fork 34
Asynchronous pipeline, initial integration #105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## main #105 +/- ##
==========================================
+ Coverage 54.01% 54.63% +0.61%
==========================================
Files 46 47 +1
Lines 2662 2709 +47
==========================================
+ Hits 1438 1480 +42
- Misses 1128 1129 +1
- Partials 96 100 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
jpinsonneau
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I have put some questions but not necessary things to update so feel free to merge
pkg/pipeline/pipeline.go
Outdated
| IsRunning bool | ||
|
|
||
| start []*node.Init | ||
| terminal []*node.Terminal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/pipeline/pipeline.go
Outdated
| writes := node.AsTerminal(p.applyWrite) | ||
| extracts := node.AsMiddle(p.applyExtract) | ||
| encodes := node.AsTerminal(p.applyEncode) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At present, @eranra uses the same transformed data to be fed into write and extract (which then feeds into encode). We probably want to combine write and encode to something like export. We may need to maintain the notion of decode since it takes a different format for its input than the other types of stages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the point. I think it's an interesting discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mariomac we talked about a wrapper function that will allow using stages that do have output as AsTerminal phases. So that function will just drop the output of the phase so it will fit when needed.
An example is the Prometheus encode phase ... it does have output and in some cases, for example, debug you want to push the output into another stdout write phase but for other cases, this should be a termination phase ... so with such wrapping, we can support both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I understand better this proposal after another comment from @KalmanMeth .
We can do two things:
- Change the "write_stdout" interface to work as middle node: receives an entry, submits it, forwards it.
- Keep it as now. Just consider that you can currently attach terminal nodes to any stage of the pipeline:
It also divides the Pipeline between Pipeline and builder.
|
|
||
| const chunkLines = 100 | ||
|
|
||
| // FileChunks ingest entries from a file and resends them in chunks of fixed number of lines. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some README tests to describe this new Ingest option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "readme test"? An entry in the README documentation? I see that the ingesters are not documented there
| if len(records) > 0 { | ||
| process(records) | ||
| log.Debugf("ingestKafka sending %d records", len(records)) | ||
| out <- records |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KalmanMeth @mariomac do we even need this timer thing .... now that we have channels in between maybe we can just send the records every time we get one ... why do we need batch all together ?>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Sending batches will anyway improve a bit the performance because you minimize the communication via channels but I think is worth giving a try and see if the simpler code and testing pays the overhead.
In a later PR I have fixed the timer and also made that, when the batch length reaches a given size, it is forwarded without waiting for the timer. We can try removing batches in yet another PR and evaluate the impact in the pipeline benchmark. That would also allow removing the "file_chunk" functionality.
| "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" | ||
| "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform" | ||
| "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write" | ||
| "github.com/netobserv/gopipes/pkg/node" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to include code review of github.com/netobserv/gopipes/pkg/node in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it is merged in its original repo. Since it's another completely independent project, I'd do it in apart. Feel free to open a PR in gopipes with some suggestions, questions, or comments, and I can address them. Or just file some issues to be addressed if you notice any bug or potential improvement.


Wraps the current interfaces into channel loops that are triggered asynchronously in goroutines and communicate each step of the pipeline.
This PR is a starting point to other 2 branches: