<a href="https://colab.research.google.com/github/shashankv05/Colab_Notebooks/blob/main/Spark%20MLLIB%20Revised.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=80fb2f83490cefbe9b2425f7a06f8d5ade2bc190d60d70bd1951313ec66fe9f0
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.master('local').appName('Spark MLLIB').config('spark.ui.port', '4050').getOrCreate()

In [None]:
spark

In [None]:
df = spark.read.csv('/content/gdrive/My Drive/Shashank Python/Projects/DataSet/Synthetic Financial Datasets For Fraud Detection/train.csv', 
                    header=True, inferSchema=True)

In [None]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [None]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [None]:
# df.dtypes

In [None]:
from pyspark.sql.functions import *

**Checking for Null Values**

In [None]:
df.select([count(when(isnull(column), column)).alias(column) for column in df.columns]).show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



In [None]:
df_selected_features = df.select('type', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'isFraud')

In [None]:
df_selected_features.show(5)

+--------+--------+-------------+--------------+-------+
|    type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+--------+-------------+--------------+-------+
| PAYMENT| 9839.64|     170136.0|     160296.36|      0|
| PAYMENT| 1864.28|      21249.0|      19384.72|      0|
|TRANSFER|   181.0|        181.0|           0.0|      1|
|CASH_OUT|   181.0|        181.0|           0.0|      1|
| PAYMENT|11668.14|      41554.0|      29885.86|      0|
+--------+--------+-------------+--------------+-------+
only showing top 5 rows



In [None]:
# df_selected_features.select('type').distinct().show()
df_selected_features.groupby('type').count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+



In [None]:
df_selected_features.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [None]:
categorical_f = [x[0] for x in df_selected_features.dtypes if x[1] == 'string']
categorical_f

['type']

In [None]:
numerical_f = [x[0] for x in df_selected_features.dtypes if (x[1] in ('int', 'double') and x[0]!= 'isFraud')]
numerical_f

['amount', 'oldbalanceOrg', 'newbalanceOrig']

---

####**A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow. It also provides tools for constructing, evaluating and tuning ML Pipelines.**

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

https://towardsdatascience.com/apache-spark-mllib-tutorial-7aba8a1dce6e

In [None]:
# StringIndexer(inputCol="type", outputCol="type_indexed")
# A one-hot encoder that maps a column of category indices to a column of binary vectors

stages = []
for col in categorical_f:
  stringIndexer = StringIndexer(inputCol= col, outputCol= col + '_indexed')
  encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(), outputCol= col + '_encoded')
  stages = [stringIndexer, encoder]  

In [None]:
stages

[StringIndexer_b08d879e5bff, OneHotEncoder_5656ac60a995]

In [None]:
assemblerInput = [col + '_encoded' for col in categorical_f] + numerical_f

In [None]:
assemblerInput

['type_encoded', 'amount', 'oldbalanceOrg', 'newbalanceOrig']

In [None]:
vectorAssembler = VectorAssembler(inputCols=assemblerInput, outputCol='features')

In [None]:
vectorAssembler

VectorAssembler_d0bb8f79b683

In [None]:
stages+= [vectorAssembler]

In [None]:
stages

[StringIndexer_b08d879e5bff,
 OneHotEncoder_5656ac60a995,
 VectorAssembler_d0bb8f79b683]

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=stages)

In [None]:
model = pipeline.fit(df_selected_features)

In [None]:
# model.transform(df_selected_features)

In [None]:
df_selected_features.select('type').distinct().show()

+--------+
|    type|
+--------+
|TRANSFER|
| CASH_IN|
|CASH_OUT|
| PAYMENT|
|   DEBIT|
+--------+



See in the next code block "type_encoded" column  is a DenseVector data type, used to reduce storage space. For example the numbers (4, [1], [1.0]) means we have an array of 4 values such that we got the value 1 at index 1, and the value 0 in all other positions. But again, why 4 values while we have FIVE unique categories? Well this is how Spark ML does it. It omits the final category to break the correlation between features. Normally you do not have to worry about it. But in case you want to force Spark ML not to drop the last column, simply add dropLast=False in the constructor.

In [None]:
# model.transform(test).show()
model.transform(df_selected_features).show(truncate=False)

