In [2]:
import findspark
findspark.init()  # Automatically finds and sets SPARK_HOME

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .getOrCreate()

# Check Spark version
print("Spark version:", spark.version)
sc=spark.sparkContext

Spark version: 3.5.5


In [3]:
df=spark.read.csv('final_data.csv',header=True,inferSchema=True,mode="DROPMALFORMED")
df.show(10)

+---------------------+----------------+--------------------+--------------+------+-----------+------+------+--------------------+-----------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|trans_date_trans_time|          cc_num|            merchant|      category|   amt|      first|  last|gender|              street|             city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat|        merch_long|is_fraud|
+---------------------+----------------+--------------------+--------------+------+-----------+------+------+--------------------+-----------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|  2019-08-11 19:38:33|4681601008538160|fraud_Hermann and...|  shopping_pos|

In [4]:
df.printSchema()

root
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



# 1. What are the top 5 cities where most fraudulent transactions occur

In [5]:
df.filter(df.is_fraud == 1).groupBy("city").count().orderBy("count", ascending=False).show(5)

+-------------+-----+
|         city|count|
+-------------+-----+
|       Dallas|   39|
|      Houston|   39|
|   Birmingham|   36|
|New York City|   35|
|    Allentown|   34|
+-------------+-----+
only showing top 5 rows



# 2.What are the most frequent transaction categories associated with fraudulent transactions

In [6]:
df.filter(df.is_fraud == 1).groupBy('category').count().orderBy("count",ascending=False).show()

+--------------+-----+
|      category|count|
+--------------+-----+
|   grocery_pos| 2228|
|  shopping_net| 2219|
|      misc_net| 1182|
|  shopping_pos| 1056|
| gas_transport|  772|
|      misc_pos|  322|
|     kids_pets|  304|
| entertainment|  292|
| personal_care|  290|
|          home|  265|
|   food_dining|  205|
|health_fitness|  185|
|   grocery_net|  175|
|        travel|  156|
+--------------+-----+



# 3. What are top 10 most frequent merchants for fraudulent transactions

In [7]:
df.filter(df.is_fraud == 1).groupBy("merchant").count().orderBy("count",ascending=False).show(10)

+--------------------+-----+
|            merchant|count|
+--------------------+-----+
|   fraud_Kilback LLC|   62|
|  fraud_Rau and Sons|   60|
|   fraud_Kozey-Boehm|   60|
|     fraud_Doyle Ltd|   57|
|    fraud_Terry-Huel|   56|
|      fraud_Kuhn LLC|   55|
|     fraud_Boyer PLC|   55|
|     fraud_Kuhic LLC|   53|
|fraud_Moen, Reing...|   53|
|fraud_Kiehn-Emmerich|   53|
+--------------------+-----+
only showing top 10 rows



# 4. Calculate the avg amount spend by each gender and visualize it

In [8]:
from pyspark.sql.functions import mean
df.groupBy("gender").mean("amt").show()

+------+------------------+
|gender|          avg(amt)|
+------+------------------+
|     F|271.71298766157497|
|     M|330.50721562156156|
+------+------------------+



# 5. Top 5 merchant with highest average transaction amount

In [9]:
from pyspark.sql.functions import mean
df.groupBy("merchant").agg(mean("amt").alias("mean_amt")).orderBy("mean_amt", ascending=False).show(5)

+--------------------+-----------------+
|            merchant|         mean_amt|
+--------------------+-----------------+
|fraud_Bashirian G...|891.0655102040814|
|     fraud_Kuhic LLC|869.5223437500001|
|fraud_Schmidt and...|852.2311111111113|
|fraud_Heathcote, ...|838.5803225806453|
|   fraud_Kozey-Boehm| 832.279864864865|
+--------------------+-----------------+
only showing top 5 rows



# 6. Calculate the age of each customer 

In [11]:
from pyspark.sql.functions import col, year, to_timestamp
from datetime import datetime

df = df.withColumn("dob_timestamp", to_timestamp(col("dob")))

current_year = datetime.now().year

df1 = df.withColumn("Age", current_year - year(col("dob_timestamp")))

df1.select("first","last", "Age").distinct().show()


+---------+---------+---+
|    first|     last|Age|
+---------+---------+---+
|   Robert|   Haynes| 28|
|  Vincent|   Waller| 71|
|     Anna|    Logan| 30|
|   Melvin|   Wright| 24|
|    James|  Baldwin| 45|
|Stephanie|    Crane| 70|
|     Jodi|Rodriguez| 48|
|     Adam| Mcdonald| 34|
|  Katelyn|     Wise| 88|
|    Brian| Williams| 38|
|     Sara|   Harris| 49|
|   Andrea|  Perkins| 53|
| Kimberly|     Webb| 75|
| Michelle|     Beck| 58|
|    Scott|     Cole| 65|
| Jennifer|    Black| 44|
|  Michael|    Jones| 64|
|  Bethany|  Andrade| 59|
|  William|  Johnson| 68|
|  Charles|  Preston| 63|
+---------+---------+---+
only showing top 20 rows



# 7. Top 5 states with highest number of transactions

In [12]:
df.groupBy('state').count().orderBy('count',ascending=False).show(5)

+-----+-----+
|state|count|
+-----+-----+
|   NY| 1290|
|   TX| 1287|
|   PA| 1153|
|   CA|  857|
|   OH|  687|
+-----+-----+
only showing top 5 rows



# 8.  Distribution of transaction over the course of month

In [13]:
from datetime import datetime
from pyspark.sql.functions import col, month, to_timestamp
df = df.withColumn("dob_timestamp", to_timestamp(col("dob")))

df = df.withColumn("month", month(col("dob_timestamp")))

df.groupBy("month").count().orderBy('month',ascending=True).show()

+-----+-----+
|month|count|
+-----+-----+
|    1| 1646|
|    2| 1275|
|    3| 1804|
|    4| 1663|
|    5| 1747|
|    6| 1642|
|    7| 1631|
|    8| 1505|
|    9| 1631|
|   10| 1514|
|   11| 1675|
|   12| 1569|
+-----+-----+



# 10. Analyse spending pattern based on age groups and avg spending amount for each group

In [14]:
from pyspark.sql.functions import col, year, to_timestamp
from datetime import datetime
from pyspark.sql.functions import col, when, avg


df = df.withColumn("dob_timestamp", to_timestamp(col("dob")))

current_year = datetime.now().year

df1 = df.withColumn("age", current_year - year(col("dob_timestamp")))


df1 = df1.withColumn("age_group", 
    when(col("age").between(18, 25), "18-25")
    .when(col("age").between(26, 35), "26-35")
    .when(col("age").between(36, 45), "36-45")
    .when(col("age").between(46, 55), "46-55")
    .when(col("age").between(56, 65), "56-65")
    .when(col("age") > 65, "65+")
)


avg_spent = df1.groupBy("age_group").agg(avg("amt").alias("avg_spent")).orderBy("age_group").show()


+---------+------------------+
|age_group|         avg_spent|
+---------+------------------+
|    18-25| 341.1335061728393|
|    26-35|  300.274189364462|
|    36-45| 256.7721430363867|
|    46-55|244.01207727044644|
|    56-65|354.12696461824964|
|      65+| 341.5525411255414|
+---------+------------------+



# Python libraries and functions from PySpark (pyspark.sql.functions)
# ----------------------------------------------------------------------------
# col- references DataFrame columns.
# year - extracts the year.
# to_timestamp - converts a string to a timestamp.
# month - extracts the month.
# datetime - provides date/time functions.
# when - applies conditional logic.
# avg - calculates the average.
# .withColumn() - to add new columns based on existing ones.
# agg() - applies multiple aggregate functions (like avg(), sum(), etc.) to a column.