In [0]:
%run "./Configuration"

In [0]:
# Define the metadata widget:
dbutils.widgets.text("entityName", "")
dbutils.widgets.text("entityColumns", "")
dbutils.widgets.text("sourceSystem", "")

# Save parameters to variables:
entityName =       dbutils.widgets.get('entityName')
sourceSystemName = dbutils.widgets.get('sourceSystem')
columns =          json.loads(dbutils.widgets.get('entityColumns'))

In [0]:
# Define query to extract entityNames from sourceSystem table:
query = f"""
  SELECT entityNames
  FROM   sourceSystem
  WHERE  sourceEntityName = '{sourceSystemName}'
  """

# Extract the array from entityNames DataFrame and convert to list:
entity_names = json.loads(([row.entityNames for row in query_db(query).collect()])[0])

In [0]:
# For each entity, create a dataFrame:
for entity in entity_names:
   # Read from BRONZE path in datalake:
   globals()[f"df_{entity}"] = spark.read.format('parquet').load(BRONZE + f'/TotesysDB/{entity}/{datetime.datetime.now().strftime("%Y/%m/%d")}')

In [0]:
# Iterate through entities to create temp views:
for entity in entity_names:
    # Create or replace tempView:
    globals()[f'df_{entity}'].createOrReplaceTempView(f"tv_{entity}")

In [0]:
if entityName == 'date':
    dbutils.notebook.exit('None')

In [0]:
if entityName == 'design':
    
    # Define Dynamic Query:
    query = f"""
    SELECT CAST(design_id AS {columns[0]['dataType']}) AS {columns[0]['columnName']}
    ,      CAST(design_name AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    ,      CAST(file_location AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
    ,      CAST(file_name AS {columns[3]['dataType']}) AS {columns[3]['columnName']}
    FROM tv_design
    """
    
    # Execute Query:
    df = spark.sql(query)

In [0]:
if entityName == 'transaction':
    query = f"""
    SELECT CAST(transaction_id AS {columns[0]['dataType']}) AS {columns[0]['columnName']}
    ,      CAST(transaction_type AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    ,      CAST(sales_order_id AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
    ,      CAST(purchase_order_id AS {columns[3]['dataType']}) AS {columns[3]['columnName']}
    FROM tv_transaction
    """

    # Execute Query:
    df = spark.sql(query)

In [0]:
if entityName == 'payment_type':
    query = f"""
    SELECT CAST(payment_type_id AS {columns[0]['dataType']}) AS {columns[0]['columnName']}
    ,      CAST(payment_type_name AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    FROM tv_payment_type
    """
    
    # Execute Query:
    df = spark.sql(query)


In [0]:
if entityName == 'location':
    query = f"""
    SELECT CAST(address_id AS {columns[0]['dataType']}) AS {columns[0]['columnName']}
    ,      CAST(address_line_1 AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    ,      CAST(address_line_2 AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
    ,      CAST(district AS {columns[3]['dataType']}) AS {columns[3]['columnName']}
    ,      CAST(city AS {columns[4]['dataType']}) AS {columns[4]['columnName']}
    ,      CAST(postal_code AS {columns[5]['dataType']}) AS {columns[5]['columnName']}
    ,      CAST(country AS {columns[6]['dataType']}) AS {columns[6]['columnName']}
    ,      CAST(phone AS {columns[7]['dataType']}) AS {columns[7]['columnName']}
    FROM tv_location
    """

    # Execute Query:
    df = spark.sql(query)

In [0]:
if entityName == 'currency':
    query = f"""
    SELECT CAST(currency_id AS {columns[0]['dataType']}) AS {columns[0]['columnName']}
    ,      {columns[1]['dataType']}(currency_code) AS {columns[1]['columnName']}
    ,      CASE 
                WHEN currency_code = 'USD' THEN 'United States Dollar'
                WHEN currency_code = 'EUR' THEN 'Euro'
                WHEN currency_code = 'JPY' THEN 'Japanese Yen'
                WHEN currency_code = 'GBP' THEN 'British Pound Sterling'
                WHEN currency_code = 'AUD' THEN 'Australian Dollar'
                WHEN currency_code = 'CAD' THEN 'Canadian Dollar'
                WHEN currency_code = 'CHF' THEN 'Swiss Franc'
                WHEN currency_code = 'CNY' THEN 'Chinese Yuan'
                WHEN currency_code = 'NZD' THEN 'New Zealand Dollar'
                ELSE 'Unknown Currency'
           END AS {columns[2]['columnName']}   
    FROM tv_currency
    """
    
    # Execute Query:
    df = spark.sql(query)

