<a href="https://colab.research.google.com/github/scar110497/Shubham/blob/main/Capstone_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#install Apache Spark 3.0.1 with Hadoop 2.7 from here.
!wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz

# Now, we just need to unzip that folder.
!tar -xvzf spark-3.0.0-bin-hadoop2.7.tgz
!pip install findspark


import os
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("SparkDemoApp").getOrCreate()
sc = spark.sparkContext
print(type(spark))

In [106]:
#CUSTOMER TABLE
import sqlite3
import pandas as pd

db = sqlite3.connect('customers.db')

query_create_table = 'CREATE TABLE customers(customer_id INTEGER, first_name VARCHAR, last_name VARCHAR, date_of_birth DATE)'
db.execute(query_create_table)

query_insert_data = '''
INSERT INTO customers (customer_id, first_name, last_name, date_of_birth) VALUES
(1, 'John', 'Doe', '1980-05-15'),
(2, 'Jane', 'Smith', '1992-08-21'),
(3, 'Alice', 'Johnson', '1975-02-10'),
(4, 'Sarah', 'Jones', '1988-12-03'),
(5, 'David', 'Brown', '1995-04-18'),
(6, 'Emma', 'Miller', '1982-07-25');
'''

db.execute(query_insert_data)

query_select_all = 'SELECT * FROM customers'
df_customers = pd.read_sql_query(query_select_all, db)
print(df_customers)

db.commit()

   customer_id first_name last_name date_of_birth
0            1       John       Doe    1980-05-15
1            2       Jane     Smith    1992-08-21
2            3      Alice   Johnson    1975-02-10
3            4      Sarah     Jones    1988-12-03
4            5      David     Brown    1995-04-18
5            6       Emma    Miller    1982-07-25


In [239]:
#Employee Table
import xml.etree.ElementTree as ET
import pandas as pd

tree = ET.parse('//content/employees.xml')
root = tree.getroot()

data = []
for child in root:
    record = {}
    for subchild in child:
        record[subchild.tag] = subchild.text
    data.append(record)

df_emp = pd.DataFrame(data)
print(df_emp)

   employee_id branch_id first_name last_name position
0            1         2       Mike   Johnson  Manager
1            2         3      Emily  Williams   Teller
2            3         2     Robert     Davis   Teller
3            4         3     Olivia    Wilson   Teller
4            5         2     Daniel   Johnson  Analyst
5            6         3     Sophia     Clark  Manager
6            7         2       Mike   Johnson  Manager
7            8         3      Emily  Williams   Teller
8            9         2     Robert     Davis   Teller
9           10         3     Olivia    Wilson   Teller
10          11         2     Daniel   Johnson  Analyst
11          12         3     Sophia     Clark  Manager


In [None]:
#Branches file

import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

json_file_path = "/content/branches.json"

with open(json_file_path, 'r') as file:
    json_data = json.load(file)

schema = StructType([
    StructField("branch_id", IntegerType(), True),
    StructField("branch_name", StringType(), True),
    StructField("location", StringType(), True),
])

rows = [(branch["branch_id"], branch["branch_name"], branch["location"]) for branch in json_data["branches"]]
branches = spark.createDataFrame(rows, schema=schema)

branches.show(truncate=False)

+---------+---------------+-------------+
|branch_id|branch_name    |location     |
+---------+---------------+-------------+
|1        |Main Branch    |Downtown City|
|2        |Suburban Branch|Suburbville  |
|3        |Regional Branch|Regional City|
+---------+---------------+-------------+



In [83]:
#Accounts file

import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

json_file_path = "/content/accounts.json"

with open(json_file_path, 'r') as file:
    json_data = json.load(file)

