Skip to content

Commit

Permalink
Load FDW data directly into a single object
Browse files Browse the repository at this point in the history
On first load, instead of using a staging PG table and then chunking it up into
objects, load the data directly into a single partition. This will mean one
large partition, but it dramatically decreases the staging disk space
requirements and speed (4x for the tested 14M row dataset).
  • Loading branch information
mildbyte committed Oct 10, 2022
1 parent 15e56d7 commit 1c30c52
Showing 1 changed file with 64 additions and 43 deletions.
107 changes: 64 additions & 43 deletions splitgraph/hooks/data_source/fdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,51 +407,51 @@ def sync(
repository.object_engine.create_schema(staging_schema)
repository.commit_engines()

self._mount_and_copy(
staging_schema,
tables,
cursor_values=cursor_values,
)

logging.info("Storing tables as Splitgraph images")
for table_name in repository.object_engine.get_all_tables(staging_schema):
logging.info("Storing %s", table_name)
new_schema = repository.object_engine.get_full_table_schema(
staging_schema, table_name
if use_state:
self._mount_and_copy(
staging_schema,
tables,
cursor_values=cursor_values,
)

if base_image:
try:
current_schema = base_image.get_table(table_name).table_schema
# Compare schemas ignoring ordinals/comments
if [(c.name, c.pg_type, c.is_pk) for c in current_schema] != [
(c.name, c.pg_type, c.is_pk) for c in new_schema
]:
raise AssertionError(
"Schema for %s changed! Old: %s, new: %s"
% (
table_name,
current_schema,
new_schema,
)
)
except TableNotFoundError:
pass
logging.info("Storing tables as Splitgraph images")
for table_name in repository.object_engine.get_all_tables(staging_schema):
logging.info("Storing %s", table_name)
new_schema = repository.object_engine.get_full_table_schema(
staging_schema, table_name
)

repository.objects.record_table_as_base(
repository,
table_name,
new_image_hash,
chunk_size=DEFAULT_CHUNK_SIZE,
source_schema=staging_schema,
source_table=table_name,
table_schema=new_schema,
)
if base_image:
try:
current_schema = base_image.get_table(table_name).table_schema
# Compare schemas ignoring ordinals/comments
if [(c.name, c.pg_type, c.is_pk) for c in current_schema] != [
(c.name, c.pg_type, c.is_pk) for c in new_schema
]:
raise AssertionError(
"Schema for %s changed! Old: %s, new: %s"
% (
table_name,
current_schema,
new_schema,
)
)
except TableNotFoundError:
pass

repository.objects.record_table_as_base(
repository,
table_name,
new_image_hash,
chunk_size=DEFAULT_CHUNK_SIZE,
source_schema=staging_schema,
source_table=table_name,
table_schema=new_schema,
)

# Store the ingestion state if we're running a sync (instead of a full reload).
# This is so that we don't have a _sg_ingestion_state table hanging around
# when doing something like a CSV upload.
if use_state:
# Store the ingestion state if we're running a sync (instead of a full reload).
# This is so that we don't have a _sg_ingestion_state table hanging around
# when doing something like a CSV upload.
new_state = {
"cursor_values": self._get_cursor_values(
staging_schema, tables, old_cursor_values=cursor_values or {}
Expand All @@ -463,8 +463,29 @@ def sync(
current_state=state,
new_state=json.dumps(new_state) if new_state else "{}",
)
add_timestamp_tags(repository, new_image_hash)
repository.commit_engines()
# For a first load, don't chunk the table and save the data from the FDW directly
# into the object (a 4x speed and a massive space improvement)
else:
errors = self.mount(staging_schema, tables=tables)
if errors:
raise DataSourceError("Error mounting tables for ingestion")

self.engine.commit()

for t in self.engine.get_all_tables(staging_schema):
logging.info("Snapshotting %s", t)
repository.objects.record_table_as_base(
repository,
t,
new_image_hash,
chunk_size=None,
source_schema=staging_schema,
source_table=t,
overwrite=False,
)

add_timestamp_tags(repository, new_image_hash)
repository.commit_engines()

return new_image_hash

Expand Down

0 comments on commit 1c30c52

Please sign in to comment.