Skip to content

Conversation

@dayesouza
Copy link
Contributor

@dayesouza dayesouza commented Feb 11, 2026

This pull request introduces a new streaming table abstraction to the storage layer, enabling efficient row-by-row access and transformation for both CSV and Parquet backends. The changes add a unified Table interface, implement CSV and Parquet streaming table classes, and update the provider modules to support these new interfaces and transformers. The overall design improves memory efficiency and extensibility for large dataset processing.

Table Abstraction and Streaming Interface

  • Added a new abstract base class Table in table.py to provide a unified streaming row-by-row interface with async iteration, write, and close methods. Supports row transformation and async context management.
  • Introduced the RowTransformer type for flexible row transformation, allowing both callable functions and classes (e.g., Pydantic models) to be used for row processing.

CSV Table Streaming Implementation

  • Added CSVTable in csv_table.py for streaming access to CSV tables, supporting async iteration, row transformation, row existence checks, and writing rows with automatic header management.
  • Updated CSVTableProvider to enforce FileStorage backend, import new streaming classes, and expose an open method for streaming table access with optional row transformation. [1] [2] [3]

Parquet Table Streaming Implementation

  • Added ParquetTable in parquet_table.py to simulate streaming for Parquet tables by loading DataFrames and yielding rows, with support for row transformation and batch writing.
  • Updated ParquetTableProvider to import new streaming classes and expose an open method for streaming table access with optional row transformation. [1] [2]

Provider Interface Unification

  • Updated TableProvider base class to include the new open method, standardizing streaming table access across different storage backends. [1] [2]
  • Updated module exports in tables/__init__.py to include the new Table class.

Supporting Changes

  • Added a get_path method to FileStorage for easier file path access needed by streaming table implementations.
  • Updated imports in workflow modules to use the new Table abstraction.

Memory change with streaming in these two workflows:

  • Reduces peak memory by up to 95%
  • Lowers retained memory by 33%
  • cuts allocation churn and is ~4% faster overall.

@dayesouza dayesouza force-pushed the text_units_streaming branch from fd38049 to bacbab4 Compare February 11, 2026 23:31
@dayesouza dayesouza changed the title create_base_text_units streaming load_input_documents and create_base_text_units streaming Feb 12, 2026
@dayesouza dayesouza marked this pull request as ready for review February 12, 2026 00:22
@dayesouza dayesouza requested a review from a team as a code owner February 12, 2026 00:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants