Skip to content

feat(chain): add ParquetSerializer #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

victoria-yining-huang
Copy link
Contributor

@victoria-yining-huang victoria-yining-huang commented Jun 16, 2025

Serializing to Parquet functionality should be provided by the streaming platform.

@victoria-yining-huang victoria-yining-huang force-pushed the vic/add_parquet_serializer branch 2 times, most recently from dfbdab3 to e012443 Compare June 18, 2025 04:36
add code first pass no tests

remove overwrite of Serializer

add pyarrow-stubs

add ignore missing imports

pre commit happy

figure out polars datatypes
@victoria-yining-huang victoria-yining-huang force-pushed the vic/add_parquet_serializer branch from e012443 to 1782aee Compare June 18, 2025 04:50
@victoria-yining-huang victoria-yining-huang marked this pull request as ready for review June 18, 2025 08:21
@victoria-yining-huang victoria-yining-huang changed the title (wip) add parquet serializer add parquet serializer Jun 18, 2025
@victoria-yining-huang victoria-yining-huang changed the title add parquet serializer add ParquetSerializer Jun 18, 2025
@@ -167,6 +173,20 @@ def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step:
)


@dataclass
class ParquetSerializer(Applier[Message[TIn], bytes], Generic[TIn]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try using this class in an example or test to make sure it works?

@victoria-yining-huang victoria-yining-huang changed the title add ParquetSerializer feat(chain): add ParquetSerializer Jun 18, 2025
Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment on the dependencies

Comment on lines +18 to +21
"polars==1.30.0",
"pyarrow==19.0.0",
"pyarrow-stubs==19.0",
"pandas==2.2.3",
Copy link
Collaborator

@fpacifici fpacifici Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not pin versions.
polars>=1.30.0
pyarrow>=19.0.0
pandas>=2.2.3

If you pin versions and the client of the library requires a different version they would not be able to use this library.
Also please set the minimum version to something older. These are the most recent versions of all those packages. This may make things harder for the user as they would have to be up to date with everything without a reason.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you should not need pyarrow-stubs in the main dependencies.
Please add them in the dev section

Comment on lines +18 to +21
"polars==1.30.0",
"pyarrow==19.0.0",
"pyarrow-stubs==19.0",
"pandas==2.2.3",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need pandas only for tests?
If yes please do not add this dependency. We should minimize the dependencies we import in a library. That makes it easier to use in other code bases.

Option 1: do without pandas
Option 2: add it only to the dev section we use for tests.

Comment on lines +18 to +21
"polars==1.30.0",
"pyarrow==19.0.0",
"pyarrow-stubs==19.0",
"pandas==2.2.3",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also consider making this libraries optional dependencies.
https://packaging.python.org/en/latest/guides/writing-pyproject-toml/#dependencies-optional-dependencies

So if snuba does not want to use parquet or arrow together with all their transitive dependencies they do not need to import those libraries. This as well makes it easier for the client to use this library.

This would have impact on how the classes are imported, The ParquetSerializer would have to be in its own module imported by the client. The streaming coded will not be able to import it directly, which is likely ok.


def _get_parquet(msg: Message[Any], schema_fields: PolarsSchema) -> bytes:
df = pl.DataFrame(
[i for i in msg.payload if i is not None],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work? Message[Any] is not known to be iterable to the type checker.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also what's the type of msg.payload ?
If it is pyarrow.Table or pyarrow.Array there should be a from_arrow method https://docs.pola.rs/api/python/stable/reference/api/polars.from_arrow.html#polars-from-arrow which should try to auto infer the types.

If instead it is a python dictionary then why do we need arrow at all ?

@@ -167,6 +173,20 @@ def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step:
)


@dataclass
class ParquetSerializer(Applier[Message[TIn], bytes], Generic[TIn]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure the type of the message is right. Isn't the input message supposed to be a Message[pyarrow.Table] or Message[pyarrow.Array].

Or is it a Sequence of python dictionary as it seems from the unit test ?

if isinstance(schema_fields, PolarsSchema):
return _get_parquet(msg, schema_fields)
else:
polars_schema = _map_arrow_to_polars_schema(schema_fields)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let's do the schema conversion in the build_Step method, which is executed only once at startup rather than doing it at runtime for each message.
Doing it in build_step means that:

  • It is done only once. SO you waste fewer resources
  • If there is an error we learn that before we try to consume. and we can even validate it.

def parquet_serializer(
msg: Message[Any], schema_fields: Union[Sequence[Tuple[str, PADataType]], PolarsSchema]
) -> bytes:
if isinstance(schema_fields, PolarsSchema):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to support both PolarSchema and Sequence[Tuple[str, PADataType] if the ParquetSerializer can only support Sequence[Tuple[str, PADataType] ?

@@ -167,6 +173,20 @@ def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step:
)


@dataclass
class ParquetSerializer(Applier[Message[TIn], bytes], Generic[TIn]):
schema_fields: Sequence[Tuple[str, PADataType]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only ok if the input type is an arrow table/batch/array.
If not then defining the types as arrow data types would not work and also leak the implementation details of how this is implemented.
If the expected payload is a python dictionary the schema cannot be defined with arrow, we need an abstract implementation that can only depend on parquet as this is a parquet serializer. Arrow and polars are implementation details.



def test_parquet_parser_nominal_case() -> None:
schema_fields: Sequence[Tuple[str, Any]] = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please test with a simpler schema.
What this test is testing is whether the parquet serialization works and whether it takes the schema into account.
Any non trivial schema would allow you to accomplish that goal.
This is the whole schema of the sentry error, which is out of the scope of what you are trying to test.
The test would be considerably simpler with a simpler, made up, schema without loosing almost anything.

Comment on lines +237 to +240
# for testing purposes, only compare the contents of the parquet tables
expected_table = pq.read_table(pa.BufferReader(expected))
expected_table = pq.read_table(pa.BufferReader(result))
assert expected_table == expected_table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not compare two tables as the two variables have the same name. The condition would always pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants