<a href="https://colab.research.google.com/github/saumilhj/dsai_projects/blob/main/first_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**FIRST SPARK NOTEBOOK**

Dataset: https://www.kaggle.com/datasets/computingvictor/transactions-fraud-datasets

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *

# Colors
BLUE = "\033[34m"
RESET = "\033[0m"

### Import data

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Trial").getOrCreate()

In [3]:
transactions_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/content/drive/MyDrive/data/transactions_data.csv")
users_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/content/drive/MyDrive/data/users_data.csv")
cards_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/content/drive/MyDrive/data/cards_data.csv")

In [4]:
print(BLUE + "Transactions Table:" + RESET)
transactions_df.show(3)
print(BLUE + "Users Table:" + RESET)
users_df.show(3)
print(BLUE + "Cards Table:" + RESET)
cards_df.show(3)

[34mTransactions Table:[0m
+-------+-------------------+---------+-------+-------+-----------------+-----------+-------------+--------------+-------+----+------+
|     id|               date|client_id|card_id| amount|         use_chip|merchant_id|merchant_city|merchant_state|    zip| mcc|errors|
+-------+-------------------+---------+-------+-------+-----------------+-----------+-------------+--------------+-------+----+------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00|Swipe Transaction|      59935|       Beulah|            ND|58523.0|5499|  NULL|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57|Swipe Transaction|      67570|   Bettendorf|            IA|52722.0|5311|  NULL|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00|Swipe Transaction|      27092|        Vista|            CA|92084.0|4829|  NULL|
+-------+-------------------+---------+-------+-------+-----------------+-----------+-------------+--------------+-------+----+------+
only showing top 3 rows



### Clean data

Drop unnecessary columns

In [5]:
transactions_df1 = transactions_df.drop("merchant_state", "zip", "mcc")
users_df1 = users_df.drop("birth_month", "address", "latitude", "longitude")
cards_df1 = cards_df.drop("card_number", "cvv", "num_cards_issued", "year_pin_last_changed")

Number of nulls and empty values in columns

In [6]:
# Utility functions
def get_null_counts(df):
  df.select([(count(when(isnull(c) | isnan(c) | (col(c) == ""), c)) \
              if d not in ("timestamp", "date") else count(when(isnull(c) | (col(c) == ""), c))).alias(c) \
             for c, d in df.dtypes]).show()

def check_duplicates(df):
  if df.count() > df.dropDuplicates().count():
    print("Duplicate records exist!")
  else:
    print("No duplicate records")

In [7]:
print(BLUE + "Transactions Table Nulls:" + RESET)
get_null_counts(transactions_df1)
print(BLUE + "Users Table Nulls:" + RESET)
get_null_counts(users_df1)
print(BLUE + "Cards Table Nulls:" + RESET)
get_null_counts(cards_df1)

[34mTransactions Table Nulls:[0m
+---+----+---------+-------+------+--------+-----------+-------------+--------+
| id|date|client_id|card_id|amount|use_chip|merchant_id|merchant_city|  errors|
+---+----+---------+-------+------+--------+-----------+-------------+--------+
|  0|   0|        0|      0|     0|       0|          0|            0|13094522|
+---+----+---------+-------+------+--------+-----------+-------------+--------+

[34mUsers Table Nulls:[0m
+---+-----------+--------------+----------+------+-----------------+-------------+----------+------------+----------------+
| id|current_age|retirement_age|birth_year|gender|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+---+-----------+--------------+----------+------+-----------------+-------------+----------+------------+----------------+
|  0|          0|             0|         0|     0|                0|            0|         0|           0|               0|
+---+-----------+--------------+--------

Check for duplicate records

In [8]:
check_duplicates(transactions_df1)
check_duplicates(users_df1)
check_duplicates(cards_df1)

No duplicate records
No duplicate records
No duplicate records


Clean datatypes and other inconsistencies

In [9]:
print(BLUE + "Transactions Schema:" + RESET)
transactions_df1.printSchema()
print(BLUE + "Users Schema:" + RESET)
users_df1.printSchema()
print(BLUE + "Cards Schema:" + RESET)
cards_df1.printSchema()

[34mTransactions Schema:[0m
root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- errors: string (nullable = true)

[34mUsers Schema:[0m
root
 |-- id: integer (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- per_capita_income: string (nullable = true)
 |-- yearly_income: string (nullable = true)
 |-- total_debt: string (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)

[34mCards Schema:[0m
root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullabl

In [10]:
# Dollar sign from columns removed and column type changed to float

transactions_df2 = transactions_df1.withColumn("amount", regexp_replace(transactions_df1["amount"], "\$", ""))\
                                    .withColumnRenamed("id", "transaction_id")
cleaned_transactions_df = transactions_df2.withColumn("amount", transactions_df2["amount"].cast("float"))

users_df2 = users_df1.withColumn("per_capita_income", regexp_replace(users_df1["per_capita_income"], "\$", ""))\
                      .withColumn("yearly_income", regexp_replace(users_df1["yearly_income"], "\$", ""))\
                      .withColumn("total_debt", regexp_replace(users_df1["total_debt"], "\$", ""))\
                      .withColumnRenamed("id", "join_client_id")
cleaned_users_df = users_df2.withColumn("per_capita_income", users_df2["per_capita_income"].cast("float"))\
                             .withColumn("yearly_income", users_df2["yearly_income"].cast("float"))\
                             .withColumn("total_debt", users_df2["total_debt"].cast("float"))

cards_df2 = cards_df1.withColumn("expires", to_date(cards_df1["expires"], "MM/yyyy"))\
                      .withColumn("credit_limit", regexp_replace(cards_df1["credit_limit"], "\$", ""))\
                      .withColumn("acct_open_date", to_date(cards_df1["acct_open_date"], "MM/yyyy"))\
                      .withColumnRenamed("id", "join_card_id").withColumnRenamed("client_id", "card_client_id")
cleaned_cards_df = cards_df2.withColumn("credit_limit", cards_df2["credit_limit"].cast("float"))

### Analysis

Join the users and cards table with the transactions table

In [11]:
join_table1 = cleaned_transactions_df.join(cleaned_users_df, cleaned_transactions_df["client_id"] == cleaned_users_df["join_client_id"], "left").drop("join_client_id")
join_table2 = join_table1.join(cleaned_cards_df, join_table1["card_id"] == cleaned_cards_df["join_card_id"], "left").drop("join_card_id", "card_client_id")
join_table2.show(5)

+--------------+-------------------+---------+-------+------+-----------------+-----------+-------------+------+-----------+--------------+----------+------+-----------------+-------------+----------+------------+----------------+----------+---------------+----------+--------+------------+--------------+----------------+
|transaction_id|               date|client_id|card_id|amount|         use_chip|merchant_id|merchant_city|errors|current_age|retirement_age|birth_year|gender|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|card_brand|      card_type|   expires|has_chip|credit_limit|acct_open_date|card_on_dark_web|
+--------------+-------------------+---------+-------+------+-----------------+-----------+-------------+------+-----------+--------------+----------+------+-----------------+-------------+----------+------------+----------------+----------+---------------+----------+--------+------------+--------------+----------------+
|       7475327|2010-01-01 00:0

Q. Number of successful transactions every year

In [12]:
suc_trans_df = join_table2.filter("errors is null").withColumn("year", year(join_table2["date"]))
suc_trans_df.groupBy("year").count().orderBy("year").show()

+----+-------+
|year|  count|
+----+-------+
|2010|1221497|
|2011|1270206|
|2012|1300851|
|2013|1331226|
|2014|1343995|
|2015|1365712|
|2016|1369872|
|2017|1377019|
|2018|1372832|
|2019|1141312|
+----+-------+



Q. Count of unsuccessful transactions along with reason

In [13]:
unsuc_trans_df = cleaned_transactions_df.filter("errors is not null")
unsuc_trans_df.groupBy("errors").agg(count("*").alias("count")).orderBy("count", ascending=False).show()

+--------------------+------+
|              errors| count|
+--------------------+------+
|Insufficient Balance|130902|
|             Bad PIN| 32119|
|    Technical Glitch| 26271|
|     Bad Card Number|  7767|
|      Bad Expiration|  6161|
|             Bad CVV|  6106|
|         Bad Zipcode|  1126|
|Bad PIN,Insuffici...|   293|
|Insufficient Bala...|   243|
|Bad Card Number,I...|    71|
|Bad PIN,Technical...|    70|
|Bad CVV,Insuffici...|    57|
|Bad Expiration,In...|    47|
|Bad Card Number,B...|    38|
|Bad Card Number,B...|    33|
|Bad Expiration,Ba...|    32|
|Bad Expiration,Te...|    21|
|Bad Card Number,T...|    15|
|Bad CVV,Technical...|     8|
|Bad Zipcode,Insuf...|     7|
+--------------------+------+
only showing top 20 rows



Q. Type of card used for the transactions

In [14]:
join_table2.groupBy("gender", "card_brand", "card_type").agg(count("transaction_id").alias("count")).show()

+------+----------+---------------+-------+
|gender|card_brand|      card_type|  count|
+------+----------+---------------+-------+
|  Male|Mastercard|          Debit|2522592|
|Female|      Visa|          Debit|1495094|
|Female|      Amex|         Credit| 459159|
|  Male|      Amex|         Credit| 395331|
|Female|Mastercard|Debit (Prepaid)| 338025|
|  Male|      Visa|          Debit|1518942|
|Female|      Visa|Debit (Prepaid)| 131562|
|Female|      Visa|         Credit| 848870|
|  Male|      Visa|Debit (Prepaid)| 151487|
|Female|Mastercard|         Credit| 627660|
|  Male|      Visa|         Credit| 811608|
|  Male|  Discover|         Credit| 165285|
|  Male|Mastercard|         Credit| 630098|
|  Male|Mastercard|Debit (Prepaid)| 294656|
|Female|Mastercard|          Debit|2744368|
|Female|  Discover|         Credit| 171178|
+------+----------+---------------+-------+



Q. Distribution of number of transactions and total amount spent based on gender, age group and financial stability

In [15]:
age_group_df = suc_trans_df.withColumn("age_group", when(suc_trans_df["current_age"] < 18, "Below 18")\
                        .when((suc_trans_df["current_age"] >= 18) & (suc_trans_df["current_age"] <= 35), "18 to 35")\
                        .otherwise("Above 35"))\
                        .withColumn("above_per_capita", when(suc_trans_df["yearly_income"] >= suc_trans_df["per_capita_income"], "Yes")\
                                    .otherwise("No"))
age_group_df.groupBy("gender", "age_group", "above_per_capita").agg(count("transaction_id").alias("number_of_trans"),\
                                                sum("amount").alias("amount_spent")).orderBy("amount_spent", ascending=False).show()

+------+---------+----------------+---------------+--------------------+
|gender|age_group|above_per_capita|number_of_trans|        amount_spent|
+------+---------+----------------+---------------+--------------------+
|Female| Above 35|             Yes|        5758793|2.4654847559925961E8|
|  Male| Above 35|             Yes|        5596134|2.3448838117738235E8|
|Female| 18 to 35|             Yes|         774050|3.4092037300454125E7|
|  Male| 18 to 35|             Yes|         648857| 3.102110402926568E7|
|  Male| Above 35|              No|         140568|   6737874.449065417|
|Female| Above 35|              No|         176120|   6267352.169977255|
+------+---------+----------------+---------------+--------------------+

