Skip to content

Commit

Permalink
Fix FDW syncs when there's no new data to sync
Browse files Browse the repository at this point in the history
In that case, we can't find out the new cursor values (since the new table
fragments are empty). Instead, combine them with the old cursor values.
  • Loading branch information
mildbyte committed Mar 11, 2022
1 parent 244b2fd commit 37acbb1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
32 changes: 20 additions & 12 deletions splitgraph/hooks/data_source/fdw.py
Expand Up @@ -353,15 +353,18 @@ def _sync(
# We override the main sync() instead
pass

def _get_cursor_value(self, schema: str, table: str, cursor_fields: List[str]) -> List[str]:
def _get_cursor_value(
self, schema: str, table: str, cursor_fields: List[str]
) -> Optional[List[str]]:
query = (
SQL("SELECT ")
+ SQL(",").join(Identifier(p) for p in cursor_fields)
+ SQL(" FROM {}.{} ORDER BY ").format(Identifier(schema), Identifier(table))
+ SQL(",").join(Identifier(p) + SQL(" DESC") for p in cursor_fields)
) + SQL(" LIMIT 1")

return [str(r) for r in self.engine.run_sql(query, return_shape=ResultShape.ONE_MANY)]
result = self.engine.run_sql(query, return_shape=ResultShape.ONE_MANY)
return [str(r) for r in result] if result else None

def sync(
self,
Expand Down Expand Up @@ -389,7 +392,7 @@ def sync(
cursor_values = state.get("cursor_values")
elif base_image:
with base_image.query_schema() as s:
cursor_values = self._get_cursor_values(s, tables)
cursor_values = self._get_cursor_values(s, tables, {})
else:
state = None
cursor_values = None
Expand Down Expand Up @@ -445,7 +448,11 @@ def sync(
# This is so that we don't have a _sg_ingestion_state table hanging around
# when doing something like a CSV upload.
if use_state:
new_state = {"cursor_values": self._get_cursor_values(staging_schema, tables)}
new_state = {
"cursor_values": self._get_cursor_values(
staging_schema, tables, old_cursor_values=cursor_values or {}
)
}
store_ingestion_state(
repository,
new_image_hash,
Expand All @@ -458,7 +465,7 @@ def sync(
return new_image_hash

def _get_cursor_values(
self, schema: str, tables: Optional[TableInfo]
self, schema: str, tables: Optional[TableInfo], old_cursor_values: Dict[str, Dict[str, str]]
) -> Dict[str, Dict[str, str]]:
cursor_values: Dict[str, Dict[str, str]] = {}
cursor_fields: Dict[str, List[str]] = {}
Expand All @@ -471,13 +478,14 @@ def _get_cursor_values(
for table_name in self.engine.get_all_tables(schema):
if table_name in cursor_fields:
table_cursor_fields = cursor_fields[table_name]
cursor_values[table_name] = {
c: v
for c, v in zip(
table_cursor_fields,
self._get_cursor_value(schema, table_name, table_cursor_fields),
)
}
cursor_value = self._get_cursor_value(schema, table_name, table_cursor_fields)

if cursor_value:
cursor_values[table_name] = dict(zip(table_cursor_fields, cursor_value))

elif old_cursor_values.get(table_name):
cursor_values[table_name] = old_cursor_values[table_name]

return cursor_values


Expand Down
16 changes: 16 additions & 0 deletions test/splitgraph/ingestion/test_common.py
Expand Up @@ -228,6 +228,22 @@ def test_fdw_data_source_with_cursors(pg_repo_local):
}
}

# Do a sync without changes
image_hash_4 = handler.sync(output, image_hash=image_hash_3, tables=tables)
image = output.images[image_hash_4]
image.checkout()

assert output.run_sql("SELECT COUNT(*) FROM fruits") == [(4,)]
assert output.run_sql("SELECT COUNT(*) FROM vegetables") == [(2,)]
assert _get_state(output) == {
"cursor_values": {
"fruits": {
"fruit_id": "4",
},
"vegetables": {"vegetable_id": "2"},
}
}


def test_fdw_data_source_with_composite_cursors(pg_repo_local):
# Same as previous test, but make sure it still works with composite cursors (more than one
Expand Down

0 comments on commit 37acbb1

Please sign in to comment.