In [2]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('missing').getOrCreate()

In [3]:
# Importing data which has a header. Schema is automatically configured.
df = spark.read.csv('rebalanced_dataset.csv', header=True, inferSchema=True)
df_uc = spark.read.csv('rebalanced_dataset_dirty.csv', header=True, inferSchema=True)
df_h1 = spark.read.csv('rebalanced_dataset_half1.csv', header=True, inferSchema=True)
df_h2 = spark.read.csv('rebalanced_dataset_half2.csv', header=True, inferSchema=True)

In [4]:
# Show number of columns and rows
print((df.count(), len(df.columns)))

(984, 32)


In [5]:
# Show the columns contained in the dataset
print(df.columns)

['_c0', 'Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class']


In [6]:
# Show the data type of each column
df.dtypes

[('_c0', 'int'),
 ('Time', 'int'),
 ('V1', 'double'),
 ('V2', 'double'),
 ('V3', 'double'),
 ('V4', 'double'),
 ('V5', 'double'),
 ('V6', 'double'),
 ('V7', 'double'),
 ('V8', 'double'),
 ('V9', 'double'),
 ('V10', 'double'),
 ('V11', 'double'),
 ('V12', 'double'),
 ('V13', 'double'),
 ('V14', 'double'),
 ('V15', 'double'),
 ('V16', 'double'),
 ('V17', 'double'),
 ('V18', 'double'),
 ('V19', 'double'),
 ('V20', 'double'),
 ('V21', 'double'),
 ('V22', 'double'),
 ('V23', 'double'),
 ('V24', 'double'),
 ('V25', 'double'),
 ('V26', 'double'),
 ('V27', 'double'),
 ('V28', 'double'),
 ('Amount', 'double'),
 ('Class', 'int')]

In [7]:
# Get summary statistics
df.select('Time','V1', 'V2', 'V3', 'V4').describe().show()
df.select('V5','V6', 'V7', 'V8', 'V9').describe().show()
df.select('V10','V11', 'V12', 'V13', 'V14').describe().show()
df.select('V15','V16', 'V17', 'V18', 'V19').describe().show()
df.select('V20','V21', 'V22', 'V23', 'V24').describe().show()
df.select('V25','V26', 'V27', 'V28', 'Amount').describe().show()
df.select('Class').describe().show()

+-------+-----------------+-------------------+-----------------+-------------------+------------------+
|summary|             Time|                 V1|               V2|                 V3|                V4|
+-------+-----------------+-------------------+-----------------+-------------------+------------------+
|  count|              984|                984|              984|                984|               984|
|   mean| 88299.5325203252|-2.3261755370132096|1.811482649214429|-3.5201620340619932|2.3300934150193044|
| stddev|48878.96431277863|  5.529796067601339|3.697605750483812|  6.213435530803077|3.1748910196942064|
|    min|              187|       -30.55238004|     -16.46099886|       -31.10368482|      -3.503648015|
|    max|           172069|        2.323150188|      22.05772899|        3.275071757|       12.11467184|
+-------+-----------------+-------------------+-----------------+-------------------+------------------+

+-------+-------------------+-------------------+-----

In [8]:
from pyspark.sql.functions import isnan, when, count, col

a = ['Time','V1', 'V2', 'V3', 'V4', 'V5','V6', 'V7', 'V8', 'V9', 'V10','V11', 'V12', 'V13', 'V14', 'V15','V16', 'V17', 'V18', 'V19']
b = ['V20','V21', 'V22', 'V23', 'V24', 'V25','V26', 'V27', 'V28', 'Amount', 'Class']

df_uc.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in a]).show()
df_uc.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in b]).show()

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|Time| V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|   1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

+---+---+---+---+---+---+---+---+---+------+-----+
|V20|V21|V22|V23|V24|V25|V26|V27|V28|Amount|Class|
+---+---+---+---+---+---+---+---+---+------+-----+
|  0|  0|  0|  0|  0|  0|  0|  0|  0|     0|    0|
+---+---+---+---+---+---+---+---+---+------+-----+



In [9]:
df_uc.select("Class").distinct().show()

+-----+
|Class|
+-----+
|    0|
| Zero|
|    1|
+-----+



In [10]:
from pyspark.sql.functions import *

clean_df = df_uc.withColumn('Class', regexp_replace('Class', 'Zero', '0'))
clean_df.select("Class").distinct().show()

+-----+
|Class|
+-----+
|    0|
|    1|
+-----+



In [11]:
df_stand = df.select('Time','Amount')
df_stand.show()

+------+------+
|  Time|Amount|
+------+------+
|129707| 76.87|
|146022|  1.18|
| 58762|  11.5|
| 40276|   1.0|
|143438|   8.9|
| 63470|100.92|
|134766|   1.0|
| 93856|209.65|
| 68899| 198.0|
| 67250| 10.96|
|  8886|   1.0|
|171483|  3.69|
| 76394| 46.86|
| 80524|  5.49|
| 55279|   1.0|
| 25198| 99.99|
| 91524|   1.0|
|120591| 45.58|
|161929|  2.28|
|129186|  7.18|
+------+------+
only showing top 20 rows



## Normalisation/standardisation

In [12]:
from pyspark.sql.functions import col, mean, stddev
from pyspark.sql import Window

input_col = "Time"
output_col = "Time_scaled"

w = Window.partitionBy()

mu = mean(input_col).over(w)
sigma = stddev(input_col).over(w)

df_sc = df.withColumn(output_col, (col(input_col) - mu)/(sigma))

In [13]:
df_sc.select("Time_scaled").show()

+--------------------+
|         Time_scaled|
+--------------------+
|  0.8471428980104121|
|  1.1809265660848745|
| -0.6042994759732067|
| -0.9824989787635539|
|  1.1280612888366728|
| -0.5079799228445172|
|  0.9506434543566398|
| 0.11367809358886415|
| -0.3969096480068674|
|-0.43064604204025925|
|  -1.624697528616902|
|  1.7018254917878404|
|-0.24357170180900675|
|-0.15907727648583606|
| -0.6755571232857015|
|  -1.290975236638316|
| 0.06596840839427963|
|  0.6606414013406726|
|  1.5063630851201473|
|  0.8364839160265448|
+--------------------+
only showing top 20 rows



## Creation of a new variable 

In [14]:
from pyspark.sql import functions as f
df_withcol = df.withColumn('LargeTransaction', f.when(f.col('Amount') > 150, "Yes").otherwise("No"))
df_withcol.select("LargeTransaction", "Amount").show()


+----------------+------+
|LargeTransaction|Amount|
+----------------+------+
|              No| 76.87|
|              No|  1.18|
|              No|  11.5|
|              No|   1.0|
|              No|   8.9|
|              No|100.92|
|              No|   1.0|
|             Yes|209.65|
|             Yes| 198.0|
|              No| 10.96|
|              No|   1.0|
|              No|  3.69|
|              No| 46.86|
|              No|  5.49|
|              No|   1.0|
|              No| 99.99|
|              No|   1.0|
|              No| 45.58|
|              No|  2.28|
|              No|  7.18|
+----------------+------+
only showing top 20 rows



## Dealing with missing values

In [15]:
#### from pyspark.sql.functions import mean

# Filling missing values with mean
mean_Time = df.select(mean(df['Time'])).collect()
mean_V1 = df.select(mean(df['V1'])).collect()

# Create variables to fill with
mean_Time_val = mean_Time[0][0]
mean_V1_val = mean_V1[0][0]

# Print variables
print(mean_Time_val)
print(mean_V1_val)

88299.5325203252
-2.3261755370132096


In [16]:
# Fill null values
df_uc.na.fill(mean_Time_val, subset=['Time']).select('Time').show()
df_uc.na.fill(mean_V1_val, subset=['V1']).select('V1').show()

+------+
|  Time|
+------+
| 88299|
|146022|
| 58762|
| 40276|
|143438|
| 63470|
|134766|
| 93856|
| 68899|
| 67250|
|  8886|
|171483|
| 76394|
| 80524|
| 55279|
| 25198|
| 91524|
|120591|
|161929|
|129186|
+------+
only showing top 20 rows

+------------+
|          V1|
+------------+
| -0.98432002|
| 0.908636658|
| -0.37396496|
| 1.159373145|
|-5.256433834|
|-1.564204151|
|-1.071284007|
|-6.750508516|
| 0.901017558|
| 1.018612133|
|-2.535852117|
|-10.32558182|
|-0.465170262|
|-0.721462598|
|-5.753851922|
| -15.9036352|
| 1.954851738|
| 1.841628474|
|-0.190794633|
| 0.290155158|
+------------+
only showing top 20 rows



