# AMEX Kaggle Competetion

## Data Fecthing

In [1]:
#Setup the virtual Enviroment 
! pip install -q kaggle

In [3]:
#download the kaggle API json file by 
#1. Go to your account, Scroll to API section and Click Expire API Token to remove previous tokens
#2. Click on Create New API Token - It will download kaggle.json file on your machine.
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"shijiemao","key":"229b59052239087c849d1ab667e93913"}'}

In [4]:
# Fetching the data from kaggle
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets list

ref                                                                title                                                size  lastUpdated          downloadCount  voteCount  usabilityRating  
-----------------------------------------------------------------  --------------------------------------------------  -----  -------------------  -------------  ---------  ---------------  
victorsoeiro/netflix-tv-shows-and-movies                           Netflix TV Shows and Movies                           2MB  2022-05-15 00:01:23          15241        444  1.0              
ruchi798/data-science-job-salaries                                 Data Science Job Salaries                             7KB  2022-06-15 08:59:12           3839        138  1.0              
zusmani/petrolgas-prices-worldwide                                 Petrol/Gas Prices Worldwide                          10KB  2022-06-24 01:25:33           1839         88  1.0              
imoore/age-dataset                           

In [5]:
! kaggle competitions download -c 'amex-default-prediction'

Downloading amex-default-prediction.zip to /content
100% 20.5G/20.5G [02:39<00:00, 109MB/s] 
100% 20.5G/20.5G [02:39<00:00, 138MB/s]


In [6]:
! mkdir AMEX
! unzip amex-default-prediction.zip -d AMEX

Archive:  amex-default-prediction.zip
  inflating: AMEX/sample_submission.csv  
  inflating: AMEX/test_data.csv      
  inflating: AMEX/train_data.csv     
  inflating: AMEX/train_labels.csv   


In [7]:
! rm amex-default-prediction.zip

## Spark session

In [8]:
!pip install -q pyspark
import os
from pprint import pprint

import pandas as pd
from pyspark.sql import SparkSession, types

spark = SparkSession.builder.master("local[*]").getOrCreate()

