From 0988a60c4f6979ca1686d7a45eb3a364efabce1d Mon Sep 17 00:00:00 2001 From: SKairinos Date: Thu, 20 Nov 2025 16:39:17 +0000 Subject: [PATCH 1/2] fix: remove magnitude from naming convention --- codeforlife/tasks/data_warehouse.py | 44 ++++-------------------- codeforlife/tasks/data_warehouse_test.py | 40 ++------------------- 2 files changed, 9 insertions(+), 75 deletions(-) diff --git a/codeforlife/tasks/data_warehouse.py b/codeforlife/tasks/data_warehouse.py index 54d0049..fa6161e 100644 --- a/codeforlife/tasks/data_warehouse.py +++ b/codeforlife/tasks/data_warehouse.py @@ -213,35 +213,28 @@ class ChunkMetadata: timestamp: str # when the task was first run obj_i_start: int # object index span start obj_i_end: int # object index span end - obj_count_digits: int # number of digits in the object count def to_blob_name(self): """Convert this chunk metadata into a blob name.""" - # Left-pad the object indexes with zeros. - obj_i_start_fstr = str(self.obj_i_start).zfill( - self.obj_count_digits - ) - obj_i_end_fstr = str(self.obj_i_end).zfill(self.obj_count_digits) - - # E.g. "user/2025-01-01_00:00:00__0001_1000.csv" + # E.g. "user/2025-01-01_00:00:00__1_1000.csv" return ( f"{self.bq_table_name}/{self.timestamp}__" - f"{obj_i_start_fstr}_{obj_i_end_fstr}.csv" + f"{self.obj_i_start}_{self.obj_i_end}.csv" ) @classmethod def from_blob_name(cls, blob_name: str): """Extract the chunk metadata from a blob name.""" - # E.g. "user/2025-01-01_00:00:00__0001_1000.csv" - # "2025-01-01_00:00:00__0001_1000.csv" + # E.g. "user/2025-01-01_00:00:00__1_1000.csv" + # "2025-01-01_00:00:00__1_1000.csv" bq_table_name, blob_name = blob_name.split("/", maxsplit=1) - # "2025-01-01_00:00:00__0001_1000" + # "2025-01-01_00:00:00__1_1000" blob_name = blob_name.removesuffix(".csv") - # "2025-01-01_00:00:00", "0001_1000" + # "2025-01-01_00:00:00", "1_1000" timestamp, obj_i_span_fstr = blob_name.split("__") - # "0001", "1000" + # "1", "1000" obj_i_start_fstr, obj_i_end_fstr = obj_i_span_fstr.split("_") return cls( @@ -249,7 +242,6 @@ def from_blob_name(cls, blob_name: str): timestamp=timestamp, obj_i_start=int(obj_i_start_fstr), obj_i_end=int(obj_i_end_fstr), - obj_count_digits=len(obj_i_start_fstr), ) def _get_gcs_bucket(self): @@ -356,9 +348,6 @@ def _save_query_set_as_csvs_in_gcs_bucket( if obj_count == 0: return - # Get the number of digits in the object count. - obj_count_digits = len(str(obj_count)) - # If the queryset is not ordered, order it by ID by default. if not queryset.ordered: queryset = queryset.order_by(self.settings.id_field) @@ -394,24 +383,6 @@ def _save_query_set_as_csvs_in_gcs_bucket( f"{self.settings.bq_table_name}/{timestamp}" ) ): - chunk_metadata = self.ChunkMetadata.from_blob_name(blob_name) - - # If the number of digits in the object count has changed... - if obj_count_digits != chunk_metadata.obj_count_digits: - # ...update the number of digits in the object count... - chunk_metadata.obj_count_digits = obj_count_digits - # ...and update the blob name... - blob_name = chunk_metadata.to_blob_name() - # ...and copy the blob with the updated name... - bucket.copy_blob( - blob=blob, - destination_bucket=bucket, - new_name=blob_name, - ) - # ...and delete the old blob. - logging.info('Deleting blob "%s".', blob.name) - blob.delete() - last_blob_name_from_current_timestamp = blob_name # Check if blobs not from the current timestamp should be deleted. elif self.settings.delete_blobs_not_from_current_timestamp: @@ -457,7 +428,6 @@ def upload_csv(obj_i_end: int): timestamp=timestamp, obj_i_start=obj_i_start, obj_i_end=obj_i_end, - obj_count_digits=obj_count_digits, ).to_blob_name() # Create a blob object for the CSV file's path and upload it. diff --git a/codeforlife/tasks/data_warehouse_test.py b/codeforlife/tasks/data_warehouse_test.py index d33cb78..4de9fd5 100644 --- a/codeforlife/tasks/data_warehouse_test.py +++ b/codeforlife/tasks/data_warehouse_test.py @@ -60,12 +60,7 @@ def __repr__(self): @classmethod # pylint: disable-next=too-many-arguments def generate_list( - cls, - task: DWT, - timestamp: str, - obj_i_start: int, - obj_i_end: int, - obj_count_digits: int, + cls, task: DWT, timestamp: str, obj_i_start: int, obj_i_end: int ): """Generate a list of mock GCS blobs. @@ -88,7 +83,6 @@ def generate_list( obj_i_end=min( obj_i_start + task.settings.chunk_size - 1, obj_i_end ), - obj_count_digits=obj_count_digits, ) ) for obj_i_start in range( @@ -127,14 +121,10 @@ def setUp(self): self.timestamp = DWT.to_timestamp(self.datetime) self.obj_i_start = 1 self.obj_i_end = 100 - self.obj_count_digits = 4 - - obj_i_start_fstr = str(self.obj_i_start).zfill(self.obj_count_digits) - obj_i_end_fstr = str(self.obj_i_end).zfill(self.obj_count_digits) self.blob_name = ( f"{self.bq_table_name}/{self.timestamp}__" - f"{obj_i_start_fstr}_{obj_i_end_fstr}.csv" + f"{self.obj_i_start}_{self.obj_i_end}.csv" ) return super().setUp() @@ -213,7 +203,6 @@ def test_chunk_metadata__to_blob_name(self): timestamp=self.timestamp, obj_i_start=self.obj_i_start, obj_i_end=self.obj_i_end, - obj_count_digits=self.obj_count_digits, ).to_blob_name() assert blob_name == self.blob_name @@ -224,7 +213,6 @@ def test_chunk_metadata__from_blob_name(self): assert chunk_metadata.timestamp == self.timestamp assert chunk_metadata.obj_i_start == self.obj_i_start assert chunk_metadata.obj_i_end == self.obj_i_end - assert chunk_metadata.obj_count_digits == self.obj_count_digits # Init CSV writer @@ -319,11 +307,6 @@ def _test_task( assert uploaded_obj_count <= obj_count assert (obj_count - uploaded_obj_count) > 0 - # Get the object count's current magnitude (number of digits) and - # simulate a higher order of magnitude during the previous run. - obj_count_digits = len(str(obj_count)) - uploaded_obj_count_digits = obj_count_digits + 1 - # Get the current datetime. now = datetime.now(timezone.utc) @@ -335,7 +318,6 @@ def _test_task( timestamp=DWT.to_timestamp(now - since_previous_run), obj_i_start=1, obj_i_end=obj_count, - obj_count_digits=obj_count_digits, ) if since_previous_run is not None else [] @@ -348,14 +330,12 @@ def _test_task( timestamp=timestamp, obj_i_start=1, obj_i_end=uploaded_obj_count, - obj_count_digits=uploaded_obj_count_digits, ) non_uploaded_blobs_from_current_timestamp = MockGcsBlob.generate_list( task=task, timestamp=timestamp, obj_i_start=uploaded_obj_count + 1, obj_i_end=obj_count, - obj_count_digits=obj_count_digits, ) # Generate a mock GCS bucket. @@ -421,22 +401,6 @@ def _test_task( ] ) - # Assert that the uploaded blobs in the current timestamp were copied - # with the magnitude corrected in their name and the old blobs deleted. - for blob in uploaded_blobs_from_current_timestamp: - blob.chunk_metadata.obj_count_digits = obj_count_digits - blob.delete.assert_called_once() - bucket.copy_blob.assert_has_calls( - [ - call( - blob=blob, - destination_bucket=bucket, - new_name=blob.chunk_metadata.to_blob_name(), - ) - for blob in uploaded_blobs_from_current_timestamp - ] - ) - # Assert that each blob was uploaded from a CSV string. for blob in non_uploaded_blobs_from_current_timestamp: csv_content, csv_writer = task.init_csv_writer() From 75c1aab4079ecc42fdf42e8499c4ffceb5f7a7af Mon Sep 17 00:00:00 2001 From: SKairinos Date: Thu, 20 Nov 2025 17:25:08 +0000 Subject: [PATCH 2/2] fix: dir name in convention --- codeforlife/tasks/data_warehouse.py | 49 +++++++++++++++--------- codeforlife/tasks/data_warehouse_test.py | 15 ++++++-- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/codeforlife/tasks/data_warehouse.py b/codeforlife/tasks/data_warehouse.py index fa6161e..4b8a6d0 100644 --- a/codeforlife/tasks/data_warehouse.py +++ b/codeforlife/tasks/data_warehouse.py @@ -31,17 +31,16 @@ class DataWarehouseTask(Task): timestamp_key = "_timestamp" GetQuerySet: t.TypeAlias = t.Callable[..., QuerySet[t.Any]] + BqTableWriteMode: t.TypeAlias = t.Literal["overwrite", "append"] # pylint: disable-next=too-many-instance-attributes class Settings: """The settings for a data warehouse task.""" - BqTableWriteMode: t.TypeAlias = t.Literal["overwrite", "append"] - # pylint: disable-next=too-many-arguments,too-many-branches def __init__( self, - bq_table_write_mode: BqTableWriteMode, + bq_table_write_mode: "DataWarehouseTask.BqTableWriteMode", chunk_size: int, fields: t.List[str], id_field: str = "id", @@ -210,6 +209,7 @@ class ChunkMetadata: """All of the metadata used to track a chunk.""" bq_table_name: str # the name of the BigQuery table + bq_table_write_mode: "DataWarehouseTask.BqTableWriteMode" timestamp: str # when the task was first run obj_i_start: int # object index span start obj_i_end: int # object index span end @@ -217,31 +217,39 @@ class ChunkMetadata: def to_blob_name(self): """Convert this chunk metadata into a blob name.""" - # E.g. "user/2025-01-01_00:00:00__1_1000.csv" + # E.g. "user__append/2025-01-01_00:00:00__1_1000.csv" return ( - f"{self.bq_table_name}/{self.timestamp}__" - f"{self.obj_i_start}_{self.obj_i_end}.csv" + f"{self.bq_table_name}__{self.bq_table_write_mode}/" + f"{self.timestamp}__{self.obj_i_start}_{self.obj_i_end}.csv" ) @classmethod def from_blob_name(cls, blob_name: str): """Extract the chunk metadata from a blob name.""" - # E.g. "user/2025-01-01_00:00:00__1_1000.csv" - # "2025-01-01_00:00:00__1_1000.csv" - bq_table_name, blob_name = blob_name.split("/", maxsplit=1) + # E.g. "user__append/2025-01-01_00:00:00__1_1000.csv" + # "user__append", "2025-01-01_00:00:00__1_1000.csv" + dir_name, file_name = blob_name.split("/") + # "user", "append" + bq_table_name, bq_table_write_mode = dir_name.rsplit( + "__", maxsplit=1 + ) + assert bq_table_write_mode in ("overwrite", "append") # "2025-01-01_00:00:00__1_1000" - blob_name = blob_name.removesuffix(".csv") + file_name = file_name.removesuffix(".csv") # "2025-01-01_00:00:00", "1_1000" - timestamp, obj_i_span_fstr = blob_name.split("__") + timestamp, obj_i_span = file_name.split("__") # "1", "1000" - obj_i_start_fstr, obj_i_end_fstr = obj_i_span_fstr.split("_") + obj_i_start, obj_i_end = obj_i_span.split("_") return cls( bq_table_name=bq_table_name, + bq_table_write_mode=t.cast( + DataWarehouseTask.BqTableWriteMode, bq_table_write_mode + ), timestamp=timestamp, - obj_i_start=int(obj_i_start_fstr), - obj_i_end=int(obj_i_end_fstr), + obj_i_start=int(obj_i_start), + obj_i_end=int(obj_i_end), ) def _get_gcs_bucket(self): @@ -362,11 +370,17 @@ def _save_query_set_as_csvs_in_gcs_bucket( # The name of the last blob from the current timestamp. last_blob_name_from_current_timestamp: t.Optional[str] = None + # The name of the directory where the blobs are expected to be located. + blob_dir_name = ( + f"{self.settings.bq_table_name}__" + f"{self.settings.bq_table_write_mode}/" + ) + # List all the existing blobs. for blob in t.cast( t.Iterator[gcs.Blob], bucket.list_blobs( - prefix=f"{self.settings.bq_table_name}/" + prefix=blob_dir_name + ( timestamp if self.settings.only_list_blobs_from_current_timestamp @@ -379,9 +393,7 @@ def _save_query_set_as_csvs_in_gcs_bucket( # Check if found first blob from current timestamp. if ( self.settings.only_list_blobs_from_current_timestamp - or blob_name.startswith( - f"{self.settings.bq_table_name}/{timestamp}" - ) + or blob_name.startswith(blob_dir_name + timestamp) ): last_blob_name_from_current_timestamp = blob_name # Check if blobs not from the current timestamp should be deleted. @@ -425,6 +437,7 @@ def upload_csv(obj_i_end: int): # Generate the path to the CSV in the bucket. blob_name = self.ChunkMetadata( bq_table_name=self.settings.bq_table_name, + bq_table_write_mode=self.settings.bq_table_write_mode, timestamp=timestamp, obj_i_start=obj_i_start, obj_i_end=obj_i_end, diff --git a/codeforlife/tasks/data_warehouse_test.py b/codeforlife/tasks/data_warehouse_test.py index 4de9fd5..9fb9e3f 100644 --- a/codeforlife/tasks/data_warehouse_test.py +++ b/codeforlife/tasks/data_warehouse_test.py @@ -78,6 +78,7 @@ def generate_list( cls( chunk_metadata=DWT.ChunkMetadata( bq_table_name=task.settings.bq_table_name, + bq_table_write_mode=task.settings.bq_table_write_mode, timestamp=timestamp, obj_i_start=obj_i_start, obj_i_end=min( @@ -118,13 +119,14 @@ def setUp(self): self.datetime = datetime.combine(self.date, self.time) self.bq_table_name = "example" + self.bq_table_write_mode: DWT.BqTableWriteMode = "append" self.timestamp = DWT.to_timestamp(self.datetime) self.obj_i_start = 1 self.obj_i_end = 100 self.blob_name = ( - f"{self.bq_table_name}/{self.timestamp}__" - f"{self.obj_i_start}_{self.obj_i_end}.csv" + f"{self.bq_table_name}__{self.bq_table_write_mode}/" + f"{self.timestamp}__{self.obj_i_start}_{self.obj_i_end}.csv" ) return super().setUp() @@ -134,7 +136,7 @@ def setUp(self): def _test_settings( self, code: str, - bq_table_write_mode: DWT.Settings.BqTableWriteMode = ("append"), + bq_table_write_mode: DWT.BqTableWriteMode = "append", chunk_size: int = 10, fields: t.Optional[t.List[str]] = None, **kwargs, @@ -200,6 +202,7 @@ def test_chunk_metadata__to_blob_name(self): """Can successfully convert a chunk's metadata into a blob name.""" blob_name = DWT.ChunkMetadata( bq_table_name=self.bq_table_name, + bq_table_write_mode=self.bq_table_write_mode, timestamp=self.timestamp, obj_i_start=self.obj_i_start, obj_i_end=self.obj_i_end, @@ -210,6 +213,7 @@ def test_chunk_metadata__from_blob_name(self): """Can successfully convert a chunk's metadata into a blob name.""" chunk_metadata = DWT.ChunkMetadata.from_blob_name(self.blob_name) assert chunk_metadata.bq_table_name == self.bq_table_name + assert chunk_metadata.bq_table_write_mode == self.bq_table_write_mode assert chunk_metadata.timestamp == self.timestamp assert chunk_metadata.obj_i_start == self.obj_i_start assert chunk_metadata.obj_i_end == self.obj_i_end @@ -378,7 +382,10 @@ def _test_task( # table's write-mode is append, assert only the blobs in the current # timestamp were listed. bucket.list_blobs.assert_called_once_with( - prefix=f"{task.settings.bq_table_name}/" + prefix=( + f"{task.settings.bq_table_name}__" + f"{task.settings.bq_table_write_mode}/" + ) + ( timestamp if task.settings.only_list_blobs_from_current_timestamp