# Import packages, mount drive, spark session:

In [1]:
import numpy as np
import pandas as pd
from sklearn.metrics import roc_curve, roc_auc_score, classification_report, accuracy_score, confusion_matrix 
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from datetime import datetime, timedelta
import re
from sklearn.metrics import accuracy_score, confusion_matrix, roc_curve, f1_score
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.decomposition import PCA
from google.colab import drive
import os
drive.mount('/content/drive')
!pip install pandasql
import pandasql as ps
!pip install pyspark -q
import pyspark
from pyspark.sql import SparkSession, SQLContext, Window
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,isnan,when,count

Mounted at /content/drive
Collecting pandasql
  Downloading pandasql-0.7.3.tar.gz (26 kB)
Building wheels for collected packages: pandasql
  Building wheel for pandasql (setup.py) ... [?25l[?25hdone
  Created wheel for pandasql: filename=pandasql-0.7.3-py3-none-any.whl size=26784 sha256=fad31319d98c8246178d29cb8661a3f3b32f1cc288da16fb8e78256b9562c9e5
  Stored in directory: /root/.cache/pip/wheels/5c/4b/ec/41f4e116c8053c3654e2c2a47c62b4fca34cc67ef7b55deb7f
Successfully built pandasql
Installing collected packages: pandasql
Successfully installed pandasql-0.7.3
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[K     |████████████████████████████████| 198 kB 60.9 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Import data, and turn data into train and test set:

Since there are 50 files for training set, and 50 more for test set, i loop through all the files and append them into dataframes 'train' and 'test'. The data is large, so I used spark dataframe and google cloud platform to run this model. The variable to be predicted is saved in seperate files. Here we import them as 'train_target', 'test_target', and 'sample'.

In [3]:
emp_RDD = spark.sparkContext.emptyRDD()
columns1 = StructType([StructField('app_id', IntegerType(), False),
                       StructField('amnt', IntegerType(), False),
                       StructField('currency', IntegerType(), False),
                       StructField('operation_kind', IntegerType(), False),
                       StructField('card_type', IntegerType(), False),
                       StructField('operation_type', IntegerType(), False),
                       StructField('operation_type_group', IntegerType(), False),
                       StructField('ecommerce_flag', IntegerType(), False),
                       StructField('payment_system', IntegerType(), False),
                       StructField('income_flag', IntegerType(), False),
                       StructField('mcc', IntegerType(), False),
                       StructField('country', IntegerType(), False),
                       StructField('city', IntegerType(), False),
                       StructField('mcc_category', IntegerType(), False),
                       StructField('day_of_week', IntegerType(), False),
                       StructField('hour', IntegerType(), False),
                       StructField('days_before', IntegerType(), False),
                       StructField('weekofyear', IntegerType(), False),
                       StructField('hour_diff', IntegerType(), False),
                       StructField('transaction_number', IntegerType(), False),
                       StructField('__index_level_0__', IntegerType(), False)])
train = spark.createDataFrame(data=emp_RDD, schema=columns1)
entries = os.listdir('/content/drive/MyDrive/MF810 Project/archive/train/train_transactions_contest')
for entry in entries:
  train = train.union(spark.read.parquet(f'/content/drive/MyDrive/MF810 Project/archive/train/train_transactions_contest/{entry}'))

test = spark.createDataFrame(data=emp_RDD, schema=columns1)
entries = os.listdir('/content/drive/MyDrive/MF810 Project/archive/test/test_transactions_contest')
for entry in entries:
  test = test.union(spark.read.parquet(f'/content/drive/MyDrive/MF810 Project/archive/test/test_transactions_contest/{entry}'))

train_target = spark.read.csv('/content/drive/MyDrive/MF810 Project/archive/alfabattle2_train_target.csv')
test_target = spark.read.csv('/content/drive/MyDrive/MF810 Project/archive/alfabattle2_test_target_contest.csv')
sample = spark.read.csv('/content/drive/MyDrive/MF810 Project/archive/alfabattle2_alpha_sample.csv')

# Perform exploratory data analysis, data processing, and data manipulation:

Let's first take a look at what's within each dataframe.

In [None]:
train.show(5)

+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|app_id|               amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag|mcc|country|city|mcc_category|day_of_week|hour|days_before|weekofyear|hour_diff|transaction_number|__index_level_0__|
+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|     0| 0.4654254330729043|       1|             4|       98|             4|                   2|             3|             7|          3|  2|      1|  37|           2|          4|  19|        351|        34|       -1|      

There is no null or nan in all columns.

In [None]:
train.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train.columns]).show()

+------+----+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|app_id|amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag|mcc|country|city|mcc_category|day_of_week|hour|days_before|weekofyear|hour_diff|transaction_number|__index_level_0__|
+------+----+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|     0|   0|       0|             0|        0|             0|                   0|             0|             0|          0|  0|      0|   0|           0|          0|   0|          0|         0|        0|                 0|                0|
+------+----+--------+------

In [None]:
#train.printSchema()
train.select("operation_kind").distinct().show()
#train.describe().show()
#train.select("operation_type").distinct().show()
#train.select("operation_type_group").distinct().show()
#train.select("ecommerce_flap").distinct().show()
#train.select("payment_system").distinct().show()
#train.select("income_flap").distinct().show()
#train.select("mcc").distinct().show()
#train.select("country").distinct().show()
#train.select("city").distinct().show()
#train.select("mcc_category").distinct().show()
#train.select("hour_diff").distinct().show()

+--------------+
|operation_kind|
+--------------+
|             1|
|             6|
|             3|
|             5|
|             4|
|             7|
|             2|
+--------------+



Check for distinct variables for each column:

In [None]:
train.select("app_id").distinct().count()

963811

In [None]:
train.select("currency").distinct().count()

11

In [None]:
train.select("operation_kind").distinct().count()

7

In [None]:
train.select("card_type").distinct().count()

173

In [None]:
train.select("operation_type").distinct().count()

22

In [None]:
train.select("operation_type_group").distinct().count()

4

In [None]:
train.select("ecommerce_flap").distinct().count()

3

In [None]:
train.select("payment_system").distinct().count()

7

In [None]:
train.select("income_flap").distinct().count()

3

In [None]:
train.select("mcc").distinct().count()

108

In [None]:
train.select("country").distinct().count()

24

In [None]:
train.select("city").distinct().count()

163

In [None]:
train.select("mcc_category").distinct().count()

28

In [None]:
train.select("hour_diff").distinct().count()

6345

In [None]:
test.show(5)

+-------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+----+----------+---------+----------+------------------+
| app_id|               amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flap|payment_system|income_flap|mcc|country|city|mcc_category|day_of_week|hour|days|weekofyear|hour_diff|hour_diff1|transaction_number|
+-------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+----+----------+---------+----------+------------------+
|1063620| 0.3676867203808272|       1|             1|        5|             2|                   1|             2|             2|          1| 28|      1|   1|          16|          4|  23| 359|        46|       -1|         1|                 0|
|1063620|0.403606655

In [None]:
train_target.show(5)

+------+-------+----+
|   _c0|    _c1| _c2|
+------+-------+----+
|app_id|product|flag|
|     0|      3|   0|
|     1|      1|   0|
|     2|      1|   0|
|     3|      1|   0|
+------+-------+----+
only showing top 5 rows



In [None]:
train_target.select('_c1').distinct().show()

+-------+
|    _c1|
+-------+
|      3|
|      0|
|      1|
|      4|
|product|
|      2|
+-------+



In [None]:
test_target.show(5)

+-------+-------+
|    _c0|    _c1|
+-------+-------+
| app_id|product|
|1063620|      0|
|1063621|      0|
|1063622|      1|
|1063623|      1|
+-------+-------+
only showing top 5 rows



In [None]:
sample.show(5)

+-------+----+
|    _c0| _c1|
+-------+----+
| app_id|flag|
|1063620| 0.5|
|1063621| 0.5|
|1063622| 0.5|
|1063623| 0.5|
+-------+----+
only showing top 5 rows



Here are the variables within 'train' and 'test':
*   app_id: loan application identifier
*   amnt: normalized value of the logarithm of the transaction amount
*   currency: Transaction currency
*   operation_kind: 
*   card_type: Unique identifier of the card type (Visa Classic, Visa Gold, Visa Platinum, etc.)
*   operation_type: Identifier of the type of operation on a plastic card
*   operation_type_group: Card transaction group identifier (Debit Card (DR), Credit Card (CR), NOOP)
*   ecommerce_flap: E-commerce flag
*   payment_system: Payment system type identifier
*   income_flap: Sign of debiting / depositing funds to the card card
*   mcc: Unique identifier for the outlet type
*   country: ID of the country of the transaction
*   city: Transaction city ID
*   mcc_category: Transaction store category ID
*   day_of_week: Day of the week when the transaction was committed
*   hour: The hour the transaction was committed 
*   days_before: The number of days before taking the loan issue date
*   weekofyear: Number of the week of the year when the transaction was committed
*   hour_diff: Number of hours since the last transaction for this client
*   transaction_number: This is the #th transaction done by this client

And here are the variables within 'train_target', 'test_target', and 'flag'
*   train_target contains the unique identifier for each user 'app_id', the one product each user choosed 'product', and the binary indicator for delinquency 'flag'
*   test_target contains the unique identifier for each user 'app_id', and the one product each user choosed 'product'
*   sample contains the unique identifier for each user 'app_id', and the dummy indicator for delinquency 'flag' (currently 0.5)

'train' and 'test' are already in tidy data form. Three principles from Wickham (2014) stated that each variable forms a column, each observation forms a row, and each type of observational unit forms a table. Also, all variables are already pre-transformed into numerical form, so there's no further data transformation need to be done at the first glance.

However, when I specifically pick one user (uniquely identified with app_id)out, say app_id = 5, and look over his/her data, I found that each user in 'train' and 'test 'has one or more transactions. 

In [None]:
train.filter(train.app_id == 5).show()

+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+----+----------+---------+----------+------------------+
|app_id|               amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flap|payment_system|income_flap|mcc|country|city|mcc_category|day_of_week|hour|days|weekofyear|hour_diff|hour_diff1|transaction_number|
+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+----+----------+---------+----------+------------------+
|     5| 0.4779440811646806|       1|             3|       52|            11|                   2|             1|             2|          2|  9|      1|  30|           2|          1|  16| 352|        50|       -1|         1|                 0|
|     5| 0.4827189761640

In [None]:
train.groupby('app_id').count().show()

+------+-----+
|app_id|count|
+------+-----+
|   148|   46|
|   463|   35|
|   471|  300|
|   496|  666|
|   833|  255|
|  1088|   13|
|  1238|  249|
|  1342|  337|
|  1580|  112|
|  1591|  941|
|  1645|   25|
|  1829|   62|
|  1959|  180|
|  2122|    7|
|  2142|  706|
|  2366|  330|
|  2659|   83|
|  2866|  160|
|  3175|  276|
|  3749|  113|
+------+-----+
only showing top 20 rows



Since users have different numbers of transactions, performing exploratory data analysis based on current 'train' does not tell the story for users. Beacuse users have differnt amount of transactions, they weight differently on graphs. With the fact that each transaction are shown as a row, putting raw variables into classification models will not generate accurate prediction. Thus, each user's information should be contained in one row only.

To do that, I plan to summarize transactions records for each user and form variables to describe all transactions, in order to fit all information into 1 row for each user. Specifically, I will describe transactions by using: count, min, max, average, variance, momentum using MA and RSI. To do this, I imported pandasql to write sql select query.

I appended the new variables to consistent variables for each user, and delete transaction related variables, thus each user now only has one row.

# SparkSQL to manipulate variables:

In [4]:
# left join train_target to train with similar app_id
trainadd = train.join(train_target,train.app_id == train_target._c0,"left")
trainadd.show()

# create dataframe group by app_id, add variables, save SQL as 'new_train'
trainadd.createOrReplaceTempView("train0")
new_train = spark.sql("SELECT app_id,max(amnt),min(amnt),AVG(amnt),VARIANCE(amnt),avg(currency),avg(operation_kind),avg(card_type),avg(operation_type),avg(operation_type_group),avg(ecommerce_flag),avg(payment_system),avg(income_flag),avg(mcc),avg(mcc_category),avg(hour_diff),max(transaction_number),max(_c0),max(_c1),max(_c2) FROM train0 GROUP BY app_id ORDER BY app_id")
#new_train.show()

+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+---+---+---+
|app_id|               amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag|mcc|country|city|mcc_category|day_of_week|hour|days_before|weekofyear|hour_diff|transaction_number|__index_level_0__|_c0|_c1|_c2|
+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+---+---+---+
|     0| 0.4654254330729043|       1|             4|       98|             4|                   2|             3|             7|          3|  2|      1|  37|           2|          4|  19|   

In [None]:
trainadd.show()

+------+----------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+----+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+---+---+---+
|app_id|      amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag| mcc|country|city|mcc_category|day_of_week|hour|days_before|weekofyear|hour_diff|transaction_number|__index_level_0__|_c0|_c1|_c2|
+------+----------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+----+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+---+---+---+
|   0.0|0.46542543|     1.0|           4.0|     98.0|           4.0|                 2.0|           3.0|           7.0|        3.0| 2.0|    1.0|37.0|         2.0|        4.0|19.0|      351.0|      34.0|     -1.0|  

As all data were sorted out, we can finally perform some exploratory data analysis:

In [None]:
#now just use example
print(train.shape)
print(train.isnull().sum().sum())

#relationship with variables
print(train.iloc[:,:].describe())
#skewness & kurtosis
print(pd.DataFrame({"Skewness": train.skew(), "Kurtosis": train.kurt()}))
#If skewness is less than −1 or greater than +1, the distribution is highly skewed. 
#If skewness is between −1 and −½ or between +½ and +1, the distribution is moderately skewed. 
#If skewness is between −½ and +½, the distribution is approximately symmetric.

fig, axs = plt.subplots(4,5, figsize=(16, 6), facecolor='w', edgecolor='k')
fig.subplots_adjust(hspace = .5, wspace=.001)
axs = axs.ravel()
for i,j in zip([i for i in train.columns.to_list()[1:] if len(i) >3],range(19)):
    axs[j].hist(train[i])
    axs[j].set_title(i+': '+str(np.round(train[i].skew(),2)))

# Correlation matrix
f, ax = plt.subplots(figsize=(12, 9))
sns.heatmap(train.corr())

As we can see.......

Also, with all these variables included, we could perform a PCA to shorten the amount of variables being used. I choosed to include the components that explains at least 95% of the variance.

In [None]:
#PCA on variables
pca = PCA(0.95)
pca.fit(X_train)
X_train = pca.transform(X_train)
X_test = pca.transform(X_test)

print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)

