#Preparation Library

In [0]:
# Move the file to the correct path
dbutils.fs.cp("dbfs:/FileStore/tables/coreFunction.py", "file:/databricks/driver/coreFunction.py")

# Add the directory to the system path
import sys
sys.path.append("/databricks/driver/")

# Now, import the coreFunction module
import coreFunction
from pyspark.sql import functions as F

#Extract from Silver - Transform in Dataframe - Load to Gold

In [0]:
silver_df = spark.read.format("delta").table("silver_data.transactions_clean")

# Transform: Calculate TotalDeposits, TotalWithdrawals, NetBalance, and LastTransactionDate
transformed_df = silver_df.groupBy("AccountNo").agg(
    F.sum("DepositAMT").alias("TotalDeposits"),
    F.sum("WithdrawalAMT").alias("TotalWithdrawals"),
    F.last("BalanceAMT").alias("NetBalance"),
    F.max("Date").alias("LastTransactionDate")
)

coreFunction.DataIngestion.DeltaTables(
    tableName="gold_data.transactions_datamart", 
    dataFrameSource=transformed_df, 
    primaryKey="AccountNo"
    )

gold_data.transactions_datamart is a Delta table.
Primary key AccountNo is unique. Starting merge into Delta table.
Number of rows updated: 10
Number of rows inserted: 0


## Data Quality: Silver vs Gold

In [0]:
# sources
silver_df_profile = spark.read.format("delta").table("silver_data.transactions_clean")

transformed_df = silver_df_profile.groupBy("AccountNo").agg(
    F.sum("DepositAMT").alias("TotalDeposits"),
    F.sum("WithdrawalAMT").alias("TotalWithdrawals"),
    F.last("BalanceAMT").alias("NetBalance"),
    F.max("Date").alias("LastTransactionDate")
)

silver_df = transformed_df.select("AccountNo")

silver_df_profile = silver_df.toPandas()

profiler_silver = coreFunction.DataProfiling(titleProfile="Profiling Report Silver: Gold")
profiler_sources =profiler_silver.profile(silver_df_profile)

source_observation= profiler_sources.description_set.table['n']
source_rows= profiler_sources.description_set.variables['AccountNo']['n_distinct']

# destination
gold_df_profile = spark.read.format("delta").table("gold_data.transactions_datamart").select("AccountNo")

gold_df_profile = gold_df_profile.toPandas()

profiler_gold = coreFunction.DataProfiling(titleProfile="Profiling Report Gold")
profiler_destination =profiler_gold.profile(gold_df_profile)

destination_observation= profiler_destination.description_set.table['n']
destination_rows= profiler_destination.description_set.variables['AccountNo']['n_distinct']


coreFunction.dataQuality.generate_data_quality_report(
            tableDQ= 'quality_data.quality_monitoring',
            source_observation= source_observation, 
            destination_observation= destination_observation, 
            source_rows= source_rows, 
            destination_rows= destination_rows, 
            table_name= 'transactions_datamart',
            process= 'silver_to_gold',
            etlType= 'datamart',
            dataFormat= 'delta',
            file_path= 'dbfs:/user/hive/warehouse/gold_data.db/transactions_datamart'
        )

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

[INFO] 'quality_data.quality_monitoring' is an existing Delta table.
[INFO] Created DataFrame for DQ results: {'ID': '14988c846caf2ab0e6bc67bc20d7e9929fe901dd42d0c111a05e0b9774b9619d', 'Updated_Date': '2024-09-11', 'Updated_Timestamp': '2024-09-11 19:15:58', 'Type': 'datamart', 'Process': 'silver_to_gold', 'Type_File_Sources': 'delta', 'File_Sources': 'dbfs:/user/hive/warehouse/gold_data.db/transactions_datamart', 'Table_Name': 'transactions_datamart', 'Status_Observation': 'PASS', 'Source_Observation': 10, 'Destination_Observation': 10, 'Difference_Observation': 0, 'Status_Rows': 'PASS', 'Source_Rows': 10, 'Destination_Rows': 10, 'Difference_Rows': 0}.
[INFO] Successfully merged DQ results into the Delta table.
Number of rows updated: 1
Number of rows inserted: 0
