Skip to content

Commit

Permalink
Adds some operations to debug goldsky data dumps (#1559)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 committed May 30, 2024
1 parent d198849 commit 457ae9d
Showing 1 changed file with 152 additions and 21 deletions.
173 changes: 152 additions & 21 deletions warehouse/oso_dagster/factories/goldsky/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
DefaultSensorStatus,
AssetsDefinition,
AssetChecksDefinition,
TableRecord,
MetadataValue,
TableColumn,
TableSchema,
)
from dagster_gcp import BigQueryResource, GCSResource
from google.api_core.exceptions import (
Expand Down Expand Up @@ -649,6 +653,7 @@ def __init__(
self.cached_blobs_to_process: List[re.Match[str]] | None = None
self.schema = None
self.pointer_table_suffix = pointer_table_suffix
self.bucket_stats = {}

async def materialize(
self,
Expand All @@ -670,6 +675,13 @@ async def materialize(

await self.clean_working_destination(context, workers)

def load_schema_from_job_id(self, log: DagsterLogManager, job_id: str, timestamp: int):
queues = self.load_queues(log, max_objects_to_load=1, checkpoint_range=GoldskyCheckpointRange(
start=GoldskyCheckpoint(job_id, timestamp, 0),
))
self.load_schema(queues)
return self.schema

def load_schema(self, queues: GoldskyQueues):
item = queues.peek()
client = self.gcs.get_client()
Expand Down Expand Up @@ -1032,6 +1044,13 @@ def clean_up(self, log: DagsterLogManager):
gcs_client = self.gcs.get_client()
# batch_delete_blobs(gcs_client, self.config.source_bucket_name, blobs, 1000)

def gather_stats(self, log: DagsterLogManager):
self.load_queues_to_process(
log,
None
)
return self.bucket_stats

def _uncached_blobs_loader(self, log: DagsterLogManager):
log.info("Loading blobs list for processing")
gcs_client = self.gcs.get_client()
Expand All @@ -1054,6 +1073,22 @@ def _cached_blobs_loader(self, log: DagsterLogManager):
log.info("using cached blobs")
return self.cached_blobs_to_process

def record_bucket_stats_from_match(self, match: re.Match[str]):
key = f"{match.group("job_id")}-{match.group("timestamp")}"
if not key in self.bucket_stats:
self.bucket_stats[key] = dict(
job_id=match.group("job_id"),
timestamp=int(match.group("timestamp")),
count=1,
workers=[match.group("worker")],
)
else:
self.bucket_stats[key]["count"] += 1
worker = match.group("worker")
if not worker in self.bucket_stats[key]["workers"]:
self.bucket_stats[key]["workers"].append(worker)


def load_queues(
self,
log: DagsterLogManager,
Expand Down Expand Up @@ -1091,6 +1126,8 @@ def load_queues(
worker_checkpoint = int(match.group("checkpoint"))
checkpoint = GoldskyCheckpoint(job_id, timestamp, worker_checkpoint)

self.record_bucket_stats_from_match(match)

# If there's a checkpoint range only queue checkpoints within that range
if checkpoint_range:
if not checkpoint_range.in_range(checkpoint):
Expand All @@ -1114,26 +1151,26 @@ def load_queues(
match,
),
)
keys = list(worker_status.keys())
if len(keys) > 0:
expected_timestamp_of_worker_status = worker_status.get(keys[0])
# Originally multiple timestamp values keys was considered an error
# but it turns out that this is a normal part of the process. This
# check is just to get a log for when it does change which might be
# useful for our own tracing/debugging purposes.
if expected_timestamp_of_worker_status.timestamp != latest_timestamp:
log.info(
{
"message": (
"Pipeline timestamp changed."
" This is a normal part of the goldsky process."
" Continuing to load chronologically"
),
"expected": expected_timestamp_of_worker_status,
"actual": latest_timestamp,
}
)

if worker_status:
keys = list(worker_status.keys())
if len(keys) > 0:
expected_timestamp_of_worker_status = worker_status.get(keys[0])
# Originally multiple timestamp values keys was considered an error
# but it turns out that this is a normal part of the process. This
# check is just to get a log for when it does change which might be
# useful for our own tracing/debugging purposes.
if expected_timestamp_of_worker_status.timestamp != latest_timestamp:
log.info(
{
"message": (
"Pipeline timestamp changed."
" This is a normal part of the goldsky process."
" Continuing to load chronologically"
),
"expected": expected_timestamp_of_worker_status,
"actual": latest_timestamp,
}
)
return queues

def load_queues_to_process(
Expand Down Expand Up @@ -1234,10 +1271,104 @@ def goldsky_backfill_op(
# Hack for now.
return "Done"

@op(name=f"{related_ops_prefix}_files_stats_op")
def goldsky_files_stats_op(
context: OpExecutionContext,
bigquery: BigQueryResource,
gcs: GCSResource,
cbt: CBTResource
):
table_schema = TableSchema(
columns=[
TableColumn(
name="timestamp", type="integer", description="The job timestamp"
),
TableColumn(
name="job_id", type="string", description="Description of column1"
),
TableColumn(
name="worker_count",
type="integer",
description="Description of column2",
),
TableColumn(
name="avg_files_per_worker",
type="float",
description="Description of column2",
),
TableColumn(
name="total_files",
type="integer",
description="Description of column2",
),
]
)
gs_asset = GoldskyAsset(gcs, bigquery, cbt, asset_config)
bucket_stats = gs_asset.gather_stats(context.log)

records = []
job_stats = bucket_stats.values()
job_stats = sorted(job_stats, key=lambda a: a['timestamp'])

for _, job_stats in bucket_stats.items():
worker_count = len(job_stats["workers"])
records.append(
TableRecord(dict(
job_id=job_stats["job_id"],
worker_count=worker_count,
timestamp=job_stats["timestamp"],
avg_files_per_worker=job_stats["count"]/worker_count,
total_files=job_stats["count"],
))
)

# Create a TableMetadataValue
table_metadata = MetadataValue.table(records=records, schema=table_schema)

# Log the metadata
context.add_output_metadata({"bucket_stats": table_metadata})

@op(name=f"{related_ops_prefix}_load_schema_op")
def goldsky_load_schema_op(
context: OpExecutionContext,
bigquery: BigQueryResource,
gcs: GCSResource,
cbt: CBTResource,
config: dict
):
table_schema = TableSchema(
columns=[
TableColumn(
name="column_name", type="string", description="The column name"
),
TableColumn(
name="column_type", type="string", description="The type"
),
]
)
gs_asset = GoldskyAsset(gcs, bigquery, cbt, asset_config)
schema: List[SchemaField] = gs_asset.load_schema_from_job_id(context.log, config["job_id"], config["timestamp"])
records = []
for field in schema:
records.append(TableRecord(dict(column_name=field.name, column_type=field.field_type)))

table_metadata = MetadataValue.table(records=records, schema=table_schema)
# Log the metadata
context.add_output_metadata({"schema": table_metadata})


@job(name=f"{related_ops_prefix}_clean_up_job")
def goldsky_clean_up_job():
goldsky_clean_up_op()

@job(name=f"{related_ops_prefix}_files_stats_job")
def goldsky_files_stats_job():
goldsky_files_stats_op()

@job(name=f"{related_ops_prefix}_load_schema_job")
def goldsky_load_schema_job():
goldsky_load_schema_op()

@job(name=f"{related_ops_prefix}_backfill_job")
def goldsky_backfill_job():
goldsky_backfill_op()
Expand Down Expand Up @@ -1269,6 +1400,6 @@ def goldsky_clean_up_sensor(
return AssetFactoryResponse(
assets=[generated_asset],
sensors=[goldsky_clean_up_sensor],
jobs=[goldsky_clean_up_job, goldsky_backfill_job],
jobs=[goldsky_clean_up_job, goldsky_backfill_job, goldsky_files_stats_job, goldsky_load_schema_job],
checks=checks,
)

0 comments on commit 457ae9d

Please sign in to comment.