In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark=SparkSession.builder.appName('big-dta-2nd').getOrCreate()

In [None]:
spark

In [None]:
df=spark.read.csv('/content/credit_card_fraud_dataset.csv',inferSchema=True,header=True)

In [None]:
df.show()

+-------------+--------------------+-------+----------+---------------+------------+-------+
|TransactionID|     TransactionDate| Amount|MerchantID|TransactionType|    Location|IsFraud|
+-------------+--------------------+-------+----------+---------------+------------+-------+
|            1|2024-04-03 14:15:...|4189.27|       688|         refund| San Antonio|      0|
|            2|2024-03-19 13:20:...|2659.71|       109|         refund|      Dallas|      0|
|            3|2024-01-08 10:08:...|  784.0|       394|       purchase|    New York|      0|
|            4|2024-04-13 23:50:...| 3514.4|       944|       purchase|Philadelphia|      0|
|            5|2024-07-12 18:51:...| 369.07|       475|       purchase|     Phoenix|      0|
|            6|2024-08-30 01:52:...|3086.22|       302|       purchase|    New York|      0|
|            7|2024-01-02 11:31:...| 1466.9|       902|         refund|Philadelphia|      0|
|            8|2024-05-12 12:25:...| 917.21|       266|       purchase

In [None]:
df.printSchema()

root
 |-- TransactionID: integer (nullable = true)
 |-- TransactionDate: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- MerchantID: integer (nullable = true)
 |-- TransactionType: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- IsFraud: integer (nullable = true)



In [None]:

df.groupby('Location').count().show()

+------------+-----+
|    Location|count|
+------------+-----+
|     Phoenix| 9960|
|      Dallas|10076|
| San Antonio|10062|
|Philadelphia| 9873|
| Los Angeles| 9936|
|   San Diego|10111|
|     Chicago|10193|
|    San Jose| 9805|
|     Houston| 9991|
|    New York| 9993|
+------------+-----+



In [None]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second


df_with_integers = df.withColumn("year", year("TransactionDate")) \
                     .withColumn("month", month("TransactionDate")) \
                     .withColumn("day", dayofmonth("TransactionDate")) \
                     .withColumn("hour", hour("TransactionDate")) \
                     .withColumn("minute", minute("TransactionDate")) \
                     .withColumn("second", second("TransactionDate"))

df_with_integers.show()

+-------------+--------------------+-------+----------+---------------+------------+-------+----+-----+---+----+------+------+
|TransactionID|     TransactionDate| Amount|MerchantID|TransactionType|    Location|IsFraud|year|month|day|hour|minute|second|
+-------------+--------------------+-------+----------+---------------+------------+-------+----+-----+---+----+------+------+
|            1|2024-04-03 14:15:...|4189.27|       688|         refund| San Antonio|      0|2024|    4|  3|  14|    15|    35|
|            2|2024-03-19 13:20:...|2659.71|       109|         refund|      Dallas|      0|2024|    3| 19|  13|    20|    35|
|            3|2024-01-08 10:08:...|  784.0|       394|       purchase|    New York|      0|2024|    1|  8|  10|     8|    35|
|            4|2024-04-13 23:50:...| 3514.4|       944|       purchase|Philadelphia|      0|2024|    4| 13|  23|    50|    35|
|            5|2024-07-12 18:51:...| 369.07|       475|       purchase|     Phoenix|      0|2024|    7| 12|  18

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [
    StringIndexer(inputCol="Location", outputCol="Location_index"),
    StringIndexer(inputCol="TransactionType", outputCol="TransactionType_index")
]

pipeline = Pipeline(stages=indexers)


df_indexed = pipeline.fit(df_with_integers).transform(df_with_integers)

df_indexed.show()

+-------------+--------------------+-------+----------+---------------+------------+-------+----+-----+---+----+------+------+--------------+---------------------+
|TransactionID|     TransactionDate| Amount|MerchantID|TransactionType|    Location|IsFraud|year|month|day|hour|minute|second|Location_index|TransactionType_index|
+-------------+--------------------+-------+----------+---------------+------------+-------+----+-----+---+----+------+------+--------------+---------------------+
|            1|2024-04-03 14:15:...|4189.27|       688|         refund| San Antonio|      0|2024|    4|  3|  14|    15|    35|           3.0|                  0.0|
|            2|2024-03-19 13:20:...|2659.71|       109|         refund|      Dallas|      0|2024|    3| 19|  13|    20|    35|           2.0|                  0.0|
|            3|2024-01-08 10:08:...|  784.0|       394|       purchase|    New York|      0|2024|    1|  8|  10|     8|    35|           4.0|                  1.0|
|            4|2

In [None]:
df_new=df_indexed.drop('Location','TransactionType')

In [None]:
df_new

DataFrame[TransactionID: int, TransactionDate: timestamp, Amount: double, MerchantID: int, IsFraud: int, year: int, month: int, day: int, hour: int, minute: int, second: int, Location_index: double, TransactionType_index: double]

In [None]:
from pyspark.sql.functions import *

In [None]:
df_new.select([count(when(col(c).isNull(),c)).alias(c) for c in df_new.columns]).show()

+-------------+---------------+------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+
|TransactionID|TransactionDate|Amount|MerchantID|IsFraud|year|month|day|hour|minute|second|Location_index|TransactionType_index|
+-------------+---------------+------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+
|            0|              0|     0|         0|      0|   0|    0|  0|   0|     0|     0|             0|                    0|
+-------------+---------------+------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+



In [None]:
df1=df_new.drop_duplicates()

In [None]:
df1.show()

+-------------+--------------------+-------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+
|TransactionID|     TransactionDate| Amount|MerchantID|IsFraud|year|month|day|hour|minute|second|Location_index|TransactionType_index|
+-------------+--------------------+-------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+
|          197|2023-11-02 11:45:...|2805.81|       345|      0|2023|   11|  2|  11|    45|    35|           6.0|                  0.0|
|          918|2023-11-03 13:12:...|2504.79|       527|      0|2023|   11|  3|  13|    12|    35|           8.0|                  0.0|
|          944|2024-06-06 09:49:...|2332.68|         6|      0|2024|    6|  6|   9|    49|    35|           4.0|                  1.0|
|          990|2024-07-30 08:29:...|1946.25|       480|      0|2024|    7| 30|   8|    29|    35|           5.0|                  1.0|
|         1088|2024-09-01 07:26:...|3390.95|       831|

In [None]:
df1.columns

['TransactionID',
 'TransactionDate',
 'Amount',
 'MerchantID',
 'IsFraud',
 'year',
 'month',
 'day',
 'hour',
 'minute',
 'second',
 'Location_index',
 'TransactionType_index']

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler=VectorAssembler(inputCols=['Amount','TransactionType_index','year','month','day','hour','minute','second','Location_index','MerchantID','TransactionID'],outputCol='features')
output=assembler.transform(df1)

In [None]:
output.show()

+-------------+--------------------+-------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+--------------------+
|TransactionID|     TransactionDate| Amount|MerchantID|IsFraud|year|month|day|hour|minute|second|Location_index|TransactionType_index|            features|
+-------------+--------------------+-------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+--------------------+
|          197|2023-11-02 11:45:...|2805.81|       345|      0|2023|   11|  2|  11|    45|    35|           6.0|                  0.0|[2805.81,0.0,2023...|
|          918|2023-11-03 13:12:...|2504.79|       527|      0|2023|   11|  3|  13|    12|    35|           8.0|                  0.0|[2504.79,0.0,2023...|
|          944|2024-06-06 09:49:...|2332.68|         6|      0|2024|    6|  6|   9|    49|    35|           4.0|                  1.0|[2332.68,1.0,2024...|
|          990|2024-07-30 08:29:...|1946.25|       480|      0|2

In [None]:
train_data ,test_data=output.randomSplit([0.8,0.2])

In [None]:
from pyspark.ml.classification import LogisticRegression ,DecisionTreeClassifier
sp=LogisticRegression(featuresCol='features',labelCol='IsFraud')

In [None]:
model=sp.fit(train_data)

In [None]:
result=model.evaluate(train_data)

In [None]:
result.accuracy

0.9898764199226064

In [None]:
pred=model.transform(test_data)

In [None]:
pred.show()

+-------------+--------------------+-------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+--------------------+--------------------+--------------------+----------+
|TransactionID|     TransactionDate| Amount|MerchantID|IsFraud|year|month|day|hour|minute|second|Location_index|TransactionType_index|            features|       rawPrediction|         probability|prediction|
+-------------+--------------------+-------+----------+-------+----+-----+---+----+------+------+--------------+---------------------+--------------------+--------------------+--------------------+----------+
|            8|2024-05-12 12:25:...| 917.21|       266|      0|2024|    5| 12|  12|    25|    35|           4.0|                  1.0|[917.21,1.0,2024....|[4.58966251292932...|[0.98994582763405...|       0.0|
|           31|2024-07-24 17:19:...|1062.73|       595|      0|2024|    7| 24|  17|    19|    35|           0.0|                  0.0|[1062.73,0.0,2024...|[4.620185

In [None]:
result_test=model.evaluate(test_data)

In [None]:
result_test.accuracy

0.9904977375565611