Also, since there are 5 products and each user only has one product, I seperated 'train' into 5 sub-trains, each targeting one product.
Thus, 5 models will be performed for each classification model included. Here we only pick group 0 to demonstrate our model.

In [5]:
#subsect train into 5 DataFrame where each contains only product as 0/1/2/3/4
#train.createOrReplaceTempView("train0")
#second = spark.sql("SELECT app_id,max(amnt),min(amnt),AVG(amnt),VARIANCE(amnt),avg(currency),avg(operation_kind),avg(card_type),avg(operation_type),avg(operation_type_group),avg(ecommerce_flag),avg(payment_system),avg(income_flag),avg(mcc),avg(mcc_category),avg(hour_diff),max(transaction_number),max(_c0),max(_c1),max(_c2) FROM train0 GROUP BY app_id ORDER BY app_id")

#final_train = new_train.join(train_target,new_train.app_id == train_target._c0,"left")
train_p0 = new_train.filter(new_train._c1 == 0)
train_p1 = new_train.filter(new_train._c1 == 1)
train_p2 = new_train.filter(new_train._c1 == 2)
train_p3 = new_train.filter(new_train._c1 == 3)
train_p4 = new_train.filter(new_train._c1 == 4)
#new_train.show()

# Perform Machine Learning model fitting for group 0:

