feat(flamepy): add fast-path serialization for numpy/arrow types#444
Conversation
Add optimized serialization paths for numpy arrays and PyArrow types to avoid cloudpickle overhead for common data science workloads. - Add type markers to identify serialization format - Use Arrow tensor format for contiguous numpy arrays (~10x faster) - Use Arrow IPC stream for PyArrow Table/RecordBatch/Array - Fall back to cloudpickle for arbitrary Python objects - Maintain backward compatibility with legacy serialized data - Add comprehensive test coverage for all fast-path types
There was a problem hiding this comment.
Code Review
This pull request introduces a fast-path serialization mechanism for NumPy arrays and PyArrow types to improve cache performance, while maintaining a cloudpickle fallback for other objects. Feedback focuses on a potential collision between the new type markers and standard Pickle opcodes which could corrupt legacy data. Additionally, the reviewer suggested optimizing serialization by writing markers directly to the output stream to avoid unnecessary memory copies and identified several unused imports.
| # Using bytes outside printable ASCII to avoid collision with pickle opcodes | ||
| _TYPE_CLOUDPICKLE = b"\x00" # Default: cloudpickle | ||
| _TYPE_NUMPY = b"\x01" # numpy array via Arrow tensor | ||
| _TYPE_ARROW_TABLE = b"\x02" # PyArrow Table via IPC | ||
| _TYPE_ARROW_ARRAY = b"\x03" # PyArrow Array via IPC | ||
| _TYPE_ARROW_BATCH = b"\x04" # PyArrow RecordBatch via IPC |
There was a problem hiding this comment.
The type markers \x00 through \x04 collide with valid Pickle opcodes (e.g., \x00 is MARK in Protocol 0/1). If legacy data (which the code intends to support at line 337) happens to start with one of these bytes, it will be misidentified as a fast-path type or a prefixed cloudpickle. This will lead to a deserialization failure because the first byte will be stripped before calling cloudpickle.loads. Additionally, the comment about printable ASCII is misleading as pickle opcodes utilize the full byte range.
Consider using a more unique multi-byte magic prefix (e.g., b"FLM\x01") or checking for the standard pickle protocol header (\x80) to distinguish legacy data.
| import io | ||
| import logging | ||
| import struct | ||
| import threading | ||
| import uuid | ||
| from collections import OrderedDict | ||
| from dataclasses import asdict, dataclass | ||
| from pathlib import Path | ||
| from typing import Any, Callable, Dict, List, Optional | ||
| from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple |
There was a problem hiding this comment.
The imports io, struct, and Tuple appear to be unused in this file. While Tuple is imported from typing, the code uses the built-in tuple for type hinting (e.g., line 650), which is preferred in modern Python (3.9+).
| import io | |
| import logging | |
| import struct | |
| import threading | |
| import uuid | |
| from collections import OrderedDict | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Any, Callable, Dict, List, Optional | |
| from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple | |
| import logging | |
| import threading | |
| import uuid | |
| from collections import OrderedDict | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional |
References
- Use built-in generics for type hinting (PEP 585) in Python 3.9+. (link)
| sink = pa.BufferOutputStream() | ||
| pa.ipc.write_tensor(tensor, sink) | ||
| return _TYPE_NUMPY + sink.getvalue().to_pybytes() |
There was a problem hiding this comment.
Concatenating the type marker with the serialized bytes (_TYPE_NUMPY + sink.getvalue().to_pybytes()) causes an additional full copy of the data. For large numpy arrays, this significantly increases memory pressure and latency. You can avoid this copy by writing the marker directly into the BufferOutputStream before writing the tensor.
| sink = pa.BufferOutputStream() | |
| pa.ipc.write_tensor(tensor, sink) | |
| return _TYPE_NUMPY + sink.getvalue().to_pybytes() | |
| sink = pa.BufferOutputStream() | |
| sink.write(_TYPE_NUMPY) | |
| pa.ipc.write_tensor(tensor, sink) | |
| return sink.getvalue().to_pybytes() |
| sink = pa.BufferOutputStream() | ||
| with pa.ipc.new_stream(sink, table.schema) as writer: | ||
| writer.write_table(table) | ||
| return _TYPE_ARROW_TABLE + sink.getvalue().to_pybytes() |
There was a problem hiding this comment.
Writing the type marker directly to the sink avoids an unnecessary full copy of the serialized table data.
| sink = pa.BufferOutputStream() | |
| with pa.ipc.new_stream(sink, table.schema) as writer: | |
| writer.write_table(table) | |
| return _TYPE_ARROW_TABLE + sink.getvalue().to_pybytes() | |
| sink = pa.BufferOutputStream() | |
| sink.write(_TYPE_ARROW_TABLE) | |
| with pa.ipc.new_stream(sink, table.schema) as writer: | |
| writer.write_table(table) | |
| return sink.getvalue().to_pybytes() |
| sink = pa.BufferOutputStream() | ||
| with pa.ipc.new_stream(sink, batch.schema) as writer: | ||
| writer.write_batch(batch) | ||
| return _TYPE_ARROW_BATCH + sink.getvalue().to_pybytes() |
There was a problem hiding this comment.
Writing the type marker directly to the sink avoids an unnecessary full copy of the serialized record batch data.
| sink = pa.BufferOutputStream() | |
| with pa.ipc.new_stream(sink, batch.schema) as writer: | |
| writer.write_batch(batch) | |
| return _TYPE_ARROW_BATCH + sink.getvalue().to_pybytes() | |
| sink = pa.BufferOutputStream() | |
| sink.write(_TYPE_ARROW_BATCH) | |
| with pa.ipc.new_stream(sink, batch.schema) as writer: | |
| writer.write_batch(batch) | |
| return sink.getvalue().to_pybytes() |
| sink = pa.BufferOutputStream() | ||
| with pa.ipc.new_stream(sink, batch.schema) as writer: | ||
| writer.write_batch(batch) | ||
| return _TYPE_ARROW_ARRAY + sink.getvalue().to_pybytes() |
There was a problem hiding this comment.
Writing the type marker directly to the sink avoids an unnecessary full copy of the serialized array data.
| sink = pa.BufferOutputStream() | |
| with pa.ipc.new_stream(sink, batch.schema) as writer: | |
| writer.write_batch(batch) | |
| return _TYPE_ARROW_ARRAY + sink.getvalue().to_pybytes() | |
| sink = pa.BufferOutputStream() | |
| sink.write(_TYPE_ARROW_ARRAY) | |
| with pa.ipc.new_stream(sink, batch.schema) as writer: | |
| writer.write_batch(batch) | |
| return sink.getvalue().to_pybytes() |
- Use 4-byte magic prefix (FLM + type byte) instead of single-byte markers to avoid collision with pickle opcodes - Write type marker directly to BufferOutputStream to avoid extra memory copy during serialization - Remove unused imports (io, struct, Tuple)
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
Summary
Changes
sdk/python/src/flamepy/core/cache.py_TYPE_CLOUDPICKLE,_TYPE_NUMPY,_TYPE_ARROW_TABLE, etc.) to identify serialization formatsdk/python/tests/test_cache.pyTestFastPathSerializationclass with 14 new tests covering all fast-path typesPerformance Impact
Testing
All 31 cache tests pass including 14 new fast-path serialization tests.