In [0]:
import dlt
from pyspark.sql.functions import *
from datetime import date,timedelta,datetime
from pyspark.sql import SparkSession
import json

audit_schema = "prod_hhobi.hhobi_audit"

In [0]:
#encryption
import base64
from pyspark.sql.types import *
from pyspark.sql.functions import expr

def column_native_encrypt(df_enc, Tokenizationdetails):
    try:
        Tokenizationdetails=Tokenizationdetails.split(",")
        for column in Tokenizationdetails:
            if (len(Tokenizationdetails)==1 and Tokenizationdetails[0]=="NA") or len(Tokenizationdetails)==0:
                return df_enc
            elif column in df_enc.columns:
                df_enc = df_enc.withColumn(f"{column}_encrypted", expr(f"main.tokenizer.Encrypt({column})")).drop(column)
                #print('here')
                df_enc=df_enc.withColumnRenamed(f"{column}_encrypted",column)
            else:
                raise Exception()
    except Exception as e:
        print('invalid entry, Exception: ', e)
    return df_enc



In [0]:
#VOLUME_TO_DLT
import os
import gnupg
import zipfile
import json
from shutil import copyfile
import time
# Define paths



def setup_gpg(*, secret_key: str):
    gpg = gnupg.GPG()
    import_result = gpg.import_keys(secret_key)
    return gpg, import_result

def copy_files_target(source_dir: str, target_dir: str, pattern: str,DEC_DIR_FLAG :str,UNZIP_DIR_FLAG : str):
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
    files = dbutils.fs.ls(source_dir)
    
    if DEC_DIR_FLAG=='N' and UNZIP_DIR_FLAG=='Y':
        files_to_copy = [f.path for f in files if f.name.endswith('.zip') and pattern in f.name]
        for file_path in files_to_copy:
            file_name = file_path.split('/')[-1]
            target_path = f"{target_dir}/{file_name}"
            dbutils.fs.cp(file_path, target_path)
    elif DEC_DIR_FLAG=='Y' and UNZIP_DIR_FLAG=='N':
        files_to_copy = [f.path for f in files if f.name.endswith('.dat') and pattern in f.name]
        for file_path in files_to_copy:
            file_name = file_path.split('/')[-1]
            target_path = f"{target_dir}/{file_name}"
            dbutils.fs.cp(file_path, target_path)
    else:
        files_to_copy = [f.path for f in files if f.name.endswith('.dat') and pattern in f.name]
        for file_path in files_to_copy:
            file_name = file_path.split('/')[-1]
            target_path = f"{target_dir}/{file_name}"
            dbutils.fs.cp(file_path, target_path)
        

def decrypt_files(*, gpg: gnupg.GPG, passphrase: str,source_dir:str, dec_dir: str,pattern: str):
    #copy_files_to_dec_dir(source_dir=source_path, target_dir=dec_dir, pattern=pattern)
    if not os.path.exists(dec_dir):
        os.makedirs(dec_dir)

    list_of_files = os.listdir(source_dir)
    for file in list_of_files:
         if pattern in file and file.endswith('.zip.gpg') :
            encrypted_file = os.path.join(source_dir, file)
            decrypted_file = os.path.join(dec_dir, file[:-4])

            # Decrypt the file
            with open(encrypted_file, 'rb') as encrypted_f:
                status = gpg.decrypt_file(encrypted_f, passphrase=passphrase, always_trust=True, output=decrypted_file)
                

def unzip_files(unzip_dir,dec_dir_src, pattern):
    if not os.path.exists(unzip_dir):
        os.makedirs(unzip_dir)
    list_of_files = os.listdir(dec_dir_src)
    zip_files = [file for file in list_of_files if file.endswith('.zip')]
    for file in zip_files:
        if pattern in file:
            zip_file_with_pattern = os.path.join(dec_dir_src, file)
            # print(file.split('/')[-1])
            with zipfile.ZipFile(zip_file_with_pattern, 'r') as zip_ref:
                    zip_ref.extractall(unzip_dir)
    

def Volumes_to_DLT(target_table_name, source_path,dec_dir, unzip_dir, pattern, file_format, DEC_DIR_FLAG, UNZIP_DIR_FLAG,encrypted_col,quality = 'bronze'):
    
    @dlt.table(
        name=target_table_name,
        comment=f"Load {target_table_name} from volumes"
    )
    def data_load():
        # Get secrets
        os.chdir(source_path)
        scope_gpg = dbutils.secrets.get(scope="dataops", key="gpg")
        spark.conf.set("credentials", scope_gpg)
        scope_split = json.loads(spark.conf.get("credentials"))
        passphrase = scope_split['passphrase']
        secret_key = scope_split['secret_key'] 
        
        # Setup GPG
        gpg, import_result = setup_gpg(secret_key=secret_key)
        
        # Handle decryption and/or unzip based on flags
        if DEC_DIR_FLAG == 'Y':
            decrypt_files(gpg=gpg, passphrase=passphrase, source_dir=source_path, dec_dir=dec_dir, pattern=pattern)
        else:
            
            copy_files_target(source_dir=source_path, target_dir=dec_dir, pattern=pattern,DEC_DIR_FLAG=DEC_DIR_FLAG,UNZIP_DIR_FLAG=UNZIP_DIR_FLAG)

        if UNZIP_DIR_FLAG == 'Y':
            time.sleep(20)
            unzip_files(unzip_dir,dec_dir,pattern)
        else:
            copy_files_target(source_dir=dec_dir, target_dir=unzip_dir, pattern=pattern,DEC_DIR_FLAG=DEC_DIR_FLAG,UNZIP_DIR_FLAG=UNZIP_DIR_FLAG)
        
        # Load the data into a DataFrame
    
        file_to_load = unzip_dir
        source_df=spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", file_format) \
            .option("delimiter", "\u001c") \
            .load(file_to_load)
        source_df=column_native_encrypt(source_df, encrypted_col)
        return source_df