schema = StructType([
    StructField("account_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("employee_id", IntegerType(), True),
    StructField("account_type", StringType(), True),
    StructField("balance", IntegerType(), True)
])

rows = [(accounts["account_id"], accounts["customer_id"], accounts["employee_id"], accounts["account_type"], accounts["balance"]) for accounts in json_data["accounts"]]
accounts = spark.createDataFrame(rows, schema=schema)

accounts.show(truncate=False)

+----------+-----------+-----------+------------+-------+
|account_id|customer_id|employee_id|account_type|balance|
+----------+-----------+-----------+------------+-------+
|1         |1          |2          |Savings     |5000   |
|2         |1          |3          |Checking    |1000   |
|3         |2          |4          |Savings     |8000   |
|4         |3          |1          |Checking    |3000   |
|5         |2          |2          |Checking    |2500   |
|6         |3          |3          |Savings     |6000   |
|7         |4          |4          |Checking    |12000  |
|8         |5          |2          |Savings     |3000   |
+----------+-----------+-----------+------------+-------+



In [171]:
 #/content/loans.csv with the schema definition

loans_df=spark.read.csv(path='/content/loans.csv', header=True)
loans_df.show()

 #/content/payment_history.csv with the schema definition
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id
spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

pf_df = spark.read.csv(path = "/content/payment_history.csv", header=True)
pf_df = pf_df.withColumn("payment_id", monotonically_increasing_id())
pf_df.show(truncate=False)



 #/content/transactions.csv with the schema definition

transactions_df=spark.read.csv(path='/content/transactions.csv', header=True)
transactions_df.show()

+-------+-----------+-----------+-------------+----------+----------+------+
|loan_id|customer_id|loan_amount|interest_rate|start_date|  end_date|status|
+-------+-----------+-----------+-------------+----------+----------+------+
|      1|          1|      10000|         0.05|2023-01-01|2023-12-31|Active|
|      2|          1|      15000|         0.04|2023-02-15|2023-12-31|Active|
|      3|          2|       8000|         0.03|2023-03-20|2023-11-30|Active|
|      4|          3|      20000|         0.05|2023-01-10|2023-10-31|Active|
+-------+-----------+-----------+-------------+----------+----------+------+

+----------+-------+------------+-----------+
|payment_id|loan_id|payment_date|amount_paid|
+----------+-------+------------+-----------+
|0         |1      |2023-03-01  |2000       |
|1         |1      |2023-04-01  |1500       |
|2         |2      |2023-04-10  |1000       |
|3         |3      |2023-02-20  |3000       |
|4         |3      |2023-03-15  |2500       |
+----------+---

In [85]:
#Basic Problem 1:- Write a spark dataframe to show the balance amount for an account_id = 1

accounts.filter(accounts['account_id'] == 1).select('balance').show()

+-------+
|balance|
+-------+
|   5000|
+-------+



In [111]:
#Basic Problem 2:- List Transactions for an account_id = 1:

transactions_df.filter(transactions_df['account_id'] == 1).show()

+--------------+----------+----------------+------+-------------------+
|transaction_id|account_id|transaction_type|amount|   transaction_date|
+--------------+----------+----------------+------+-------------------+
|             1|         1|         Deposit|  1000|2023-01-15 08:30:00|
|             2|         1|      Withdrawal|   500|2023-02-02 12:45:00|
|            10|         1|         Deposit|  1200|2023-10-01 10:00:00|
+--------------+----------+----------------+------+-------------------+



In [112]:
#Basic Problem 3:- List Accounts with a zero balance

accounts.filter(accounts['balance'] == 0).select('account_id').show()
accounts.filter(accounts['balance'] == 0).show()

+----------+
|account_id|
+----------+
+----------+

+----------+-----------+-----------+------------+-------+
|account_id|customer_id|employee_id|account_type|balance|
+----------+-----------+-----------+------------+-------+
+----------+-----------+-----------+------------+-------+



In [113]:
#Basic Problem 4:- Find the Oldest Customer

df_customers.sort_values(by=['date_of_birth'], ascending=True).head(1)


Unnamed: 0,customer_id,first_name,last_name,date_of_birth
2,3,Alice,Johnson,1975-02-10


In [224]:
#Basic Problem 5:- Calculate the Total Interest Earned Across All Accounts:

interest_earned = transactions_df.agg(F.sum('amount').alias('total_interest'))
interest_earned.show()

+--------------+
|total_interest|
+--------------+
|       14500.0|
+--------------+



#Account Problems


In [267]:
#Problem 1:- List All Accounts with Customer Information:

from pyspark.sql import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("SparkDemoApp").getOrCreate()

df = spark.createDataFrame(df_customers)


df_new = accounts.join(df, on = 'customer_id', how = 'inner')
result = df_new.select('customer_id', 'first_name', 'last_name', 'account_id', 'account_type', 'balance')
result.show()


  for column, series in pdf.iteritems():


+-----------+----------+---------+----------+------------+-------+
|customer_id|first_name|last_name|account_id|account_type|balance|
+-----------+----------+---------+----------+------------+-------+
|          5|     David|    Brown|         8|     Savings|   3000|
|          1|      John|      Doe|         1|     Savings|   5000|
|          1|      John|      Doe|         2|    Checking|   1000|
|          3|     Alice|  Johnson|         4|    Checking|   3000|
|          3|     Alice|  Johnson|         6|     Savings|   6000|
|          2|      Jane|    Smith|         3|     Savings|   8000|
|          2|      Jane|    Smith|         5|    Checking|   2500|
|          4|     Sarah|    Jones|         7|    Checking|  12000|
+-----------+----------+---------+----------+------------+-------+



In [231]:
#Problem 2:- Calculate Total Balance for Each Customer

import pyspark.sql.functions as F
test = accounts.groupBy("customer_id").agg(F.sum('balance').alias('total_bal'))
test1 = test.join(df, "customer_id", 'left')  #df_customer = df
test_new= test1.orderBy(F.desc("total_bal")).show(5)

+-----------+---------+----------+---------+-------------+
|customer_id|total_bal|first_name|last_name|date_of_birth|
+-----------+---------+----------+---------+-------------+
|          4|    12000|     Sarah|    Jones|   1988-12-03|
|          2|    10500|      Jane|    Smith|   1992-08-21|
|          3|     9000|     Alice|  Johnson|   1975-02-10|
|          1|     6000|      John|      Doe|   1980-05-15|
|          5|     3000|     David|    Brown|   1995-04-18|
+-----------+---------+----------+---------+-------------+



In [234]:
#Problem 3:- Find Customers with Multiple Accounts
from pyspark.sql.types import *

multi_df = accounts.groupBy('customer_id').agg(F.count('account_id').alias('total_accounts'))
tb_df = multi_df.filter(F.col('total_accounts')>1)
tb_df.show()


+-----------+--------------+
|customer_id|total_accounts|
+-----------+--------------+
|          1|             2|
|          3|             2|
|          2|             2|
+-----------+--------------+



#Customer Transactions Reports


In [236]:
#Problem 1:- List Transactions with Account and Customer Information:

join_df = transactions_df.join(accounts, "account_id").join(df, "customer_id")
join_df1 = join_df.select('transaction_id', 'transaction_type', 'amount', 'transaction_date', 'customer_id', 'first_name', 'last_name', 'account_id', 'account_type' )
join_df1.show()

+--------------+----------------+------+-------------------+-----------+----------+---------+----------+------------+
|transaction_id|transaction_type|amount|   transaction_date|customer_id|first_name|last_name|account_id|account_type|
+--------------+----------------+------+-------------------+-----------+----------+---------+----------+------------+
|            10|         Deposit|  1200|2023-10-01 10:00:00|          1|      John|      Doe|         1|     Savings|
|             2|      Withdrawal|   500|2023-02-02 12:45:00|          1|      John|      Doe|         1|     Savings|
|             1|         Deposit|  1000|2023-01-15 08:30:00|          1|      John|      Doe|         1|     Savings|
|             8|         Deposit|  3000|2023-08-22 16:45:00|          1|      John|      Doe|         2|    Checking|
|             3|         Deposit|  2000|2023-03-10 15:20:00|          1|      John|      Doe|         2|    Checking|
|             9|      Withdrawal|  1500|2023-09-14 09:30

In [143]:
#Problem 2:- Calculate Average Transaction Amount:


spark = SparkSession.builder.appName("Average_Transaction_Amount").getOrCreate()

transactions_df.select(avg('amount')).show()

+-----------+
|avg(amount)|
+-----------+
|     1450.0|
+-----------+



In [238]:
#Problem 3:- Identify High-Value Customers with Total Balance:

import pyspark.sql.functions as F
test = accounts.groupBy("customer_id").agg(F.sum('balance').alias('total_bal'))
test1 = test.join(df, "customer_id", 'inner')   #df_customer = df
test_new= test1.orderBy(F.desc("total_bal")).show(5)



+-----------+---------+----------+---------+-------------+
|customer_id|total_bal|first_name|last_name|date_of_birth|
+-----------+---------+----------+---------+-------------+
|          4|    12000|     Sarah|    Jones|   1988-12-03|
|          2|    10500|      Jane|    Smith|   1992-08-21|
|          3|     9000|     Alice|  Johnson|   1975-02-10|
|          1|     6000|      John|      Doe|   1980-05-15|
|          5|     3000|     David|    Brown|   1995-04-18|
+-----------+---------+----------+---------+-------------+



In [260]:
#Problem 4:- List Employees and Their Assigned Customers:
from pyspark.sql import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("SparkDemoApp").getOrCreate()

df_emp2 = spark.createDataFrame(df_emp)

lec_df = df_new.join(df_emp2, on = 'employee_id', how = 'inner')

lec_df.show()

  for column, series in pdf.iteritems():


+-----------+-----------+----------+------------+-------+----------+---------+-------------+---------+----------+---------+--------+
|employee_id|customer_id|account_id|account_type|balance|first_name|last_name|date_of_birth|branch_id|first_name|last_name|position|
+-----------+-----------+----------+------------+-------+----------+---------+-------------+---------+----------+---------+--------+
|          1|          3|         4|    Checking|   3000|     Alice|  Johnson|   1975-02-10|        2|      Mike|  Johnson| Manager|
|          3|          1|         2|    Checking|   1000|      John|      Doe|   1980-05-15|        2|    Robert|    Davis|  Teller|
|          3|          3|         6|     Savings|   6000|     Alice|  Johnson|   1975-02-10|        2|    Robert|    Davis|  Teller|
|          4|          2|         3|     Savings|   8000|      Jane|    Smith|   1992-08-21|        3|    Olivia|   Wilson|  Teller|
|          4|          4|         7|    Checking|  12000|     Sarah| 

In [262]:
#Problem 5:- Calculate the Total Number of Transactions for Each Account Type

joined_df = transactions_df.join(accounts, "account_id", "inner")

total_transactions_by_type = joined_df.groupBy("account_type").agg(F.count("transaction_id").alias("total_transactions"))

total_transactions_by_type.show()

+------------+------------------+
|account_type|total_transactions|
+------------+------------------+
|     Savings|                 5|
|    Checking|                 5|
+------------+------------------+



In [291]:
#Problem 6 & 7:- Find Customers with No Accounts

from pyspark.sql.functions import *
customers_alias = df.alias("c")
accounts_alias = accounts.alias("a")



customers_with_no_accounts = customers_alias.join(accounts_alias, col("c.customer_id") == col("a.customer_id"), "left_anti")
customers_with_no_accounts.show()

#df.show()
#accounts.show()

+-----------+----------+---------+-------------+
|customer_id|first_name|last_name|date_of_birth|
+-----------+----------+---------+-------------+
|          6|      Emma|   Miller|   1982-07-25|
+-----------+----------+---------+-------------+



In [286]:
#Problem 8:- List the Latest Transaction for Each Account:

window_spec = Window.partitionBy("account_id").orderBy(F.desc("transaction_date"))

df_transactions_with_row_number = transactions_df.withColumn("row_number", F.row_number().over(window_spec))

latest_transactions = df_transactions_with_row_number.filter("row_number = 1")

latest_transactions = latest_transactions.drop("row_number")

latest_transactions.show()

+--------------+----------+----------------+------+-------------------+
|transaction_id|account_id|transaction_type|amount|   transaction_date|
+--------------+----------+----------------+------+-------------------+
|             7|         3|      Withdrawal|   800|2023-07-08 14:15:00|
|             6|         5|         Deposit|  2000|2023-06-12 11:30:00|
|            10|         1|         Deposit|  1200|2023-10-01 10:00:00|
|             9|         4|      Withdrawal|  1500|2023-09-14 09:30:00|
|             8|         2|         Deposit|  3000|2023-08-22 16:45:00|
+--------------+----------+----------------+------+-------------------+



In [304]:
#Problem 9:- Calculate the Total Withdrawals for Each Customer

trans_df = df_new.join(transactions_df, 'account_id')
trans_df2 = trans_df[trans_df['transaction_type'] == 'Withdrawal']
trans_df3 = trans_df2.select('customer_id', 'first_name', 'last_name', 'amount')
trans_df4 = trans_df3.groupBy('customer_id', 'first_name', 'last_name').agg(F.sum('amount').alias('total_withdrwals'))
trans_df4.show()


+-----------+----------+---------+----------------+
|customer_id|first_name|last_name|total_withdrwals|
+-----------+----------+---------+----------------+
|          3|     Alice|  Johnson|          1500.0|
|          2|      Jane|    Smith|          1800.0|
|          1|      John|      Doe|           500.0|
+-----------+----------+---------+----------------+



In [305]:
#Prolem 10: Find Duplicate Transactions

df_dup1 = transactions_df.groupby('transaction_id').count().alias("duplicate_count")
df_dup = df_dup1.filter(col("count" )> 1).alias("duplicate")
df_dup.show()

+--------------+-----+
|transaction_id|count|
+--------------+-----+
+--------------+-----+

