In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

### Customer Data Ingestion 

In [0]:
customer_schema =StructType([
    StructField("customer_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("address", StringType(), True),
    StructField("credit_score", IntegerType(), True),
    StructField("join_date", DateType(), True),
    StructField("last_update", TimestampType(), True)
])   

customer_df=spark.read.csv("dbfs:/FileStore/Databricks_Capstone_Rajesh/Datasets/customers.csv", header=True, schema=customer_schema)

customer_df.display()

customer_df.write.format("delta").mode("append").saveAsTable("bronze_schema.customers_raw")


customer_id,name,email,phone,address,credit_score,join_date,last_update
C1000,Joseph Taylor,joseph.taylor@hotmail.com,(807) 253-7279,"66537 Christina Highway, Lake Mistyview, TN 68898",822,2017-01-02,2023-12-05T00:00:00Z
C1001,Jennifer Hendrix,jennifer.hendrix@outlook.com,(358) 643-3523,"934 Ryan Avenue Suite 021, Lake Kimberly, UT 56417",754,2017-01-03,2023-06-05T00:00:00Z
C1002,Dana Arnold,dana.arnold@hotmail.com,(386) 828-8286,"0185 Klein Cliffs, Hendrixchester, GA 25173",790,2017-01-04,2023-07-20T00:00:00Z
C1003,Alexander Thomas,alexander.thomas@yahoo.com,(186) 526-7232,"3131 Daniels Drives Apt. 144, West Saraberg, AR 60483",850,2017-01-04,2023-10-10T00:00:00Z
C1004,Andrea Hall,andrea.hall@yahoo.com,(639) 820-0428,"4107 Franco Ridge Apt. 873, Jamesberg, AS 87522",693,2017-01-05,2023-03-13T00:00:00Z
C1005,Mark Jackson,mark.jackson@aol.com,(001) 440-7899,"4291 Russell Lodge Suite 685, New Jameschester, WV 91945",618,2017-01-06,2023-08-19T00:00:00Z
C1006,Destiny Reynolds,destiny.reynolds@gmail.com,(745) 791-5046,"810 Walter Port, Horneview, NE 87158",628,2017-01-06,2023-04-25T00:00:00Z
C1007,Karen Reed,karen.reed@hotmail.com,(142) 131-1275,"2361 Crystal Extension Apt. 029, West Kevin, AK 55543",830,2017-01-07,2023-03-04T00:00:00Z
C1008,Robert Edwards,robert.edwards@yahoo.com,(414) 461-3883,"7970 Brianna Court Apt. 350, Lake Jacobchester, MS 22698",565,2017-01-12,2023-05-02T00:00:00Z
C1009,James Stewart,james.stewart@outlook.com,(478) 627-2159,"3423 Villanueva Branch, Sarahland, UT 51641",678,2017-01-14,2023-05-18T00:00:00Z


In [0]:
%sql
select * from bronze_schema.customers_raw;

customer_id,name,email,phone,address,credit_score,join_date,last_update
C1000,Joseph Taylor,joseph.taylor@hotmail.com,(807) 253-7279,"66537 Christina Highway, Lake Mistyview, TN 68898",822,2017-01-02,2023-12-05T00:00:00Z
C1001,Jennifer Hendrix,jennifer.hendrix@outlook.com,(358) 643-3523,"934 Ryan Avenue Suite 021, Lake Kimberly, UT 56417",754,2017-01-03,2023-06-05T00:00:00Z
C1002,Dana Arnold,dana.arnold@hotmail.com,(386) 828-8286,"0185 Klein Cliffs, Hendrixchester, GA 25173",790,2017-01-04,2023-07-20T00:00:00Z
C1003,Alexander Thomas,alexander.thomas@yahoo.com,(186) 526-7232,"3131 Daniels Drives Apt. 144, West Saraberg, AR 60483",850,2017-01-04,2023-10-10T00:00:00Z
C1004,Andrea Hall,andrea.hall@yahoo.com,(639) 820-0428,"4107 Franco Ridge Apt. 873, Jamesberg, AS 87522",693,2017-01-05,2023-03-13T00:00:00Z
C1005,Mark Jackson,mark.jackson@aol.com,(001) 440-7899,"4291 Russell Lodge Suite 685, New Jameschester, WV 91945",618,2017-01-06,2023-08-19T00:00:00Z
C1006,Destiny Reynolds,destiny.reynolds@gmail.com,(745) 791-5046,"810 Walter Port, Horneview, NE 87158",628,2017-01-06,2023-04-25T00:00:00Z
C1007,Karen Reed,karen.reed@hotmail.com,(142) 131-1275,"2361 Crystal Extension Apt. 029, West Kevin, AK 55543",830,2017-01-07,2023-03-04T00:00:00Z
C1008,Robert Edwards,robert.edwards@yahoo.com,(414) 461-3883,"7970 Brianna Court Apt. 350, Lake Jacobchester, MS 22698",565,2017-01-12,2023-05-02T00:00:00Z
C1009,James Stewart,james.stewart@outlook.com,(478) 627-2159,"3423 Villanueva Branch, Sarahland, UT 51641",678,2017-01-14,2023-05-18T00:00:00Z


### Branch Data Ingestion 

In [0]:
branches_schema = StructType([
    StructField("branch_id", StringType(), True),
    StructField("branch_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("timezone", StringType(), True)
])

branch_df=spark.read.csv("dbfs:/FileStore/Databricks_Capstone_Rajesh/Datasets/branches.csv", header=True, schema=branches_schema)


branch_df.write.format("delta").mode("append").saveAsTable("bronze_schema.branches_raw")


In [0]:
%sql
select * from bronze_schema.branches_raw;

branch_id,branch_name,location,timezone
B0000,South Jamie Branch,Rome,CET
B0001,Warrenfurt Branch,Perth,AWST
B0002,North Robertborough Branch,Chicago,CST
B0003,Littleview Branch,Dallas,CST
B0004,East Roybury Branch,Madrid,CET
B0005,East Annaton Branch,Brisbane,AEST
B0006,Lake Kennethchester Branch,Los Angeles,PST
B0007,Port Debbie Branch,Phoenix,MST
B0008,North Shelbyfurt Branch,Melbourne,AEST
B0009,East Brent Branch,Philadelphia,EST


### Transaction Streaming Data Ingestion 

In [0]:

# Define schema for transaction data
transactions_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("branch_id", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("status", StringType(), True)
])

def ingest():
  source_df = (spark.readStream
                      .format("cloudFiles")
                      .option("cloudFiles.format", "csv")  
                      .option("header", "true") 
                      .option("timestampFormat","yyyy-MM-dd")
                      .schema(transactions_schema)               
                      .option("cloudFiles.schemaLocation", "dbfs:/FileStore/Databricks_Capstone_Rajesh/Datasets/schema")
                      .option("cloudFiles.inferColumnTypes", "true")                    
                      .load("dbfs:/FileStore/Databricks_Capstone_Rajesh/Datasets/Transactions_batch_file")
                      
  )

  write_query = (source_df.writeStream
                          .format("delta")
                          .option("checkpointLocation", "dbfs:/FileStore/Databricks_Capstone_Rajesh/Datasets/chekpoints")
                          .option("mergeSchema", "true")
                          .outputMode("append")                         
                          .toTable("bronze_schema.transactions_raw")
  )

ingest()

In [0]:
%sql
select * from bronze_schema.transactions_raw;

transaction_id,customer_id,branch_id,channel,transaction_type,amount,currency,timestamp,status
T29950,C1468,B0013,mobile,deposit,4.26,USD,2020-11-20 05:52:00,completed
T29951,C1224,B0011,mobile,payment,45.49,USD,2020-11-20 07:26:00,completed
T29952,C1173,B0010,mobile,deposit,75.92,GBP,2020-11-20 08:22:00,completed
T29953,C1044,B0003,web,payment,54.19,USD,2020-11-20 09:17:00,completed
T29954,C1158,B0012,ATM,deposit,52.82,EUR,2020-11-20 10:51:00,completed
T29955,C1065,B0011,branch,payment,56.58,USD,2020-11-20 11:17:00,completed
T29956,C1084,B0003,web,payment,73.82,USD,2020-11-20 11:57:00,completed
T29957,C1584,B0009,mobile,withdrawal,40.26,GBP,2020-11-20 12:43:00,completed
T29958,C1277,B0002,web,withdrawal,27.85,USD,2020-11-20 13:51:00,denied
T29959,C1638,B0006,,withdrawal,3.31,EUR,2020-11-20 14:29:00,pending
