In [0]:
%pip install pyyaml

In [0]:
dbutils.widgets.text("process_id", "-1", "process_id")
dbutils.widgets.text("hold_file_if_schema_failed", "true", "hold_file_if_schema_failed")

In [0]:
process_id = int(dbutils.widgets.get("process_id"))
hold_file_if_schema_failed:bool = (dbutils.widgets.get("hold_file_if_schema_failed") == "true")

In [0]:
from etl import get_environment, PROJECT
import logging

In [0]:
logger = logging.getLogger(f"finalise {PROJECT} process_id={process_id}")
env = get_environment(spark = spark).name
catalog = f"{env}_hub"
spark.sql(f"USE CATALOG {catalog}")
logger.info(f"default catalog set to {catalog}")

In [0]:

df = spark.sql(f"""
select
  snapshot_date,
  `table`,
  file_name,
  file_size,
  schema_valid,
  total_count,
  valid_count,
  invalid_count,
  invalid_ratio,     
  process_id,
  load_date
from {catalog}.{PROJECT}._audit
where process_id = {process_id}
order by snapshot_date, `table`
""")

display(df)

In [0]:
df_failed = df.where("!schema_valid")
display(df_failed)

In [0]:
failed_count = df_failed.count()
succeeded_count = df.count()

if failed_count > 0 and hold_file_if_schema_failed:
  msg = f"ingest_{PROJECT} job to load {catalog}.{PROJECT} has {failed_count} file(s) with schema failures. See the finalisation step for details."
  logger.error(msg)
  raise Exception(msg)
else:
  f"""
  UPDATE {catalog}.{PROJECT}._audit
  SET hub_load_date = now()
  WHERE process_id = {process_id}
  """
  msg = f"ingest_{PROJECT} job to load {catalog}.{PROJECT} has successfully loaded {succeeded_count} files."
  dbutils.notebook.exit(msg)