<a href="https://colab.research.google.com/github/shweetak/Sweta/blob/main/Capstone_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#install Apache Spark 3.0.1 with Hadoop 2.7 from here.
!wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz

# Now, we just need to unzip that folder.
!tar -xvzf spark-3.0.0-bin-hadoop2.7.tgz
!pip install findspark


import os
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"
import findspark
findspark.init()

# Creating Customer table and inserting data

In [None]:
import sqlite3
import pandas as pd

db = sqlite3.connect('customers.db')

query_create_table = 'CREATE TABLE customers(customer_id INTEGER, first_name VARCHAR, last_name VARCHAR, date_of_birth DATE)'
db.execute(query_create_table)

query_insert_data = '''
INSERT INTO customers (customer_id, first_name, last_name, date_of_birth) VALUES
(1, 'John', 'Doe', '1980-05-15'),
(2, 'Jane', 'Smith', '1992-08-21'),
(3, 'Alice', 'Johnson', '1975-02-10'),
(4, 'Sarah', 'Jones', '1988-12-03'),
(5, 'David', 'Brown', '1995-04-18'),
(6, 'Emma', 'Miller', '1982-07-25');
'''

db.execute(query_insert_data)

query_select_all = 'SELECT * FROM customers'
df_customers = pd.read_sql_query(query_select_all, db)
print(df_customers)

db.commit()


   customer_id first_name last_name date_of_birth
0            1       John       Doe    1980-05-15
1            2       Jane     Smith    1992-08-21
2            3      Alice   Johnson    1975-02-10
3            4      Sarah     Jones    1988-12-03
4            5      David     Brown    1995-04-18
5            6       Emma    Miller    1982-07-25


#Creating braches dataframe

In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

json_file_path = "/content/branches.json"

with open(json_file_path, 'r') as file:
    json_data = json.load(file)

schema = StructType([
    StructField("branch_id", IntegerType(), True),
    StructField("branch_name", StringType(), True),
    StructField("location", StringType(), True),
])

rows = [(branch["branch_id"], branch["branch_name"], branch["location"]) for branch in json_data["branches"]]
branches = spark.createDataFrame(rows, schema=schema)

branches.show(truncate=False)


+---------+---------------+-------------+
|branch_id|branch_name    |location     |
+---------+---------------+-------------+
|1        |Main Branch    |Downtown City|
|2        |Suburban Branch|Suburbville  |
|3        |Regional Branch|Regional City|
+---------+---------------+-------------+



#Creating accounts dataframe

In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

json_file_path = "/content/accounts.json"

with open(json_file_path, 'r') as file:
    json_data = json.load(file)