In [6]:
#use spark random forest
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [15]:
#train test split using train_p0, which only include user with product 0
a, b = train_p0.randomSplit([0.8,0.2])

#general classification head
FEATURES_COL = ['app_id','max(amnt)','min(amnt)','avg(amnt)','variance(amnt)','avg(currency)','avg(operation_kind)','avg(card_type)','avg(operation_type)','avg(operation_type_group)',
                'avg(ecommerce_flag)','avg(payment_system)','avg(income_flag)','avg(mcc)','avg(mcc_category)','avg(hour_diff)','max(transaction_number)','max(_c0)','max(_c1)','max(_c2)'
                ]
for col in a.columns:
  if col in FEATURES_COL:
    a = a.withColumn(col,a[col].cast('float'))
#a.show()

for col in b.columns:
  if col in FEATURES_COL:
    b = b.withColumn(col,b[col].cast('float'))
#b.show()
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
from pyspark.ml.feature import StandardScaler
stdScaler = StandardScaler(inputCol="features", \
                        outputCol="scaledFeatures", \
                        withStd=True, \
                        withMean=False)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator( \
                  labelCol="max(_c2)", \
                  predictionCol="prediction", \
                  metricName="accuracy")

In [None]:
# naive bayes
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
train_fv = vecAssembler.transform(train_f).select('max(_c2)', 'features')
#train_fv.show()
test_fv = vecAssembler.transform(test_f).select('max(_c2)', 'features')
#test_fv.show()
"""
print(train_f.count(), len(train_f.columns))
print(test_f.count(), len(test_f.columns))
"""
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol='max(_c2)')
model = nb.fit(train_fv)
predictions = model.transform(test_fv)
predictions.show()

