-
-
Notifications
You must be signed in to change notification settings - Fork 0
fix(examples): Migrate billing example #146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This changes the billing example to use the new chain syntax instead of the old method. It also changes the signature of the OutcomesBuffer. There is also a test that puts a message through the pipeline and verifies the output.
The test I added doesn't actually work, it fails with this stack trace:
Not sure why this error is happening in the runtime code. |
We did not yet provided a way to run the pipeline on either rust-arroyo or python arroyo end to end in unit tests. Here is the task https://github.com/getsentry/streaming-planning/issues/192. Feel free to pick it up if you want. |
I'm not sure I understand this. The |
I would like us to avoid depending on a real Kafka running for unit tests. Let's use a real kafka only for some integration tests. That causes a lot of issues as seen in snuba and arroyo:
Also your test is running with the python adapter rather than the rust one. We are decommissioning the python one. https://github.com/getsentry/streams/pull/146/files#diff-23f223c3b3e4a5189f9081e41f4ab27d70aa3e4ae1b3f9c07213f6ddc866c32aR44 Though taking a step back. What's the goal of the test you introduced ? If running the examples is meant as a series of integration tests, all good, we want to do this https://github.com/getsentry/streaming-planning/issues/205. But in such case they should be a separate test suit so that we can run them in CI only with the test environment properly setup like sentry acceptance tests. I would like to avoid running all the examples against a real kafka every time an engineer runs the unit tests locally. |
name="myinput", | ||
stream_name="events", | ||
) | ||
.apply("mymap", Map(function=build_outcome)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a Parser
applier rather than a custom map. https://getsentry.github.io/streams/build_pipeline.html#sentry_streams.pipeline.chain.Parser
We have the outcome schema in the schema registry so it should just work https://github.com/getsentry/sentry-kafka-schemas/blob/main/schemas/outcomes.v1.schema.json
As people will refer to these example, we should follow the practices we want to recommend
def run_pipeline(pipeline: Pipeline) -> None: | ||
with open("./sentry_streams/deployment_config/simple_map_filter.yaml", "r") as f: | ||
environment_config = yaml.safe_load(f) | ||
print(":HERE", environment_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this forgotten ?
This means that either:
I'd verify the auto offset reset parameter you are passing to kafka. Initial offset reset parameter can be provided to the consumer in the yaml config file.https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L89-L94 |
I'll remove the test from this, I was trying to do some kind of integration test. I'll put that in a separate PR. |
This changes the billing example to use the new chain syntax instead of the old method. It also
changes the signature of the OutcomesBuffer.
There is also a test that puts a message through the pipeline and verifies the output.