1. Load the CSV Data

In [1]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("FinancePySparkProject").getOrCreate()

# Load CSV data
df = spark.read.csv("financial_data.csv", header=True, inferSchema=True)
df.show()


+--------------+----------+-------------------+----------------+------+--------+--------------+-------------+
|transaction_id|account_id|   transaction_date|transaction_type|amount|currency|      category|     location|
+--------------+----------+-------------------+----------------+------+--------+--------------+-------------+
|             1|      1001|2023-07-01 00:00:00|          Credit|1200.5|     USD|        Salary|     New York|
|             2|      1001|2023-07-03 00:00:00|           Debit|150.75|     USD|     Groceries|     New York|
|             3|      1002|2023-07-02 00:00:00|          Credit|2500.0|     EUR|        Salary|       Berlin|
|             4|      1003|2023-07-04 00:00:00|           Debit|1000.0|     USD|          Rent|  Los Angeles|
|             5|      1002|2023-07-05 00:00:00|          Credit| 200.0|     EUR|    Investment|       Berlin|
|             6|      1001|2023-07-06 00:00:00|           Debit| 500.0|     USD|     Utilities|     New York|
|         

2. Data Cleaning

Handle Missing Values: Replace or remove rows with missing values.

In [4]:
from pyspark.sql.functions import col, sum, when

# Check for missing values in each column
df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()

# Replace missing values in the 'amount' column with the median value
median_amount = df.approxQuantile("amount", [0.5], 0.0)[0]
df = df.na.fill({'amount': median_amount})

# Drop rows where 'transaction_type', 'currency', or 'category' is missing
df = df.na.drop(subset=["transaction_type", "currency", "category"])
df.show()


+--------------+----------+----------------+----------------+------+--------+--------+--------+
|transaction_id|account_id|transaction_date|transaction_type|amount|currency|category|location|
+--------------+----------+----------------+----------------+------+--------+--------+--------+
|             0|         0|               0|               0|     0|       0|       0|       0|
+--------------+----------+----------------+----------------+------+--------+--------+--------+

+--------------+----------+-------------------+----------------+------+--------+--------------+-------------+
|transaction_id|account_id|   transaction_date|transaction_type|amount|currency|      category|     location|
+--------------+----------+-------------------+----------------+------+--------+--------------+-------------+
|             1|      1001|2023-07-01 00:00:00|          Credit|1200.5|     USD|        Salary|     New York|
|             2|      1001|2023-07-03 00:00:00|           Debit|150.75|     USD

Remove Outliers: Identify and remove outliers in the amount column.

In [5]:
# Define a threshold for outliers, e.g., transactions above the 95th percentile
upper_limit = df.approxQuantile("amount", [0.95], 0.0)[0]
df = df.filter(df["amount"] <= upper_limit)
df.show()


+--------------+----------+-------------------+----------------+------+--------+--------------+-------------+
|transaction_id|account_id|   transaction_date|transaction_type|amount|currency|      category|     location|
+--------------+----------+-------------------+----------------+------+--------+--------------+-------------+
|             1|      1001|2023-07-01 00:00:00|          Credit|1200.5|     USD|        Salary|     New York|
|             2|      1001|2023-07-03 00:00:00|           Debit|150.75|     USD|     Groceries|     New York|
|             3|      1002|2023-07-02 00:00:00|          Credit|2500.0|     EUR|        Salary|       Berlin|
|             4|      1003|2023-07-04 00:00:00|           Debit|1000.0|     USD|          Rent|  Los Angeles|
|             5|      1002|2023-07-05 00:00:00|          Credit| 200.0|     EUR|    Investment|       Berlin|
|             6|      1001|2023-07-06 00:00:00|           Debit| 500.0|     USD|     Utilities|     New York|
|         

3. Data Validation

Ensure Correct Data Types: Validate and correct data types.

In [6]:
df.printSchema()

# Convert amount to double if it's not already
df = df.withColumn("amount", col("amount").cast("double"))
df.printSchema()


root
 |-- transaction_id: integer (nullable = true)
 |-- account_id: integer (nullable = true)
 |-- transaction_date: timestamp (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- amount: double (nullable = false)
 |-- currency: string (nullable = true)
 |-- category: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- transaction_id: integer (nullable = true)
 |-- account_id: integer (nullable = true)
 |-- transaction_date: timestamp (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- amount: double (nullable = false)
 |-- currency: string (nullable = true)
 |-- category: string (nullable = true)
 |-- location: string (nullable = true)



Validate Date Formats: Ensure transaction_date is in the correct format.

In [7]:
from pyspark.sql.functions import to_date

df = df.withColumn("transaction_date", to_date(df["transaction_date"], "yyyy-MM-dd"))
df.show()


+--------------+----------+----------------+----------------+------+--------+--------------+-------------+
|transaction_id|account_id|transaction_date|transaction_type|amount|currency|      category|     location|
+--------------+----------+----------------+----------------+------+--------+--------------+-------------+
|             1|      1001|      2023-07-01|          Credit|1200.5|     USD|        Salary|     New York|
|             2|      1001|      2023-07-03|           Debit|150.75|     USD|     Groceries|     New York|
|             3|      1002|      2023-07-02|          Credit|2500.0|     EUR|        Salary|       Berlin|
|             4|      1003|      2023-07-04|           Debit|1000.0|     USD|          Rent|  Los Angeles|
|             5|      1002|      2023-07-05|          Credit| 200.0|     EUR|    Investment|       Berlin|
|             6|      1001|      2023-07-06|           Debit| 500.0|     USD|     Utilities|     New York|
|             7|      1004|      2023

Check for Duplicate Records: Remove duplicates based on specific columns.

In [8]:
df = df.dropDuplicates(subset=["transaction_id", "account_id"])
df.show()


+--------------+----------+----------------+----------------+------+--------+--------------+-------------+
|transaction_id|account_id|transaction_date|transaction_type|amount|currency|      category|     location|
+--------------+----------+----------------+----------------+------+--------+--------------+-------------+
|             1|      1001|      2023-07-01|          Credit|1200.5|     USD|        Salary|     New York|
|             2|      1001|      2023-07-03|           Debit|150.75|     USD|     Groceries|     New York|
|             3|      1002|      2023-07-02|          Credit|2500.0|     EUR|        Salary|       Berlin|
|             4|      1003|      2023-07-04|           Debit|1000.0|     USD|          Rent|  Los Angeles|
|             5|      1002|      2023-07-05|          Credit| 200.0|     EUR|    Investment|       Berlin|
|             6|      1001|      2023-07-06|           Debit| 500.0|     USD|     Utilities|     New York|
|             7|      1004|      2023

4. Data Transformation

   
Categorize Transactions: Group transactions into broader categories.

In [12]:
from pyspark.sql.functions import when

df = df.withColumn("broad_category",
                   when(df["category"].isin("Groceries", "Rent"), "Living Expenses")
                   .when(df["category"] == "Salary", "Income")
                   .otherwise("Other"))
df.show()


+--------------+----------+----------------+----------------+------+--------+--------------+-------------+---------------+
|transaction_id|account_id|transaction_date|transaction_type|amount|currency|      category|     location| broad_category|
+--------------+----------+----------------+----------------+------+--------+--------------+-------------+---------------+
|             1|      1001|      2023-07-01|          Credit|1200.5|     USD|        Salary|     New York|         Income|
|             2|      1001|      2023-07-03|           Debit|150.75|     USD|     Groceries|     New York|Living Expenses|
|             3|      1002|      2023-07-02|          Credit|2500.0|     EUR|        Salary|       Berlin|         Income|
|             4|      1003|      2023-07-04|           Debit|1000.0|     USD|          Rent|  Los Angeles|Living Expenses|
|             5|      1002|      2023-07-05|          Credit| 200.0|     EUR|    Investment|       Berlin|          Other|
|             6|

Add Time-Based Features: Extract month and day of the week from transaction_date.

In [15]:
from pyspark.sql.functions import month, dayofweek

df = df.withColumn("transaction_month", month(df["transaction_date"]))
df = df.withColumn("transaction_day", dayofweek(df["transaction_date"]))
df.show()


+--------------+----------+----------------+----------------+------+--------+--------------+-------------+---------------+-----------------+---------------+
|transaction_id|account_id|transaction_date|transaction_type|amount|currency|      category|     location| broad_category|transaction_month|transaction_day|
+--------------+----------+----------------+----------------+------+--------+--------------+-------------+---------------+-----------------+---------------+
|             1|      1001|      2023-07-01|          Credit|1200.5|     USD|        Salary|     New York|         Income|                7|              7|
|             2|      1001|      2023-07-03|           Debit|150.75|     USD|     Groceries|     New York|Living Expenses|                7|              2|
|             3|      1002|      2023-07-02|          Credit|2500.0|     EUR|        Salary|       Berlin|         Income|                7|              1|
|             4|      1003|      2023-07-04|           Deb

Aggregate Data: Calculate total expenditure by category and location.

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, max, min, trunc
from pyspark.sql.window import Window

# Total amount by category and location
total_by_category_location = df.groupBy("category", "location").agg(
    sum("amount").alias("total_amount"),
    avg("amount").alias("average_amount"),
    max("amount").alias("max_amount"),
    min("amount").alias("min_amount")
)

total_by_category_location.show()


+--------------+-------------+------------+------------------+----------+----------+
|      category|     location|total_amount|    average_amount|max_amount|min_amount|
+--------------+-------------+------------+------------------+----------+----------+
|     Groceries|     New York|      350.75|           175.375|     200.0|    150.75|
|    Investment|       Berlin|       200.0|             200.0|     200.0|     200.0|
|     Utilities|San Francisco|       800.0|             800.0|     800.0|     800.0|
|     Freelance|     New York|      1700.0|             850.0|     900.0|     800.0|
|        Coffee|       Berlin|       130.0|              65.0|      70.0|      60.0|
|    Investment|  Los Angeles|       100.0|             100.0|     100.0|     100.0|
|     Groceries|       Berlin|       650.0|             325.0|     500.0|     150.0|
|     Utilities|      Chicago|       700.0|             700.0|     700.0|     700.0|
|        Coffee|San Francisco|        90.0|              45.0|   

Add Time-Based Features: Extract month and day of the week from transaction_date.

In [23]:
# Add a 'month' column for monthly aggregation
df_with_month = df.withColumn("month", trunc("transaction_date", "MM"))

# Total amount by month and location
total_by_month_location = df_with_month.groupBy("month", "location").agg(
    sum("amount").alias("total_amount")
)

total_by_month_location.show()


+----------+-------------+------------+
|     month|     location|total_amount|
+----------+-------------+------------+
|2023-08-01|San Francisco|       950.0|
|2023-08-01|     New York|      1700.0|
|2023-07-01|       Berlin|      9900.0|
|2023-08-01|      Chicago|      2600.0|
|2023-07-01|      Chicago|      3825.0|
|2023-07-01|  Los Angeles|      2950.0|
|2023-08-01|       Berlin|      2680.0|
|2023-07-01|San Francisco|       430.0|
|2023-07-01|     New York|     4981.25|
|2023-08-01|  Los Angeles|      2800.0|
+----------+-------------+------------+



Window Functions


Running Total by Location

In [24]:
# Define window specification for running total
window_spec = Window.partitionBy("location").orderBy("transaction_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate running total
df_with_running_total = df.withColumn("running_total", sum("amount").over(window_spec))

df_with_running_total.show()


+--------------+----------+----------------+----------------+------+--------+-------------+--------+---------------+-----------------+---------------+-------------+
|transaction_id|account_id|transaction_date|transaction_type|amount|currency|     category|location| broad_category|transaction_month|transaction_day|running_total|
+--------------+----------+----------------+----------------+------+--------+-------------+--------+---------------+-----------------+---------------+-------------+
|             3|      1002|      2023-07-02|          Credit|2500.0|     EUR|       Salary|  Berlin|         Income|                7|              1|       2500.0|
|             5|      1002|      2023-07-05|          Credit| 200.0|     EUR|   Investment|  Berlin|          Other|                7|              4|       2700.0|
|            10|      1002|      2023-07-10|          Credit|2200.0|     EUR|       Salary|  Berlin|         Income|                7|              2|       4900.0|
|         

7-Day Moving Average by Location

In [25]:
# Define window specification for 7-day moving average
moving_avg_window = Window.partitionBy("location").orderBy("transaction_date").rowsBetween(-6, 0)

# Calculate moving average
df_with_moving_avg = df.withColumn("moving_avg", avg("amount").over(moving_avg_window))

df_with_moving_avg.show()


+--------------+----------+----------------+----------------+------+--------+-------------+--------+---------------+-----------------+---------------+------------------+
|transaction_id|account_id|transaction_date|transaction_type|amount|currency|     category|location| broad_category|transaction_month|transaction_day|        moving_avg|
+--------------+----------+----------------+----------------+------+--------+-------------+--------+---------------+-----------------+---------------+------------------+
|             3|      1002|      2023-07-02|          Credit|2500.0|     EUR|       Salary|  Berlin|         Income|                7|              1|            2500.0|
|             5|      1002|      2023-07-05|          Credit| 200.0|     EUR|   Investment|  Berlin|          Other|                7|              4|            1350.0|
|            10|      1002|      2023-07-10|          Credit|2200.0|     EUR|       Salary|  Berlin|         Income|                7|              2|