In [20]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100, \
                        regParam=0.3, \
                        elasticNetParam=0.1, \
                        featuresCol="scaledFeatures", \
                        family = "binomial", \
                        labelCol="max(_c2)")

from pyspark.ml import Pipeline
pipeline_lr = Pipeline(stages=[vecAssembler, stdScaler, lr])
pipelineModel_lr = pipeline_lr.fit(a)
predDF_lr = pipelineModel_lr.transform(b)
"""
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator( \
                  labelCol="max(_c2)", \
                  predictionCol="prediction", \
                  metricName="accuracy")
"""
lr_accuracy = evaluator.evaluate(predDF_lr)
print("Accuracy of LogisticRegression is = %g"%(lr_accuracy))
print("Test Error of LogisticRegression = %g "%(1.0 - lr_accuracy))

Accuracy of LogisticRegression is = 0.977109
Test Error of LogisticRegression = 0.022891 


In [None]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, \
                 regParam=0.1, \
                 featuresCol="scaledFeatures", \
                 labelCol="max(_c2)")
from pyspark.ml import Pipeline
pipeline_lsvc = Pipeline(stages=[vecAssembler, stdScaler, lsvc])
pipelineModel_lsvc = pipeline_lsvc.fit(a)
predDF_lsvc = pipelineModel_lsvc.transform (b) #predict test result
lr_accuracy = evaluator.evaluate(predDF_lsvc) #measure accuracy
print("Accuracy of LogisticRegression is = %g"%(lr_accuracy))
print("Test Error of LogisticRegression = %g "%(1.0 - lr_accuracy))

