# Download and Unzip Function

To see the download and unzipping progress

In [1]:
!pip install lemma-dev-utils
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting lemma-dev-utils
  Downloading lemma_dev_utils-0.0.2.tar.gz (3.0 kB)
Building wheels for collected packages: lemma-dev-utils
  Building wheel for lemma-dev-utils (setup.py) ... [?25l[?25hdone
  Created wheel for lemma-dev-utils: filename=lemma_dev_utils-0.0.2-py3-none-any.whl size=3524 sha256=a5d4b29a0723d6f0eb5264037e09cd56b73e01185c70250b57ab3892e15cab3a
  Stored in directory: /root/.cache/pip/wheels/be/6a/ac/93a152a4146982dfdfc411e32037c303ada53bf5ab93f8939f
Successfully built lemma-dev-utils
Installing collected packages: lemma-dev-utils
Successfully installed lemma-dev-utils-0.0.2
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-non

# Downloading the Dataset

In [2]:
with open('dataset_url.txt', 'r') as f:
    url = f.read()

In [3]:
from lemma_dev_utils import download_unzip
import os

path = 'dataset'

download_unzip(path, url)
os.chdir(path)

Downloading amex-default-prediction.zip: 100%|##########| 21981528023/21981528023 [02:36<00:00, 140755241.14it/s]
Unzipping sample_submission.csv: 100%|##########| 61956097/61956097 [00:01<00:00, 43955787.81it/s]
Unzipping test_data.csv: 100%|##########| 33824849921/33824849921 [07:50<00:00, 71822258.27it/s]
Unzipping train_data.csv: 100%|##########| 16393289729/16393289729 [03:43<00:00, 73423787.95it/s]
Unzipping train_labels.csv: 100%|##########| 30752769/30752769 [00:00<00:00, 62095775.23it/s]


# Setting up Spark

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
        
sc = spark.sparkContext

# Loading the Training Set

In [5]:
raw_data = sc.textFile("train_data.csv")
raw_labels = sc.textFile("train_labels.csv")

# Preprocessing



## Managing Numeric and NaN values

In [6]:
import math

def isnumeric_val(val):
    non_negative = val.replace("-", "", 1)
    non_decimal = non_negative.replace(".", "", 1)
    non_scientific = non_decimal.replace("e-", "", 1).replace("e", "", 1)
    isitnumeric = non_scientific.isdigit()
    return isitnumeric

def clean_split_csv(row):
    # Split Row
    split_row = row.split(',')
    new_row = list()

    for val in split_row:
        # Fill NaN
        val = val if val != '' else '0.0'
        # Rounding numbers
        val_toadd = math.floor(float(val)*1000)/1000 if isnumeric_val(val) else val
        new_row.append(val_toadd)

    return tuple(new_row)

## Getting features, labels and the header

In [7]:
# cleaning the training data
clean_data = raw_data.map(lambda row: clean_split_csv(row))
header_features = clean_data.take(1)[0]
features = clean_data.filter(lambda x: x != header_features)

# cleaning the training labels
clean_labels = raw_labels.map(lambda row: clean_split_csv(row))
header_labels = clean_labels.take(1)[0]
labels = clean_labels.filter(lambda x: x != header_labels)

## Sampling the dataset

In [8]:
sampled_data = features.sample(withReplacement = False, fraction = 0.1, seed = 42)

## Get number of partitions I have

In [9]:
sampled_data.getNumPartitions()

489

## Getting the feature dense shape

# From RDD to DF

In [10]:
%%time
# around 26 minutes
df = sampled_data.toDF()
df_schema = df.schema
df.write.csv("sample_train_data_(spark).csv")
del df

CPU times: user 9.01 s, sys: 1.28 s, total: 10.3 s
Wall time: 26min 26s


In [11]:
df = spark.read.format("csv") \
    .option("header", True) \
    .schema(df_schema) \
    .load("sample_train_data_(spark).csv")

In [12]:
df.rdd.getNumPartitions()

20

In [13]:
df.show()

+--------------------+----------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---+---+-----+-----+-----+-----+-----+---+-----+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+-----+-----+----+-----+-----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+-----+----+----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+----+-----+-----+-----+-----+----

## Checking column names

In [14]:
df_columns = df.columns
len(header_features) == len(df_columns)

True

In [15]:
header_features == df_columns

False

In [16]:
header_features[:5], df_columns[:5]

(('customer_ID', 'S_2', 'P_2', 'D_39', 'B_1'), ['_1', '_2', '_3', '_4', '_5'])

## Rename with original column names

In [17]:
for to_rename, correct_name in zip(df_columns, header_features):
    df = df.withColumnRenamed(to_rename, correct_name)

## Looking at the dataset shape and schema

In [18]:
df.show()

+--------------------+----------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+-----+-----+-----+-----+-----+----+-----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+-----+-----+----+-----+-----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----

In [19]:
schema = {col.name : str(col.dataType) for col in df.schema}
print(*[f"{x}\n" for x in schema.items() if x[1] != 'DoubleType()'])

('customer_ID', 'StringType()')
 ('S_2', 'StringType()')
 ('D_63', 'StringType()')
 ('D_64', 'StringType()')



In [20]:
n_rows = df.count()
n_columns = len(df_columns)
print(f"n° o Rows: {n_rows}")
print(f"n° o Columns: {n_columns}")

n° o Rows: 552289
n° o Columns: 190


In [21]:
df.printSchema()

root
 |-- customer_ID: string (nullable = true)
 |-- S_2: string (nullable = true)
 |-- P_2: double (nullable = true)
 |-- D_39: double (nullable = true)
 |-- B_1: double (nullable = true)
 |-- B_2: double (nullable = true)
 |-- R_1: double (nullable = true)
 |-- S_3: double (nullable = true)
 |-- D_41: double (nullable = true)
 |-- B_3: double (nullable = true)
 |-- D_42: double (nullable = true)
 |-- D_43: double (nullable = true)
 |-- D_44: double (nullable = true)
 |-- B_4: double (nullable = true)
 |-- D_45: double (nullable = true)
 |-- B_5: double (nullable = true)
 |-- R_2: double (nullable = true)
 |-- D_46: double (nullable = true)
 |-- D_47: double (nullable = true)
 |-- D_48: double (nullable = true)
 |-- D_49: double (nullable = true)
 |-- B_6: double (nullable = true)
 |-- B_7: double (nullable = true)
 |-- B_8: double (nullable = true)
 |-- D_50: double (nullable = true)
 |-- D_51: double (nullable = true)
 |-- B_9: double (nullable = true)
 |-- R_3: double (nullable = t

In [22]:
# input from American Express challenge
categorical_variables = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']

In [23]:
categorical_df = df.select(categorical_variables)

In [24]:
categorical_df.show()

+----+----+-----+-----+-----+-----+-----+----+----+----+----+
|B_30|B_38|D_114|D_116|D_117|D_120|D_126|D_63|D_64|D_66|D_68|
+----+----+-----+-----+-----+-----+-----+----+----+----+----+
| 0.0| 2.0|  0.0|  0.0|  3.0|  1.0|  1.0|  CO|   U| 0.0| 4.0|
| 0.0| 3.0|  1.0|  0.0| -1.0|  0.0|  1.0|  CR|   R| 0.0| 2.0|
| 0.0| 3.0|  1.0|  0.0|  3.0|  0.0|  1.0|  CR|   U| 0.0| 2.0|
| 0.0| 2.0|  1.0|  0.0|  2.0|  0.0|  1.0|  CL|   O| 0.0| 6.0|
| 0.0| 2.0|  1.0|  0.0|  2.0|  0.0|  1.0|  CL|   O| 0.0| 6.0|
| 0.0| 2.0|  0.0|  0.0|  2.0|  0.0|  1.0|  CL|   O| 0.0| 6.0|
| 0.0| 2.0|  0.0|  0.0|  2.0|  0.0|  1.0|  CL|   O| 0.0| 6.0|
| 1.0| 5.0|  1.0|  0.0|  2.0|  0.0|  1.0|  CO|   O| 0.0| 6.0|
| 1.0| 5.0|  1.0|  0.0|  2.0|  0.0|  1.0|  CO|   O| 0.0| 6.0|
| 0.0| 2.0|  0.0|  0.0|  2.0|  0.0| -1.0|  CL|   R| 1.0| 5.0|
| 0.0| 2.0|  0.0|  0.0|  2.0|  0.0|  0.0|  CL|   R| 1.0| 5.0|
| 0.0| 2.0|  0.0|  0.0|  6.0|  0.0|  1.0|  CO|   O| 0.0| 6.0|
| 0.0| 2.0|  0.0|  0.0|  0.0|  0.0|  0.0|  CO| 0.0| 0.0| 0.0|
| 0.0| 2

In [25]:
categorical_df.describe().show()

+-------+-------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+------+--------------------+-------------------+------------------+
|summary|               B_30|              B_38|              D_114|               D_116|             D_117|              D_120|             D_126|  D_63|                D_64|               D_66|              D_68|
+-------+-------------------+------------------+-------------------+--------------------+------------------+-------------------+------------------+------+--------------------+-------------------+------------------+
|  count|             552289|            552289|             552289|              552289|            552289|             552289|            552289|552289|              552289|             552289|            552289|
|   mean|0.15865787658273114|2.7179719313620225| 0.5993800347281948|0.001249345904046...|2.2408901861163266|0.11325954346365762|0.7236157156

In [26]:
categorical_df.describe().filter("summary = 'max'").collect()

[Row(summary='max', B_30='2.0', B_38='7.0', D_114='1.0', D_116='1.0', D_117='6.0', D_120='1.0', D_126='1.0', D_63='XZ', D_64='U', D_66='1.0', D_68='6.0')]

In [27]:
categorical_df.describe().filter("summary = 'min'").collect()

[Row(summary='min', B_30='0.0', B_38='0.0', D_114='0.0', D_116='0.0', D_117='-1.0', D_120='0.0', D_126='-1.0', D_63='CL', D_64='-1.0', D_66='0.0', D_68='0.0')]

D_117 and D_126 may be a problem as they were known values but we changed the nan values to 0, they have negative values I'll make them 0.5 to adjust to a category semantically closer to the original one

# Fixing the negative values appearing in the categories

In [28]:
from pyspark.sql.functions import col, when

for i in ['D_117', 'D_126', 'D_64']:
    df = df.withColumn(i, when((col(i) == -1.0), 0.5).otherwise(col(i)))

In [29]:
df.select(categorical_variables).describe().filter("summary = 'min'").collect()

[Row(summary='min', B_30='0.0', B_38='0.0', D_114='0.0', D_116='0.0', D_117='0.0', D_120='0.0', D_126='0.0', D_63='CL', D_64='0.0', D_66='0.0', D_68='0.0')]

# Casting string-type categories to numeric
As per the request of Spark

In [30]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

new_categorical_variables = [f'{col}_numeric' for col in categorical_variables]
indexer = StringIndexer(inputCols=categorical_variables, outputCols=new_categorical_variables).fit(df)
df = indexer.transform(df)

In [31]:
categorical_df = df.select(categorical_variables+new_categorical_variables)

In [32]:
categorical_df.groupby(['B_38_numeric', 'B_38']).count().show()

+------------+----+------+
|B_38_numeric|B_38| count|
+------------+----+------+
|         3.0| 5.0| 44289|
|         6.0| 6.0| 16372|
|         4.0| 4.0| 29921|
|         7.0| 0.0|   198|
|         2.0| 1.0|116053|
|         1.0| 3.0|124785|
|         0.0| 2.0|194672|
|         5.0| 7.0| 25999|
+------------+----+------+



In [33]:
categorical_df.groupby(['D_63_numeric', 'D_63']).count().show()

+------------+----+------+
|D_63_numeric|D_63| count|
+------------+----+------+
|         3.0|  XZ|  2611|
|         5.0|  XL|   661|
|         4.0|  XM|  1038|
|         0.0|  CO|411835|
|         2.0|  CL| 43521|
|         1.0|  CR| 92623|
+------------+----+------+



In [34]:
# somewhat a heavy operation (it could be avoided) but I prefer to keep the original category labels
reverse_dictionary = {col : dict(categorical_df.groupby([f'{col}_numeric', col]).count()\
                                 .drop('count').collect()) for col in categorical_variables}

reverse_dictionary['D_63']

{3.0: 'XZ', 5.0: 'XL', 4.0: 'XM', 0.0: 'CO', 2.0: 'CL', 1.0: 'CR'}

In [35]:
reverse_dictionary.keys()

dict_keys(['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68'])

In [36]:
# dropping old columns from dataframe
drop_statement = 'df = df'
for col in categorical_variables:
    drop_statement += f'.drop("{col}")'
exec(drop_statement)

# One hot encoding Categorical variables

In [37]:
from pyspark.ml.feature import OneHotEncoder

output_col_names = [f'{col}_enc' for col in categorical_variables]
encoder = OneHotEncoder(inputCols = new_categorical_variables, 
                        outputCols = output_col_names)

model = encoder.fit(df)
df = model.transform(df)

# dropping the numeric columns from df
drop_statement = 'df = df'
for col in new_categorical_variables:
    drop_statement += f'.drop("{col}")'
exec(drop_statement)

df.select(output_col_names).show()

+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|     B_30_enc|     B_38_enc|    D_114_enc|    D_116_enc|    D_117_enc|    D_120_enc|    D_126_enc|     D_63_enc|     D_64_enc|     D_66_enc|     D_68_enc|
+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|(2,[0],[1.0])|(7,[0],[1.0])|    (1,[],[])|(1,[0],[1.0])|(7,[1],[1.0])|    (1,[],[])|(2,[0],[1.0])|(5,[0],[1.0])|(4,[1],[1.0])|(1,[0],[1.0])|(6,[3],[1.0])|
|(2,[0],[1.0])|(7,[1],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|(7,[0],[1.0])|(1,[0],[1.0])|(2,[0],[1.0])|(5,[1],[1.0])|(4,[2],[1.0])|(1,[0],[1.0])|(6,[5],[1.0])|
|(2,[0],[1.0])|(7,[1],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|(7,[1],[1.0])|(1,[0],[1.0])|(2,[0],[1.0])|(5,[1],[1.0])|(4,[1],[1.0])|(1,[0],[1.0])|(6,[5],[1.0])|
|(2,[0],[1.0])|(7,[0],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|(7,[3],

In [38]:
# renaming back columns to original names
for n in range(len(output_col_names)):
    df = df.withColumnRenamed(output_col_names[n], categorical_variables[n])

# Getting the Labels to Join them

In [39]:
label_df = labels.toDF()
label_cols = label_df.columns
for n in range(len(label_cols)):
    label_df = label_df.withColumnRenamed(label_cols[n], header_labels[n])
label_df.show()

+--------------------+------+
|         customer_ID|target|
+--------------------+------+
|0000099d6bd597052...|   0.0|
|00000fd6641609c6e...|   0.0|
|00001b22f846c82c5...|   0.0|
|000041bdba6ecadd8...|   0.0|
|00007889e4fcd2614...|   0.0|
|000084e5023181993...|   0.0|
|000098081fde4fd64...|   0.0|
|0000d17a1447b25a0...|   0.0|
|0000f99513770170a...|   1.0|
|00013181a0c5fc8f1...|   1.0|
|0001337ded4e1c253...|   1.0|
|00013c6e1cec7c21b...|   1.0|
|0001812036f155833...|   1.0|
|00018dd4932409baf...|   0.0|
|000198b3dc70edd65...|   0.0|
|000201146e53cacdd...|   0.0|
|0002d381bdd8048d7...|   0.0|
|0002e335892f7998f...|   1.0|
|00031e8be98bc3411...|   0.0|
|000333075fb8ec6d5...|   1.0|
+--------------------+------+
only showing top 20 rows



In [40]:
df = df.join(label_df, ['customer_ID'])

In [41]:
from pyspark.ml.feature import VectorAssembler

columns_final = ['customer_ID', 'S_2', 'target']
feature_to_assemble = [col for col in header_features if col not in columns_final]

vecAssembler = VectorAssembler(inputCols=feature_to_assemble, 
                               outputCol="features")

assembled_df = vecAssembler.transform(df).select(columns_final+['features'])

In [42]:
assembled_df.show()

+--------------------+----------+------+--------------------+
|         customer_ID|       S_2|target|            features|
+--------------------+----------+------+--------------------+
|000201146e53cacdd...|2017-06-30|   0.0|[0.948,0.12,0.194...|
|000473eb907b57c8c...|2017-12-08|   1.0|[0.498,0.029,0.69...|
|00050d84c6d26e26c...|2017-08-25|   0.0|[0.937,0.208,0.03...|
|00055add5eaee481b...|2017-03-17|   0.0|[0.841,0.002,0.00...|
|00055add5eaee481b...|2017-07-12|   0.0|[0.834,0.002,0.01...|
|00055add5eaee481b...|2018-03-17|   0.0|[0.828,0.003,0.00...|
|000678921d09c5503...|2017-06-03|   0.0|[0.955,0.005,0.00...|
|000678921d09c5503...|2017-07-04|   0.0|[0.963,0.007,0.00...|
|0008ef32824d3067b...|2017-06-12|   0.0|[0.866,0.007,0.00...|
|0008ef32824d3067b...|2017-07-13|   0.0|[0.822,0.0,0.004,...|
|00093b69756b1afe3...|2017-12-11|   0.0|[0.741,0.008,0.56...|
|00093b69756b1afe3...|2018-01-14|   0.0|[1.004,0.001,0.67...|
|000940eb997a3356d...|2017-04-26|   0.0|[1.002,0.478,0.11...|
|000940e

# Feature Selection

In [43]:
from pyspark.ml.feature import UnivariateFeatureSelector

# ChiSqSelector - Deprecated since version 3.1.0: Use UnivariateFeatureSelector

selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="target", selectionMode="numTopFeatures")

selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(20)

columns_final.append('selectedFeatures')

result = selector.fit(assembled_df).transform(assembled_df).select(columns_final)

print("UnivariateFeatureSelector output with top %d features selected using f_classif"
      % selector.getSelectionThreshold())

UnivariateFeatureSelector output with top 20 features selected using f_classif


In [44]:
result.take(1)

[Row(customer_ID='000201146e53cacdde1c7e9d29f4d3c46fd4d9231a3744aa39fb9c6afa79b708', S_2='2017-06-30', target=0.0, selectedFeatures=DenseVector([0.948, 0.12, 0.194, 0.027, 0.004, 0.124, 0.0, 0.695, 0.0, 0.257, 0.002, 0.143, 0.12, 0.007, 0.007, 0.237, 0.36, 0.079, 0.0, 0.053]))]

# Train-test split
Not stratified (yet to implement)

In [45]:
train, validation = result.randomSplit(weights=[0.8,0.2], seed=42)

# Random Forest

In [46]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="target", featuresCol="selectedFeatures", seed=42, numTrees=30)
model = rf.fit(train)

In [47]:
model.featureImportances

SparseVector(20, {0: 0.2998, 2: 0.0934, 3: 0.0237, 4: 0.0728, 5: 0.0263, 6: 0.0018, 7: 0.0529, 8: 0.0308, 9: 0.0037, 10: 0.1148, 11: 0.0282, 12: 0.032, 13: 0.003, 14: 0.0029, 15: 0.0005, 16: 0.0091, 17: 0.1487, 19: 0.0556})

In [48]:
predictions = model.transform(validation)

In [49]:
predictions.show()

+--------------------+----------+------+--------------------+--------------------+--------------------+----------+
|         customer_ID|       S_2|target|    selectedFeatures|       rawPrediction|         probability|prediction|
+--------------------+----------+------+--------------------+--------------------+--------------------+----------+
|00050d84c6d26e26c...|2017-08-25|   0.0|[0.937,0.208,0.03...|[28.9932153755336...|[0.96644051251778...|       0.0|
|000678921d09c5503...|2017-06-03|   0.0|[0.955,0.005,0.00...|[28.9932153755336...|[0.96644051251778...|       0.0|
|0008ef32824d3067b...|2017-06-12|   0.0|[0.866,0.007,0.00...|[28.3309970231094...|[0.94436656743698...|       0.0|
|000940eb997a3356d...|2017-12-02|   0.0|[0.913,0.0,0.079,...|[25.4202619549414...|[0.84734206516471...|       0.0|
|0013037420169086d...|2017-05-31|   0.0|[0.801,0.006,0.01...|[26.1965848707431...|[0.87321949569143...|       0.0|
|001470351ad71323c...|2017-06-09|   0.0|[0.243,0.534,0.41...|[7.19606766826092..

In [50]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="target",
                                              predictionCol="prediction",
                                              metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
print(f"Test Error = {(1.0 - accuracy)}")

Accuracy: 0.8552130364365144
Test Error = 0.14478696356348564
