Implement WithStateSync interface with schema derivation fallback#10
Merged
nicosuave merged 5 commits intosidequery:mainfrom Feb 1, 2026
Merged
Conversation
Test verifies that fresh pipeline instances can restore schemas from the destination when loading data with missing columns. Currently fails because IcebergRestClient does not implement WithStateSync.
Add schema and state restoration from Iceberg catalog, enabling pipelines in different execution contexts to share state. - Add get_stored_schema() to retrieve latest schema by name - Add get_stored_schema_by_hash() for exact version lookup - Add get_stored_state() to retrieve pipeline state - Use PyIceberg row_filter for predicate pushdown optimization - Add unit tests for WithStateSync methods
This test demonstrates the scenario where: 1. Pipeline creates table with columns [a, b, c, d] 2. _dlt_version is deleted (simulating corrupted/empty state) 3. New pipeline run with data missing column 'd' fails The test currently fails with SchemaEvolutionError because the destination doesn't derive schema from existing Iceberg tables when _dlt_version has no stored schema.
When get_stored_schema() finds no stored schema in _dlt_version, it now falls back to deriving the schema from existing Iceberg table metadata. This handles scenarios where: - _dlt_version is deleted or corrupted - Pipeline runs in different execution contexts with empty state - Historical columns exist in Iceberg but not in current data batch The derived schema includes all columns from existing Iceberg tables, ensuring they are not treated as "dropped" during schema evolution. Implementation: - Add _derive_schema_from_iceberg_tables() to scan catalog for tables - Add _iceberg_type_to_dlt_type() for type conversion - Modify get_stored_schema() to use derivation as fallback - Add unit tests for derivation and precedence behavior
Member
|
Looks great, thanks for this |
Member
|
Just released a new version v0.3.0 with these changes as well |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
While using
dlt-iceberg, I encountered issues when running pipelines in ephemeral/stateless runtimes (ECS Fargate, Kubernetes Jobs). These environments don't preserve local dlt state between runs, which exposed several gaps in how dlt-iceberg handles schema management.Sorry for the large PR, lots of test code 😬
Issues encountered
No schema restoration from destination
dlt-iceberg didn't implement the
WithStateSyncinterface, so pipelines running in fresh containers couldn't restore schemas from the_dlt_versiontable. Each run only knew about columns in the current data batch."Dropped columns" errors with varying data
When source data varies between runs (e.g., some API responses have optional fields), columns present in the Iceberg table but missing from the current batch were treated as "dropped", causing
SchemaEvolutionErrorfailures.Features
WithStateSync Interface
get_stored_schema()– Retrieves newest schema from_dlt_versiontableget_stored_schema_by_hash()– Retrieves schema by exact version hashget_stored_state()– Retrieves pipeline state from_dlt_pipeline_statetableSchema Derivation Fallback
When
_dlt_versionhas no stored schema but Iceberg tables exist, the destination now derives the schema from existing table metadata. This handles:_dlt_versiondeleted or corruptedTest plan
WithStateSyncmethods_dlt_versionis empty