In [18]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1.0, \
                modelType="gaussian", \
                featuresCol="scaledFeatures", \
                labelCol="max(_c2)")
from pyspark.ml import Pipeline
pipeline_nb = Pipeline(stages=[vecAssembler, stdScaler, nb])
pipelineModel_nb = pipeline_nb.fit(a)
predDF_nb = pipelineModel_nb.transform (b)
nb_accuracy = evaluator.evaluate(predDF_nb)
print("Accuracy of Naïve Bayes is = %g"%(nb_accuracy))
print("Error of Naïve Bayes is = %g "%(1.0 - nb_accuracy))

Accuracy of Naïve Bayes is = 1
Error of Naïve Bayes is = 0 


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="max(_c2)", \
                            featuresCol="scaledFeatures", \
                            impurity="gini")
from pyspark.ml import Pipeline
pipeline_dt = Pipeline(stages=[vecAssembler, stdScaler, dt])
pipelineModel_dt = pipeline_nb.fit(a)
predDF_dt = pipelineModel_dt.transform (b)
dt_accuracy = evaluator.evaluate(predDF_dt)
print("Accuracy of Decision Tree is = %g"%(dt_accuracy))
print("Error of Decision Tree is = %g "%(1.0 - dt_accuracy))

In [None]:
from pyspark.ml.classification import RandomForestClassifier
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="max(_c2)", \
                            featuresCol="scaledFeatures", \
                            numTrees=50)
from pyspark.ml import Pipeline
pipeline_rf = Pipeline(stages=[vecAssembler, stdScaler, rf])
pipelineModel_rf = pipeline_rf.fit(a)
predDF_rf = pipelineModel_rf.transform (b)
rf_accuracy = evaluator.evaluate(predDF_rf)
print("Accuracy of Random Tree is = %g"%(rf_accuracy))
print("Error of Random Tree is = %g "%(1.0 - rf_accuracy))

In [None]:
from pyspark.ml.classification import GBTClassifier
# Train a GBT model.
gbt = GBTClassifier(labelCol="max(_c2)", \
                    featuresCol="scaledFeatures", \
                    maxIter=10)
from pyspark.ml import Pipeline
pipeline_gbt = Pipeline(stages=[vecAssembler, stdScaler, gbt])
pipelineModel_gbt = pipeline_gbt.fit(a)
predDF_gbt = pipelineModel_gbt.transform (b)
gbt_accuracy = evaluator.evaluate(predDF_gbt)
print("Accuracy of Gradient-Boosted Tree is = %g"%(gbt_accuracy))
print("Error of Gradient-Boosted Tree is = %g "%(1.0 - gbt_accuracy))

In [None]:
from pyspark.ml.classification import OneVsRest
ovr = OneVsRest(classifier=lr, \
                labelCol="max(_c2)", \
                featuresCol="scaledFeatures")
from pyspark.ml import Pipeline
pipeline_ovr = Pipeline(stages=[vecAssembler, stdScaler, ovr])
pipelineModel_ovr = pipeline_ovr.fit(a)
predDF_ovr = pipelineModel_ovr.transform (b)
ovr_accuracy = evaluator.evaluate(predDF_ovr)
print("Accuracy of One-vs-Rest is = %g"%(ovr_accuracy))
print("Error of One-vs-Rest is = %g "%(1.0 - ovr_accuracy))

In [None]:
from pyspark.ml.classification import FMClassifier
# Train a FM model.
fm = FMClassifier(labelCol="max(_c2)", \
                  featuresCol="scaledFeatures", \
                  stepSize=0.001)
from pyspark.ml import Pipeline
pipeline_fm = Pipeline(stages=[vecAssembler, stdScaler, fm])
pipelineModel_fm = pipeline_fm.fit(a)
predDF_fm = pipelineModel_fm.transform (b)
fm_accuracy = evaluator.evaluate(predDF_fm)
print("Accuracy of Factorization Machines is = %g"%(fm_accuracy))
print("Error of Factorization Machines is = %g "%(1.0 - fm_accuracy))

In [None]:
#convert all data columns to float
FEATURES_COL = ['app_id', 'amnt', 'currency', 'operation_kind', 'card_type', 'operation_type', 'operation_type_group', 'ecommerce_flag', 'payment_system','income_flag', 
                'mcc', 'country', 'city', 'mcc_category', 'day_of_week', 'hour', 'days_before', 'weekofyear', 'hour_diff', 'transaction_number', '__index_level_0__']
for col in train.columns:
  if col in FEATURES_COL:
    train = train.withColumn(col,train[col].cast('float'))
train.show()
#create a feature column to be used in the clustering
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
df_kmeans = vecAssembler.transform(train).select('app_id', 'features')
df_kmeans.show()
#optimize choice of k
cost = np.zeros(20)
for k in range(2,10):
    print(k)
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df_kmeans.sample(False,0.1, seed=42))
    cost[k] = model.computeCost(df_kmeans)
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,10),cost[2:10])
ax.set_xlabel('k')
ax.set_ylabel('cost')

k = 10
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(df_kmeans)
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)

+------+----------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+----+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|app_id|      amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag| mcc|country|city|mcc_category|day_of_week|hour|days_before|weekofyear|hour_diff|transaction_number|__index_level_0__|
+------+----------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+----+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|   0.0|0.46542543|     1.0|           4.0|     98.0|           4.0|                 2.0|           3.0|           7.0|        3.0| 2.0|    1.0|37.0|         2.0|        4.0|19.0|      351.0|      34.0|     -1.0|               1.0|              0.0|


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(train)

# Make predictions
predictions = model.transform(train)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
#turn spark df back to pandas df, and seperate into x and y
trainpd = new_train.toPandas()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

# Use loop spark than turn to pandas

In [9]:
train_target = spark.read.csv('/content/drive/MyDrive/MF810 Project/archive/alfabattle2_train_target.csv')
test_target = spark.read.csv('/content/drive/MyDrive/MF810 Project/archive/alfabattle2_test_target_contest.csv')
sample = spark.read.csv('/content/drive/MyDrive/MF810 Project/archive/alfabattle2_alpha_sample.csv')

train = pd.DataFrame()
entries = os.listdir('/content/drive/MyDrive/MF810 Project/archive/train/train_transactions_contest')
for entry in entries:
  entry = spark.read.parquet(f'/content/drive/MyDrive/MF810 Project/archive/train/train_transactions_contest/{entry}')
  entry.createOrReplaceTempView("train0")
  bigdf = spark.sql("SELECT app_id,max(amnt),min(amnt),AVG(amnt),VARIANCE(amnt),avg(currency),avg(operation_kind),avg(card_type),avg(operation_type),avg(operation_type_group),avg(ecommerce_flag),avg(payment_system),avg(income_flag),avg(mcc),avg(mcc_category),avg(hour_diff),max(transaction_number) FROM train0 GROUP BY app_id ORDER BY app_id")
  bigdf = bigdf.join(train_target,bigdf.app_id ==  train_target._c0,"inner")
  bigdf = bigdf.toPandas()
  train = train.append(bigdf)

In [11]:
train

