In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression

In [2]:
df = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/frauddetectionsmall.csv')


In [3]:
display(df)

In [4]:
df.printSchema()

In [5]:
#select data column
df2 = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", (col("isFraud").cast("Int").alias("label")))
display(df2)

In [6]:
#Split the data
splits = df2.randomSplit([0.7, 0.3])
train = splits[0]

In [7]:
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()

In [8]:
print "Training Rows:", train_rows, " Testing Rows:", test_rows
train.show(5)
test.show(5)

In [9]:
test.printSchema()

In [10]:
#defining Pipeline whihc provides a simple construction, tuning and testing for ML workflows.
strIdx = StringIndexer(inputCol = "type", outputCol = "typeCat")
labelIdx = StringIndexer(inputCol = "label", outputCol = "idxLabel")

In [11]:
# number is meaningful so that it should be number features
catVect = VectorAssembler(inputCols = ["typeCat"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"], outputCol="numFeatures")

In [12]:
# number vector is normalized
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

In [13]:
cl = []
pipeline = []


In [14]:
cl.insert(0, DecisionTreeClassifier(labelCol="idxLabel", featuresCol="features"))
cl.insert(1, RandomForestClassifier(labelCol="idxLabel", featuresCol="features"))
cl.insert(2, LogisticRegression(labelCol="idxLabel", featuresCol="features"))

In [15]:
# Pipeline process the series of transformation above, which is 7 transformation
for i in range(3):
    pipeline.insert(i, Pipeline(stages=[strIdx, labelIdx, catVect, catIdx, numVect, minMax, featVect, cl[i]]))
    #piplineModel = pipeline.fit(train)
print "Pipeline complete!"

In [16]:
#Train Validation Split
#we will used a train validation split instead of cross validation for every model because it takes much less time to train the model with the train validation split.
model = []

#When we will be using the whole dataset, we will use the TrainValidationSplit instead of the CrossValidator!
paramGrid = (ParamGridBuilder().addGrid(cl[0].impurity, ("gini", "entropy")).addGrid(cl[0].maxDepth, [5, 10, 20]).addGrid(cl[0].maxBins, [5, 10, 20]).build())
cv = CrossValidator(estimator=pipeline[0], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, numFolds=5)
#cv = TrainValidationSplit(estimator=pipeline[0], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
model.insert(0, cv.fit(train))
print "Model 1 completed"


In [17]:
paramGrid2 = (ParamGridBuilder().addGrid(cl[1].impurity, ("gini", "entropy")).addGrid(cl[1].maxDepth, [5, 10, 20]).addGrid(cl[1].maxBins, [5, 10, 20]).build())
cv2 = CrossValidator(estimator=pipeline[1], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid2, numFolds=5)
#cv2 = TrainValidationSplit(estimator=pipeline[1], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid2, trainRatio=0.8)
model.insert(1, cv2.fit(train))
print "Model 2 completed"


In [18]:
paramGrid3 = (ParamGridBuilder().addGrid(cl[2].regParam, [0.01, 0.5, 2.0]).addGrid(cl[2].threshold, [0.30, 0.35, 0.5]).addGrid(cl[2].maxIter, [1, 5]).addGrid(cl[2].elasticNetParam, [0.0, 0.5, 1]).build())
cv3 = CrossValidator(estimator=pipeline[2], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid3, numFolds=5)
#cv3 = TrainValidationSplit(estimator=pipeline[2], evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
model.insert(2, cv3.fit(train))
print "Model 3 completed"

In [19]:
#Test the model
prediction = [] 
predicted = []
for i in range(3):
  prediction.insert(i, model[i].transform(test))
  predicted.insert(i, prediction[i].select("features", "prediction", "probability", "trueLabel"))
  predicted[i].show(30)