In [105]:
!spark-submit --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_275
Branch HEAD
Compiled by user ubuntu on 2020-08-28T08:58:35Z
Revision 2b147c4cd50da32fe2b4167f97c8142102a0510d
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.


# Imports

In [64]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.util import MLUtils
from pyspark.mllib.feature import LabeledPoint
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MultilabelMetrics
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import PCA, StandardScaler

# Environment

In [2]:
# Spark Session
appName = "clip"
master = "local"

conf = (SparkConf()
    .set("spark.driver.maxResultSize", "8g")
    .set("spark.driver.memory", "16g") )

sc = SparkContext(master, appName, conf = conf)
sqlContext = SQLContext(sc)
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# EDA

In [72]:
# Inbound
raw = spark.read.csv("raw/datos.csv", header = True, inferSchema = True)
raw.show(1, vertical = True)
raw.printSchema()

# Total count
print("Total count: " + str(raw.count()))
# Is fraud count
print("Is fraud count: " + str(raw.filter(F.col("is_fraud") == 1.0).count()))
# Is not fraud count
print("Is not fraud count: " + str(raw.filter(F.col("is_fraud") == 0.0).count()))

-RECORD 0------------------------------
 transaction_id | 99899e9e02c4b41fc... 
 timestamp      | 155270.0             
 amount         | 12.0                 
 variable_01    | -0.0713300864873049  
 variable_02    | -0.0328999520808001  
 variable_03    | 0.10998943181331698  
 variable_04    | 0.33988859549026507  
 variable_05    | -0.6261306478334321  
 variable_06    | -0.11685326903066098 
 variable_07    | 1.22082562064008     
 variable_08    | 0.39404086545839706  
 variable_09    | 0.0517052313610874   
 variable_10    | 0.7000841408460489   
 variable_11    | -1.18526338095099    
 variable_12    | -0.183050156425376   
 variable_13    | 1.05102866940126     
 variable_14    | 0.267423177397213    
 variable_15    | -0.22056869472414398 
 variable_16    | 1.3582066520249299   
 variable_17    | -0.321922279007069   
 variable_18    | -1.12124634073183    
 variable_19    | 0.852399772357995    
 variable_20    | -0.6359347108629929  
 variable_21    | -0.445326616622055   


In [78]:
# General statistics
raw.select("timestamp", "amount", "is_fraud").summary().show()

+-------+-----------------+-----------------+--------------------+
|summary|        timestamp|           amount|            is_fraud|
+-------+-----------------+-----------------+--------------------+
|  count|           284807|           284807|              284807|
|   mean|94813.85957508067|88.34961925094817|0.001727485630620034|
| stddev|47488.14595456631|250.1201092401885| 0.04152718963546499|
|    min|              0.0|              0.0|                   0|
|    25%|          54196.0|              5.6|                   0|
|    50%|          84687.0|             22.0|                   0|
|    75%|         139317.0|            77.15|                   0|
|    max|         172792.0|         25691.16|                   1|
+-------+-----------------+-----------------+--------------------+



In [79]:
# Distribution of is_fraud
raw.select("amount").filter(F.col("is_fraud") == 1.0).summary().show()

+-------+-----------------+
|summary|           amount|
+-------+-----------------+
|  count|              492|
|   mean|122.2113211382113|
| stddev|256.6832882977122|
|    min|              0.0|
|    25%|              1.0|
|    50%|             9.21|
|    75%|           105.89|
|    max|          2125.87|
+-------+-----------------+



In [75]:
raw.select("variable_01", "variable_02", "variable_03", "variable_04", "variable_05", "variable_06", "variable_07", "variable_08", "variable_09", "variable_10").summary().show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|         variable_01|         variable_02|         variable_03|         variable_04|         variable_05|         variable_06|         variable_07|         variable_08|         variable_09|         variable_10|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              284807|
|   mean|-1.19352278837118...|-3.50871749491060...|1.690591100150187...|5.482371085797822...|4.470221848251438...|2.6744

In [76]:
raw.select("variable_11", "variable_12", "variable_13", "variable_14", "variable_15", "variable_16", "variable_17", "variable_18", "variable_19", "variable_20").summary().show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|         variable_11|         variable_12|         variable_13|         variable_14|         variable_15|         variable_16|         variable_17|         variable_18|         variable_19|         variable_20|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              284807|
|   mean|9.582111884565311...|-3.86547702404329...|1.456776393364761...|4.873085685764354...|1.200757771829120...|8.8216

In [77]:
raw.select("variable_21", "variable_22", "variable_23", "variable_24", "variable_25", "variable_26", "variable_27", "variable_28", "variable_29", "variable_30", "variable_31", "variable_32").summary().show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|         variable_21|         variable_22|         variable_23|         variable_24|         variable_25|         variable_26|         variable_27|         variable_28|         variable_29|         variable_30|         variable_31|         variable_32|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              284807|              28

# Classifier (MLP)

## v. Base

In [102]:
# SVMLIB format, to abstract labels and features.
!rm -r libsvm/
r = raw.rdd.map(lambda line:LabeledPoint(line[-1], Vectors.dense(line[3:35])))
MLUtils.saveAsLibSVMFile(r, "libsvm/")
libsvm = spark.read.format("libsvm").load("libsvm/")
# libsvm.show(truncate = True)

# Classifier (MLP).
data = libsvm
# Define network
layers = [32, 23, 18, 2]
trainer = MultilayerPerceptronClassifier(maxIter = 100, layers = layers, 
                                         blockSize = 128, seed = 1234)
# Training.
print("Training...")
model = trainer.fit(data)
# Accuracy.
print("Testing over training data...")
result = model.transform(data)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Training...
Testing over training data...
Accuracy = 0.9997963533199675


## v. 32 features

In [96]:
# SVMLIB format
!rm -r libsvm/
r = raw.rdd.map(lambda line:LabeledPoint(line[-1], Vectors.dense(line[3:35])))
MLUtils.saveAsLibSVMFile(r, "libsvm/")
libsvm = spark.read.format("libsvm").load("libsvm/")
# libsvm.show(truncate = True)

# Classifier (MLP)
data = libsvm
# Train and test split.
splits = data.randomSplit([0.7, 0.3], 1234)
train = splits[0]
test = splits[1]
# Define network
layers = [32, 23, 18, 2]
trainer = MultilayerPerceptronClassifier(maxIter = 100, layers = layers, 
                                         blockSize = 128, seed = 1234)
# Training.
print("Training...")
model = trainer.fit(train)
# Accuracy of test.
print("Testing...")
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

# Metrics 
metrics = MulticlassMetrics(predictionAndLabels.rdd)

print("Confusion matrix")
print(metrics.confusionMatrix().toArray())

evaluator = MulticlassClassificationEvaluator()
evaluator.setPredictionCol("prediction")

print("Accuracy")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "accuracy"}))
print("True positive rate, is fraud")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "truePositiveRateByLabel",
    evaluator.metricLabel: 1.0}))
print("True positive rate, is not fraud")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "truePositiveRateByLabel",
    evaluator.metricLabel: 0.0}))

Training...
Testing...
Test set accuracy = 0.9994486156733928
Confusion matrix
[[8.5056e+04 2.4000e+01]
 [2.3000e+01 1.3700e+02]]
Accuracy
0.9994486156733928
True positive rate, is fraud
0.85625
True positive rate, is not fraud
0.9997179125528914


## v. Features Normalized

In [88]:
# SVMLIB format
!rm -r libsvm/
r = raw.rdd.map(lambda line:LabeledPoint(line[-1], Vectors.dense(line[3:35])))
MLUtils.saveAsLibSVMFile(r, "libsvm/")
libsvm = spark.read.format("libsvm").load("libsvm/")
# libsvm.show(truncate = True)

# Normalize
scaler = StandardScaler(inputCol = "features", outputCol = "norm_features",
                        withStd = True, withMean = True)
scaler_model = scaler.fit(libsvm)
libsvm = scaler_model.transform(libsvm)
libsvm = libsvm.select("label", "norm_features")
libsvm = libsvm.withColumnRenamed("norm_features", "features")
# libsvm.show(truncate = False)

# Classifier (MLP)
data = libsvm
# Train and test
splits = data.randomSplit([0.7, 0.3], 1234)
train = splits[0]
test = splits[1]
# Define network
layers = [32, 23, 18, 2]
trainer = MultilayerPerceptronClassifier(maxIter = 100, layers = layers, 
                                         blockSize = 128, seed = 1234)
# Training.
print("Training...")
model = trainer.fit(train)
# Accuracy of test
print("Testing...")
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

# Metrics
metrics = MulticlassMetrics(predictionAndLabels.rdd)

print("Confusion matrix")
print(metrics.confusionMatrix().toArray())

evaluator = MulticlassClassificationEvaluator()
evaluator.setPredictionCol("prediction")

print("Accuracy")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "accuracy"}))
print("True positive rate, is fraud")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "truePositiveRateByLabel",
    evaluator.metricLabel: 1.0}))
print("True positive rate, is not fraud")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "truePositiveRateByLabel",
    evaluator.metricLabel: 0.0}))

Training...
Testing...
Test set accuracy = 0.9992022524636321
Confusion matrix
[[8.5047e+04 3.3000e+01]
 [3.5000e+01 1.2500e+02]]
Accuracy
0.9992022524636321
True positive rate, is fraud
0.78125
True positive rate, is not fraud
0.9996121297602256


## v. Dimensionality Reduction PCA (k = 10)

In [94]:
# SVMLIB format
!rm -r libsvm/
r = raw.rdd.map(lambda line:LabeledPoint(line[-1], Vectors.dense(line[3:35])))
MLUtils.saveAsLibSVMFile(r, "libsvm/")
libsvm = spark.read.format("libsvm").load("libsvm/")
# libsvm.show(truncate = True)

# PCA
pca = PCA(k = 10, inputCol = "features", outputCol = "pca_features")
model = pca.fit(libsvm)
libsvm = model.transform(libsvm)
libsvm = libsvm.select("label", "pca_features")
libsvm = libsvm.withColumnRenamed("pca_features", "features")
# libsvm.show(1, truncate = False)

# Classifier (MLP)
data = libsvm
# Train and test
splits = data.randomSplit([0.7, 0.3], 1234)
train = splits[0]
test = splits[1]
# Define network
layers = [10, 15, 12, 2]
trainer = MultilayerPerceptronClassifier(maxIter = 100, layers = layers, 
                                         blockSize = 128, seed = 1234)
# Training.
print("Training...")
model = trainer.fit(train)
# Accuracy of test
print("Testing...")
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

# Metrics
metrics = MulticlassMetrics(predictionAndLabels.rdd)

print("Confusion matrix")
print(metrics.confusionMatrix().toArray())

evaluator = MulticlassClassificationEvaluator()
evaluator.setPredictionCol("prediction")

print("Accuracy")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "accuracy"}))
print("True positive rate, is fraud")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "truePositiveRateByLabel",
    evaluator.metricLabel: 1.0}))
print("True positive rate, is not fraud")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "truePositiveRateByLabel",
    evaluator.metricLabel: 0.0}))

Training...
Testing...
Test set accuracy = 0.9991670577193805
Confusion matrix
[[8.5066e+04 1.4000e+01]
 [5.7000e+01 1.0300e+02]]
Accuracy
0.9991670577193805
True positive rate, is fraud
0.64375
True positive rate, is not fraud
0.9998354489891866


## v. 33 features

In [99]:
# SVMLIB format
!rm -r libsvm/
r = raw.rdd.map(lambda line:LabeledPoint(line[-1], Vectors.dense(line[2:35])))
MLUtils.saveAsLibSVMFile(r, "libsvm/")
libsvm = spark.read.format("libsvm").load("libsvm/")
# libsvm.show(truncate = True)

# Classifier (MLP)
data = libsvm
# Train and test splits.
splits = data.randomSplit([0.7, 0.3], 1234)
train = splits[0]
test = splits[1]
# Define network
layers = [33, 23, 18, 2]
trainer = MultilayerPerceptronClassifier(maxIter = 100, layers = layers, 
                                         blockSize = 128, seed = 1234)
# Training.
print("Training...")
model = trainer.fit(train)
# Accuracy of test.
print("Testing...")
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

# Metrics 
metrics = MulticlassMetrics(predictionAndLabels.rdd)

print("Confusion matrix")
print(metrics.confusionMatrix().toArray())

evaluator = MulticlassClassificationEvaluator()
evaluator.setPredictionCol("prediction")

print("Accuracy")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "accuracy"}))
print("True positive rate, is fraud")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "truePositiveRateByLabel",
    evaluator.metricLabel: 1.0}))
print("True positive rate, is not fraud")
print(evaluator.evaluate(predictionAndLabels, {evaluator.metricName: "truePositiveRateByLabel",
    evaluator.metricLabel: 0.0}))

Training...
Testing...
Test set accuracy = 0.999296129887498
Confusion matrix
[[8.5076e+04 2.5000e+01]
 [3.5000e+01 1.0700e+02]]
Accuracy
0.999296129887498
True positive rate, is fraud
0.7535211267605634
True positive rate, is not fraud
0.9997062314191373
