In [1]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder \
    .master("local") \
    .appName("ML LIb") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# File location and type
file_location = "dataset.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

DataFrame[step: string, type: string, amount: string, nameOrig: string, oldbalanceOrg: string, newbalanceOrig: string, nameDest: string, oldbalanceDest: string, newbalanceDest: string, isFraud: string, isFlaggedFraud: string]

In [2]:
#Printing schema of dataframe 
df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)



In [4]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.types import FloatType,IntegerType

categoricalCol = 'type'
stages = []
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'out')
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "Vec"])
stages += [stringIndexer, encoder]
numericCols = ['amount','oldbalanceOrg','newbalanceOrig','oldbalanceDest','newbalanceDest','isFlaggedFraud']
for i in numericCols:
  df = df.withColumn(i, df[i].cast(FloatType()))
df = df.withColumn("label", df['isFraud'].cast(IntegerType()))
assemblerInputs = [categoricalCol + "Vec"] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [5]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = stages)

pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)

In [6]:
(train, test) = df.randomSplit([0.8, 0.2], seed=12345)

# Cache the training and test datasets
train.cache()
test.cache()

DataFrame[step: string, type: string, amount: float, nameOrig: string, oldbalanceOrg: float, newbalanceOrig: float, nameDest: string, oldbalanceDest: float, newbalanceDest: float, isFraud: string, isFlaggedFraud: float, label: int, typeout: double, typeVec: vector, features: vector]

In [7]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)

In [8]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.9941566625530367


In [9]:
gbtModel.save("gbtModel.model")