## Combining dataframes

In [17]:
df_comb = df_h1.union(df_h2)

print("df_h1: ", (df_h1.count(), len(df_h1.columns)))
print("df_h2: ", (df_h2.count(), len(df_h2.columns)))
print("df_comb: ", (df_comb.count(), len(df_comb.columns)))

df_h1:  (571, 32)
df_h2:  (485, 32)
df_comb:  (1056, 32)


## Sorting a column

In [18]:
df.select("Amount", "Time", "V1").orderBy(df['Amount'].desc()).show()

+-------+------+------------+
| Amount|  Time|          V1|
+-------+------+------------+
|5026.26|138975|-7.418658885|
| 2693.5|126331|-5.982639392|
|2528.61| 50296|-4.295369193|
|2125.87|122608|-2.003459531|
|1809.68|  9064|-3.499107537|
|1563.24|134021| -2.87553428|
|1504.93|154278|-1.600211299|
|1402.16| 62467|-5.344665375|
|1389.56| 59011| -2.32692237|
|1354.25| 65385|-2.923826664|
| 1335.0|133184|-1.212681701|
|1218.89| 18088|-12.22402062|
|1096.99|154309|-0.082983465|
| 996.27|147501|-1.611877338|
| 966.91|125511| 0.341343499|
| 925.31|134769|-0.967767137|
| 829.41| 87883|-1.360292634|
| 824.83| 70536|-2.271754537|
|  810.0| 34681| -0.23108011|
| 802.52| 41743|-2.144411474|
+-------+------+------------+
only showing top 20 rows



## Feature selecition

In [19]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [20]:
# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
assembler = VectorAssembler(
    inputCols=['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9',
               'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18',
               'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27',
               'V28', 'Amount', 'Class'],
    outputCol="features")

In [21]:
# Now that we've created the assembler variable, let's actually transform the data.
output = assembler.transform(df)

In [22]:
# Using print schema, you see that the features output column has been added. 
output.printSchema()

# You can see that the features column is a dense vector that combines the various features as expected.
output.head(1)

