Skip to content

fix(examples): Migrate billing example #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

evanh
Copy link
Member

@evanh evanh commented Jun 18, 2025

This changes the billing example to use the new chain syntax instead of the old method. It also
changes the signature of the OutcomesBuffer.

There is also a test that puts a message through the pipeline and verifies the output.

This changes the billing example to use the new chain syntax instead of the old method. It also
changes the signature of the OutcomesBuffer.

There is also a test that puts a message through the pipeline and verifies the output.
@evanh
Copy link
Member Author

evanh commented Jun 18, 2025

The test I added doesn't actually work, it fails with this stack trace:

Traceback (most recent call last):
  File "/Users/evanhicks/code/getsentry/streams/sentry_streams/.venv/lib/python3.13/site-packages/arroyo/processing/processor.py", line 334, in run
    self._run_once()
    ~~~~~~~~~~~~~~^^
  File "/Users/evanhicks/code/getsentry/streams/sentry_streams/.venv/lib/python3.13/site-packages/arroyo/processing/processor.py", line 405, in _run_once
    self.__message = self.__consumer.poll(timeout=1.0)
                     ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^
  File "/Users/evanhicks/code/getsentry/streams/sentry_streams/.venv/lib/python3.13/site-packages/arroyo/backends/kafka/consumer.py", line 427, in poll
    raise OffsetOutOfRange(str(error))
arroyo.errors.OffsetOutOfRange: KafkaError{code=_AUTO_OFFSET_RESET,val=-140,str="fetch failed due to requested offset not available on the broker: Broker: Offset out of range (broker 1001)"}

Not sure why this error is happening in the runtime code.

@fpacifici
Copy link
Collaborator

The test I added doesn't actually work, it fails with this stack trace:

We did not yet provided a way to run the pipeline on either rust-arroyo or python arroyo end to end in unit tests.
That would have to be done via the in-memory Arroyo broker.

Here is the task https://github.com/getsentry/streaming-planning/issues/192. Feel free to pick it up if you want.
Till then. trying to run the full pipeline in unit tests is unlikely to be useful

@evanh
Copy link
Member Author

evanh commented Jun 18, 2025

That would have to be done via the in-memory Arroyo broker.

I'm not sure I understand this. The runner.py file is calling runtime.run(), why can't a test do that? I have Kafka running locally, why would I need to run anything else?

@fpacifici
Copy link
Collaborator

fpacifici commented Jun 18, 2025

I have Kafka running locally, why would I need to run anything else?

I would like us to avoid depending on a real Kafka running for unit tests. Let's use a real kafka only for some integration tests. That causes a lot of issues as seen in snuba and arroyo:

  • Tests are really slow. Using the in memory broker allows you to run a ton of corner cases tests quickly.
  • They become flaky due to the asynchronous nature of the kafka rebalancing protocol, produce and consume. Your test may get stuck waiting to get a partition.
  • The stateful nature of kafka means that either you create and destroy topics at every run (which adds a lot of latency) or you risk finding spurious tests from a previous one so you can only see the first failure when troubleshooting.

Also your test is running with the python adapter rather than the rust one. We are decommissioning the python one. https://github.com/getsentry/streams/pull/146/files#diff-23f223c3b3e4a5189f9081e41f4ab27d70aa3e4ae1b3f9c07213f6ddc866c32aR44

Though taking a step back. What's the goal of the test you introduced ? If running the examples is meant as a series of integration tests, all good, we want to do this https://github.com/getsentry/streaming-planning/issues/205. But in such case they should be a separate test suit so that we can run them in CI only with the test environment properly setup like sentry acceptance tests. I would like to avoid running all the examples against a real kafka every time an engineer runs the unit tests locally.

name="myinput",
stream_name="events",
)
.apply("mymap", Map(function=build_outcome))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a Parser applier rather than a custom map. https://getsentry.github.io/streams/build_pipeline.html#sentry_streams.pipeline.chain.Parser

We have the outcome schema in the schema registry so it should just work https://github.com/getsentry/sentry-kafka-schemas/blob/main/schemas/outcomes.v1.schema.json

As people will refer to these example, we should follow the practices we want to recommend

def run_pipeline(pipeline: Pipeline) -> None:
with open("./sentry_streams/deployment_config/simple_map_filter.yaml", "r") as f:
environment_config = yaml.safe_load(f)
print(":HERE", environment_config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this forgotten ?

@fpacifici
Copy link
Collaborator

arroyo.errors.OffsetOutOfRange: KafkaError{code=_AUTO_OFFSET_RESET,val=-140,str="fetch failed due to requested offset not available on the broker: Broker: Offset out of range (broker 1001)"}

This means that either:

  • the consumer was started with auto-offset-reset = error and there was no offset in kafka (new topic or consumer group? )
  • the consumer was started with auto-offset-reset = error and the committed offset was already collected and dropped 24 hours after it was created.

I'd verify the auto offset reset parameter you are passing to kafka. Initial offset reset parameter can be provided to the consumer in the yaml config file.https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L89-L94

@evanh
Copy link
Member Author

evanh commented Jun 19, 2025

I'll remove the test from this, I was trying to do some kind of integration test. I'll put that in a separate PR.

@evanh evanh requested a review from fpacifici June 19, 2025 17:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants