In [1]:
import pyspark
from pyspark.sql.types import DateType
from pyspark.sql import SQLContext, SparkSession, Window
from pyspark.sql.functions import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import abs, sqrt
import pyspark.sql.functions as func

#start spark and read raw data
sc=pyspark.SparkContext.getOrCreate()
sqlContext = SQLContext(sc)


In [3]:
df = sqlContext.read.csv('original.csv', header = True)

#cast to double
df = df.select(*(col(c).cast("double").alias(c) for c in df.columns))

#check for null values
na = df.select([count(when(isnan(c), c)).alias(c) for c in df.columns])
na.show()

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|Time| V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|Amount|Class|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|     0|    0|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+



In [4]:
#count fraud

df.select(df.Class).where(df.Class == 1.0).count()

492

In [5]:
#count nonfraud 

df.select(df.Class).where(df.Class == 0.0).count()

284315

In [6]:
#count total

df.count()

284807

In [7]:
#count distinct

df.distinct().count()

283726

In [8]:
#count distinct fruad

df.select(df.columns).where(df.Class == 1.0).distinct().count()

473

In [9]:
#count distinct non-fraud

df.select(df.columns).where(df.Class == 0.0).distinct().count()

283253

In [10]:
#drop duplicate
df = df.dropDuplicates()
df.select(df.Class).count()

283726

## Split the data into 80% modeling and 20% out-of-time testing and save as modeling.csv and oot.csv

In [12]:
df = df.withColumn('team', lit('7'))
df = df.withColumn("index", func.row_number().over(Window.partitionBy("team").orderBy(asc("Time"))))
df = df.drop('team')
modeling_df = df.select('*').where(df.index.between(1, int(283726*0.80)))
oot_df = df.select('*').where(df.index.between(int(283726*0.80)+1, 283726))

In [3]:
nrows = modeling_df.select("Class").count()
shuffled = modeling_df.rdd.takeSample(False, nrows)
modeling_df = sqlContext.createDataFrame(shuffled)

In [15]:
#modeling_df.coalesce(1).write.csv("modeling", header = True)
#oot_df.coalesce(1).write.csv("oot", header = True)

## Split the modeling data into 80% base_training and 20% base_testing and save as base_train.csv and base_test.csv

In [None]:
modeling_df = sqlContext.read.csv('modeling.csv', header = True)
modeling_df = modeling_df.select(*(col(c).cast("double").alias(c) for c in modeling_df.columns))


#split by row range
modeling_df = modeling_df.withColumn('team', lit('7'))
modeling_df = modeling_df.withColumn("index", func.row_number().over(Window.partitionBy("team").orderBy('team')))
modeling_df = modeling_df.drop('team')
nrow = modeling_df.select('Class').count()
base_train = modeling_df.select('*').where(modeling_df.index.between(1, int(nrow*0.8)))
base_test = modeling_df.select('*').where(modeling_df.index.between(int(nrow*0.8)+1, nrow))
print(base_train.select('*').distinct().count())
#base_train.coalesce(1).write.csv("base_train", header = True)
#base_test.coalesce(1).write.csv("base_test", header = True)

## Normalization

In [2]:
sqlContext.clearCache()

#normalize columns in modeling.csv and oot.csv
oot = sqlContext.read.csv('oot_raw.csv', header = True)
oot = oot.select(*(col(c).cast("double").alias(c) for c in oot.columns))
base_train = sqlContext.read.csv('base_train_raw.csv', header = True)
base_train = oot.select(*(col(c).cast("double").alias(c) for c in base_train.columns))
base_test = sqlContext.read.csv('base_test_raw.csv', header = True)
base_test = oot.select(*(col(c).cast("double").alias(c) for c in base_test.columns))


In [33]:
# min-max normalization

for i in ['Amount', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 
            'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 
            'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 
            'V24', 'V25', 'V26', 'V27', 'V28']:
    
    minVal = modeling.agg({i: "min"}).collect()[0][0]
    maxVal = modeling.agg({i: "max"}).collect()[0][0]

    modeling = modeling.withColumn(i, (modeling[i]-minVal)/(maxVal - minVal))
    oot = oot.withColumn(i, (oot[i]-minVal)/(maxVal - minVal))
    base_train = base_train.withColumn(i, (base_train[i]-minVal)/(maxVal - minVal))
    base_test = base_test.withColumn(i, (base_test[i]-minVal)/(maxVal - minVal))

In [9]:
#modeling.coalesce(1).write.csv("modeling", header = True)
#oot.coalesce(1).write.csv("oot", header = True)
#base_train.coalesce(1).write.csv("base_train", header = True)
#base_test.coalesce(1).write.csv("base_test", header = True)