root
 |-- _c0: integer (nullable = true)
 |-- Time: integer (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nul

[Row(_c0=192463, Time=129707, V1=-0.98432002, V2=0.809110442, V3=1.313431834, V4=0.378640098, V5=1.077080777, V6=0.102875633, V7=1.223544978, V8=-0.064319527, V9=-1.811779644, V10=-0.095408155, V11=-1.177043196, V12=-0.203626696, V13=1.139408969, V14=-0.361021109, V15=-1.274237257, V16=0.967896107, V17=-0.769116363, V18=-0.762870643, V19=-1.50974795, V20=0.209450417, V21=0.258492675, V22=0.522263091, V23=-0.347560932, V24=0.762405289, V25=0.701891989, V26=2.495761332, V27=-0.220928833, V28=0.024871946, Amount=76.87, Class=0, features=DenseVector([129707.0, -0.9843, 0.8091, 1.3134, 0.3786, 1.0771, 0.1029, 1.2235, -0.0643, -1.8118, -0.0954, -1.177, -0.2036, 1.1394, -0.361, -1.2742, 0.9679, -0.7691, -0.7629, -1.5097, 0.2095, 0.2585, 0.5223, -0.3476, 0.7624, 0.7019, 2.4958, -0.2209, 0.0249, 76.87, 0.0]))]

In [23]:
# Let's select two columns (the feature and predictor).
# This is now in the appropriate format to be processed by Spark.
final_data = output.select("features",'Class')
final_data.show()

+--------------------+-----+
|            features|Class|
+--------------------+-----+
|[129707.0,-0.9843...|    0|
|[146022.0,0.90863...|    1|
|[58762.0,-0.37396...|    0|
|[40276.0,1.159373...|    1|
|[143438.0,-5.2564...|    1|
|[63470.0,-1.56420...|    0|
|[134766.0,-1.0712...|    0|
|[93856.0,-6.75050...|    1|
|[68899.0,0.901017...|    0|
|[67250.0,1.018612...|    0|
|[8886.0,-2.535852...|    1|
|[171483.0,-10.325...|    0|
|[76394.0,-0.46517...|    0|
|[80524.0,-0.72146...|    0|
|[55279.0,-5.75385...|    1|
|[25198.0,-15.9036...|    1|
|[91524.0,1.954851...|    1|
|[120591.0,1.84162...|    0|
|[161929.0,-0.1907...|    0|
|[129186.0,0.29015...|    1|
+--------------------+-----+
only showing top 20 rows



In [24]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors


selector = ChiSqSelector(numTopFeatures=10, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="Class")

result = selector.fit(final_data).transform(final_data)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()

ChiSqSelector output with top 10 features selected
+--------------------+-----+--------------------+
|            features|Class|    selectedFeatures|
+--------------------+-----+--------------------+
|[129707.0,-0.9843...|    0|[129707.0,-0.9843...|
|[146022.0,0.90863...|    1|[146022.0,0.90863...|
|[58762.0,-0.37396...|    0|[58762.0,-0.37396...|
|[40276.0,1.159373...|    1|[40276.0,1.159373...|
|[143438.0,-5.2564...|    1|[143438.0,-5.2564...|
|[63470.0,-1.56420...|    0|[63470.0,-1.56420...|
|[134766.0,-1.0712...|    0|[134766.0,-1.0712...|
|[93856.0,-6.75050...|    1|[93856.0,-6.75050...|
|[68899.0,0.901017...|    0|[68899.0,0.901017...|
|[67250.0,1.018612...|    0|[67250.0,1.018612...|
|[8886.0,-2.535852...|    1|[8886.0,-2.535852...|
|[171483.0,-10.325...|    0|[171483.0,-10.325...|
|[76394.0,-0.46517...|    0|[76394.0,-0.46517...|
|[80524.0,-0.72146...|    0|[80524.0,-0.72146...|
|[55279.0,-5.75385...|    1|[55279.0,-5.75385...|
|[25198.0,-15.9036...|    1|[25198.0,-15.9036...|

## Dealing with imbalances - undersampling

In [25]:
from pyspark.sql.functions import col, explode, array, lit

major_df = df.filter(col("Class") == 0)
minor_df = df.filter(col("Class") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

# We can see that the ratio is 1 i.e. not imbalanced, since the dataset has already been rebalanced

ratio: 1


In [26]:
sampled_majority_df = major_df.sample(False, 1/ratio)
combined_df_2 = sampled_majority_df.unionAll(minor_df)
combined_df_2.show()

+------+------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------+-----+
|   _c0|  Time|          V1|          V2|          V3|          V4|          V5|          V6|          V7|          V8|          V9|         V10|         V11|         V12|         V13|         V14|         V15|         V16|         V17|         V18|         V19|         V20|         V21|         V22|         V23|         V24|         V25|         V26|         V27|         V28|Amount|Class|
+------+------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+----

## Statistical transformation

In [27]:
from pyspark.sql.functions import col
from pyspark.sql.functions import log

df_demonstration = df.select("Amount")
df_log = df_demonstration.withColumn("logvalue", log10(col("Amount")))
df_log.show()

+------+-------------------+
|Amount|           logvalue|
+------+-------------------+
| 76.87| 1.8857568810692675|
|  1.18|0.07188200730612536|
|  11.5| 1.0606978403536116|
|   1.0|                0.0|
|   8.9| 0.9493900066449128|
|100.92|  2.003977241845537|
|   1.0|                0.0|
|209.65|  2.321494866739587|
| 198.0|  2.296665190261531|
| 10.96| 1.0398105541483504|
|   1.0|                0.0|
|  3.69| 0.5670263661590603|
| 46.86|  1.670802284260944|
|  5.49| 0.7395723444500919|
|   1.0|                0.0|
| 99.99| 1.9999565683801925|
|   1.0|                0.0|
| 45.58| 1.6587743208443568|
|  2.28|0.35793484700045375|
|  7.18| 0.8561244442423003|
+------+-------------------+
only showing top 20 rows



## Regression

In [28]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [29]:
df = df.withColumnRenamed("Class","label")

# Print data schema.
df.printSchema()

# Print data columns.
df.columns

root
 |-- _c0: integer (nullable = true)
 |-- Time: integer (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nul

['_c0',
 'Time',
 'V1',
 'V2',
 'V3',
 'V4',
 'V5',
 'V6',
 'V7',
 'V8',
 'V9',
 'V10',
 'V11',
 'V12',
 'V13',
 'V14',
 'V15',
 'V16',
 'V17',
 'V18',
 'V19',
 'V20',
 'V21',
 'V22',
 'V23',
 'V24',
 'V25',
 'V26',
 'V27',
 'V28',
 'Amount',
 'label']

In [30]:
my_cols = df.select(['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9',
               'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18',
               'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27',
               'V28', 'Amount', 'label'])

In [31]:
# Remove NA values
my_final_data = my_cols.na.drop()

In [32]:
df.columns

['_c0',
 'Time',
 'V1',
 'V2',
 'V3',
 'V4',
 'V5',
 'V6',
 'V7',
 'V8',
 'V9',
 'V10',
 'V11',
 'V12',
 'V13',
 'V14',
 'V15',
 'V16',
 'V17',
 'V18',
 'V19',
 'V20',
 'V21',
 'V22',
 'V23',
 'V24',
 'V25',
 'V26',
 'V27',
 'V28',
 'Amount',
 'label']

In [33]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

In [34]:
# Now we can assemble all of this as one vector in the features column. 
assembler = VectorAssembler(inputCols=['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9',
               'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18',
               'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27',
               'V28', 'Amount'],outputCol='features')

In [35]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Train/test split (80/20 split chosen)
data_train, data_test = my_final_data.randomSplit([0.8,0.2])

In [36]:
log_reg = LogisticRegression(featuresCol='features',labelCol='label')

In [37]:
# Lists everything we want to do. Index data, encode data, assemble data and then pass in the actual model.
pipeline = Pipeline(stages=[assembler,log_reg])

In [38]:
# Hyper parameter tuning
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(log_reg.aggregationDepth,[2,5,10])\
    .addGrid(log_reg.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(log_reg.fitIntercept,[False, True])\
    .addGrid(log_reg.maxIter,[10, 100, 1000])\
    .addGrid(log_reg.regParam,[0.01, 0.5, 2.0]) \
    .build()

In [39]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2,
                         ) 

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(data_train)

In [40]:
# Note pipeline. Call it as you would call a machine learning object.
prediction = cvModel.transform(data_train)

In [41]:
# Evaluate the model using the binary classifer.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='label')

In [42]:
prediction.select('label','prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 20 rows



In [43]:
AUC = my_eval.evaluate(prediction)

AUC

0.9434122253535178

In [44]:
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval1 = BinaryClassificationEvaluator(labelCol = 'label')

In [45]:
sumModel = cvModel.bestModel.stages[-1]

In [46]:
print("Coefficients: " + str(sumModel.coefficients) + "\n")
print("Intercept: " + str(sumModel.intercept))

Coefficients: [0.0,-0.01575735836320621,0.0,-0.02862765968368443,0.4464185027414538,0.0,-0.255975237693972,0.0,-0.0394912488614178,0.0,-0.13056737959609294,0.198612924846988,-0.2350708179242306,-0.1297143145854754,-0.37847780773504003,-0.03474611021826392,-0.07194334002085218,0.0,0.0,0.0,-0.007199255421488341,0.030275518624218952,0.0032234454456543838,-0.05845346873321611,0.0,0.0,-0.06370411378510321,0.0,0.6187912448310326,0.0011296313794410884]

Intercept: -2.395357795449395


In [47]:
print ('Best Param (regParam): ', sumModel._java_obj.getRegParam())

print('Best Param (MaxIter): ', sumModel._java_obj.getMaxIter())

print('Best Param (elasticNetParam): ', sumModel._java_obj.getElasticNetParam())

Best Param (regParam):  0.01
Best Param (MaxIter):  100
Best Param (elasticNetParam):  0.5


## Trialing other ML algorithms

In [48]:
assembler1 = VectorAssembler(inputCols=['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9',
               'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18',
               'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27',
               'V28', 'Amount'],outputCol='features')

In [49]:
output = assembler1.transform(df)
df.columns

['_c0',
 'Time',
 'V1',
 'V2',
 'V3',
 'V4',
 'V5',
 'V6',
 'V7',
 'V8',
 'V9',
 'V10',
 'V11',
 'V12',
 'V13',
 'V14',
 'V15',
 'V16',
 'V17',
 'V18',
 'V19',
 'V20',
 'V21',
 'V22',
 'V23',
 'V24',
 'V25',
 'V26',
 'V27',
 'V28',
 'Amount',
 'label']

In [50]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
output_fixed = indexer.fit(output).transform(output)

In [51]:
final_data = output_fixed.select("features",'labelIndex')

In [52]:
train_data,test_data = final_data.randomSplit([0.8,0.2])

In [53]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [59]:
# Create models
dt = DecisionTreeClassifier(labelCol='labelIndex',featuresCol='features')
rf = RandomForestClassifier(labelCol='labelIndex',featuresCol='features')
gb = GBTClassifier(labelCol='labelIndex',featuresCol='features')

In [60]:
# Train models
dt_model = dt.fit(train_data)
rf_model = rf.fit(train_data)
gb_model = gb.fit(train_data)

In [61]:
# Test models
dt_predictions = dt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)
gb_predictions = gb_model.transform(test_data)

In [62]:
# Evaluate models
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'labelIndex')

In [64]:
print("Decision tree AUC ROC")
print(my_binary_eval.evaluate(dt_predictions))

print("Random forest AUC ROC")
print(my_binary_eval.evaluate(rf_predictions))

my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='labelIndex', rawPredictionCol='prediction')
print("Gradient boosted AUC ROC")
print(my_binary_gbt_eval.evaluate(gb_predictions))

Decision tree AUC ROC
0.9297531512605042
Random forest AUC ROC
0.9842436974789917
Gradient boosted AUC ROC
0.9455532212885153


In [65]:
# Let's import the evaluator.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [66]:
# Select (prediction, true label) and compute test error. 
acc_evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")

In [68]:
dt_acc = acc_evaluator.evaluate(dt_predictions)
rf_acc = acc_evaluator.evaluate(rf_predictions)
gb_acc = acc_evaluator.evaluate(gb_predictions)

In [69]:
print('A decision tree model has an accuracy of: {0:2.2f}%'.format(dt_acc*100))
print('-'*40)
print('A random forest model has an accuracy of: {0:2.2f}%'.format(rf_acc*100))
print('-'*40)
print('An GBT model has an accuracy of: {0:2.2f}%'.format(gb_acc*100))

A decision tree model has an accuracy of: 93.46%
----------------------------------------
A random forest model has an accuracy of: 94.86%
----------------------------------------
An GBT model has an accuracy of: 94.39%


In [73]:
feature_importances_dt = dt_model.featureImportances
print(type(feature_importances_dt))

<class 'pyspark.ml.linalg.SparseVector'>


In [88]:
from  pyspark.mllib.linalg import SparseVector
import pandas as pd

col_n = ['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9',
               'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18',
               'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27',
               'V28', 'Amount']

df_dt = pd.DataFrame(feature_importances_dt.toArray())
df_dt["Features"] = col_n
df_dt

Unnamed: 0,0,Features
0,0.004275,Time
1,0.0,V1
2,0.0,V2
3,0.0,V3
4,0.041457,V4
5,0.0,V5
6,0.019692,V6
7,0.006042,V7
8,0.020218,V8
9,0.009772,V9


In [89]:
feature_importances_rf = rf_model.featureImportances
df_rf = pd.DataFrame(feature_importances_rf.toArray())
df_rf["Features"] = col_n
df_rf

Unnamed: 0,0,Features
0,0.001302,Time
1,0.003013,V1
2,0.017686,V2
3,0.047178,V3
4,0.138585,V4
5,0.021478,V5
6,0.010456,V6
7,0.068081,V7
8,0.00362,V8
9,0.002965,V9


In [97]:
feature_importances_gb = rf_model.featureImportances
df_gb = pd.DataFrame(feature_importances_gb.toArray())
df_gb["Features"] = col_n
df_gb

Unnamed: 0,0,Features
0,0.001302,Time
1,0.003013,V1
2,0.017686,V2
3,0.047178,V3
4,0.138585,V4
5,0.021478,V5
6,0.010456,V6
7,0.068081,V7
8,0.00362,V8
9,0.002965,V9
