In [276]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [277]:
# Specify the path to your CSV file
csv_file_path = "gs://capstondata/accounts.csv"

# Read the CSV file into a DataFrame with inferred schema
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Show the contents of the DataFrame
df.show()

+-----------+----------+-----------+-----------+----------------+---------+---------------+
|  AccountId|CustomerID|AccountType|    Balance|last_kyc_updated|branch_id|account_created|
+-----------+----------+-----------+-----------+----------------+---------+---------------+
|BUS-1990298|3GFT1ZDW4G|   Business|30342570.25|      10/30/2022| BR-00015|      1/29/2006|
|SAV-5186099|VRWR0O400I|    Savings|41896208.43|        6/4/2022| BR-00133|      8/21/2008|
|CUR-9367339|D55UJI8WV0|    Current|32272303.31|       6/18/2021| BR-00001|     10/20/1998|
|SAV-4887415|53CR23T46S|    Savings|72880621.93|        3/9/2020| BR-00004|     11/15/2012|
|SAV-1692074|HZDGV2JYU1|    Savings|29188271.76|       5/28/2021| BR-00099|     10/11/2020|
|BUS-7665995|MSUHT18DWD|   Business|24902534.91|      10/15/2019| BR-00011|      7/23/1999|
|CUR-8976663|VE674KTE1E|    Current|59816713.45|       7/15/2020| BR-00187|      2/26/2016|
|BUS-2814494|BVZA30CZ4F|   Business| 92399809.3|       1/15/2021| BR-00194|     

In [278]:
# Specify the path to your CSV file
csv_file_path = "gs://capstondata/customers.csv"

# Read the CSV file into a DataFrame with inferred schema
customers = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Show the contents of the DataFrame
customers.show()

+-----------+--------------------+--------------------+----------+--------------+
|Customer_id|           Full_Name|      Customer_Email|       dob|Customer_Phone|
+-----------+--------------------+--------------------+----------+--------------+
| V0H084B3FA|         Mahika Gill|mahika.gill@gmail...| 6/23/1993|     785877743|
| 7SI1YKSDMB|        Elakshi Gour|elakshi.gour@outl...| 6/14/1981|    9638371516|
| H0L4L7VHXT|    Tanya Srinivasan|tanyasrinivasan@y...| 3/22/2003|     559420656|
| 6I324KGK08|        Kashvi Saraf|kashvisaraf@yahoo...| 6/27/2001|    4985572504|
| WKJ0VXQFIY|            Piya Ram| piyaram@hotmail.com| 3/31/1994|    5549849574|
| HT4P3N5R38|    Aniruddh Kothari|aniruddhkothari@h...| 4/20/1994|    8609260186|
| Y4EHCD6U3G|           Riya Gala|riya.gala@hotmail...| 8/10/1989|    6947567136|
| KTYHYNJG3Q|    Tarini Sabharwal|tarini.sabharwal@...| 12/3/1973|    7701533431|
| 2N2119NGLF|     Shanaya Sampath|shanaya_sampath@g...|10/29/1988|    4587205438|
| CT2JZ4HB0I|   

In [279]:
df.printSchema()

root
 |-- AccountId: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- AccountType: string (nullable = true)
 |-- Balance: string (nullable = true)
 |-- last_kyc_updated: string (nullable = true)
 |-- branch_id: string (nullable = true)
 |-- account_created: string (nullable = true)



In [280]:
#REMOVE DUPLICATES
df=df.dropDuplicates()

In [281]:
#dropping entire row if null
df= df.na.drop(subset='AccountId')

In [282]:
df= df.join(customers, col("CustomerID") == col("Customer_id"), how="left")\
       .select('AccountId', 'Customer_id','AccountType', 'Balance', 'last_kyc_updated', 'branch_id', 'account_created')

In [283]:
df.show()

