-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for incremental loads to FDW plugins #647
Conversation
Add a `cursor_columns` field to the table parameters (used as a list of columns that form an increasing-only replication bookmark). Store the ingestion state in a table inside of the image, similarly to Airbyte.
splitgraph/hooks/data_source/fdw.py
Outdated
with delete_schema_at_end(repository.object_engine, staging_schema): | ||
repository.object_engine.delete_schema(staging_schema) | ||
repository.object_engine.create_schema(staging_schema) | ||
repository.commit_engines() | ||
|
||
self._mount_and_copy( | ||
staging_schema, | ||
tables, | ||
cursor_values=None if not state else state.get("cursor_values"), | ||
) | ||
|
||
logging.info("Storing tables as Splitgraph images") | ||
for table_name in repository.object_engine.get_all_tables(staging_schema): | ||
logging.info("Storing %s", table_name) | ||
new_schema = repository.object_engine.get_full_table_schema( | ||
staging_schema, table_name | ||
) | ||
|
||
if base_image: | ||
try: | ||
current_schema = base_image.get_table(table_name).table_schema | ||
if current_schema != new_schema: | ||
raise AssertionError( | ||
"Schema for %s changed! Old: %s, new: %s" | ||
% ( | ||
table_name, | ||
current_schema, | ||
new_schema, | ||
) | ||
) | ||
except TableNotFoundError: | ||
pass | ||
|
||
repository.objects.record_table_as_base( | ||
repository, | ||
table_name, | ||
new_image_hash, | ||
chunk_size=DEFAULT_CHUNK_SIZE, | ||
source_schema=staging_schema, | ||
source_table=table_name, | ||
table_schema=new_schema, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some overlap between this code and the code in https://github.com/splitgraph/splitgraph/blob/master/splitgraph/ingestion/airbyte/data_source.py#L290-L329 but I don't know if there's a nice way to factor it out + when writeable LQ lands, we should be able to simplify this code anyway by writing into the LQ checkout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
splitgraph/hooks/data_source/fdw.py
Outdated
with delete_schema_at_end(repository.object_engine, staging_schema): | ||
repository.object_engine.delete_schema(staging_schema) | ||
repository.object_engine.create_schema(staging_schema) | ||
repository.commit_engines() | ||
|
||
self._mount_and_copy( | ||
staging_schema, | ||
tables, | ||
cursor_values=None if not state else state.get("cursor_values"), | ||
) | ||
|
||
logging.info("Storing tables as Splitgraph images") | ||
for table_name in repository.object_engine.get_all_tables(staging_schema): | ||
logging.info("Storing %s", table_name) | ||
new_schema = repository.object_engine.get_full_table_schema( | ||
staging_schema, table_name | ||
) | ||
|
||
if base_image: | ||
try: | ||
current_schema = base_image.get_table(table_name).table_schema | ||
if current_schema != new_schema: | ||
raise AssertionError( | ||
"Schema for %s changed! Old: %s, new: %s" | ||
% ( | ||
table_name, | ||
current_schema, | ||
new_schema, | ||
) | ||
) | ||
except TableNotFoundError: | ||
pass | ||
|
||
repository.objects.record_table_as_base( | ||
repository, | ||
table_name, | ||
new_image_hash, | ||
chunk_size=DEFAULT_CHUNK_SIZE, | ||
source_schema=staging_schema, | ||
source_table=table_name, | ||
table_schema=new_schema, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, makes sense.
Also handle the case where it's not specified.
It adds this table in cases like a CSV upload, polluting the repository. Instead, when loading the state during a sync, fall back to getting the cursor values by just querying the max fields in the current image if it doesn't exist.
@@ -134,22 +135,42 @@ def copy_table( | |||
target_schema: str, | |||
target_table: str, | |||
with_pk_constraints: bool = True, | |||
cursor_fields: Optional[Dict[str, str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a
cursor_columns
field to the table parameters (used as a listof columns that form an increasing-only replication bookmark). Store the
ingestion state in a table inside of the image, similarly to Airbyte.