Unnamed: 0,app_id,max(amnt),min(amnt),avg(amnt),variance(amnt),avg(currency),avg(operation_kind),avg(card_type),avg(operation_type),avg(operation_type_group),avg(ecommerce_flag),avg(payment_system),avg(income_flag),avg(mcc),avg(mcc_category),avg(hour_diff),max(transaction_number),_c0,_c1,_c2
0,148,0.639497,0.258972,0.505867,0.009130,1.000000,2.391304,48.695652,3.195652,1.195652,1.000000,4.630435,1.195652,2.000000,2.000000,180.934783,46,148,0,0
1,463,0.590245,0.288846,0.450289,0.005697,1.000000,2.028571,60.028571,2.742857,1.285714,1.057143,2.914286,1.285714,9.028571,4.114286,204.200000,35,463,2,0
2,471,0.634003,0.000000,0.338660,0.007613,1.000000,1.220000,63.503333,1.696667,1.030000,1.033333,1.450000,1.023333,6.946667,3.433333,28.643333,300,471,1,0
3,496,0.684931,0.038895,0.425572,0.007686,1.286787,1.551051,31.675676,3.061562,1.046547,1.500000,1.939940,1.055556,22.983483,6.908408,12.875375,666,496,1,0
4,833,0.612489,0.217228,0.398154,0.008628,1.000000,1.627451,37.129412,2.639216,1.101961,1.113725,2.741176,1.101961,9.662745,3.831373,33.701961,255,833,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
15353,1002074,0.698225,0.000000,0.399242,0.010031,1.044847,1.169847,53.566794,1.452290,1.047710,1.001908,1.416031,1.045802,19.166985,5.208969,8.159351,1048,1002074,0,0
15354,1002279,0.572565,0.000000,0.311480,0.004069,1.000000,1.143836,1.000000,1.705479,1.013699,1.000000,1.006849,1.027397,7.089041,2.965753,26.280822,146,1002279,2,0
15355,1002321,0.601229,0.192513,0.385930,0.008940,1.000000,1.907895,2.000000,2.375000,1.250000,1.000000,2.243421,1.250000,10.092105,3.861842,51.776316,152,1002321,0,0
15356,1002711,0.607141,0.000000,0.340700,0.010926,2.340206,1.412371,8.000000,1.536082,1.082474,1.164948,1.350515,1.082474,5.649485,3.556701,42.030928,97,1002711,0,0


In [12]:
X_tv = train.drop(columns=['_c2'])
y_tv = train['_c2']

#split X and y into training and validation set
X_train,X_valid,y_train,y_valid = train_test_split(X_tv,y_tv,test_size=0.2,random_state=1)
print(X_train.shape, y_train.shape)
print(X_valid.shape, y_valid.shape)

(771048, 19) (771048,)
(192763, 19) (192763,)


In [36]:
sum = y_valid.sum()
total = 0
for s in sum:
  total += int(s)
print(total)

5305


In [14]:
# Random Forest
rf = RandomForestClassifier(n_estimators=70,oob_score=True,n_jobs=-1,random_state=101,max_features=None,min_samples_leaf=30)
rf.fit(X_train,y_train)
y_pred_rf = rf.predict(X_valid)

print(f'\nNumber of default for Decision Tree: {sum(y_pred_rf)}')
print(f'Model Accuracy: {accuracy_score(y_valid,y_pred_rf)}')
print(f'Model F1 Score: {f1_score(y_valid,y_pred_rf)}')
print(f'Confusion Matrix:\n{confusion_matrix(y_valid,y_pred_rf)}')
ns_probs = [0 for _ in range(len(y_valid))]
lr_probs = rf.predict_proba(X_valid)[:, 1]
ns_auc = roc_auc_score(y_valid, ns_probs)
lr_auc = roc_auc_score(y_valid, lr_probs)
print('No Skill: ROC AUC=%.3f' % (ns_auc))
print('Random Forest: ROC AUC=%.3f' % (lr_auc))
ns_fpr, ns_tpr, _ = roc_curve(y_valid, ns_probs)
lr_fpr, lr_tpr, _ = roc_curve(y_valid, lr_probs)
plt.plot(ns_fpr, ns_tpr, linestyle='--', label='No Skill')
plt.plot(lr_fpr, lr_tpr, marker='.', label='Random Forest')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

TypeError: ignored

In [37]:
sum = 0
for i in range(0, len(y_pred_rf)):    
   sum = sum + int(y_pred_rf[i])
print(sum)  

0


In [19]:
# Logistic Regression
lr = LogisticRegression(solver = 'lbfgs',max_iter = 1000)
lr.fit(X_train,y_train)
y_pred_lr = lr.predict(X_valid)

