# Financial KPI Reporting with PySpark & Power BI
This notebook processes financial transaction data using **PySpark**, cleans and enriches it, and prepares a dataset for visualization in **Power BI**. It simulates missing amounts, calculates KPIs (cost, profit, margin), assigns risk segments, and outputs a clean CSV ready for business intelligence dashboards.

---

## Table of Contents
1. [Environment Setup](#1-environment-setup)
2. [Data Upload & Extraction](#2-data-upload--extraction)
3. [Load Data](#3-load-data)
4. [Join Datasets](#4-join-datasets)
5. [Clean and Simulate Amount](#5-clean-and-simulate-amount)
6. [Date & KPI Calculations](#6-date--kpi-calculations)
7. [Risk Segmentation](#7-risk-segmentation)
8. [Select Clean Columns](#8-select-clean-columns)
9. [Export for Power BI](#9-export-for-power-bi)


## 1. Environment Setup
### Install and configure Spark environment

In [None]:
# --- 0) Spark setup (run once per session) ---
!apt-get install -qq openjdk-11-jdk-headless > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar -xzf spark-3.4.1-bin-hadoop3.tgz
!pip -q install findspark

import os, findspark
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("FinancialKPIReporting").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

## 2. Data Upload & Extraction
### Upload and extract Kaggle dataset

In [None]:
# --- 1) Upload and extract your Kaggle zip ---
from google.colab import files
uploaded = files.upload()   # upload your "archive (2).zip"

import zipfile
import io

zip_name = [k for k in uploaded.keys()][0]
with zipfile.ZipFile(io.BytesIO(uploaded[zip_name]), "r") as z:
    z.extractall("/content/transaction_data")

base = "/content/transaction_data"

## 3. Load Data
### Load transactions, card details, and fraud labels

In [None]:
# --- 2) Load data (NO MCC JOIN) ---
from pyspark.sql import functions as F

df_tx    = spark.read.option("header", True).option("inferSchema", True).csv(f"{base}/transactions_data.csv")
df_cards = spark.read.option("header", True).option("inferSchema", True).csv(f"{base}/cards_data.csv")

# Fraud labels are JSON dict: {transaction_id: "Yes"/"No"}
import pandas as pd
labels_pd = pd.read_json(f"{base}/train_fraud_labels.json").reset_index().rename(columns={"index":"transaction_id", "target":"fraud"})
df_labels = spark.createDataFrame(labels_pd)

## 4. Join Datasets
### Normalize IDs and join dataframes

In [None]:
# --- 3) Normalize ids to avoid join errors ---
df_tx    = df_tx.withColumnRenamed("id", "transaction_id")
df_cards = df_cards.withColumnRenamed("id", "card_id").withColumnRenamed("client_id", "card_client_id")

# --- 4) Join: transactions + labels + cards ---
df = (df_tx
      .join(df_labels, on="transaction_id", how="left")        # adds 'fraud' (Yes/No)
      .fillna({"fraud": "No"})
      .join(df_cards, on="card_id", how="left")                 # adds card attributes
     )

## 5. Clean and Simulate Amount
### Clean 'amount' field and simulate missing values

In [None]:
# --- 5) Clean amount and simulate where null ---
from pyspark.sql.functions import regexp_replace, col, when, rand

# If 'amount' looks like "$1,234.50", strip symbols then cast
df = df.withColumn("amount", regexp_replace("amount", "[$,]", ""))
df = df.withColumn("amount", col("amount").cast("double"))

# Simulate amount for nulls (range ~50..3050, reproducible)
df = df.withColumn(
    "amount",
    when(col("amount").isNull(), (rand(seed=42) * F.lit(3000.0) + F.lit(50.0))).otherwise(col("amount"))
)

## 6. Date & KPI Calculations
### Extract date parts and calculate KPIs

In [None]:
# --- 6) Dates and derived fields ---
from pyspark.sql.functions import to_timestamp, year, month

df = (df
      .withColumn("timestamp", to_timestamp("date"))    # auto-parse; adapt fmt if needed
      .withColumn("year",  year("timestamp"))
      .withColumn("month", month("timestamp"))
     )

# KPIs: cost/profit/margin (simple assumptions)
df = (df
      .withColumn("cost",   col("amount") * F.lit(0.70))
      .withColumn("profit", col("amount") - col("cost"))
      .withColumn("profit_margin",
                  when(col("amount") > 0, col("profit")/col("amount")).otherwise(F.lit(0.0)))
      .withColumn("is_fraud", when(col("fraud") == "Yes", F.lit(1)).otherwise(F.lit(0)))
     )

## 7. Risk Segmentation
### Assign risk segments without UDF for better performance

In [None]:
# Risk segment (no UDF; faster)
df = df.withColumn(
    "risk_segment",
    when(col("fraud") == "Yes", "High Risk")
    .when(col("amount") > 1000, "Medium Risk")
    .otherwise("Low Risk")
)

## 8. Select Clean Columns
### Select relevant columns for export

In [None]:
# --- 7) Select clean columns (no MCC) & avoid ambiguity ---
wanted_cols = [
    "transaction_id","date","timestamp","year","month",
    "client_id","card_id","card_client_id",
    "amount","cost","profit","profit_margin",
    "use_chip","merchant_id","merchant_city","merchant_state","zip",
    "fraud","is_fraud","risk_segment",
    "card_brand","card_type","credit_limit","card_on_dark_web"
]
df_final = df.select(*[c for c in wanted_cols if c in df.columns])

# Quick sanity checks
df_final.printSchema()
df_final.groupBy("risk_segment").count().orderBy(F.col("count").desc()).show()

## 9. Export for Power BI
### Save cleaned data to CSV and download

In [None]:
# --- 8) Export for Power BI (single CSV inside a folder) ---
out_dir = "/content/cleaned_financial_data"
(df_final
 .coalesce(1)
 .write.mode("overwrite")
 .option("header", True)
 .csv(out_dir)
)

# Zip and download
!zip -qr cleaned_financial_data.zip cleaned_financial_data
from google.colab import files
files.download("cleaned_financial_data.zip")