In [None]:
# Receive parameters
pipelineruntime = ""
executionYear = ""
executionMonth = ""
executionDay = ""
workspaceid = ""
lakehouseid = ""
files_action_silver = ""

In [None]:
!pip install great_expectations

import json
import great_expectations as gx

import pyspark.sql.types as types
from pyspark.sql.functions import *

import traceback

In [None]:
# Create base path for Lakehouse
base_lakehouse_path = f"abfss://{workspaceid}@onelake.dfs.fabric.microsoft.com/{lakehouseid}/"

# Get class from Python files for pipelineLog, validateData
sc.addPyFile("abfss://Global_Electronics_Retailer_Meta@onelake.dfs.fabric.microsoft.com/LH_Python_Files.Lakehouse/Files/FabricUDFManagement.py")
from FabricUDFManagement import *
pipelineLogger = PipelineLogger()
validateData = ValidateData()

# Instance of great_expectations
context = gx.get_context()

# Create executionDate
executionDate = f"{executionYear}-{executionMonth}-{executionDay}"

# Load to json array
bronze_to_silver_json_array = json.loads(files_action_silver)

In [None]:
# Time
start_time = ""
end_time = ""

# Error, status
error = ""
status = ""

# Path
data_validate_path = ""
validator = None

for bronze_to_silver_json in bronze_to_silver_json_array:

    # Load to json_obj
    bronze_to_silver = json.loads(bronze_to_silver_json)

    # Get information about task
    task_id = bronze_to_silver["task_id"]
    task_name = bronze_to_silver["task_name"]
    source_folder = bronze_to_silver["source_folder"]
    target_schema = bronze_to_silver["target_schema"]
    target_table = bronze_to_silver["target_table"]
    phase = bronze_to_silver["phase"]

    # Validate data
    columnMissing = ""
    columnNull = ""

    # Num Inserted
    numInserted = 0

    # Check data
    # print("Task_id: ", task_id)
    # print("Task_name: ", task_name)
    # print("Source folder", source_folder)
    # print("target_table", target_table)

    # Get start time of notebook
    start_time = spark.sql(''' SELECT from_utc_timestamp(CURRENT_TIMESTAMP(), 'Asia/Ho_Chi_Minh') as current_time ''') \
                              .collect()[0]["current_time"].strftime('%Y-%m-%d %H:%M:%S')

    # Validate authentication (Fake account)
    try:
        data_validate_path = f"{base_lakehouse_path}/Files/Bronze/{source_folder}/{executionYear}/{executionMonth}/{executionDay}/{source_folder}_{executionYear}_{executionMonth}_{executionDay}.parquet"
        validator = context.sources.pandas_default.read_parquet(data_validate_path)
    except:
        data_validate_path = f"{base_lakehouse_path}/Files/Bronze/{source_folder}/{executionYear}/{executionMonth}/{executionDay}/{source_folder}_{executionYear}_{executionMonth}_{executionDay}.parquet"
        validator = context.sources.pandas_default.read_parquet(data_validate_path)

    try:
        # Check task
            # Task = 6, Customer
        if task_id == 6:
            col_check_match = ["CustomerKey", "Gender", "Name", "City", "State_Code", "State", "Zip_Code", "Country", "Continent", "Birthday"]
            col_check_null = "CustomerKey"

            columnMissing = validateData.check_schema(validator, col_check_match)
            # columnNull = validateData.check_null(validator, col_check_null)
        
            # Task = 7, Stores
        elif task_id == 7:
            col_check_match = ["StoreKey", "Country", "State", "Square_Meters", "Open_Date"]
            col_check_null = "StoreKey"

            columnMissing = validateData.check_schema(validator, col_check_match)
            # columnNull = validateData.check_null(validator, col_check_null)
            
            # Task = 8, Products
        elif task_id == 8:
            col_check_match = ["ProductKey", "Product_Name", "Brand", "Color", "Unit_Cost_USD", "Unit_Price_USD", \
                               "SubcategoryKey", "Subcategory", "CategoryKey", "Category"]
            col_check_null = "ProductKey"

            columnMissing = validateData.check_schema(validator, col_check_match)
            # columnNull = validateData.check_null(validator, col_check_null)
            s
            # Task = 9, Sales
        elif task_id == 9:
            col_check_match = ["Order_Number", "Line_Item", "Order_Date", "Delivery_Date", "CustomerKey", "StoreKey", \
                               "ProductKey", "Quantity", "Currency_Code"]
            col_check_null = "Order_Number"

            columnMissing = validateData.check_schema(validator, col_check_match)
            # columnNull = validateData.check_null(validator, col_check_null)
            
            # Task = 10, Exchange_Rates
        elif task_id == 10:
            col_check_match = ["Date", "Currency", "Exchange"]
            col_check_null = "Currency"

            columnMissing = validateData.check_schema(validator, col_check_match)
            # columnNull = validateData.check_null(validator, col_check_null)

        else:
            pass

        # Check error 
        if (columnMissing != "") or (columnNull != ""): 
            error = "Missing column or Have null value in data"
            status = "Failed"
        else:

            error = ""
            status = "Success"

            # Read dataframe
            df_save_delta = spark.read.format("parquet").load(data_validate_path)

            # Set numInserted
            numInserted = df_save_delta.count()

            # Save as delta
            df_save_delta.write.mode("overwrite").option("parquet.vorder.enabled","true").format("delta").save(f"{base_lakehouse_path}/Tables/{target_table}")

    except:
        error = traceback.format_exc()
        status = "Failed"
        

    # Get end time of notebook
    end_time = spark.sql(''' SELECT from_utc_timestamp(CURRENT_TIMESTAMP(), 'Asia/Ho_Chi_Minh') as current_time ''') \
                              .collect()[0]["current_time"].strftime('%Y-%m-%d %H:%M:%S')

    # Log pipeline run
    df_log = pipelineLogger.log_data(pipelineruntime, task_id, task_name, start_time, \
                                     end_time, 0, numInserted, 0, columnMissing, columnNull, error, status, types, spark)

    # Save log
    df_log.write.mode("append").format("parquet").save(f"{base_lakehouse_path}/Files/Log_BronzetoSilver/{executionDate}/")

In [2]:
# df_log = spark.read.format("parquet").load("abfss://a2719874-ed1d-47f0-be1d-c88fd0db2547@onelake.dfs.fabric.microsoft.com/d81ba17b-89ed-46b5-a1a9-e020f10bf176/Files/Log_BronzetoSilver/2024-05-29/*.parquet")

# display(df_log.orderBy("TaskId", ascending = True))

StatementMeta(, 4dd63d81-ff4b-422b-8596-7d4cb6c7109b, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4f34a44f-9953-4e57-bb7e-c615b8977224)