schema = StructType([
    StructField("account_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("employee_id", IntegerType(), True),
    StructField("account_type", StringType(), True),
    StructField("balance", IntegerType(), True),
])

rows = [
    (
        account["account_id"],
        account["customer_id"],
        account["employee_id"],
        account["account_type"],
        account["balance"]
    ) for account in json_data["accounts"]
]
accounts = spark.createDataFrame(rows, schema=schema)

accounts.show(truncate=False)


+----------+-----------+-----------+------------+-------+
|account_id|customer_id|employee_id|account_type|balance|
+----------+-----------+-----------+------------+-------+
|1         |1          |2          |Savings     |5000   |
|2         |1          |3          |Checking    |1000   |
|3         |2          |4          |Savings     |8000   |
|4         |3          |1          |Checking    |3000   |
|5         |2          |2          |Checking    |2500   |
|6         |3          |3          |Savings     |6000   |
|7         |4          |4          |Checking    |12000  |
|8         |5          |2          |Savings     |3000   |
+----------+-----------+-----------+------------+-------+



#Creating transaction dataframe

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

schema = StructType([
    StructField("transaction_id", IntegerType()),
    StructField("account_id", IntegerType()),
    StructField("transaction_type", StringType()),
    StructField("amount", IntegerType()),
    StructField("transaction_date", TimestampType())
])
transaction = spark.read.csv(path = "/content/transactions.csv", header=True, schema=schema)
transaction.show(truncate=False)


+--------------+----------+----------------+------+-------------------+
|transaction_id|account_id|transaction_type|amount|transaction_date   |
+--------------+----------+----------------+------+-------------------+
|1             |1         |Deposit         |1000  |2023-01-15 08:30:00|
|2             |1         |Withdrawal      |500   |2023-02-02 12:45:00|
|3             |2         |Deposit         |2000  |2023-03-10 15:20:00|
|4             |3         |Withdrawal      |1000  |2023-04-05 10:10:00|
|5             |4         |Deposit         |1500  |2023-05-20 09:00:00|
|6             |5         |Deposit         |2000  |2023-06-12 11:30:00|
|7             |3         |Withdrawal      |800   |2023-07-08 14:15:00|
|8             |2         |Deposit         |3000  |2023-08-22 16:45:00|
|9             |4         |Withdrawal      |1500  |2023-09-14 09:30:00|
|10            |1         |Deposit         |1200  |2023-10-01 10:00:00|
+--------------+----------+----------------+------+-------------

#Creating employees datframe

In [None]:
import xml.etree.ElementTree as ET
import pandas as pd

tree = ET.parse('/content/employees.xml')
root = tree.getroot()

data = []
for child in root:
    record = {}
    for subchild in child:
        record[subchild.tag] = subchild.text
    data.append(record)

employees = pd.DataFrame(data)
print(employees)

   employee_id branch_id first_name last_name position
0            1         2       Mike   Johnson  Manager
1            2         3      Emily  Williams   Teller
2            3         2     Robert     Davis   Teller
3            4         3     Olivia    Wilson   Teller
4            5         2     Daniel   Johnson  Analyst
5            6         3     Sophia     Clark  Manager
6            7         2       Mike   Johnson  Manager
7            8         3      Emily  Williams   Teller
8            9         2     Robert     Davis   Teller
9           10         3     Olivia    Wilson   Teller
10          11         2     Daniel   Johnson  Analyst
11          12         3     Sophia     Clark  Manager


#Creating loan dataframe

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

schema = StructType([
    StructField("loan_id", IntegerType()),
    StructField("customer_id", IntegerType()),
    StructField("loan_amount", IntegerType()),
    StructField("interest_rate", DoubleType()),
    StructField("start_date", DateType()),
    StructField("end_date", DateType()),
    StructField("status", StringType())
])
loans = spark.read.csv(path = "/content/loans.csv", header=True, schema=schema)
loans.show(truncate=False)


+-------+-----------+-----------+-------------+----------+----------+------+
|loan_id|customer_id|loan_amount|interest_rate|start_date|end_date  |status|
+-------+-----------+-----------+-------------+----------+----------+------+
|1      |1          |10000      |0.05         |2023-01-01|2023-12-31|Active|
|2      |1          |15000      |0.04         |2023-02-15|2023-12-31|Active|
|3      |2          |8000       |0.03         |2023-03-20|2023-11-30|Active|
|4      |3          |20000      |0.05         |2023-01-10|2023-10-31|Active|
+-------+-----------+-----------+-------------+----------+----------+------+



# Creating payment_history dataframe

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id
spark = SparkSession.builder.appName("CapstoneProject").master("local").getOrCreate()

schema = StructType([
    StructField("payment_id", IntegerType()),
    StructField("loan_id", IntegerType()),
    StructField("payment_date", DateType()),
    StructField("amount_paid", DoubleType()),
])
payment_history = spark.read.csv(path = "/content/payment_history.csv", header=True, schema=schema)
payment_history = payment_history.withColumn("payment_id", monotonically_increasing_id())
payment_history.show(truncate=False)


+----------+-------+------------+-----------+
|payment_id|loan_id|payment_date|amount_paid|
+----------+-------+------------+-----------+
|0         |1      |2023-03-01  |2000.0     |
|1         |1      |2023-04-01  |1500.0     |
|2         |2      |2023-04-10  |1000.0     |
|3         |3      |2023-02-20  |3000.0     |
|4         |3      |2023-03-15  |2500.0     |
+----------+-------+------------+-----------+



# Basic reports

In [None]:
# Question 1. Write a spark dataframe to show the balance amount for an account_id = 1

result_df = accounts.filter(accounts["account_id"] == 1).select("balance")
result_df.show(truncate=False)

+-------+
|balance|
+-------+
|5000   |
+-------+



In [None]:
# Question 2: List Transactions for an account_id = 1:

account_transactions = transaction.filter(transaction["account_id"] == 1)
account_transactions.show(truncate=False)

+--------------+----------+----------------+------+-------------------+
|transaction_id|account_id|transaction_type|amount|transaction_date   |
+--------------+----------+----------------+------+-------------------+
|1             |1         |Deposit         |1000  |2023-01-15 08:30:00|
|2             |1         |Withdrawal      |500   |2023-02-02 12:45:00|
|10            |1         |Deposit         |1200  |2023-10-01 10:00:00|
+--------------+----------+----------------+------+-------------------+



In [None]:
# Question 3: List Accounts with a zero balance:

zero_balance_accounts = accounts.filter(accounts["balance"] == 0)
zero_balance_accounts.show(truncate=False)

+----------+-----------+-----------+------------+-------+
|account_id|customer_id|employee_id|account_type|balance|
+----------+-----------+-----------+------------+-------+
+----------+-----------+-----------+------------+-------+



In [None]:
# Question 4: Find the Oldest Customer

from pyspark.sql.types import *
from pyspark.sql.functions import *

customers = spark.createDataFrame(df_customers)

oldest_customer = customers.orderBy("date_of_birth").limit(1)
oldest_customer.show(truncate=False)

  for column, series in pdf.iteritems():


+-----------+----------+---------+-------------+
|customer_id|first_name|last_name|date_of_birth|
+-----------+----------+---------+-------------+
|3          |Alice     |Johnson  |1975-02-10   |
+-----------+----------+---------+-------------+



In [None]:
# Question 5: Calculate the Total Interest Earned Across All Accounts

from pyspark.sql.types import *
from pyspark.sql.functions import *
total_interest_earned = accounts.select(sum(col("balance") * 'interest_rate').alias("total_interest_earned")).collect()[0]["total_interest_earned"]

print("Total Interest Earned:", total_interest_earned)

Total Interest Earned: None


# Accounts Report

In [64]:
# Question 1: List All Accounts with Customer Information

all_accounts_info = accounts.join(customers, on='customer_id', how='inner')

result = all_accounts_info.select("customer_id", "first_name", "last_name", "account_id", "account_type", "balance")

result.show(truncate=False)


+-----------+----------+---------+----------+------------+-------+
|customer_id|first_name|last_name|account_id|account_type|balance|
+-----------+----------+---------+----------+------------+-------+
|5          |David     |Brown    |8         |Savings     |3000   |
|1          |John      |Doe      |1         |Savings     |5000   |
|1          |John      |Doe      |2         |Checking    |1000   |
|3          |Alice     |Johnson  |4         |Checking    |3000   |
|3          |Alice     |Johnson  |6         |Savings     |6000   |
|2          |Jane      |Smith    |3         |Savings     |8000   |
|2          |Jane      |Smith    |5         |Checking    |2500   |
|4          |Sarah     |Jones    |7         |Checking    |12000  |
+-----------+----------+---------+----------+------------+-------+



In [68]:
# Question 2: . Calculate Total Balance for Each Customer:

total_balance_per_customer = result.groupBy("customer_id", "first_name", "last_name").agg(sum("balance").alias("total_balance"))

total_balance_per_customer.show(truncate=False)

+-----------+----------+---------+-------------+
|customer_id|first_name|last_name|total_balance|
+-----------+----------+---------+-------------+
|3          |Alice     |Johnson  |9000         |
|5          |David     |Brown    |3000         |
|2          |Jane      |Smith    |10500        |
|4          |Sarah     |Jones    |12000        |
|1          |John      |Doe      |6000         |
+-----------+----------+---------+-------------+



In [69]:
# Question 3: Find Customers with Multiple Accounts:

customer_account_count = accounts.groupBy("customer_id").agg(count("account_id").alias("account_count"))

customers_with_multiple_accounts = customer_account_count.filter(col("account_count") > 1)

customers_with_multiple_accounts.show(truncate=False)

+-----------+-------------+
|customer_id|account_count|
+-----------+-------------+
|1          |2            |
|3          |2            |
|2          |2            |
+-----------+-------------+



# Customer Transaction report

In [71]:
# Question 1: List Transactions with Account and Customer Information:

joined_df = transaction.join(accounts, "account_id").join(customers, "customer_id")
joined_df.show(truncate=False)

+-----------+----------+--------------+----------------+------+-------------------+-----------+------------+-------+----------+---------+-------------+
|customer_id|account_id|transaction_id|transaction_type|amount|transaction_date   |employee_id|account_type|balance|first_name|last_name|date_of_birth|
+-----------+----------+--------------+----------------+------+-------------------+-----------+------------+-------+----------+---------+-------------+
|1          |1         |10            |Deposit         |1200  |2023-10-01 10:00:00|2          |Savings     |5000   |John      |Doe      |1980-05-15   |
|1          |1         |2             |Withdrawal      |500   |2023-02-02 12:45:00|2          |Savings     |5000   |John      |Doe      |1980-05-15   |
|1          |1         |1             |Deposit         |1000  |2023-01-15 08:30:00|2          |Savings     |5000   |John      |Doe      |1980-05-15   |
|1          |2         |8             |Deposit         |3000  |2023-08-22 16:45:00|3    

In [72]:
# Question 2: Calculate Average Transaction Amount

average_transaction_amount = transaction.groupBy().agg(avg("amount").alias("average_transaction_amount"))

average_transaction_amount.show(truncate=False)

+--------------------------+
|average_transaction_amount|
+--------------------------+
|1450.0                    |
+--------------------------+



In [75]:
# Question 3: Identify High-Value Customers with Total Balance

total_balance_per_customer = joined_df.groupBy("customer_id", "first_name", "last_name").agg(sum("balance").alias("total_balance"))

high_value_threshold = 1000

high_value_customers = total_balance_per_customer.filter("total_balance >= {}".format(high_value_threshold))

high_value_customers.show(truncate=False)

+-----------+----------+---------+-------------+
|customer_id|first_name|last_name|total_balance|
+-----------+----------+---------+-------------+
|3          |Alice     |Johnson  |6000         |
|2          |Jane      |Smith    |18500        |
|1          |John      |Doe      |17000        |
+-----------+----------+---------+-------------+



In [86]:
# Question 5: Calculate the Total Number of Transactions for Each Account Type

joined_df = transaction.join(accounts, "account_id", "inner")

total_transactions_by_type = joined_df.groupBy("account_type").agg(count("transaction_id").alias("total_transactions"))

total_transactions_by_type.show()

+------------+------------------+
|account_type|total_transactions|
+------------+------------------+
|     Savings|                 5|
|    Checking|                 5|
+------------+------------------+



In [90]:
# Question 6 & 7: Find Customers with No Accounts:

customers_alias = customers.alias("c")
accounts_alias = accounts.alias("a")

customers_with_no_accounts = customers_alias.join(accounts_alias, col("c.customer_id") == col("a.customer_id"), "left_anti")

customers_with_no_accounts.show()

+-----------+----------+---------+-------------+
|customer_id|first_name|last_name|date_of_birth|
+-----------+----------+---------+-------------+
|          6|      Emma|   Miller|   1982-07-25|
+-----------+----------+---------+-------------+



In [91]:
# Question 8: List the Latest Transaction for Each Account

from pyspark.sql.window import Window
from pyspark.sql import functions as F

window_spec = Window.partitionBy("account_id").orderBy(F.desc("transaction_date"))

df_transactions_with_row_number = transaction.withColumn("row_number", F.row_number().over(window_spec))

latest_transactions = df_transactions_with_row_number.filter("row_number = 1")

latest_transactions = latest_transactions.drop("row_number")

latest_transactions.show()

+--------------+----------+----------------+------+-------------------+
|transaction_id|account_id|transaction_type|amount|   transaction_date|
+--------------+----------+----------------+------+-------------------+
|            10|         1|         Deposit|  1200|2023-10-01 10:00:00|
|             7|         3|      Withdrawal|   800|2023-07-08 14:15:00|
|             6|         5|         Deposit|  2000|2023-06-12 11:30:00|
|             9|         4|      Withdrawal|  1500|2023-09-14 09:30:00|
|             8|         2|         Deposit|  3000|2023-08-22 16:45:00|
+--------------+----------+----------------+------+-------------------+



In [93]:
# Question 10: Find Duplicate Transactions

selected_columns = ["account_id", "transaction_type", "amount", "transaction_date"]
duplicate_counts = transaction.groupBy(selected_columns).count()
duplicate_transactions = duplicate_counts.filter(col("count") > 1)
duplicate_transactions.show(truncate=False)

+----------+----------------+------+----------------+-----+
|account_id|transaction_type|amount|transaction_date|count|
+----------+----------------+------+----------------+-----+
+----------+----------------+------+----------------+-----+