+--------+---------+-------------+--------------+-------+------------+-------------+---------------------------------------------+
|type    |amount   |oldbalanceOrg|newbalanceOrig|isFraud|type_indexed|type_encoded |features                                     |
+--------+---------+-------------+--------------+-------+------------+-------------+---------------------------------------------+
|PAYMENT |9839.64  |170136.0     |160296.36     |0      |1.0         |(4,[1],[1.0])|[0.0,1.0,0.0,0.0,9839.64,170136.0,160296.36] |
|PAYMENT |1864.28  |21249.0      |19384.72      |0      |1.0         |(4,[1],[1.0])|[0.0,1.0,0.0,0.0,1864.28,21249.0,19384.72]   |
|TRANSFER|181.0    |181.0        |0.0           |1      |3.0         |(4,[3],[1.0])|(7,[3,4,5],[1.0,181.0,181.0])                |
|CASH_OUT|181.0    |181.0        |0.0           |1      |0.0         |(4,[0],[1.0])|(7,[0,4,5],[1.0,181.0,181.0])                |
|PAYMENT |11668.14 |41554.0      |29885.86      |0      |1.0         |(4,[1],[1.0])

In [None]:
final_data = model.transform(df_selected_features).selectExpr("features", "isFraud as label")

In [None]:
final_data.show(5, truncate=False)

+--------------------------------------------+-----+
|features                                    |label|
+--------------------------------------------+-----+
|[0.0,1.0,0.0,0.0,9839.64,170136.0,160296.36]|0    |
|[0.0,1.0,0.0,0.0,1864.28,21249.0,19384.72]  |0    |
|(7,[3,4,5],[1.0,181.0,181.0])               |1    |
|(7,[0,4,5],[1.0,181.0,181.0])               |1    |
|[0.0,1.0,0.0,0.0,11668.14,41554.0,29885.86] |0    |
+--------------------------------------------+-----+
only showing top 5 rows



####**Train_Test_Split**

In [None]:
train, test = final_data.randomSplit([0.7, 0.3], seed= 42)

In [None]:
train.count()

4454014

In [None]:
test.count()

1908606

#### **Model Creation**



In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression(labelCol ='label', featuresCol ='features')

In [None]:
lr

LogisticRegression_0ea5bfbf982e

In [None]:
lr_model = lr.fit(train)

In [None]:
lr_model.summary.areaUnderROC

0.9916165861188908

In [None]:
# lr_model.summary.pr.show()

In [None]:
lr_model.coefficientMatrix

DenseMatrix(1, 7, [69.8637, -222.1004, -228.0726, 71.6426, -0.0, 0.0, -0.0], 1)

In [None]:
# train_transform = lr_model.transform(train)


In [None]:
# train_transform.show(truncate=False)

In [None]:
test_transform  = lr_model.transform(test)

In [None]:
test_transform.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(7,[0,4],[1.0,2.05])|    0|[5.87699683132360...|[0.99720464481241...|       0.0|
|(7,[0,4],[1.0,7.57])|    0|[5.87711152908025...|[0.99720496451890...|       0.0|
|(7,[0,4],[1.0,9.38])|    0|[5.87714913830842...|[0.99720506934225...|       0.0|
|(7,[0,4],[1.0,9.73])|    0|[5.87715641081111...|[0.99720508961151...|       0.0|
|(7,[0,4],[1.0,15....|    0|[5.87727360199724...|[0.99720541621590...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [None]:
test_transform.select('label', 'prediction').show(5)

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol ='rawPrediction', labelCol ='label')


In [None]:
test_transform.select("label", "rawPrediction", "prediction", "probability").show(5)

+-----+--------------------+----------+--------------------+
|label|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|    0|[5.87699683132360...|       0.0|[0.99720464481241...|
|    0|[5.87711152908025...|       0.0|[0.99720496451890...|
|    0|[5.87714913830842...|       0.0|[0.99720506934225...|
|    0|[5.87715641081111...|       0.0|[0.99720508961151...|
|    0|[5.87727360199724...|       0.0|[0.99720541621590...|
+-----+--------------------+----------+--------------------+
only showing top 5 rows



In [None]:
# print('The area under ROC for train set is {}'.format(evaluator.evaluate(train_transform)))
print('The area under ROC for test set is {}'.format(evaluator.evaluate(test_transform)))

The area under ROC for test set is 0.9937907277293401
