In [0]:
# File location and type

# kaggle dataset --- https://www.kaggle.com/datasets/dhanushnarayananr/credit-card-fraud

file_location = "/FileStore/tables/card_data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.repartition(4)

display(df)

distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,fraud
57.87785658389723,0.3111400080477545,1.9459399775518595,1.0,1.0,0.0,0.0,0.0
10.829942699255543,0.1755915022816658,1.294218810619857,1.0,0.0,0.0,0.0,0.0
5.091079490616996,0.8051525945853258,0.4277145611942758,1.0,0.0,0.0,1.0,0.0
2.2475643282963613,5.60004354707232,0.3626625780570958,1.0,1.0,0.0,1.0,0.0
44.19093600261837,0.5664862680583477,2.2227672978404707,1.0,1.0,0.0,1.0,0.0
5.586407674186407,13.26107326805812,0.0647684653704633,1.0,0.0,0.0,0.0,0.0
3.7240191247148102,0.9568379284821842,0.2784649449081555,1.0,0.0,0.0,1.0,0.0
4.848246572280567,0.3207354272228163,1.2730495235601782,1.0,0.0,1.0,0.0,0.0
0.8766322564943629,2.503608926692144,1.5169993152858177,0.0,0.0,0.0,0.0,0.0
8.83904670372637,2.9705122760243827,2.36168254706846,1.0,0.0,0.0,1.0,0.0


In [0]:
# Create a view or table

temp_table_name = "card_data_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `card_data_csv`

distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,fraud
57.87785658389723,0.3111400080477545,1.9459399775518595,1.0,1.0,0.0,0.0,0.0
10.829942699255543,0.1755915022816658,1.294218810619857,1.0,0.0,0.0,0.0,0.0
5.091079490616996,0.8051525945853258,0.4277145611942758,1.0,0.0,0.0,1.0,0.0
2.2475643282963613,5.60004354707232,0.3626625780570958,1.0,1.0,0.0,1.0,0.0
44.19093600261837,0.5664862680583477,2.2227672978404707,1.0,1.0,0.0,1.0,0.0
5.586407674186407,13.26107326805812,0.0647684653704633,1.0,0.0,0.0,0.0,0.0
3.7240191247148102,0.9568379284821842,0.2784649449081555,1.0,0.0,0.0,1.0,0.0
4.848246572280567,0.3207354272228163,1.2730495235601782,1.0,0.0,1.0,0.0,0.0
0.8766322564943629,2.503608926692144,1.5169993152858177,0.0,0.0,0.0,0.0,0.0
8.83904670372637,2.9705122760243827,2.36168254706846,1.0,0.0,0.0,1.0,0.0


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "card_data_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
#No of rows and columns
print(df.count(),len(df.columns))

1000000 8


In [0]:
#No of null values in each column
from pyspark.sql.functions import col,isnan, when, count

df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]).display()

distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,fraud
0,0,0,0,0,0,0,0


In [0]:
#describing numerical columns
pd = df.toPandas()
pd.describe()

Unnamed: 0,distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,fraud
count,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0,1000000.0
unique,1000000.0,1000000.0,1000000.0,2.0,2.0,2.0,2.0,2.0
top,78.80067845695436,8.838484043974422,0.2915701762061419,1.0,0.0,0.0,1.0,0.0
freq,1.0,1.0,1.0,881536.0,649601.0,899392.0,650552.0,912597.0


In [0]:
#rename fraud column name to label
df = df.withColumnRenamed("fraud","label")

In [0]:
#No of fraudulent and non-fradulent records
df.groupBy("label").count().display()

label,count
1.0,87403
0.0,912597


In [0]:
from pyspark.sql.functions import expr

fraud_df = df.filter(expr("label = 1"))
print("Fraud Count --- " + str(fraud_df.count()))

Fraud Count --- 87403


In [0]:
no_fraud_df = df.filter(expr("label = 0"))
print("Not Fraud Count --- " + str(no_fraud_df.count()))

Not Fraud Count --- 912597


In [0]:
# sampling unbalanced data to balanced data by selecting 60k records from each fraud and not fraud dataframes

#For fraudulent data, sampling 
col_names = df.columns
fraud_sample_list = fraud_df.sample(False,0.8,2321).take(60000)
fraud_sample_df = spark.createDataFrame(fraud_sample_list, col_names)
fraud_sample_df.display()

distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,label
15.694985541059944,175.98918151972342,0.8556228290724207,1.0,0.0,0.0,1.0,1.0
26.711462023719893,1.5520081259491354,4.603600688206188,1.0,1.0,0.0,1.0,1.0
10.664473716016785,1.565769086201661,4.886520843107555,1.0,0.0,0.0,1.0,1.0
2.530145018135443,3.6897810904940105,8.297406866219774,1.0,0.0,0.0,1.0,1.0
21.12611616077098,0.2719873964398006,6.081770719263626,1.0,0.0,0.0,1.0,1.0
9.59840134057398,0.4545563461632137,6.084828700634962,1.0,0.0,0.0,1.0,1.0
22.545587575167414,0.3539392546618495,4.095442206076704,1.0,0.0,0.0,1.0,1.0
335.1893199927121,1.1141676662234867,0.0982428061882022,1.0,0.0,0.0,1.0,1.0
1.1158806875176952,0.1702369279899755,4.127313378541158,0.0,0.0,0.0,0.0,1.0
5.7938900041198025,2.758511012799268,5.085685575610163,1.0,0.0,0.0,1.0,1.0


In [0]:
#For fraudulent data, sampling 
col_names = df.columns
not_fraud_sample_list = no_fraud_df.sample(False,0.8,2321).take(60000)
not_fraud_sample_df = spark.createDataFrame(not_fraud_sample_list, col_names)
not_fraud_sample_df.display()

distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,label
5.091079490616996,0.8051525945853258,0.4277145611942758,1.0,0.0,0.0,1.0,0.0
2.2475643282963613,5.60004354707232,0.3626625780570958,1.0,1.0,0.0,1.0,0.0
44.19093600261837,0.5664862680583477,2.2227672978404707,1.0,1.0,0.0,1.0,0.0
5.586407674186407,13.26107326805812,0.0647684653704633,1.0,0.0,0.0,0.0,0.0
3.7240191247148102,0.9568379284821842,0.2784649449081555,1.0,0.0,0.0,1.0,0.0
0.8766322564943629,2.503608926692144,1.5169993152858177,0.0,0.0,0.0,0.0,0.0
8.83904670372637,2.9705122760243827,2.36168254706846,1.0,0.0,0.0,1.0,0.0
14.26352973506908,0.1587580860346303,1.1361019405394772,1.0,1.0,0.0,1.0,0.0
765.2825592612469,0.3715619621963146,0.5512447476281009,1.0,1.0,0.0,0.0,0.0
13.95597236704443,0.2715215283485692,2.798901124938246,1.0,0.0,0.0,1.0,0.0


In [0]:
# Sampled no of each type of records
print(fraud_sample_df.count(), not_fraud_sample_df.count())

60000 60000


In [0]:
# union of all samples
dataframe = fraud_sample_df.union(not_fraud_sample_df)
print(dataframe.count())

120000


In [0]:
#casting all columns to float
from pyspark.sql.functions import col

cols = dataframe.columns
for col_name in cols:
    dataframe = dataframe.withColumn(col_name,col(col_name).cast('float'))

dataframe.printSchema()

root
 |-- distance_from_home: float (nullable = true)
 |-- distance_from_last_transaction: float (nullable = true)
 |-- ratio_to_median_purchase_price: float (nullable = true)
 |-- repeat_retailer: float (nullable = true)
 |-- used_chip: float (nullable = true)
 |-- used_pin_number: float (nullable = true)
 |-- online_order: float (nullable = true)
 |-- label: float (nullable = true)



In [0]:
# corelation between all columns
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *


assembler = VectorAssembler(inputCols=dataframe.columns, outputCol="features",handleInvalid='keep')
assembled_df = assembler.transform(dataframe).select("features")

correlation = Correlation.corr(assembled_df,"features","pearson").collect()

rows = correlation[0][0].toArray().tolist()
corr_df = spark.createDataFrame(rows,dataframe.columns)

corr_df.display()
"""
    Since all values are less than 0.8 and greater than -0.8, all columns are dependent on each other
"""



distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,label
1.0,-0.0142548477990866,-0.074968878760346,0.149379318464247,-0.0992699358869285,-0.0313685563751383,0.0536436658270424,0.2034237239607886
-0.0142548477990866,1.0,-0.0368708101575127,-0.0174528954877886,-0.0355128752623047,-0.0157853479168518,0.0195954872378879,0.1122653720678468
-0.074968878760346,-0.0368708101575127,1.0,-0.0411269341399975,0.0345604381317569,-0.0916637057877601,0.1423433187678534,0.4878107893193141
0.149379318464247,-0.0174528954877886,-0.0411269341399975,1.0,-0.004931687386543,0.0004097134212601359,0.0734955141055144,-0.0041920917606198
-0.0992699358869285,-0.0355128752623047,0.0345604381317569,-0.004931687386543,1.0,0.0188952454790714,-0.0254200170212631,-0.1122357567288977
-0.0313685563751383,-0.0157853479168518,-0.0916637057877601,0.0004097134212601359,0.0188952454790714,1.0,-0.0748704097836509,-0.2333913672426755
0.0536436658270424,0.0195954872378879,0.1423433187678534,0.0734955141055144,-0.0254200170212631,-0.0748704097836509,1.0,0.3932372849135667
0.2034237239607886,0.1122653720678468,0.4878107893193141,-0.0041920917606198,-0.1122357567288977,-0.2333913672426755,0.3932372849135667,1.0


Out[16]: '\n    Since all values are less than 0.8 and greater than -0.8, all columns are dependent on each other\n'

In [0]:
train_df, test_df = dataframe.randomSplit(weights=[0.7,0.3], seed=2321)

In [0]:
print(train_df.count(), test_df.count())

84054 35946


In [0]:
from pyspark.ml.feature import VectorAssembler

cols_without_label = train_df.columns
cols_without_label.remove("label")
assembled_train_df = VectorAssembler(inputCols=cols_without_label, outputCol="cols_vector",handleInvalid='keep').transform(train_df)
assembled_test_df = VectorAssembler(inputCols=cols_without_label, outputCol="cols_vector",handleInvalid='keep').transform(test_df)

In [0]:
from pyspark.ml.feature import MinMaxScaler

scaled_train_df = MinMaxScaler(inputCol="cols_vector",outputCol="cols_scaled").fit(assembled_train_df).transform(assembled_train_df)
scaled_test_df = MinMaxScaler(inputCol="cols_vector",outputCol="cols_scaled").fit(assembled_test_df).transform(assembled_test_df)

In [0]:
scaled_train_df.display()

distance_from_home,distance_from_last_transaction,ratio_to_median_purchase_price,repeat_retailer,used_chip,used_pin_number,online_order,label,cols_vector,cols_scaled
0.08547844,0.060152363,6.157767,0.0,1.0,0.0,1.0,1.0,"Map(vectorType -> dense, length -> 7, values -> List(0.08547843992710114, 0.060152363032102585, 6.157766819000244, 0.0, 1.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 7, values -> List(6.033889858216556E-6, 2.818296235331626E-5, 0.049392701388342855, 0.0, 1.0, 0.0, 1.0))"
0.093916506,0.008813569,5.05939,0.0,0.0,0.0,1.0,1.0,"Map(vectorType -> dense, length -> 7, values -> List(0.09391650557518005, 0.008813569322228432, 5.059390068054199, 0.0, 0.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 7, values -> List(6.827485446744144E-6, 3.965399330667166E-6, 0.04056364464066715, 0.0, 0.0, 0.0, 1.0))"
0.10451535,6.9127526,6.984204,0.0,0.0,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 2), values -> List(0.10451535135507584, 6.912752628326416, 6.984203815460205))","Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 2), values -> List(7.824301257998763E-6, 0.0032606951972847073, 0.056035831019263176))"
0.12000077,0.02414789,12.3019,0.0,0.0,0.0,1.0,1.0,"Map(vectorType -> dense, length -> 7, values -> List(0.12000077217817307, 0.024147890508174896, 12.301899909973145, 0.0, 0.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 7, values -> List(9.280696854364034E-6, 1.1198913460913994E-5, 0.09878094606736432, 0.0, 0.0, 0.0, 1.0))"
0.121058896,1.4599624,4.263229,0.0,0.0,0.0,1.0,1.0,"Map(vectorType -> dense, length -> 7, values -> List(0.12105889618396759, 1.4599623680114746, 4.263228893280029, 0.0, 0.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 7, values -> List(9.380212854610472E-6, 6.885020810084668E-4, 0.034163880455634246, 0.0, 0.0, 0.0, 1.0))"
0.1287654,0.074746646,4.185111,0.0,0.0,0.0,1.0,1.0,"Map(vectorType -> dense, length -> 7, values -> List(0.12876540422439575, 0.07474664598703384, 4.185111045837402, 0.0, 0.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 7, values -> List(1.0105005839034403E-5, 3.506738525637024E-5, 0.03353594754692767, 0.0, 0.0, 0.0, 1.0))"
0.14182104,0.31494662,5.4253464,0.0,0.0,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 2), values -> List(0.14182104170322418, 0.31494662165641785, 5.425346374511719))","Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 2), values -> List(1.1332881603543597E-5, 1.4837464529223027E-4, 0.043505302859580014))"
0.14406335,20.260767,6.465733,0.0,0.0,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 2), values -> List(0.14406335353851318, 20.260766983032227, 6.465733051300049))","Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 2), values -> List(1.1543769839639945E-5, 0.009557227614643391, 0.05186821931878057))"
0.15148231,0.5478149,5.1589985,0.0,0.0,0.0,1.0,1.0,"Map(vectorType -> dense, length -> 7, values -> List(0.15148231387138367, 0.5478149056434631, 5.158998489379883, 0.0, 0.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 7, values -> List(1.2241519115028387E-5, 2.582233958355792E-4, 0.04136432473852371, 0.0, 0.0, 0.0, 1.0))"
0.15858257,4.5996222,4.008739,0.0,0.0,0.0,1.0,1.0,"Map(vectorType -> dense, length -> 7, values -> List(0.15858256816864014, 4.5996222496032715, 4.008738994598389, 0.0, 0.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 7, values -> List(1.2909294257191406E-5, 0.002169544107079841, 0.03211822011619684, 0.0, 0.0, 0.0, 1.0))"


In [0]:
from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(featuresCol="cols_scaled", labelCol="label",maxIter=10000)
model = lsvc.fit(scaled_train_df)

In [0]:
predictions = model.transform(scaled_test_df)

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix

evaluator=MulticlassClassificationEvaluator(metricName="accuracy")
acc = evaluator.evaluate(predictions)
 
print("Prediction Accuracy: ", acc)

y_pred=predictions.select("prediction").collect()
y_orig=predictions.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm) 


Prediction Accuracy:  0.8928114393812941
Confusion Matrix:
[[14260  3703]
 [  150 17833]]
