In [None]:
pip install pyspark



# loading required libraries


In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils
import seaborn as sns

In [None]:
spark_context = SparkContext.getOrCreate()
if (spark_context is None):
    spark_context = SparkContext(master = "local[4]", appName = "BDA-PROJECT")
spark = SparkSession(sparkContext = spark_context)

### Loading the Dataset

In [None]:
'''{"username":"santhoshkim","key":"bd0662a5f34e5b37062447e90caf5e6c"}'''

'{"username":"santhoshkim","key":"bd0662a5f34e5b37062447e90caf5e6c"}'

In [None]:
!pip install opendatasets
import opendatasets as o
o.download('https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud/data')

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: santhoshkim
Your Kaggle Key: ··········
Downloading creditcardfraud.zip to ./creditcardfraud


100%|██████████| 66.0M/66.0M [00:04<00:00, 16.5MB/s]





In [None]:
Creditcard = spark.read.csv('/content/creditcardfraud/creditcard.csv', inferSchema = True, header = True)

In [None]:
Creditcard.count()

284807

In [None]:
Creditcard=Creditcard.drop('Time')

In [None]:
Creditcard.show(5)

+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------+-----+
|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|                 V21|                V22|               

In [None]:
Creditcard.columns

['V1',
 'V2',
 'V3',
 'V4',
 'V5',
 'V6',
 'V7',
 'V8',
 'V9',
 'V10',
 'V11',
 'V12',
 'V13',
 'V14',
 'V15',
 'V16',
 'V17',
 'V18',
 'V19',
 'V20',
 'V21',
 'V22',
 'V23',
 'V24',
 'V25',
 'V26',
 'V27',
 'V28',
 'Amount',
 'Class']

### Finding Missing null values in each column

In [None]:
def missing():

    for columns in Creditcard.columns:
        print(columns + ' has number of NULLs : ' + str(Creditcard[Creditcard[columns] == 'NA'].count()))

In [None]:
missing()

V1 has number of NULLs : 0
V2 has number of NULLs : 0
V3 has number of NULLs : 0
V4 has number of NULLs : 0
V5 has number of NULLs : 0
V6 has number of NULLs : 0
V7 has number of NULLs : 0
V8 has number of NULLs : 0
V9 has number of NULLs : 0
V10 has number of NULLs : 0
V11 has number of NULLs : 0
V12 has number of NULLs : 0
V13 has number of NULLs : 0
V14 has number of NULLs : 0
V15 has number of NULLs : 0
V16 has number of NULLs : 0
V17 has number of NULLs : 0
V18 has number of NULLs : 0
V19 has number of NULLs : 0
V20 has number of NULLs : 0
V21 has number of NULLs : 0
V22 has number of NULLs : 0
V23 has number of NULLs : 0
V24 has number of NULLs : 0
V25 has number of NULLs : 0
V26 has number of NULLs : 0
V27 has number of NULLs : 0
V28 has number of NULLs : 0
Amount has number of NULLs : 0
Class has number of NULLs : 0


In [None]:
Creditcard=Creditcard.withColumnRenamed('Class','LABEL')

In [None]:
Creditcard.columns

['V1',
 'V2',
 'V3',
 'V4',
 'V5',
 'V6',
 'V7',
 'V8',
 'V9',
 'V10',
 'V11',
 'V12',
 'V13',
 'V14',
 'V15',
 'V16',
 'V17',
 'V18',
 'V19',
 'V20',
 'V21',
 'V22',
 'V23',
 'V24',
 'V25',
 'V26',
 'V27',
 'V28',
 'Amount',
 'LABEL']

### Data Transformation

In [None]:
Features_List = ['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7','V8', 'V9','V10','V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17','V18', 'V19']

In [None]:
for columns in Features_List:
     Creditcard= Creditcard.withColumn(columns, Creditcard[columns].cast(DoubleType()))

In [None]:
Creditcard.printSchema()

root
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nullable = true)
 |-- Amount: double (nul

## Create the Feature Vector and Divide the Dataset

In [None]:
assembler = VectorAssembler(inputCols = Features_List,outputCol = 'feature_vector')
dataframe = assembler.transform(Creditcard)
dataframe.show()

+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------+-----+--------------------+
|                V1|                 V2|                V3|                 V4|                 V5|                 V6|                  V7|                 V8|                V9|                V10|               V11|               V12|                V13|                V14|                V15|                V16|                 V17|                V18|                V19|                V20|       

In [None]:
dataframe.show(5)

+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------+-----+--------------------+
|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|                 V21|              

In [None]:
df_train,df_test= dataframe.randomSplit([0.8, 0.2], seed = 1)

In [None]:
df_test.count()

28645

In [None]:
df_train.count()

256162

# Apply Machine Learning Classification Algorithms on the Dataset and Compare their Accuracy.

### Decision Trees

In [None]:
Decision_Tree_Classifier = DecisionTreeClassifier(labelCol = 'LABEL', featuresCol = 'feature_vector')
Decision_Tree_Model = Decision_Tree_Classifier.fit(df_train)

In [None]:
Decision_Tree_Prediction = Decision_Tree_Model.transform(df_test)

In [None]:
Decision_Tree_Prediction.show(5)

+-----------------+-----------------+-----------------+----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+-------+-----+--------------------+--------------+--------------------+----------+
|               V1|               V2|               V3|              V4|                V5|               V6|               V7|                V8|                V9|               V10|              V11|               V12|              V13|              V14|               V15|                V16|               V17|               V18|              V19|              V20|              V21|             

In [None]:
Decision_Tree_Evaluator = MulticlassClassificationEvaluator(labelCol =  'LABEL', predictionCol = 'prediction')

In [None]:
Decision_Tree_Accuracy = Decision_Tree_Evaluator.evaluate(Decision_Tree_Prediction)

In [None]:
print('Decision Tree Accuracy is : ' + str(Decision_Tree_Accuracy * 100))
print('Test Error is : ' + str(1 - Decision_Tree_Accuracy))

Decision Tree Accuracy is : 99.92319776575317
Test Error is : 0.0007680223424681998


### Random Forest

In [None]:
Random_Forest_Classifier = RandomForestClassifier(labelCol = 'LABEL', featuresCol = 'feature_vector',  maxDepth = 5,
    maxBins = 32, numTrees = 500)

Random_Forest_Model = Random_Forest_Classifier.fit(df_train)

In [None]:
Random_Forest_Prediction = Random_Forest_Model.transform(df_test)

In [None]:
Random_Forest_Prediction.show(5)

In [None]:
Random_Forest_Evaluator = MulticlassClassificationEvaluator(labelCol = 'LABEL', predictionCol = 'prediction')

In [None]:
Random_Forest_Accuracy = Random_Forest_Evaluator.evaluate(Random_Forest_Prediction)

In [None]:

print('Random Forest Accuracy is : ' + str(Random_Forest_Accuracy * 100))
print('Test Error is : ' + str(1 - Random_Forest_Accuracy))

### Logistic Regression

In [None]:
Logistic_Regression_Classifier = LogisticRegression(regParam = 0.3, labelCol = "LABEL", featuresCol = 'feature_vector', \
                                       maxIter = 20,  elasticNetParam = 0.8)

In [None]:
Logistic_Regression_Model = Logistic_Regression_Classifier.fit(df_train)

In [None]:
Logistic_Regression_Prediction = Logistic_Regression_Model.transform(df_test)

Logistic_Regression_Prediction.show(5)

In [None]:
Logistic_Regression_Evaluator = MulticlassClassificationEvaluator(labelCol = "LABEL", predictionCol = "prediction", \
                                                     metricName = "accuracy")

In [None]:
Logistic_Regression_Accuracy = Logistic_Regression_Evaluator.evaluate(Logistic_Regression_Prediction)

In [None]:
print('Logistic Regression Accuracy is : ' + str(Logistic_Regression_Accuracy * 100))
print('Test Error is : ' + str(1 - Logistic_Regression_Accuracy))

# Calculate the confusion matrix and find the precision, recall, and F1 score of each classification algorithm.

### Decision Trees Model Performance

In [None]:
Decision_Tree_Prediction_Labels = Decision_Tree_Prediction.select(['prediction', 'LABEL'])
Decision_Tree_KPI = MulticlassMetrics(Decision_Tree_Prediction_Labels.rdd.map(list))

In [None]:
Decision_Tree_precision = (Decision_Tree_confusion_matrix[0][0]) / (Decision_Tree_confusion_matrix[0][0] + Decision_Tree_confusion_matrix[1][0])
print('Decision_Tree Precision = ' + str(Decision_Tree_precision))

Decision_Tree Precision = 1.0


In [None]:
Decision_Tree_recall = (Decision_Tree_confusion_matrix[0][0]) / (Decision_Tree_confusion_matrix[0][0] + Decision_Tree_confusion_matrix[0][1])
print('Decision_Tree Recall = ' + str(Decision_Tree_recall))

Decision_Tree Recall = 1.0


In [None]:
Decision_Tree_f1Score = (Decision_Tree_precision * Decision_Tree_recall) / (Decision_Tree_precision + Decision_Tree_recall) * 2
print('Decision_Tree F1 Score = ' + str(Decision_Tree_f1Score))

Decision_Tree F1 Score = 1.0


### Random Forest Model Performance

In [None]:
Random_Forest_Prediction_Labels = Random_Forest_Prediction.select(['prediction', 'LABEL'])
Random_Forest_KPI = MulticlassMetrics(Random_Forest_Prediction_Labels.rdd.map(list))

In [None]:
Random_Forest_precision = (Random_Forest_confusion_matrix[0][0]) / (Random_Forest_confusion_matrix[0][0] + Random_Forest_confusion_matrix[1][0])
print('Random_Forest Precision = ' + str(Random_Forest_precision))

Random_Forest Precision = 1.0


In [None]:
Random_Forest_recall = (Random_Forest_confusion_matrix[0][0]) / (Random_Forest_confusion_matrix[0][0] + Random_Forest_confusion_matrix[0][1])
print('RF Recall = ' + str(Random_Forest_recall))

RF Recall = 1.0


In [None]:
Random_Forest_f1Score = (Random_Forest_precision * Random_Forest_recall) / (Random_Forest_precision + Random_Forest_recall) * 2
print('RF F1 Score = ' + str(Random_Forest_f1Score))

RF F1 Score = 1.0


### Logistic Regression Model Performance

In [None]:
Logistic_Regression_Prediction_Labels = Logistic_Regression_Prediction.select(['prediction', 'LABEL'])

Logistic_Regression_KPI = MulticlassMetrics(Logistic_Regression_Prediction_Labels.rdd.map(list))

In [None]:
Logistic_Regression_precision = (Logistic_Regression_confusion_matrix[0][0]) / (Logistic_Regression_confusion_matrix[0][0] + Logistic_Regression_confusion_matrix[1][0])
print('LogReg Precision = ' + str(Logistic_Regression_precision))

LogReg Precision = 0.9981829498454001


In [None]:
Logistic_Regression_recall = (Logistic_Regression_confusion_matrix[0][0]) / (Logistic_Regression_confusion_matrix[0][0] + Logistic_Regression_confusion_matrix[0][1])
print('LogReg Recall = ' + str(Logistic_Regression_recall))

LogReg Recall = 1.0


In [None]:
Logistic_Regression_f1Score = (Logistic_Regression_precision * Logistic_Regression_recall) / (Logistic_Regression_precision + Logistic_Regression_recall) * 2
print('LogReg F1 Score = ' + str(Logistic_Regression_f1Score))

LogReg F1 Score = 0.9990906487542893


Using pipeline concept

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

train_data, test_data = Creditcard.randomSplit([0.8, 0.2], seed=42)

stag_dt=[assembler,Decision_Tree_Model]
stag_lr=[assembler,Logistic_Regression_Model]

DT = Pipeline(stages=stag_dt)
LR = Pipeline(stages=stag_lr)

DT=DT.fit(train_data)
LR=LR.fit(train_data)

dt_pred,lr_pred= DT.transform(test_data) , LR.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="LABEL", predictionCol="prediction", metricName="accuracy")
dt_acc = evaluator.evaluate(dt_pred)
lr_acc = evaluator.evaluate(lr_pred)

print("\t\t\tTEST ACCURACY", f"DECISION TREE : {dt_acc:.2f}",f"LOGISTIC REGRESSION : {lr_acc:.2f}",sep='\n')

			TEST ACCURACY
DECISION TREE : 1.00
LOGISTIC REGRESSION : 1.00


In [None]:
Decision_Tree_Classifier = DecisionTreeClassifier(labelCol = 'LABEL', featuresCol = 'feature_vector')

In [None]:
paramGrid = ParamGridBuilder().addGrid(Decision_Tree_Model.maxDepth, [3, 5, 7]).addGrid(Decision_Tree_Model.minInstancesPerNode, [1, 3, 5]).build()

In [None]:
print(train_data)

DataFrame[V1: double, V2: double, V3: double, V4: double, V5: double, V6: double, V7: double, V8: double, V9: double, V10: double, V11: double, V12: double, V13: double, V14: double, V15: double, V16: double, V17: double, V18: double, V19: double, V20: double, V21: double, V22: double, V23: double, V24: double, V25: double, V26: double, V27: double, V28: double, Amount: double, LABEL: int]


In [None]:
train_data1, test_data1 = dataframe.randomSplit([0.8, 0.2], seed=42)

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


label_column_name = "LABEL"
features_column_name = "feature_vector"

rf = RandomForestClassifier(labelCol=label_column_name, featuresCol=features_column_name)


param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

evaluator = MulticlassClassificationEvaluator(labelCol=label_column_name, predictionCol="prediction", metricName="accuracy")
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossval.fit(train_data1)

best_model = cvModel.bestModel

predictions = best_model.transform(test_data1)

accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")

spark.stop()


Test Accuracy: 1.00
