Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to parametrize stream/pipeline creation? #456

Closed
anovv opened this issue Oct 4, 2022 · 2 comments
Closed

How to parametrize stream/pipeline creation? #456

anovv opened this issue Oct 4, 2022 · 2 comments

Comments

@anovv
Copy link

anovv commented Oct 4, 2022

Hi, thanks for the great project!

What I'm trying to achieve is to create a custom builder interface/wrapper around streamz to have predefined building blocks of preconfigured streams which can be used together. Example


def _custom_stream_builder_1(input_stream):
    return input_stream.accumulate(_custom_acc, returns_state=True, start=_custom_state)


def _custom_stream_builder_2(input_stream):
    return input_stream.filter(lambda event: _custom_condition)


def _custom_stream_builder_3(input_stream):
    return input_stream.map(lambda event: _custom_map)

stream = Stream()
stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream)))
stream.sink(print)

for event in events:
    stream.emit(event)

However it looks like the code inside those functions does not alter the initial stream object and all emitted events go straight to sink. What am I doing wrong? Can you please point me in right direction?

Another question, what is the difference between


stream = Stream()
stream.sink(print)

for event in events:
    stream.emit(event)

and


stream = Stream()
stream = stream.sink(print) # any other function map/filter/etc. here

for event in events:
    stream.emit(event)

I feel like the answer is somewhere in this example but don't understand where. Thanks!

@martindurant
Copy link
Member

The problem is, you have redefined what stream is, so that when you emit() you are emitting on the last node of your stream graph, not the input.

If you had

stream_in = Stream()
stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream_in)))
stream.sink(print)

for event in events:
    stream_in.emit(event)

you will get the behaviour you are after.

@anovv
Copy link
Author

anovv commented Jan 16, 2023

Thanks @martindurant, that worked!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants