## *Introduction To Pyspark*

<img src="lets_go.gif" width="250" height="150">  
v>

#### *Why Pyspark?*
#### Pandas :  
  - Data manipulation and analysis  
  - Single computer's memory usage  
  - Easy-to-use data structures like DataFrames and Series  
  - But When it comes to Big Data.....
  - <img src="BigDatahumor.webp" width="250" height="150">
#### Pyspark :  
  - Built on top of Apache Spark, designed for processing and analyzing large-scale datasets
  - Similar PySpark's DataFrame API to Pandas  
  - Python code that can be distributed and executed in parallel across a cluster of computers/GPU's or single computer CPU cores, Like an Naruto's Shadow Clone Jutsu  
  - <img src="distr_comp.webp" width="250" height="150">

##### *What is Pysprak?*
*PySpark is the Python API for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing.*  
<img src="kehna_kya_chahte_hogif.gif" width="300" height="200">

### *Let's Dive in to Practical Thing!!*

<img src="all_right_be_practical.gif" width="300" height="200">

#### *Data Reading Comparison*

In [1]:
# Import Libraries
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import time
import sys
pd.set_option('display.max_columns', None)



In [2]:
# Function to read CSV file using Pandas and measure time
def read_csv_with_pandas(file_path):
    start_time = time.time()
    df = pd.read_csv(file_path, header = 0, sep = ',',low_memory = False,)
    end_time = time.time()
    #memory use in GB's
    memory_use = sys.getsizeof(df)/(1024**3)
    return df, end_time - start_time, memory_use

In [3]:
# File path
file_path = "Fraud.csv/Fraud.csv"

# Read CSV file using Pandas
pandas_df, pandas_time, pandas_memory = read_csv_with_pandas(file_path)
print("Pandas reading time:", pandas_time, "seconds")
print("memory Used by Pandas:",pandas_memory,"GB")

Pandas reading time: 30.387768030166626 seconds
memory Used by Pandas: 1.5607366245239973 GB


*Pandas' memory usage after reading data is like Joey piling up all the food on his plate at Thanksgiving dinner – it starts off reasonable, but by the end, it'll struggle to keep it all together!*  
<img src="thanksgiving-day-turkey.gif" width="300" height="200">

In [4]:
# first of all to start with pyspark we need to create pyspark sesion. 
spark = SparkSession.builder \
        .appName("ReadCSVWithPySpark") \
        .getOrCreate()

#### *Why to Create Pyspark Session?*

<img src="creating pyspark session.webp" width="300" height="200">

*Think of creating a PySpark session like summoning Megumi's shikigami hounds in 'Jujutsu Kaisen.' Just as Megumi calls upon his loyal companions to assist him in battle, creating a PySpark session summons your faithful connection to the Spark cluster. With this connection, you can command your PySpark hounds to create DataFrames, register them as tables, execute data jujutsu (SQL queries), cache tables for quick access, and even read files effortlessly.*

In [5]:
# Function to read CSV file using PySpark and measure time
def read_csv_with_pyspark(file_path):
    start_time = time.time()
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    end_time = time.time()
    #memory use in GB's
    memory_use = sys.getsizeof(df)/(1024**3)
    return df, end_time - start_time, memory_use

In [6]:
# Read CSV file using PySpark
pyspark_df, pyspark_time, pyspark_memory = read_csv_with_pyspark(file_path)
print("PySpark reading time:", pyspark_time, "seconds")
print("memory Used by Pyspark:",pyspark_memory,"GB")

PySpark reading time: 28.72043251991272 seconds
memory Used by Pyspark: 5.21540641784668e-08 GB


In [7]:
# Variable storages
describe_stat = pandas_df.describe()
describe_stat

Unnamed: 0,step,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
count,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0
mean,243.3972,179861.9,833883.1,855113.7,1100702.0,1224996.0,0.00129082,2.514687e-06
std,142.332,603858.2,2888243.0,2924049.0,3399180.0,3674129.0,0.0359048,0.001585775
min,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,156.0,13389.57,0.0,0.0,0.0,0.0,0.0,0.0
50%,239.0,74871.94,14208.0,0.0,132705.7,214661.4,0.0,0.0
75%,335.0,208721.5,107315.2,144258.4,943036.7,1111909.0,0.0,0.0
max,743.0,92445520.0,59585040.0,49585040.0,356015900.0,356179300.0,1.0,1.0


#### *Now memory for this variable again will get add up...*
<img src="pandas_memory_use.gif" width="300" height="200">

In [8]:
print("Memory used by describe_stat Varibale for pandas (In bytes) : ")
print(sys.getsizeof(describe_stat))

Memory used by describe_stat Varibale for pandas (In bytes) : 
1027


In [9]:
describe_stat = pyspark_df.describe()
print(describe_stat.show())

+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|     oldbalanceOrg|   newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|  count|           6362620| 6362620|           6362620|    6362620|           6362620|          6362620|    6362620|           6362620|           6362620|             6362620|             6362620|
|   mean|243.39724563151657|    NULL|179861.90354913048|       NULL| 833883.1040744851|855113.6685785787|       NULL|1100701.6665196505|1224996.3982019336|0.001290820448180152| 2.51468734577894E-6|
| stddev| 

In [10]:
# well it's bit messy
print(describe_stat.show(vertical = True))

-RECORD 0------------------------------
 summary        | count                
 step           | 6362620              
 type           | 6362620              
 amount         | 6362620              
 nameOrig       | 6362620              
 oldbalanceOrg  | 6362620              
 newbalanceOrig | 6362620              
 nameDest       | 6362620              
 oldbalanceDest | 6362620              
 newbalanceDest | 6362620              
 isFraud        | 6362620              
 isFlaggedFraud | 6362620              
-RECORD 1------------------------------
 summary        | mean                 
 step           | 243.39724563151657   
 type           | NULL                 
 amount         | 179861.90354913048   
 nameOrig       | NULL                 
 oldbalanceOrg  | 833883.1040744851    
 newbalanceOrig | 855113.6685785787    
 nameDest       | NULL                 
 oldbalanceDest | 1100701.6665196505   
 newbalanceDest | 1224996.3982019336   
 isFraud        | 0.001290820448180152 


In [11]:
print("Memory used by describe_stat Varibale for pyspark (In bytes) : ")
print(sys.getsizeof(describe_stat))

Memory used by describe_stat Varibale for pyspark (In bytes) : 
56


<img src="lazy_eval.jpg" width="300" height="200">

### *It's Lazy Evaluation !!!!*

In [12]:
# Stopping spark session
spark.stop()

In [13]:
# deleting unused varibles
del pandas_df, describe_stat, pyspark_df

In [14]:
# Stored Variables in the memory
%who

SparkSession	 file_path	 pandas_memory	 pandas_time	 pd	 ps	 pyspark_memory	 pyspark_time	 read_csv_with_pandas	 
read_csv_with_pyspark	 spark	 sys	 time	 


#### *Practical - Functions, Methods, Implementations!!*

##### *Let's Exaplore!!*
<img src="gojo-satoru-explore.gif" width="300" height="200">

In [15]:
spark = SparkSession.builder \
        .appName("DataAnalysisWithSpark") \
        .getOrCreate()

<img src="information_gif.gif" width="300" height="200">

In [16]:
spark

In [17]:
data = spark.read.csv(file_path)
data.show(n = 5)

+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
| _c0|     _c1|    _c2|        _c3|          _c4|           _c5|        _c6|           _c7|           _c8|    _c9|          _c10|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
|   1| PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|  181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|  181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0

In [18]:
data = spark.read.csv(file_path, header = True,inferSchema = True)
data.show(n = 5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

<img src="data_everywhere.gif" width="300" height="200">

In [19]:
# another way to read the files
# data = spark.read.option('header','true').csv(file_path)

#### Here's what inferSchema does:

*Automatic Schema Inference: PySpark examines the data in the input source and attempts to determine the data types of each column based on the actual values. For example, if a column contains only numerical values, PySpark infers it as a numeric data type (e.g., integer or float).
Efficient Data Typing: By inferring the schema, PySpark ensures that the DataFrame is created with the most appropriate data types for each column. This helps in optimizing memory usage and improving query performance.
Convenience: Setting inferSchema to True simplifies the data loading process, as you don't need to manually specify the schema for the DataFrame. PySpark handles the schema inference automatically, saving you time and efforts*

In [20]:
# type of the data variable
type(data)

pyspark.sql.dataframe.DataFrame

In [21]:
# to get data glimpses
data.show(n=2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



In [22]:
data.head(n=2)
# similarly tail will also work, showing bottom n rows of pyspark sql dataframe

[Row(step=1, type='PAYMENT', amount=9839.64, nameOrig='C1231006815', oldbalanceOrg=170136.0, newbalanceOrig=160296.36, nameDest='M1979787155', oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=0, isFlaggedFraud=0),
 Row(step=1, type='PAYMENT', amount=1864.28, nameOrig='C1666544295', oldbalanceOrg=21249.0, newbalanceOrig=19384.72, nameDest='M2044282225', oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=0, isFlaggedFraud=0)]

In [23]:
# printing data columns
data.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

#### *Talk to me Data 😤!!!!*

<img src="Talk_to_me.gif" width="400" height="300">

In [24]:
# shape of the the Data
print('Shape of the Given Data is : ')
print(f'Number if Rows : {data.count()}')
print(f'Number if columns : {len(data.columns)}')

Shape of the Given Data is : 
Number if Rows : 6362620
Number if columns : 11


In [25]:
data.printSchema() # much like data.info in pandas

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [26]:
# show data types same as pandas
data.dtypes

[('step', 'int'),
 ('type', 'string'),
 ('amount', 'double'),
 ('nameOrig', 'string'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('nameDest', 'string'),
 ('oldbalanceDest', 'double'),
 ('newbalanceDest', 'double'),
 ('isFraud', 'int'),
 ('isFlaggedFraud', 'int')]

In [27]:
# describe stats
data.describe().show()

+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|     oldbalanceOrg|   newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|  count|           6362620| 6362620|           6362620|    6362620|           6362620|          6362620|    6362620|           6362620|           6362620|             6362620|             6362620|
|   mean|243.39724563151657|    NULL|179861.90354913048|       NULL| 833883.1040744851|855113.6685785787|       NULL|1100701.6665196505|1224996.3982019336|0.001290820448180152| 2.51468734577894E-6|
| stddev| 

In [28]:
# describe stats
# ps is pyspark.pandas
ps.DataFrame(data).describe(percentiles = [0.5,0.9])


Unnamed: 0,step,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
count,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0
mean,243.3972,179861.9,833883.1,855113.7,1100702.0,1224996.0,0.00129082,2.514687e-06
std,142.332,603858.2,2888243.0,2924049.0,3399180.0,3674129.0,0.0359048,0.001585775
min,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,239.0,74864.38,14211.23,0.0,132623.7,214605.8,0.0,0.0
90%,399.0,365352.4,1821945.0,1968910.0,2913495.0,3194287.0,0.0,0.0
max,743.0,92445520.0,59585040.0,49585040.0,356015900.0,356179300.0,1.0,1.0


In [29]:
# summary stats same as describe
data.summary().show()

+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|     oldbalanceOrg|   newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|  count|           6362620| 6362620|           6362620|    6362620|           6362620|          6362620|    6362620|           6362620|           6362620|             6362620|             6362620|
|   mean|243.39724563151657|    NULL|179861.90354913048|       NULL| 833883.1040744851|855113.6685785787|       NULL|1100701.6665196505|1224996.3982019336|0.001290820448180152| 2.51468734577894E-6|
| stddev| 

In [30]:
# summary stats with addition of few more stats
data.summary("count","mean","stddev","min","25%","50%","90%","99%","max").show()

+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|     oldbalanceOrg|   newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|  count|           6362620| 6362620|           6362620|    6362620|           6362620|          6362620|    6362620|           6362620|           6362620|             6362620|             6362620|
|   mean|243.39724563151657|    NULL|179861.90354913048|       NULL| 833883.1040744851|855113.6685785787|       NULL|1100701.6665196505|1224996.3982019336|0.001290820448180152| 2.51468734577894E-6|
| stddev| 

In [31]:
# select single column
amount = data.select('amount')
amount.show(n=2)

+-------+
| amount|
+-------+
|9839.64|
|1864.28|
+-------+
only showing top 2 rows



In [32]:
# select multiple columns
data.select(['amount','oldbalanceDest']).show(2)

+-------+--------------+
| amount|oldbalanceDest|
+-------+--------------+
|9839.64|           0.0|
|1864.28|           0.0|
+-------+--------------+
only showing top 2 rows



In [33]:
from pyspark.sql.functions import mean, stddev

In [34]:
# adding a new column
data = data.withColumn('Norm_Amount',\
                (data['amount'] - data.agg(mean(data['amount'])).collect()[0][0])/data.agg(stddev(data['amount'])).collect()[0][0])

In [35]:
data.agg(mean(data['amount'])).collect()

[Row(avg(amount)=179861.90354913048)]

In [36]:
data.show(n = 2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-------------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|        Norm_Amount|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-------------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|-0.2815599004707184|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|-0.2947672388565485|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-------------------+
only showing top 2 rows



In [37]:
# some other useful pyspark sql functions
from pyspark.sql.functions import mean, stddev, variance, corr, skewness, kurtosis, min, max, sum, count, col

In [38]:
# To drop a column
data = data.drop('Norm_Amount')
data.show(n = 2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



#### *Let's Deal Missing*

<img src="lets_deal_with_missing.gif" width="300" height="300">

In [39]:
# Missing Analysis
# col : Returns a Column based on the given column name.
missing_values_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
missing_values_counts.show()


+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



<img src="where_is_missing.jpg" width="400" height="300">

In [40]:
data.na.drop(how = 'any') # It will drop all rows containing atleast one null in that row (default how = 'any')
data.na.drop(how = 'any', thresh = 3) # It will drop all rows containing atleast 4 null in that row
data.na.drop(how = 'all') # It will drop all rows containing all null in that row
data.na.drop(how = 'any', subset = ['amount']) # it will drop row if the amount contains the null
data.na.fill('value to be fill with', subset = ['amount']) # fill takes two argument 1) value 2) subset
# please notice, haven't assigned to any variable .. will not be stored in any variable

DataFrame[step: int, type: string, amount: double, nameOrig: string, oldbalanceOrg: double, newbalanceOrig: double, nameDest: string, oldbalanceDest: double, newbalanceDest: double, isFraud: int, isFlaggedFraud: int]

In [41]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['amount', 'oldbalanceOrg'],
    outputCols = [f"{col}_imputed" for col in ['amount', 'oldbalanceOrg']]
).setStrategy("mean")

imputer.fit(data).transform(data).show(n = 2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|amount_imputed|oldbalanceOrg_imputed|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|       9839.64|             170136.0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|       1864.28|              21249.0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------------------+
only

#### Data Filtering 

<img src="filter.webp" width="400" height="400">

In [42]:
data.filter("amount < 1000").show(n = 4)

+----+--------+------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|TRANSFER| 181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT| 181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|671.64|C2033524545|      15123.0|      14451.36| M473053293|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 38.66| C343345308|      16174.0|      16135.34|M1714688478|           0.0|           0.0|      0|             0|
+----+--------+------+-----------+-------------+--------------+-----------+--------------+-------

In [43]:
data.filter(data['amount'] < 1000).show(n = 4)

+----+--------+------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|TRANSFER| 181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT| 181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|671.64|C2033524545|      15123.0|      14451.36| M473053293|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 38.66| C343345308|      16174.0|      16135.34|M1714688478|           0.0|           0.0|      0|             0|
+----+--------+------+-----------+-------------+--------------+-----------+--------------+-------

In [44]:
# using SQL query
# Register the DataFrame as a temporary view
data.createOrReplaceTempView("Fraud_Data")
# Run SQL query to filter data
spark.sql("SELECT * FROM Fraud_Data WHERE amount < 1000").show(n = 2)
# Drop the temporary view
spark.catalog.dropTempView("Fraud_Data")

+----+--------+------+-----------+-------------+--------------+----------+--------------+--------------+-------+--------------+
|step|    type|amount|   nameOrig|oldbalanceOrg|newbalanceOrig|  nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+------+-----------+-------------+--------------+----------+--------------+--------------+-------+--------------+
|   1|TRANSFER| 181.0|C1305486145|        181.0|           0.0|C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT| 181.0| C840083671|        181.0|           0.0| C38997010|       21182.0|           0.0|      1|             0|
+----+--------+------+-----------+-------------+--------------+----------+--------------+--------------+-------+--------------+
only showing top 2 rows



True

In [45]:
# use of & , || operator
data.filter((data["amount"] < 10000) & (data["isFlaggedFraud"] == 0)).show(n = 4)

+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|  181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|  181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------

In [46]:
# using SQL query
# Register the DataFrame as a temporary view
data.createOrReplaceTempView("Fraud_Data")
# Run SQL query to filter data
spark.sql("SELECT * FROM Fraud_Data WHERE amount < 10000 and isFlaggedFraud = 0").show(n = 2)
# Drop the temporary view
spark.catalog.dropTempView("Fraud_Data")

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



True

### Pyspark Groupby and Aggregations

In [47]:
# much similar to pandas
# groupby type of transaction for total aggregate amount  
data.groupBy('type').sum('amount').show()
# groupby type of transaction for max amount  
data.groupBy('type').max('amount').show()
# can use min, max, count, mean ....

+--------+--------------------+
|    type|         sum(amount)|
+--------+--------------------+
|TRANSFER|4.852919872631695...|
| CASH_IN|2.363673919124604...|
|CASH_OUT|3.944129952244902...|
| PAYMENT|2.809337113837001...|
|   DEBIT|2.2719922127999982E8|
+--------+--------------------+

+--------+-------------+
|    type|  max(amount)|
+--------+-------------+
|TRANSFER|9.244551664E7|
| CASH_IN|    1915267.9|
|CASH_OUT|        1.0E7|
| PAYMENT|    238637.98|
|   DEBIT|    569077.51|
+--------+-------------+



#### Renaming Column Names

In [48]:
print('Aggregating two columns ...')
agg = data.groupBy('type').agg({'amount':'sum', 'oldbalanceOrg':'max'})
agg.show(n = 2)
print('Renaming Column Name...')
agg = agg.withColumnRenamed("type", "Type")
agg.show(n = 2)


Aggregating two columns ...
+--------+--------------------+------------------+
|    type|         sum(amount)|max(oldbalanceOrg)|
+--------+--------------------+------------------+
|TRANSFER|4.852919872631695...|     5.958504037E7|
| CASH_IN|2.363673919124604...|     3.893942403E7|
+--------+--------------------+------------------+
only showing top 2 rows

Renaming Column Name...
+--------+--------------------+------------------+
|    Type|         sum(amount)|max(oldbalanceOrg)|
+--------+--------------------+------------------+
|TRANSFER|4.852919872631695...|     5.958504037E7|
| CASH_IN|2.363673919124604...|     3.893942403E7|
+--------+--------------------+------------------+
only showing top 2 rows



In [49]:
# Rename multiple columns using select() with aliasing
print('Renaming multiple columns same time...')
agg = agg.select(col("type").alias("Type"), col("sum(amount)").alias("Agg_Sum_Amount"), col("max(oldbalanceOrg)").alias("Maximum_Old_Balance_ORG"))
agg.show(n = 2)
# or can simply do
print('Alternate Way..')
agg = (agg.withColumnRenamed("type", "Type").withColumnRenamed("Agg_Sum_Amount", "SUM_AMT").withColumnRenamed("Maximum_Old_Balance_ORG", "MAX_OLd_BAL"))
agg.show(n = 2)

Renaming multiple columns same time...
+--------+--------------------+-----------------------+
|    Type|      Agg_Sum_Amount|Maximum_Old_Balance_ORG|
+--------+--------------------+-----------------------+
|TRANSFER|4.852919872631695...|          5.958504037E7|
| CASH_IN|2.363673919124604...|          3.893942403E7|
+--------+--------------------+-----------------------+
only showing top 2 rows

Alternate Way..
+--------+--------------------+-------------+
|    Type|             SUM_AMT|  MAX_OLd_BAL|
+--------+--------------------+-------------+
|TRANSFER|4.852919872631695...|5.958504037E7|
| CASH_IN|2.363673919124604...|3.893942403E7|
+--------+--------------------+-------------+
only showing top 2 rows



In [50]:
agg.columns

['Type', 'SUM_AMT', 'MAX_OLd_BAL']

#### Model Building

In [51]:
data.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [52]:
# Some Feature Engineering
agg1 = data.groupby('nameOrig').agg({'amount':'avg','oldbalanceOrg':'sum','newbalanceOrig':'stddev',
                                     'oldbalanceDest':'sum','newbalanceDest':'stddev'})
agg1.show(n = 2)
print(f'Shape of the aggregated data : {agg1.count()}')

+-----------+-------------------+----------------------+-----------+------------------+----------------------+
|   nameOrig|sum(oldbalanceDest)|stddev(newbalanceDest)|avg(amount)|sum(oldbalanceOrg)|stddev(newbalanceOrig)|
+-----------+-------------------+----------------------+-----------+------------------+----------------------+
|C1305004711|                0.0|                  NULL|    8586.98|            3763.0|                  NULL|
|C1016856028|                0.0|                  NULL|    1795.67|        6300966.92|                  NULL|
+-----------+-------------------+----------------------+-----------+------------------+----------------------+
only showing top 2 rows

Shape of the aggregated data : 6353307


In [53]:
agg1.filter("stddev(newbalanceDest) > 0").show(2)

+-----------+-------------------+----------------------+-----------+------------------+----------------------+
|   nameOrig|sum(oldbalanceDest)|stddev(newbalanceDest)|avg(amount)|sum(oldbalanceOrg)|stddev(newbalanceOrig)|
+-----------+-------------------+----------------------+-----------+------------------+----------------------+
|C1146819885|          121847.22|    32231.214020824595|   33762.45|           50258.0|    13603.405109280542|
|C1154436417|         7291602.02|     5629485.844161115| 203673.835|           99394.0|    62680.151257532874|
+-----------+-------------------+----------------------+-----------+------------------+----------------------+
only showing top 2 rows



In [54]:
# getting unique values in columns
data.select("isFraud").distinct().show()
data.select("isFlaggedFraud").distinct().show()

+-------+
|isFraud|
+-------+
|      1|
|      0|
+-------+

+--------------+
|isFlaggedFraud|
+--------------+
|             0|
|             1|
+--------------+



In [55]:
# getting value counts in columns
data.groupBy("isFraud").count().show()
data.groupBy("isFlaggedFraud").count().show()

+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   8213|
|      0|6354407|
+-------+-------+

+--------------+-------+
|isFlaggedFraud|  count|
+--------------+-------+
|             0|6362604|
|             1|     16|
+--------------+-------+



In [56]:
target = data.groupBy('nameOrig').max('isFraud')
print(target.count())

6353307


In [57]:
# merge flag to aggregated data
# Perform inner join
merged_df = agg1.join(target, on="nameOrig", how="inner")
print(f'Shape of the merged data : {merged_df.count()},{len(merged_df.columns)}')
merged_df.show(n = 2)

Shape of the merged data : 6353307,7
+-----------+-------------------+----------------------+-----------+------------------+----------------------+------------+
|   nameOrig|sum(oldbalanceDest)|stddev(newbalanceDest)|avg(amount)|sum(oldbalanceOrg)|stddev(newbalanceOrig)|max(isFraud)|
+-----------+-------------------+----------------------+-----------+------------------+----------------------+------------+
|C1000001337|                0.0|                  NULL|    3170.28|           58089.0|                  NULL|           0|
|C1000003615|          935378.26|                  NULL|   49360.77|        1472658.31|                  NULL|           0|
+-----------+-------------------+----------------------+-----------+------------------+----------------------+------------+
only showing top 2 rows



In [58]:
# fill null with 0 
merged_df = merged_df.na.fill(0)
merged_df.show(n = 2)

+-----------+-------------------+----------------------+-----------+------------------+----------------------+------------+
|   nameOrig|sum(oldbalanceDest)|stddev(newbalanceDest)|avg(amount)|sum(oldbalanceOrg)|stddev(newbalanceOrig)|max(isFraud)|
+-----------+-------------------+----------------------+-----------+------------------+----------------------+------------+
|C1000001337|                0.0|                   0.0|    3170.28|           58089.0|                   0.0|           0|
|C1000003615|          935378.26|                   0.0|   49360.77|        1472658.31|                   0.0|           0|
+-----------+-------------------+----------------------+-----------+------------------+----------------------+------------+
only showing top 2 rows



In [59]:
# Let's Build a Random Forest for the Fraud Classification

In [60]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator


In [61]:
merged_df = merged_df.drop('nameOrig')

In [62]:
# Split the data into training and test sets
train_data, test_data = merged_df.randomSplit([0.7, 0.3])

In [63]:
# Define the features vector
assembler = VectorAssembler(inputCols = [col for col in merged_df.columns if col != 'max(isFraud)'], outputCol = 'Features')

# Transform the data using the features vector
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

In [64]:
train_data.show(n=5)

+-------------------+----------------------+-----------+------------------+----------------------+------------+--------------------+
|sum(oldbalanceDest)|stddev(newbalanceDest)|avg(amount)|sum(oldbalanceOrg)|stddev(newbalanceOrig)|max(isFraud)|            Features|
+-------------------+----------------------+-----------+------------------+----------------------+------------+--------------------+
|                0.0|                   0.0|       0.02|               0.0|                   0.0|           0|      (5,[2],[0.02])|
|                0.0|                   0.0|       0.02|          33192.97|                   0.0|           0|(5,[2,3],[0.02,33...|
|                0.0|                   0.0|       0.03|          205196.0|                   0.0|           0|(5,[2,3],[0.03,20...|
|                0.0|                   0.0|        0.2|          573881.0|                   0.0|           0|(5,[2,3],[0.2,573...|
|                0.0|                   0.0|       0.23|           87

In [None]:
# Train a Random Forest classifier
rf = RandomForestClassifier(labelCol="max(isFraud)", featuresCol="Features", numTrees=10)
model = rf.fit(train_data)

In [None]:
# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="max(isFraud)", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Print classification report
# print("Classification Report:")
# print(predictions.select("max(isFraud)", "prediction").show())

<img src="accuracy-hits-off.gif" width="300" height="300">