+-----------+-----------+-----------+-----------+----------------+---------+---------------+
|  AccountId|Customer_id|AccountType|    Balance|last_kyc_updated|branch_id|account_created|
+-----------+-----------+-----------+-----------+----------------+---------+---------------+
|CUR-5421058| T09KEB6NAK|    Current|40174182.62|        9/4/2022| BR-00132|       6/9/2015|
|BUS-2480869| N7YBSW74IO|   Business|19396837.38|      11/30/2020| BR-00001|      7/26/2001|
|SAV-8659231| QZZPP38OPK|    Savings|88274879.06|       12/2/2022| BR-00185|      10/2/1985|
|CUR-7107091| Q3E0OUZH11|    Current| 28079282.9|       7/21/2020| BR-00043|      1/30/2019|
|CUR-8563313| 0U5AJPYF2R|    Current|11819827.12|       5/12/2022| BR-00108|       4/3/2001|
|BUS-9750558| HA55ZPB9IF|   Business|21386318.46|       1/10/2019| BR-00122|       3/2/2017|
|BUS-3956882| RYWN8BMLQQ|   Business|54923664.52|       11/7/2021| BR-00180|     12/21/2020|
|BUS-8640431| A9V7MDXUO5|   Business|21650202.11|       8/28/2021| BR-

In [284]:
# Calculate the mode of the "AccountType" column
mode_value = df.groupBy("AccountType").count().sort(col("count").desc()).first()["AccountType"]

# Fill null values with the mode
df = df.na.fill(mode_value, subset=["AccountType"])

In [285]:
df= df.na.fill(0, subset=["Balance"])

In [286]:
# Specify the path to your CSV file
csv_file_path = "gs://capstondata/branches.csv"

# Read the CSV file into a DataFrame with inferred schema
branches = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Show the contents of the DataFrame
branches.show()

+--------+---------+-------------------+----------+
|BranchId|Bank_Name|Branch_Registration| Bank_city|
+--------+---------+-------------------+----------+
|BR-00001|     HDFC|       INBR48747244|  Varanasi|
|BR-00002|     AXIS|       INBR11941555|    Nanded|
|BR-00003|      SBI|       INBR80849249|  Parbhani|
|BR-00004|     HSBC|       INBR48851610|    Ratlam|
|BR-00005|     IDFC|       INBR79215276| Karaikudi|
|BR-00006|     HSBC|       INBR13412552|    Nanded|
|BR-00007|     AXIS|       INBR12827025|    Ratlam|
|BR-00008|     HDFC|       INBR28462915|    Hospet|
|BR-00009|     HSBC|       INBR21050894|  Jamalpur|
|BR-00010|     HSBC|       INBR67066394|  Parbhani|
|BR-00011|     AXIS|       INBR73539041|    Tezpur|
|BR-00012|     HSBC|       INBR68860717| Karaikudi|
|BR-00013|     HDFC|       INBR57531262|   Nellore|
|BR-00014|    ICICI|       INBR67941345|    Tezpur|
|BR-00015|    ICICI|       INBR25228856|   Lucknow|
|BR-00016|     HSBC|       INBR61059225|   Panipat|
|BR-00017|  

In [287]:
df= df.join(branches, col("BranchId") == col("branch_id"), how="left")\
       .select('AccountId', 'Customer_id','AccountType', 'Balance', 'last_kyc_updated', 'branch_id', 'account_created')

In [288]:
df.show()

+-----------+-----------+-----------+-----------+----------------+---------+---------------+
|  AccountId|Customer_id|AccountType|    Balance|last_kyc_updated|branch_id|account_created|
+-----------+-----------+-----------+-----------+----------------+---------+---------------+
|CUR-5421058| T09KEB6NAK|    Current|40174182.62|        9/4/2022| BR-00132|       6/9/2015|
|BUS-2480869| N7YBSW74IO|   Business|19396837.38|      11/30/2020| BR-00001|      7/26/2001|
|SAV-8659231| QZZPP38OPK|    Savings|88274879.06|       12/2/2022| BR-00185|      10/2/1985|
|CUR-7107091| Q3E0OUZH11|    Current| 28079282.9|       7/21/2020| BR-00043|      1/30/2019|
|CUR-8563313| 0U5AJPYF2R|    Current|11819827.12|       5/12/2022| BR-00108|       4/3/2001|
|BUS-9750558| HA55ZPB9IF|   Business|21386318.46|       1/10/2019| BR-00122|       3/2/2017|
|BUS-3956882| RYWN8BMLQQ|   Business|54923664.52|       11/7/2021| BR-00180|     12/21/2020|
|BUS-8640431| A9V7MDXUO5|   Business|21650202.11|       8/28/2021| BR-

In [289]:
df = df.withColumn("Balance",round( df["Balance"].cast(DecimalType(10,2)),2))

In [290]:
df.show()


+-----------+-----------+-----------+-----------+----------------+---------+---------------+
|  AccountId|Customer_id|AccountType|    Balance|last_kyc_updated|branch_id|account_created|
+-----------+-----------+-----------+-----------+----------------+---------+---------------+
|CUR-5421058| T09KEB6NAK|    Current|40174182.62|        9/4/2022| BR-00132|       6/9/2015|
|BUS-2480869| N7YBSW74IO|   Business|19396837.38|      11/30/2020| BR-00001|      7/26/2001|
|SAV-8659231| QZZPP38OPK|    Savings|88274879.06|       12/2/2022| BR-00185|      10/2/1985|
|CUR-7107091| Q3E0OUZH11|    Current|28079282.90|       7/21/2020| BR-00043|      1/30/2019|
|CUR-8563313| 0U5AJPYF2R|    Current|11819827.12|       5/12/2022| BR-00108|       4/3/2001|
|BUS-9750558| HA55ZPB9IF|   Business|21386318.46|       1/10/2019| BR-00122|       3/2/2017|
|BUS-3956882| RYWN8BMLQQ|   Business|54923664.52|       11/7/2021| BR-00180|     12/21/2020|
|BUS-8640431| A9V7MDXUO5|   Business|21650202.11|       8/28/2021| BR-

In [291]:
df = df.withColumn("last_kyc_updated",date_format(to_date(col("last_kyc_updated"),"M/d/yyyy"),"MM/dd/yyyy"))

In [292]:
df = df.withColumn("account_created",date_format(to_date(col("account_created"),"M/d/yyyy"),"MM/dd/yyyy"))

In [293]:
df.show()

+-----------+-----------+-----------+-----------+----------------+---------+---------------+
|  AccountId|Customer_id|AccountType|    Balance|last_kyc_updated|branch_id|account_created|
+-----------+-----------+-----------+-----------+----------------+---------+---------------+
|CUR-5421058| T09KEB6NAK|    Current|40174182.62|      09/04/2022| BR-00132|     06/09/2015|
|BUS-2480869| N7YBSW74IO|   Business|19396837.38|      11/30/2020| BR-00001|     07/26/2001|
|SAV-8659231| QZZPP38OPK|    Savings|88274879.06|      12/02/2022| BR-00185|     10/02/1985|
|CUR-7107091| Q3E0OUZH11|    Current|28079282.90|      07/21/2020| BR-00043|     01/30/2019|
|CUR-8563313| 0U5AJPYF2R|    Current|11819827.12|      05/12/2022| BR-00108|     04/03/2001|
|BUS-9750558| HA55ZPB9IF|   Business|21386318.46|      01/10/2019| BR-00122|     03/02/2017|
|BUS-3956882| RYWN8BMLQQ|   Business|54923664.52|      11/07/2021| BR-00180|     12/21/2020|
|BUS-8640431| A9V7MDXUO5|   Business|21650202.11|      08/28/2021| BR-

In [294]:
df.printSchema()

root
 |-- AccountId: string (nullable = true)
 |-- Customer_id: string (nullable = true)
 |-- AccountType: string (nullable = false)
 |-- Balance: decimal(11,2) (nullable = true)
 |-- last_kyc_updated: string (nullable = true)
 |-- branch_id: string (nullable = true)
 |-- account_created: string (nullable = true)



In [296]:
# Specify the desired file name and path
desired_file_name = "gs://capstondata/silver_layer/Accounts.csv"

# Write the cleaned DataFrame to a CSV file with the desired file name
df.write.csv(desired_file_name, header=True, mode="append")

                                                                                