In [0]:
if entityName == 'counterparty':
    query = f"""
    SELECT CAST(cp.counterparty_id AS {columns[0]['dataType']}) AS {columns[0]['columnName']}
    ,      CAST(cp.counterparty_legal_name AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    ,      CAST(l.address_line_1 AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
    ,      CAST(l.address_line_2 AS {columns[3]['dataType']}) AS {columns[3]['columnName']}
    ,      CAST(l.district AS {columns[4]['dataType']}) AS {columns[4]['columnName']}
    ,      CAST(l.city AS {columns[5]['dataType']}) AS {columns[5]['columnName']}
    ,      CAST(l.postal_code AS {columns[6]['dataType']}) AS {columns[6]['columnName']}
    ,      CAST(l.country AS {columns[7]['dataType']}) AS {columns[7]['columnName']}
    ,      CAST(l.phone AS {columns[8]['dataType']}) AS {columns[8]['columnName']}
    FROM tv_counterparty cp
    JOIN tv_location l ON cp.legal_address_id = l.address_id
    """

    # Execute Query:
    df = spark.sql(query)

In [0]:
if entityName == 'department':
    query = f"""
    SELECT CAST(department_id AS {columns[0]['dataType']}) AS {columns[0]['columnName']}
    ,      CAST(department_name AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    ,      CAST(location AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
    ,      CAST(manager AS {columns[3]['dataType']}) AS {columns[3]['columnName']}
    ,      CAST(created_at AS {columns[4]['dataType']}) AS {columns[4]['columnName']}
    ,      CAST(last_updated AS {columns[5]['dataType']}) AS {columns[5]['columnName']}
    FROM tv_department
    """

    # Execute Query:
    df = spark.sql(query)

In [0]:
if entityName == 'staff':
    query = f"""
    SELECT CAST(s.staff_id AS {columns[0]['dataType']}) AS {columns[0]['columnName']}
    ,      CAST(s.first_name AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    ,      CAST(s.last_name AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
    ,      CAST(d.department_name AS {columns[3]['dataType']}) AS {columns[3]['columnName']}
    ,      CAST(d.location AS {columns[4]['dataType']}) AS {columns[4]['columnName']}
    ,      CAST(s.email_address AS {columns[5]['dataType']}) AS {columns[5]['columnName']}
    FROM tv_staff s
    JOIN tv_department d ON s.department_id = d.department_id
    """

    # Execute Query:
    df = spark.sql(query)


In [0]:
if entityName == 'payment':
    query = f"""
    SELECT
          ROW_NUMBER() OVER (ORDER BY {columns[1]['columnName']}) as {columns[0]['columnName']}
        , CAST(p.payment_id AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
        , CAST(p.created_at AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
        , CAST(date_format(p.created_at, 'HH:mm:ss') AS STRING) AS {columns[3]['columnName']}
        , CAST(p.last_updated AS {columns[4]['dataType']}) AS {columns[4]['columnName']}
        , CAST(date_format(p.last_updated, 'HH:mm:ss') AS STRING) AS {columns[5]['columnName']}
        , CAST(t.transaction_id AS {columns[6]['dataType']}) AS {columns[6]['columnName']}
        , CAST(cp.counterparty_id AS {columns[7]['dataType']}) AS {columns[7]['columnName']}
        , CAST(p.payment_amount AS DECIMAL(10,2)) AS {columns[8]['columnName']}
        , CAST(c.currency_id AS {columns[9]['dataType']}) AS {columns[9]['columnName']}
        , CAST(pt.payment_type_id AS {columns[10]['dataType']}) AS {columns[10]['columnName']}
        , CAST(p.paid AS {columns[11]['dataType']}) AS {columns[11]['columnName']}
      --  , CAST(p.payment_date AS {columns[12]['dataType']}) AS {columns[12]['columnName']}
    FROM tv_payment p
    JOIN tv_counterparty cp ON p.counterparty_id = cp.counterparty_id
    JOIN tv_currency c ON p.currency_id = c.currency_id
    JOIN tv_payment_type pt ON p.payment_type_id = pt.payment_type_id
    JOIN tv_transaction t ON p.transaction_id = t.transaction_id
    """
 
    # Execute Query:
    df = spark.sql(query)

In [0]:
if entityName == 'purchase_order':
    query = f"""
    SELECT
           ROW_NUMBER() OVER (ORDER BY po.{columns[1]['columnName']}) as {columns[0]['columnName']}
    ,      CAST(po.purchase_order_id AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    ,      CAST(po.created_at AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
    ,      CAST(date_format(po.created_at, 'HH:mm:ss') AS STRING) AS {columns[3]['columnName']}  
    ,      CAST(po.last_updated AS {columns[4]['dataType']}) AS {columns[4]['columnName']}
    ,      CAST(date_format(po.last_updated, 'HH:mm:ss') AS STRING) AS {columns[5]['columnName']}
    ,      CAST(s.staff_id AS {columns[6]['dataType']}) AS {columns[6]['columnName']}
    ,      CAST(cp.counterparty_id AS {columns[7]['dataType']}) AS {columns[7]['columnName']}
    ,      CAST(po.item_code AS {columns[8]['dataType']}) AS {columns[8]['columnName']}
    ,      CAST(po.item_quantity AS {columns[9]['dataType']}) AS {columns[9]['columnName']}
    ,      CAST(po.item_unit_price AS DECIMAL(10,2)) AS {columns[10]['columnName']}
    ,      CAST(c.currency_id AS {columns[11]['dataType']}) AS {columns[11]['columnName']}
    ,      CAST(po.agreed_delivery_date AS {columns[12]['dataType']}) AS {columns[12]['columnName']}
    ,      CAST(po.agreed_payment_date AS {columns[13]['dataType']}) AS {columns[13]['columnName']}
    ,      CAST(l.address_id AS {columns[14]['dataType']}) AS {columns[14]['columnName']}

    FROM tv_purchase_order po
       JOIN tv_staff        s      ON po.staff_id = s.staff_id 
       JOIN tv_counterparty cp     ON po.counterparty_id = cp.counterparty_id
       JOIN tv_currency     c      ON po.currency_id = c.currency_id
       JOIN tv_location     l      ON po.agreed_delivery_location_id = l.address_id
   
    """

    # Execute Query:
    df = spark.sql(query)



In [0]:
if entityName == 'sales_order':
    query = f"""
    SELECT 
           ROW_NUMBER() OVER (ORDER BY so.{columns[1]['columnName']}) as {columns[0]['columnName']}
    ,      CAST(so.sales_order_id AS {columns[1]['dataType']}) AS {columns[1]['columnName']}
    ,      CAST(so.created_at AS {columns[2]['dataType']}) AS {columns[2]['columnName']}
    ,      CAST(date_format(so.created_at, 'HH:mm:ss') AS STRING) AS {columns[3]['columnName']}
    ,      CAST(so.last_updated AS {columns[4]['dataType']}) AS {columns[4]['columnName']}
    ,      CAST(date_format(so.last_updated, 'HH:mm:ss') AS STRING) AS {columns[5]['columnName']}
    ,      CAST(s.staff_id AS {columns[6]['dataType']}) AS {columns[6]['columnName']}
    ,      CAST(cp.counterparty_id AS {columns[7]['dataType']}) AS {columns[7]['columnName']}
    ,      CAST(so.units_sold AS {columns[8]['dataType']}) AS {columns[8]['columnName']}
    ,      CAST(so.unit_price AS {columns[9]['dataType']}) AS {columns[9]['columnName']}
    ,      CAST(c.currency_id AS {columns[10]['dataType']}) AS {columns[10]['columnName']}
    ,      CAST(d.design_id AS {columns[11]['dataType']}) AS {columns[11]['columnName']}
    ,      CAST(so.agreed_payment_date AS {columns[12]['dataType']}) AS {columns[12]['columnName']}
    ,      CAST(so.agreed_delivery_date AS {columns[13]['dataType']}) AS {columns[13]['columnName']}
    ,      CAST(l.address_id AS {columns[14]['dataType']}) AS {columns[14]['columnName']}
    FROM tv_sales_order so
    JOIN tv_counterparty cp ON so.counterparty_id = cp.counterparty_id
    JOIN tv_staff s ON so.staff_id = s.staff_id
    JOIN tv_currency c ON so.currency_id = c.currency_id
    JOIN tv_design d ON so.design_id = d.design_id
    JOIN tv_location l on so.agreed_delivery_location_id = l.address_id
    """
    
    # Execute Query:
    df = spark.sql(query)

In [0]:
# Check data exists:
if df.count() > 0:
    folder_path = datetime.datetime.now().strftime("%Y/%m/%d")

    # Set location for delta table:
    location = SILVER + f"/{sourceSystemName}/{entityName}/{folder_path}/"
    
    # Write to delta table:
    df.write.mode('overwrite').format('delta').option('header', 'true').save(location)

    # Output location:
    dbutils.notebook.exit(location)
else:
    dbutils.notebook.exit(-1)
