# Main Entry Point for a Table Load Task

This default notebook is executed using Databricks Workflows as defined in resources/fnz_pb_job.yml.

In [None]:
dbutils.widgets.text("process_id", "-1", "process_id")
dbutils.widgets.text("table", "default", "table")
dbutils.widgets.text("load_type", "default", "load_type")

# stage load parameters
dbutils.widgets.text("force", "false", "force")
dbutils.widgets.text("stage_merge_schema", "true", "stage_merge_schema")
dbutils.widgets.text("modified_after", "", "modified_after")
dbutils.widgets.text("modified_before", "", "modified_before")

# base load parameters
dbutils.widgets.text("hold_file_if_schema_failed", "true", "hold_file_if_schema_failed")
dbutils.widgets.text("base_merge_schema", "true", "stage_merge_schema")

In [None]:
from etl import get_table, Table
import logging
from datetime import datetime

process_id:int = int(dbutils.widgets.get("process_id"))
table:str = dbutils.widgets.get("table")
load_type:str = dbutils.widgets.get("load_type")
force:bool = (dbutils.widgets.get("force") == "true")
stage_merge_schema:bool = (dbutils.widgets.get("stage_merge_schema") == "true")
# if true the entire file will be held at stage if it has schema errors.
hold_file_if_schema_failed:bool = (dbutils.widgets.get("hold_file_if_schema_failed") == "true")
base_merge_schema:bool = (dbutils.widgets.get("base_merge_schema") == "true")

modified_after = dbutils.widgets.get("modified_after")
modified_after = datetime.strptime(modified_after, "%Y-%m-%d %H:%M:%S") if modified_after else None

modified_before = dbutils.widgets.get("modified_before")
modified_before = datetime.strptime(modified_before, "%Y-%m-%d %H:%M:%S") if modified_before else None

table:Table = get_table(spark = spark, table = table, load_type = load_type)
logger = logging.getLogger(f"load_table(table = {table.name}, load_type = {load_type})")

DEBUG : 2023-12-05 18:30:41,666 : fnz_pb.utils : utils.py.get_environment: line(26) : Detected the workspace adb-1593923126743168 for environment configuration
INFO : 2023-12-05 18:30:41,668 : fnz_pb.utils : utils.py.get_environment: line(39) : Detected the dev environment
INFO : 2023-12-05 18:30:41,668 : fnz_pb.utils : utils.py.load_sql: line(9) : loading sql /Workspace/Repos/pau_ryasha@quilter.com/QDP-FNZ-PB/fnz_pb/src/./../sql/fnz_pb.balance.sql


In [None]:

catalog = f"{table.environment}_hub"
spark.sql(f"USE CATALOG {catalog}")
logger.info(f"default catalog set to {catalog}")

INFO : 2023-12-05 18:30:41,991 : load_table(table = balance, load_type = batch) : command-1412197260571917-2469796052.<module>: line(3) : default catalog set to dev_hub


In [None]:
table.stage_into(
    process_id = process_id, 
    merge_schema = stage_merge_schema, 
    force = force,
    modified_after = modified_after,
    modified_before = modified_before
)

INFO : 2023-12-05 18:30:42,079 : TableBalance : _table_balance.py.stage_into: line(28) : not applicable, skipping


In [None]:
df = table.load_audit(process_id = process_id)
if df:
  display(df)

INFO : 2023-12-05 18:30:42,179 : TableBalance : _table_balance.py.load_header_footer: line(34) : not applicable, skipping


In [None]:
df = table.extract(
  process_id=process_id, 
  hold_file_if_schema_failed=hold_file_if_schema_failed
)

INFO : 2023-12-05 18:30:42,279 : TableBalance : _table_balance.py.extract: line(43) : extracting balance


In [None]:
df = table.transform(df=df)

INFO : 2023-12-05 18:30:42,583 : TableBalance : _table_balance.py.transform: line(91) : transforming balance


In [None]:
df = table.load(df=df)
display(df)

INFO : 2023-12-05 18:30:42,681 : TableBalance : _table_balance.py.load: line(99) : loading balance
DEBUG : 2023-12-05 18:30:42,682 : TableBalance : _table_balance.py.load: line(101) : create table if not exists `fnz_pb`.`balance`
(
  fnz_hdr_extract_date TIMESTAMP,
  load_flag STRING,
  effective_date INT,
  sub_account_id STRING,
  instrument_code STRING,
  location STRING,
  custodian_account_id STRING,
  quantity DECIMAL(19,8),
  system_currency_value DECIMAL(19,8),
  head_account_currency_value DECIMAL(19,8),
  price_currency_value DECIMAL(19,8),
  sub_account_currency_value DECIMAL(19,8),
  valuation_book_cost DECIMAL(19,8),
  price_currency_clean_value DECIMAL(19,8),
  price_currency_accrued_interest DECIMAL(19,8),
  system_currency_iso_code STRING,
  head_account_currency_iso_code STRING,
  price_currency_iso_code STRING,
  sub_account_currency_iso_code STRING,
  source_system_id INT,
  _file_name STRING,
  _snapshot_date TIMESTAMP,
  _load_date TIMESTAMP,
  _process_id BIGINT,


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
22108800,0,0,22108800


In [None]:
dbutils.notebook.exit(f"load_table {catalog}.ad_works.{table.name} succeeded with process_id = {process_id}")