[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[K     |████████████████████████████████| 199 kB 62.5 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [9]:
test_path = "../content/AMEX/test_data.csv"
train_path = "../content/AMEX/train_data.csv"
label_path = "../content/AMEX/train_labels.csv"

In [10]:
train_df = pd.read_csv(train_path, nrows=100)
test_df = pd.read_csv(test_path, nrows=100)
label_df = pd.read_csv(label_path, nrows=100)

In [11]:
## Train types
train_types = train_df.dtypes
train_types_count = train_types.value_counts()

## Test types
test_types = test_df.dtypes
test_types_count = test_types.value_counts()

## Label types
label_types = label_df.dtypes
label_types_count = label_types.value_counts()

In [12]:
def print_splits(*msg):
    for m in msg:
        print(m)
        print()
        
print_splits(train_types_count, test_types_count, label_types_count)

float64    185
object       4
int64        1
dtype: int64

float64    185
object       4
int64        1
dtype: int64

object    1
int64     1
dtype: int64



In [13]:
# Types mapper
types_map = {
    "object": types.StringType(),
    "float64": types.FloatType(),
    "int64": types.IntegerType(),
}

# Known dtypes
string_dtypes = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
date_dtypes = ['S_2']

In [14]:
def create_spark_schema(series):
    fields = []
    
    for index, value in series.items():
        if index in string_dtypes:
            field = types.StructField(index, types.StringType(), True)
            
        elif index in date_dtypes:
            field = types.StructField(index, types.DateType(), True)
        
        else:
            field = types.StructField(index, types_map.get(str(value)), True)
            
        fields.append(field)
    return types.StructType(fields)

In [15]:
train_schema = create_spark_schema(train_types) 
test_schema = create_spark_schema(test_types)
label_schema = create_spark_schema(label_types)

In [16]:
# Set header to True or else it will be included as row
train_psdf = spark.read.option("header", "true").csv(train_path, schema=train_schema)
test_psdf = spark.read.option("header", "true").csv(test_path, schema=test_schema)
label_psdf = spark.read.option("header", "true").csv(label_path, schema=label_schema)

In [17]:
# Check schema
print_splits(train_psdf.schema[:3], test_psdf.schema[:3], label_psdf.schema[:3])

StructType([StructField('customer_ID', StringType(), True), StructField('S_2', DateType(), True), StructField('P_2', FloatType(), True)])

StructType([StructField('customer_ID', StringType(), True), StructField('S_2', DateType(), True), StructField('P_2', FloatType(), True)])

StructType([StructField('customer_ID', StringType(), True), StructField('target', IntegerType(), True)])



In [None]:
train_psdf.write.parquet("train_amex")
test_psdf.write.parquet("test_amex")
label_psdf.write.parquet("label_amex")

## Data preprocessing

In [None]:
import itertools
import multiprocessing
import re
from IPython import display

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import pyspark.pandas as ps
from pyspark import StorageLevel
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# SESSION PARAMETER
CORES = multiprocessing.cpu_count()
MAX_PARTITION_SIZE = "134217728b"

In [None]:
spark = (SparkSession.builder.master(f"local[{CORES}]")
                             .config("spark.memory.offHeap.enabled", "true")
                             .config("spark.memory.offHeap.size","5g")
                             .config("spark.sql.shuffle.partitions", CORES * 3)
                             .config("spark.default.parallelism", CORES * 3)
                             .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", MAX_PARTITION_SIZE)
                             .appName("ML_spark")
                             .getOrCreate())

In [None]:
train_path = "../content/train_amex"
test_path = "../content/test_amex"
label_path = "../content/label_amex"

In [None]:
%%time
train_df = spark.read.parquet(train_path)
test_df = spark.read.parquet(test_path)
label_df = spark.read.parquet(label_path)

CPU times: user 9.06 ms, sys: 1.41 ms, total: 10.5 ms
Wall time: 851 ms


In [None]:
train_df.select("customer_ID").explain()  # select one column to simplify the output

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [customer_ID#1908] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/train_amex], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customer_ID:string>




In [None]:
def get_null_count(sql_df, colname):
    count = (sql_df.select(colname)
                   .filter(F.col(colname).isNull())
                   .count())
    return count

In [None]:
missing_customer_train = get_null_count(train_df, "customer_ID") 
missing_customer_test = get_null_count(test_df, "customer_ID")

total_miss = missing_customer_train + missing_customer_test
print(f"Missing customer_ID: {total_miss}")

Missing customer_ID: 0


## Feature Engineering

In [None]:
# Known Columns
info_cols = ['customer_ID', 'S_2']
target_cols = ['target']
cat_cols = [
    'B_30', 'B_38', 
    'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']


# Define Numeric Columns
excluded = info_cols + cat_cols
num_cols = [col for col in train_df.columns if col not in excluded]

# Define Feature Columns
features_cols =  cat_cols + num_cols

print(f"Number of categoric cols: {len(cat_cols)}")
print(f"Number of numeric cols: {len(num_cols)}")

Number of categoric cols: 11
Number of numeric cols: 177


In [None]:
train_df = (train_df.fillna(0, subset=num_cols)
                    .fillna("null", subset=cat_cols))

test_df = (test_df.fillna(0, subset=num_cols)
                  .fillna("null", subset=cat_cols))

In [None]:
def add_suffix(names, suffix):
    return [name + suffix for name in names]
    
# Create columns aliases
cat_index_cols = add_suffix(cat_cols, "_index")

# Fit StringIndexer
indexers = StringIndexer(inputCols=cat_cols, outputCols=cat_index_cols)
indexers_model = indexers.fit(train_df)

# Transform to data
train_df_indexed = indexers_model.transform(train_df)
test_df_indexed = indexers_model.transform(test_df)

In [None]:
# See what columns the indexer handle
indexers.getInputCols()

# See the indexed columns
train_df_indexed.select("B_30_index").show(5)

+----------+
|B_30_index|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 5 rows



In [None]:
# Create columns aliases
cat_ohe_cols = add_suffix(cat_cols, "_ohe")

# Fit OneHotEncoder
ohe = OneHotEncoder(inputCols=cat_index_cols, outputCols=cat_ohe_cols)
ohe_model = ohe.fit(train_df_indexed)

# Transform to data
train_df_ohed = ohe_model.transform(train_df_indexed)
test_df_ohed = ohe_model.transform(test_df_indexed)

In [None]:
train_df_ohed.select("B_30_ohe").show(5)

+-------------+
|     B_30_ohe|
+-------------+
|(3,[0],[1.0])|
|(3,[0],[1.0])|
|(3,[0],[1.0])|
|(3,[0],[1.0])|
|(3,[0],[1.0])|
+-------------+
only showing top 5 rows



In [None]:
# Functions for each type
# each tuple consist of: (function, column's suffix)
num_funcs = [
    (F.mean, "_mean"),
    (F.stddev, "_std"),
    (F.min, "_min"),
    (F.max, "_max"),
]

cat_funcs = [
    (F.count, "_count"),
    (F.last, "_last"),
    (F.countDistinct, "_nunique"),
]

In [None]:
# Arguments for .agg method
# each arg consist of: func(colname).alias(colname + suffix)
agg_num_args = [
    func(col).alias(col + suffix) 
    for col, (func, suffix) in itertools.product(num_cols, num_funcs)]

agg_cols_args = [
    func(col).alias(col + suffix) 
    for col, (func, suffix) in itertools.product(cat_ohe_cols, cat_funcs)]

# Combine numeric and categoric agg arguments
agg_args = agg_num_args + agg_cols_args
agg_args[0]

Column<'avg(P_2) AS P_2_mean'>

In [None]:
# Columns that we won't use
unused_cols = cat_cols + num_cols + cat_index_cols + cat_ohe_cols
print(f"Unused columns {len(unused_cols)}")

Unused columns 210


In [None]:
# Apply the agg while also dropping unused columns
train_df_grouped = (train_df_ohed.groupBy("customer_ID")
                                 .agg(*agg_cols_args)
                                 .drop(*unused_cols))

test_df_grouped = (test_df_ohed.groupBy("customer_ID")
                                .agg(*agg_cols_args)
                                .drop(*unused_cols))

## Modeling

In [None]:
train_joined_df = train_df_grouped.join(F.broadcast(label_df), on="customer_ID")

In [None]:
dim = len(train_joined_df.columns)
print(f"Total features: {dim}")

Total features: 35


In [None]:
va = VectorAssembler(
    inputCols=train_joined_df.drop("customer_ID", "target").columns,
    outputCol="features",
    handleInvalid="error",
)

train_ready_df = (va.transform(train_joined_df)
                    .select(["customer_ID", "features", "target"])
                    .persist(StorageLevel.DISK_ONLY))

test_ready_df = (va.transform(test_df_grouped)
                   .select(["customer_ID", "features"])
                   .persist(StorageLevel.DISK_ONLY))

## Training

In [None]:
logres = LogisticRegression(featuresCol="features", labelCol="target")
logres_model = logres.fit(train_ready_df)

In [None]:
gbt = GBTClassifier(labelCol="target", featuresCol="features", maxIter=10)

# Train model.  This also runs the indexers.
model = gbt.fit(train_ready_df)

## Prediction

In [None]:
test_predictions = logres_model.transform(test_ready_df)
test_predictions

DataFrame[customer_ID: string, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [None]:
predictions = model.transform(test_ready_df)

# Select example rows to display.
predictions.select("prediction", "probability", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="probability", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only

+----------+--------------------+--------------------+
|prediction|         probability|            features|
+----------+--------------------+--------------------+
|       0.0|[0.69636525161398...|(66,[0,1,4,5,6,13...|
|       0.0|[0.91370669401726...|(66,[0,1,4,5,8,13...|
|       0.0|[0.91456496284576...|(66,[0,1,4,5,8,13...|
|       0.0|[0.89406083609393...|(66,[0,1,4,5,6,13...|
|       0.0|[0.92214781722093...|(66,[0,1,4,5,6,13...|
+----------+--------------------+--------------------+
only showing top 5 rows



IllegalArgumentException: ignored

## More and More