Skip to content

Commit

Permalink
Disable usage of placeholder dates in Singer
Browse files Browse the repository at this point in the history
(has own failure handling)
  • Loading branch information
mildbyte committed Oct 18, 2022
1 parent ee79e2e commit 9547ad3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
13 changes: 11 additions & 2 deletions splitgraph/hooks/data_source/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,17 @@ def prepare_new_image(
hash_or_tag: Optional[str],
comment: str = "Singer tap ingestion",
copy_latest: bool = True,
use_placeholder_dt: bool = False,
) -> Tuple[Optional[Image], str]:
new_image_hash = "{:064x}".format(getrandbits(256))
if repository_exists(repository) and copy_latest:
# Clone the base image and delta compress against it
base_image: Optional[Image] = repository.images[hash_or_tag] if hash_or_tag else None
repository.images.add(
parent_id=None, image=new_image_hash, comment=comment, created=PLACEHOLDER_DATE
parent_id=None,
image=new_image_hash,
comment=comment,
created=PLACEHOLDER_DATE if use_placeholder_dt else None,
)
if base_image:
repository.engine.run_sql(
Expand All @@ -311,7 +315,12 @@ def prepare_new_image(
)
else:
base_image = None
repository.images.add(parent_id=None, image=new_image_hash, comment=comment)
repository.images.add(
parent_id=None,
image=new_image_hash,
comment=comment,
created=PLACEHOLDER_DATE if use_placeholder_dt else None,
)
return base_image, new_image_hash


Expand Down
6 changes: 3 additions & 3 deletions splitgraph/ingestion/singer/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from splitgraph.hooks.data_source.base import (
SyncableDataSource,
get_ingestion_state,
make_image_latest,
prepare_new_image,
)
from splitgraph.ingestion.common import add_timestamp_tags
Expand Down Expand Up @@ -129,7 +128,9 @@ def sync(
catalog = self._run_singer_discovery(config)
catalog = self.build_singer_catalog(catalog, tables)

base_image, new_image_hash = prepare_new_image(repository, image_hash)
base_image, new_image_hash = prepare_new_image(
repository, image_hash, use_placeholder_dt=False
)
state = get_ingestion_state(repository, image_hash) if use_state else None
logging.info("Current ingestion state: %s", state)

Expand Down Expand Up @@ -159,7 +160,6 @@ def sync(
store_ingestion_state(repository, new_image_hash, state, latest_state)

add_timestamp_tags(repository, new_image_hash)
make_image_latest(repository, new_image_hash)
repository.commit_engines()

if failure:
Expand Down

0 comments on commit 9547ad3

Please sign in to comment.