In [1]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *


StatementMeta(, 976e6abe-71f1-4bf1-8eee-1e2c7ee23b64, 3, Finished, Available, Finished)

In [23]:
#Data loading from bronze layer

bank_df=spark.read.format('parquet')\
                  .option('header',True)\
                  .option('inferSchema',True)\
                  .load(('abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/Rk_Bank_proj_Bronze'
                  '.Lakehouse/Files/bronzeFinanceData/bank_data.parquet'))

customer_df=spark.read.format('parquet')\
                  .option('header',True)\
                  .option('inferSchema',True)\
                  .load(('abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/Rk_Bank_proj_Bronze'
                  '.Lakehouse/Files/bronzeFinanceData/customer_data.parquet'))

transaction_df=spark.read.format('parquet')\
                  .option('header',True)\
                  .option('inferSchema',True)\
                  .load(('abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/Rk_Bank_proj_Bronze'
                  '.Lakehouse/Files/bronzeFinanceData/transaction_data.parquet'))                         
                                                  

bank_df.printSchema()
customer_df.printSchema()
transaction_df.printSchema()


StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 25, Finished, Available, Finished)

root
 |-- Branch_ID: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Firm_Revenue: decimal(18,0) (nullable = true)
 |-- Expenses: decimal(18,0) (nullable = true)
 |-- Profit_Margin: float (nullable = true)

root
 |-- Customer_ID: integer (nullable = true)
 |-- Age: short (nullable = true)
 |-- Customer_Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Bank_Name: string (nullable = true)
 |-- Branch_ID: string (nullable = true)

root
 |-- Transaction_ID: integer (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Account_Type: string (nullable = true)
 |-- Total_Balance: integer (nullable = true)
 |-- Transaction_Amount: double (nullable = true)
 |-- Investment_Amount: integer (nullable = true)
 |-- Investment_Type: string (nullable = true)
 |-- Transaction_Date: timestamp (nullable = true)



Silver Layer Transformation


In [25]:
bank_df.show()

StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 27, Finished, Available, Finished)

+---------+------+------+------------+--------+-------------+
|Branch_ID|  City|Region|Firm_Revenue|Expenses|Profit_Margin|
+---------+------+------+------------+--------+-------------+
|     1069|Nagpur|  West|           0|  279148|        64.56|
|     1092|Nagpur|  East|           0|  493705|        48.98|
|     1344|Nagpur| South|           0|  362711|        25.62|
|     1348|Nagpur|  East|           0|  259141|        94.84|
|     1546|Nagpur|  East|           0|   67502|        11.03|
|     1610|Nagpur| North|           0|  344764|        24.06|
|     1724|Nagpur|  East|           0|  203754|        69.43|
|     1763|Nagpur|  East|           0|   32371|       -20.86|
|     1900|Nagpur|  East|           0|   67800|       -13.88|
|     1954|Nagpur|  West|           0|  246251|        -1.99|
|     1004|Nagpur|  West|      282701|  148194|        -14.1|
|     1005|Nagpur|  East|      657164|   90729|        11.57|
|     1009|Nagpur| South|      690932|   44992|        56.47|
|     10

In [27]:
#cleaning bank data
null_counts = bank_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in bank_df.columns])
null_counts.show()
#handeling missing values
bank_df=bank_df.dropna(how='all')
bank_df = bank_df.dropDuplicates() 

bank_df=bank_df.fillna({
    'Branch_ID':'Unknown_Branch',
    'City':'Unknown_City',
    'Region':'Unknown_Region',
    'Firm_Revenue':0,
    'Expenses':0,
    'Profit_Margin':0.0
})

bank_df = bank_df.withColumn("Branch_ID", col("Branch_ID").cast(IntegerType()))
bank_df = bank_df.withColumn("Firm_Revenue", col("Firm_Revenue").cast(FloatType()))
bank_df = bank_df.withColumn("Expenses", col("Expenses").cast(FloatType()))


StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 29, Finished, Available, Finished)

+---------+----+------+------------+--------+-------------+
|Branch_ID|City|Region|Firm_Revenue|Expenses|Profit_Margin|
+---------+----+------+------------+--------+-------------+
|        0|   0|     0|           0|       0|            0|
+---------+----+------+------------+--------+-------------+



In [29]:
avg_age=customer_df.select(avg(col('Age')))
type(avg_age)
mean_age=avg_age.collect()[0][0]

StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 31, Finished, Available, Finished)

In [30]:
#cleaning customer data
null_counts = customer_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in customer_df.columns])
null_counts.show()
#handeling missing values
customer_df=customer_df.dropna(how='all')
customer_df = customer_df.dropDuplicates() 

customer_df=customer_df.fillna({
   'Age':mean_age,
   'Customer_Type':'Unknown',
   'City':'Not_Provided'
})
# null_counts = customer_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in customer_df.columns])
# null_counts.show()
customer_df.printSchema()

StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 32, Finished, Available, Finished)

+-----------+---+-------------+----+------+---------+---------+
|Customer_ID|Age|Customer_Type|City|Region|Bank_Name|Branch_ID|
+-----------+---+-------------+----+------+---------+---------+
|          0|500|          500| 500|     0|        0|        0|
+-----------+---+-------------+----+------+---------+---------+

root
 |-- Customer_ID: integer (nullable = true)
 |-- Age: short (nullable = true)
 |-- Customer_Type: string (nullable = false)
 |-- City: string (nullable = false)
 |-- Region: string (nullable = true)
 |-- Bank_Name: string (nullable = true)
 |-- Branch_ID: string (nullable = true)



In [31]:
#cleaning transaction data
null_counts = transaction_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in transaction_df.columns])
null_counts.show()
#handeling missing values
transaction_df=transaction_df.dropna(how='all')
transaction_df = transaction_df.dropDuplicates() 

transaction_df=transaction_df.fillna({
    'Customer_ID':0,
    "Account_Type": "Unknown",    
    "Total_Balance": 0,           
    "Transaction_Amount": 0.0,    
    "Investment_Amount": 0,       
    "Investment_Type": "None"
})


transaction_df.printSchema()

StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 33, Finished, Available, Finished)

+--------------+-----------+------------+-------------+------------------+-----------------+---------------+----------------+
|Transaction_ID|Customer_ID|Account_Type|Total_Balance|Transaction_Amount|Investment_Amount|Investment_Type|Transaction_Date|
+--------------+-----------+------------+-------------+------------------+-----------------+---------------+----------------+
|             0|          0|           0|            0|                 0|                0|              0|               0|
+--------------+-----------+------------+-------------+------------------+-----------------+---------------+----------------+

root
 |-- Transaction_ID: integer (nullable = true)
 |-- Customer_ID: integer (nullable = false)
 |-- Account_Type: string (nullable = false)
 |-- Total_Balance: integer (nullable = false)
 |-- Transaction_Amount: double (nullable = false)
 |-- Investment_Amount: integer (nullable = false)
 |-- Investment_Type: string (nullable = false)
 |-- Transaction_Date: timesta

Writing the silver layer data to lake house

In [33]:
bank_df.write.format('delta').mode('overwrite').saveAsTable('SilverFinanceData.bank_table')

StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 35, Finished, Available, Finished)

In [12]:
customer_df.write.format('delta').mode('overwrite').saveAsTable('SilverFinanceData.customer_table')

StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 14, Finished, Available, Finished)

In [11]:
transaction_df.write.format('delta').mode('overwrite').saveAsTable('SilverFinanceData.transaction_table')

StatementMeta(, 12392733-863f-4efd-b045-2c1191fea4c6, 13, Finished, Available, Finished)