Skip to content

feat(chain): add BatchParser #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

victoria-yining-huang
Copy link
Contributor

@victoria-yining-huang victoria-yining-huang commented Jun 12, 2025

this BatchParser is to be used after a Batch step, not to be confused with the Parser step which is to be used before a Batch step.

also updated the example in Batching.py to use this BatchParser, tested successfully locally

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write a unit test for the new parser function before merging

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually now CI does not pass.

Comment on lines 44 to 49
class FakeCodec:
def decode(self, payload: bytes, _: Any) -> Mapping[str, int]:
if payload == b'{"a": 1}':
return {"a": 1}
else:
return {"b": 2}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of returning a fake codec here i would suggest to use Codec class by kafka schemas directly, just to exercise more code. if there is a breaking change in sentry-kafka-schemas (intentional or not), we're more likely to catch it with this test

same for FakeMessage, it seems feasible to me to create a real message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used real classes

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address the comments

Comment on lines 39 to 42
monkeypatch.setattr(
"sentry_streams.pipeline.msg_parser._get_codec_from_msg",
lambda _: JsonCodec(json_schema={}),
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to monkeypatch? Please consider using a real schema like ingest-metrics.
While, in unit tests, you want to keep your test restricted in scope and fast, mocking the schema does not buy you anything. There is no dependency on external systems.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we remove the schema or even just alter it, we'll have to change this. i personally regret using real schemas/datasets in some unit tests in snuba, as i changed/removed some schema and suddenly have to change random irrelevant tests that fully rely on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can hardcode a real schema instead of importing it, then the tests won't be affected if the schema changes. But that doesn't make a difference than using a fake schema imo since they're both hardcoded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ^

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we remove the schema or even just alter it, we'll have to change this. i personally regret using real schemas/datasets in some unit tests in snuba, as i changed/removed some schema and suddenly have to change random irrelevant tests that fully rely on it.

Don't worry sentry-kafka-schemas will not be there for long, we need to decouple that repo from this one (even the licenses are incompatible). So feel free to use a real schema.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@victoria-yining-huang my issue is having to hijack the behavior of the parser by monkeypatching the codec, not the fact that the schema is the real one or not. Any schema is fine, I'd try to avoid monkeypatching because it makes the test cover less code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i used real example and schema imported from sentry_kafka_schema lib for the test now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed monkeypatch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added test cases for no shema or wrong shema. This covers _get_codec_from_msg function

add all draft

remove prints

frmt

move exception block

frmt

typing

typing

input output type

typing

add generic tout

push for riya

type fixes

remove comment

add unit test

add unit test

use real classes

dont cast

fix module path

hardcode a real codec

use real things from sentry kafka schema
Copy link
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the BatchParser output another batch? Should it use a FlatMap instead?

@victoria-yining-huang
Copy link
Contributor Author

victoria-yining-huang commented Jun 18, 2025

@evanh addressed the question "Why does the BatchParser output another batch" in weekly meeting. The design choice for batching is batch in batch out, batches stay intact throughout every single step

re "Should it use a FlatMap instead?" I know currently FlatMap is not fully implemented yet (

def flat_map(self, step: FlatMap, stream: Route) -> Route:
"""
Builds a flat-map operator for the platform the adapter supports.
"""
raise NotImplementedError
) so this will not work. I'm not sure if it eventually should use FlatMap or not tho. @fpacifici ?

@victoria-yining-huang victoria-yining-huang changed the title add BatchParser feat(chain): add BatchParser Jun 18, 2025
@fpacifici
Copy link
Collaborator

I'm not sure if it eventually should use FlatMap or not tho. @fpacifici ?

No The Batch parser is just syntactic sugar over a Batch -> Custom map.
If you want to expand the batch after parsing you would plug a flatMap

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants