In [0]:
import posixpath
import json
from pyspark.sql import functions as F
from pyspark.sql.types import StructType
import uuid
from datetime import datetime

In [0]:
dbutils.widgets.text("table_name", "")
dbutils.widgets.text("run_date_values", "")

table_name = dbutils.widgets.get("table_name")
run_date_values = dbutils.widgets.get("run_date_values").split(",")

In [0]:
pipeline_name = 'bronze_ingestion'
pipeline_run_id = str(uuid.uuid4())
base_path = f'/Volumes/otc/volumn/landingfiles/{table_name}/'

run_date = ['run_date='+d for d in run_date_values]
run_mode = 'single' if len(run_date)==1 else 'multiple'

start_ts = datetime.utcnow()
run_status = 'failed'


ingestion_params = f"""
{pipeline_name},
{pipeline_run_id},
{table_name},
{run_date_values},
{run_mode},
{base_path}
"""

print(ingestion_params)

In [0]:
try:
    # Read source_schema_registry for table
    source_schema_registry = spark.table('otc_config.config.source_schema_registry')\
        .filter( F.col('table_name')==table_name )
        
    # Get read_options_json    
    read_options_json = source_schema_registry.select('read_options_json').collect()[0][0]  
    read_options_json = json.loads(read_options_json)

    # Get table_schema 
    table_schema = source_schema_registry.select('schema_ddl').collect()[0][0]


    def get_ddl_cols(schema):
        ddl_cols = [[]]
        ddl_cols[0] = [schema.split(" ")[0], '']
        for i, x in enumerate(schema.split(" ")[1:-1]):
            ddl_cols[i][1] = x.rsplit(',',1)[0]
            ddl_cols.append( ['', ''] )
            ddl_cols[i+1][0] = x.rsplit(',',1)[1]

        ddl_cols[-1][1] = schema.split(" ")[-1]
        return ddl_cols


    # Get csv as dataframe
    def read_landing_files(PATH, read_options_json, schema):
        # Get name and type separately
        ddl_cols = get_ddl_cols(schema)

        # Read csv from landing
        if table_name=='src_payments' or table_name=='src_shipments':
            return_df = spark.read.format('json')\
                .options(**read_options_json)\
                            .load( PATH )
        else:
            return_df = spark.read.format('csv')\
                .options(**read_options_json)\
                            .load( PATH )

        # Enforce schema
        return_df = return_df.select(  *[ F.col(col_name).cast(col_type).alias(col_name) for col_name, col_type in ddl_cols] )
        
        return_df = return_df.withColumn('source_file', F.input_file_name())

        return return_df


except Exception as e:
    dbutils.notebook.exit({
        'notebook_run_status': 'failed',
        'ingestion_params': ingestion_params,
        'remark': f'Reading config failed: {e}'
    })

In [0]:
def write_in_bronze(write_df, table_name, run_date):
    try:
        # Delete the partition
        delete_query = f" DELETE FROM otc.bronze.{table_name} WHERE run_date='{run_date}' "
        spark.sql(delete_query)

        # Write to bronze
        write_df.write.mode('append').saveAsTable(f'otc.bronze.{table_name}')

        return {
            'status': 'success',
            'error': ''
        }
    
    except Exception as e:
        print('Failed due to ',e)
        return {
            'status': 'failed',
            'error': e
        }


In [0]:
try:
    if run_mode=='single':
        FULL_PATH = posixpath.join( base_path, run_date[0] )
        src_bronze = read_landing_files(FULL_PATH, read_options_json, table_schema)

        # Adding run_date column
        run_date_values = [date.split("=")[-1] for date in run_date]
        src_bronze = src_bronze.withColumn('run_date', F.lit( run_date_values[0] ).cast('date') )

    elif run_mode=='multiple':
        FULL_PATH = base_path
        src_bronze = read_landing_files(FULL_PATH, read_options_json, table_schema)

        # Filtering on run_date columns
        run_date_values = [date.split("=")[-1] for date in run_date]
        src_bronze = src_bronze.filter(  F.col('run_date').isin( run_date_values ) )
        

except Exception as e:
    dbutils.notebook.exit({
        'notebook_run_status': 'failed',
        'ingestion_params': ingestion_params,
        'remark': f'Reading landing files failed: {e}'
    })

In [0]:
for rn in run_date_values:
    wr_src_bronze = src_bronze.filter( F.col('run_date')==rn )

    rows_read = wr_src_bronze.count()
    end_ts = datetime.utcnow()

    ingestion_status = write_in_bronze(wr_src_bronze, table_name, rn)

    insert_into_pipeline_run = f"""
        INSERT INTO otc.audit.pipeline_run (pipeline_name, pipeline_run_id, table_name, run_date, run_mode, start_ts, end_ts, status, rows_read , error_message) 
        VALUES (
            '{pipeline_name}',
            '{pipeline_run_id}',
            '{table_name}',
            '{rn}',
            '{run_mode}',
            '{start_ts}',
            '{end_ts}',
            '{ingestion_status['status']}',
            '{rows_read}',
            '{ingestion_status['error']}'
            )
    """

    spark.sql(insert_into_pipeline_run)


# Return success message to ADF pipeline
dbutils.notebook.exit({
    'notebook_run_status': 'success',
    'ingestion_params': ingestion_params,
    'remark': 'Ingestion Completed'
    })