In [1]:
import os
import sys
import findspark # We use this function to find PySpark in venv

In [None]:
# --- ENVIRONMENT CONFIGURATION ---
# Initialize findspark to automatically locate the Spark installation in the virtual env.
# This sets SPARK_HOME and PYTHONPATH variables dynamically.
findspark.init()

# Set JAVA_HOME explicitly to the default Ubuntu OpenJDK path.
# This bridges the gap between Python and the JVM.
os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"

from pyspark.sql import SparkSession
from pyspark.sql.functions import desc # We need to import this fuctions AFTER findspark.init()

# --- INITIALIZE SPARK SESSION ---
print(" Starting Spark Engine with local JDBC driver...")

# Absolute path to the PostgreSQL JDBC driver (.jar)
# Ensure 'postgresql-42.6.0.jar' is present in the current directory.
jar_path = os.path.abspath("postgresql-42.6.0.jar")

# Build the SparkSession.
# We include the JDBC driver in the classpath to allow database connectivity.
spark = SparkSession.builder \
    .appName("BankFraudAnalysis") \
    .config("spark.jars", jar_path) \
    .config("spark.driver.extraClassPath", jar_path) \
    .getOrCreate()

print(f"Spark Engine INITIALIZED! Version: {spark.version}")

 Starting Spark Engine with local JDBC driver...


26/01/05 23:49:42 WARN Utils: Your hostname, bruce-Latitude-3420 resolves to a loopback address: 127.0.1.1; using 192.168.1.2 instead (on interface wlp0s20f3)
26/01/05 23:49:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
26/01/05 23:49:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark Engine INITIALIZED! Version: 3.5.7


In [3]:
# --- DATABASE CONNECTION ---
print(" Connecting to PostgreSQL database...")

# JDBC Connection String
# We connect to localhost:5434 (The Docker port exposed to the host)
jdbc_url = "jdbc:postgresql://localhost:5434/bank_fraud_db"

connection_properties = {
    "user": "admin",
    "password": "admin_password",
    "driver": "org.postgresql.Driver"
}

 Connecting to PostgreSQL database...


In [4]:
# --- DATA INGESTION (READ) ---
# Read the 'transactions' table into a Spark DataFrame
df_spark = spark.read.jdbc(
    url=jdbc_url, 
    table="transactions", 
    properties=connection_properties
)

# --- EXPLORATORY DATA ANALYSIS (EDA) ---
record_count = df_spark.count()
print(f" Total Transactions Loaded: {record_count}")

print("\n === Top 5 Highest Value Transactions ===")
df_spark.orderBy(desc("amount")).show(5)

print("\n=== Schema Inference ===")
df_spark.printSchema()

 Total Transactions Loaded: 1850

 === Top 5 Highest Value Transactions ===
+--------------------+---------+-------------------+--------+---------------+---------+--------+
|      transaction_id|client_id|   transaction_date|  amount|       merchant| location|is_fraud|
+--------------------+---------+-------------------+--------+---------------+---------+--------+
|b203321f-f70c-447...|     4672|2026-01-06 05:12:00|19525.49|      Smith-Kim|   Poland|    true|
|362772bf-7665-404...|     8772|2026-01-06 05:44:01|19491.27|Martinez-Bowers| Botswana|    true|
|537a2efd-e15b-47c...|     4647|2026-01-06 05:40:01|19442.86|   Juarez Group|Guatemala|    true|
|04e96818-5052-4ad...|     7400|2026-01-06 05:08:00|19394.50|     Forbes Ltd|  Moldova|    true|
|d66daace-f537-449...|     6455|2026-01-06 05:08:00|19231.56|   Gonzalez Inc|    Nepal|    true|
+--------------------+---------+-------------------+--------+---------------+---------+--------+
only showing top 5 rows


=== Schema Inference ===
