-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(chain): add ParquetSerializer #140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
dfbdab3
to
e012443
Compare
add code first pass no tests remove overwrite of Serializer add pyarrow-stubs add ignore missing imports pre commit happy figure out polars datatypes
e012443
to
1782aee
Compare
@@ -167,6 +173,20 @@ def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: | |||
) | |||
|
|||
|
|||
@dataclass | |||
class ParquetSerializer(Applier[Message[TIn], bytes], Generic[TIn]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you try using this class in an example or test to make sure it works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my comment on the dependencies
"polars==1.30.0", | ||
"pyarrow==19.0.0", | ||
"pyarrow-stubs==19.0", | ||
"pandas==2.2.3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not pin versions.
polars>=1.30.0
pyarrow>=19.0.0
pandas>=2.2.3
If you pin versions and the client of the library requires a different version they would not be able to use this library.
Also please set the minimum version to something older. These are the most recent versions of all those packages. This may make things harder for the user as they would have to be up to date with everything without a reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you should not need pyarrow-stubs
in the main dependencies.
Please add them in the dev
section
"polars==1.30.0", | ||
"pyarrow==19.0.0", | ||
"pyarrow-stubs==19.0", | ||
"pandas==2.2.3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need pandas only for tests?
If yes please do not add this dependency. We should minimize the dependencies we import in a library. That makes it easier to use in other code bases.
Option 1: do without pandas
Option 2: add it only to the dev section we use for tests.
"polars==1.30.0", | ||
"pyarrow==19.0.0", | ||
"pyarrow-stubs==19.0", | ||
"pandas==2.2.3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also consider making this libraries optional dependencies.
https://packaging.python.org/en/latest/guides/writing-pyproject-toml/#dependencies-optional-dependencies
So if snuba does not want to use parquet or arrow together with all their transitive dependencies they do not need to import those libraries. This as well makes it easier for the client to use this library.
This would have impact on how the classes are imported, The ParquetSerializer would have to be in its own module imported by the client. The streaming coded will not be able to import it directly, which is likely ok.
|
||
def _get_parquet(msg: Message[Any], schema_fields: PolarsSchema) -> bytes: | ||
df = pl.DataFrame( | ||
[i for i in msg.payload if i is not None], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work? Message[Any]
is not known to be iterable to the type checker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also what's the type of msg.payload
?
If it is pyarrow.Table
or pyarrow.Array
there should be a from_arrow
method https://docs.pola.rs/api/python/stable/reference/api/polars.from_arrow.html#polars-from-arrow which should try to auto infer the types.
If instead it is a python dictionary then why do we need arrow at all ?
@@ -167,6 +173,20 @@ def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: | |||
) | |||
|
|||
|
|||
@dataclass | |||
class ParquetSerializer(Applier[Message[TIn], bytes], Generic[TIn]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure the type of the message is right. Isn't the input message supposed to be a Message[pyarrow.Table]
or Message[pyarrow.Array]
.
Or is it a Sequence of python dictionary as it seems from the unit test ?
if isinstance(schema_fields, PolarsSchema): | ||
return _get_parquet(msg, schema_fields) | ||
else: | ||
polars_schema = _map_arrow_to_polars_schema(schema_fields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let's do the schema conversion in the build_Step
method, which is executed only once at startup rather than doing it at runtime for each message.
Doing it in build_step
means that:
- It is done only once. SO you waste fewer resources
- If there is an error we learn that before we try to consume. and we can even validate it.
def parquet_serializer( | ||
msg: Message[Any], schema_fields: Union[Sequence[Tuple[str, PADataType]], PolarsSchema] | ||
) -> bytes: | ||
if isinstance(schema_fields, PolarsSchema): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to support both PolarSchema
and Sequence[Tuple[str, PADataType]
if the ParquetSerializer can only support Sequence[Tuple[str, PADataType]
?
@@ -167,6 +173,20 @@ def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step: | |||
) | |||
|
|||
|
|||
@dataclass | |||
class ParquetSerializer(Applier[Message[TIn], bytes], Generic[TIn]): | |||
schema_fields: Sequence[Tuple[str, PADataType]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only ok if the input type is an arrow table/batch/array.
If not then defining the types as arrow data types would not work and also leak the implementation details of how this is implemented.
If the expected payload is a python dictionary the schema cannot be defined with arrow, we need an abstract implementation that can only depend on parquet as this is a parquet serializer. Arrow and polars are implementation details.
|
||
|
||
def test_parquet_parser_nominal_case() -> None: | ||
schema_fields: Sequence[Tuple[str, Any]] = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please test with a simpler schema.
What this test is testing is whether the parquet serialization works and whether it takes the schema into account.
Any non trivial schema would allow you to accomplish that goal.
This is the whole schema of the sentry error, which is out of the scope of what you are trying to test.
The test would be considerably simpler with a simpler, made up, schema without loosing almost anything.
# for testing purposes, only compare the contents of the parquet tables | ||
expected_table = pq.read_table(pa.BufferReader(expected)) | ||
expected_table = pq.read_table(pa.BufferReader(result)) | ||
assert expected_table == expected_table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not compare two tables as the two variables have the same name. The condition would always pass.
Serializing to Parquet functionality should be provided by the streaming platform.