In [0]:
def fetch_matching_rows_Load_Control(spark, param1=None, param2=None, param3=None):

  query = f"SELECT * FROM {audit_schema}.ingestion_load_control_table WHERE 1=1" 
  if param1 is not None:
    query += f" AND dataGroupId = '{param1}'" 
  if param2 is not None:
    query += f" AND dataFlowGroup = '{param2}'"
  if param3 is not None:
    query += f" AND dataFlowId = '{param3}'"

  matching_rows = spark.sql(query)
  return matching_rows

def fetch_matching_rows_Entities_List(spark, PK_Column):
    query = f"SELECT * FROM {audit_schema}.ingestion_entities WHERE dataFlowFK ='{PK_Column}'"
    print(query)
    matching_records = spark.sql(query)

    return matching_records

In [0]:
def Delta_to_bronze(target_Table, source_table_name, quality = 'bronze'):
    print('Delta_to_bronze')

    @dlt.table(
        name=target_Table
    )
    def process_delta_table():
        source_delta_table_path = f"{source_table_name}"
        return (
            spark.read.table(f"{source_delta_table_path}")
        )

In [0]:
# #Volume to DLT
# def Volumes_to_DLT(target_table_name, source_path, file_format):
    
#     @dlt.table(
#         name=target_table_name,
#         comment=f"Load {target_table_name} from volumes"
#     )
#     def data_load():
#         df = spark.readStream.format("cloudFiles") \
#             .option("cloudFiles.format", file_format) \
#             .option("delimiter", "\x1d") \
#             .load(source_path)
        
#         return df

#     return data_load

In [0]:
def fetch_bronze_data_flows():
    # batch_time = (datetime.now() + timedelta(minutes = 330)).hour
    
    query = f"select * from {audit_schema}.ingestion_load_control_table where targetLayer = 'Bronze' and DataFlow_Status = 'Ready for DLT'"  #.format(str(batch_time))
    matching_records = spark.sql(query)

    return matching_records

In [0]:
def update_bronze_dataflow_status(dataFlowGroup, status):
    query = f"update {audit_schema}.ingestion_load_control_table set DataFlow_status = {status} where dataFlowGroup = {dataFlowGroup} and targetLayer in ('Bronze')"
    spark.sql(query)

In [0]:
spark = SparkSession.builder.appName("flow").getOrCreate()

# to be passed while running the DLT pipeline
# dataGroupId = 'A'
# dataFlowGroup = 'A1'
# dataFlowId = 'A102'
# TargetLayer = 'Bronze'

data_flows = fetch_bronze_data_flows().collect()

for data_flow in data_flows:
    dataGroupId = data_flow['dataGroupId']
    dataFlowGroup = data_flow['dataFlowGroup']
    dataFlowId = data_flow['dataFlowId']
    pk_column_value = dataGroupId + "-" + dataFlowGroup + "-" + dataFlowId
    sourceFormat = data_flow['sourceFormat']
    sourceDetails = data_flow['sourceDetails']
    # quality = data_flow['Target_Layer']

    # if data_flow.filter("sourceFormat = 'Oracle DB'").count() > 0:
    if sourceFormat == 'Oracle DB':
        matching_records_table2 = fetch_matching_rows_Entities_List(spark, pk_column_value)
        display(matching_records_table2)
        for table_load in matching_records_table2.collect():
            target_Table = table_load['targetTable']
            primaryKey = table_load['primaryKey']  
            source_table_name= table_load[('SourceTableName')] 
            delta_to_DLT(target_Table, source_table_name, quality = 'bronze')
    
    # elif data_flow.filter("sourceFormat = 'Delta Table'").count() > 0:
    elif sourceFormat == 'Delta Table':
        matching_records_table2 = fetch_matching_rows_Entities_List(spark, pk_column_value)
        #display(matching_records_table2)
        for table_load in matching_records_table2.collect():
            target_Table = table_load['targetTablename']
            primaryKey = table_load['primaryKey']  
            source_table_name= table_load[('SourceTableName')] 
            Delta_to_bronze(target_Table, source_table_name, quality = 'bronze')


    # elif data_flow.filter("sourceFormat = 'Volumes'").count() > 0:
    elif sourceFormat == 'Files':
        matching_records_table2 = fetch_matching_rows_Entities_List(spark, pk_column_value)
        for table_load in matching_records_table2.collect():
            target_tableName = table_load['targetTablename']
            working_dir=table_load['volume_details']['working_dir']
            dec_dir=table_load['volume_details']['dec_dir']
            unzip_dir=table_load['volume_details']['unzip_dir']
            file_pattern=table_load['volume_details']['file_pattern']
            file_format=table_load['volume_details']['file_format']
            is_encrypted=table_load['volume_details']['is_encrypted']
            is_zipped=table_load['volume_details']['is_zipped']
            encrypted_col=table_load['encrypted_col']
            Volumes_to_DLT(target_tableName,working_dir,dec_dir,unzip_dir,file_pattern,file_format,is_encrypted,is_zipped,encrypted_col, quality = 'bronze')
    else:
            pass
    #update_bronze_dataflow_status(data_flow['dataFlowGroup'], "Completed")