print(f'\nNumber of default for Decision Tree: {sum(y_pred_lr)}')
print(f'Model Accuracy: {accuracy_score(y_valid,y_pred_lr)}')
print(f'Model F1 Score: {f1_score(y_valid,y_pred_lr)}')
print(f'Confusion Matrix:\n{confusion_matrix(y_valid,y_pred_lr)}')
ns_probs = [0 for _ in range(len(y_valid))]
lr_probs = lr.predict_proba(X_valid)[:, 1]
ns_auc = roc_auc_score(y_valid, ns_probs)
lr_auc = roc_auc_score(y_valid, lr_probs)
print('No Skill: ROC AUC=%.3f' % (ns_auc))
print('SVM: ROC AUC=%.3f' % (lr_auc))
ns_fpr, ns_tpr, _ = roc_curve(y_valid, ns_probs)
lr_fpr, lr_tpr, _ = roc_curve(y_valid, lr_probs)
plt.plot(ns_fpr, ns_tpr, linestyle='--', label='No Skill')
plt.plot(lr_fpr, lr_tpr, marker='.', label='SVM')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

TypeError: ignored

In [None]:
sum = 0
for i in range(0, len(y_pred_lr)):    
   print(int(y_pred_lr[i]))

In [41]:
# find the best n for KNN classification model
def knn(n):
    knn = KNeighborsClassifier(n_neighbors=n)
    knn.fit(X_train,y_train)
    y_pred_knn = knn.predict(X_valid)
    return f1_score(y_valid,y_pred_knn)

knn_bestaccuracy = 0
for i in range(2,20):
    accuracy = knn(i)
    if accuracy > knn_bestaccuracy:
        knn_bestn = i
print(f'Best n for KNN classification: {knn_bestn}')

# KNN
knn = KNeighborsClassifier(n_neighbors=19)
knn.fit(X_train,y_train)
y_pred_knn = knn.predict(X_valid)

print(f'\nNumber of default for KNN: {sum(y_pred_knn)}')
print(f'Model Accuracy: {accuracy_score(y_valid,y_pred_knn)}')
print(f'Model F1 Score: {f1_score(y_valid,y_pred_knn)}')
print(f'Confusion Matrix:\n{confusion_matrix(y_valid,y_pred_knn)}')
ns_probs = [0 for _ in range(len(y_valid))]
lr_probs = knn.predict_proba(X_valid)[:, 1]
ns_auc = roc_auc_score(y_valid, ns_probs)
lr_auc = roc_auc_score(y_valid, lr_probs)
print('No Skill: ROC AUC=%.3f' % (ns_auc))
print('Logistic: ROC AUC=%.3f' % (lr_auc))
ns_fpr, ns_tpr, _ = roc_curve(y_valid, ns_probs)
lr_fpr, lr_tpr, _ = roc_curve(y_valid, lr_probs)
plt.plot(ns_fpr, ns_tpr, linestyle='--', label='No Skill')
plt.plot(lr_fpr, lr_tpr, marker='.', label='Logistic')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

ValueError: ignored

In [42]:
# Decision Tree
dtree = DecisionTreeClassifier(max_depth=10,random_state=101,max_features=None,min_samples_leaf=15)
dtree.fit(X_train,y_train)
y_pred_dtree = dtree.predict(X_valid)

print(f'\nNumber of default for Decision Tree: {sum(y_pred_dtree)}')
print(f'Model Accuracy: {accuracy_score(y_valid,y_pred_dtree)}')
print(f'Model F1 Score: {f1_score(y_valid,y_pred_dtree)}')
print(f'Confusion Matrix:\n{confusion_matrix(y_valid,y_pred_dtree)}')
ns_probs = [0 for _ in range(len(y_valid))]
lr_probs = dtree.predict_proba(X_valid)[:, 1]
ns_auc = roc_auc_score(y_valid, ns_probs)
lr_auc = roc_auc_score(y_valid, lr_probs)
print('No Skill: ROC AUC=%.3f' % (ns_auc))
print('Logistic: ROC AUC=%.3f' % (lr_auc))
ns_fpr, ns_tpr, _ = roc_curve(y_valid, ns_probs)
lr_fpr, lr_tpr, _ = roc_curve(y_valid, lr_probs)
plt.plot(ns_fpr, ns_tpr, linestyle='--', label='No Skill')
plt.plot(lr_fpr, lr_tpr, marker='.', label='Decision Tree')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

TypeError: ignored

In [44]:
sum = 0
for i in range(0, len(y_pred_dtree)):    
   print(int(y_pred_dtree[i]))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0

First, we can perform some unsupervised learning to the training set.

Since I seperated training set into 5 sub training set, I will run through classfication algorithms only for training set targeting product 0.

# Fit testing set and see performance: