# Install

pip install pyspark

# Template

Data preprocessing is done on the side of traditional python

In [None]:
import pandas as pd
from sklearn.decomposition import PCA

def PCA_transformation(X_train, X_test, categorial_columns):

    pca = PCA(n_components=24) # In this case the explained_variance_ratio_ exceed 0.99999

    X = pd.concat((X_train, X_test), ignore_index=True)

    X_apply = X.drop(labels=categorial_columns, axis=1)

    X_pca = pca.fit_transform(X_apply)

    X_pca_std = X_pca.std(axis = 0)

    X_pca_norm = X_pca/X_pca_std

    df_pca = pd.DataFrame(data=X_pca_norm, columns=["Feature_{:02d}".format(i) for i in range(24)])

    df_combined = pd.concat([X[categorial_columns], df_pca], axis=1)

    return df_combined.iloc[:len(X_train)], df_combined.iloc[len(X_train):]

"""
Start loading the data
"""
X_train = pd.read_csv("../python/KNN_minkowski_features.csv", index_col=0)
y_train = X_train[X_train.columns[-1]]
X_train.drop(X_train.columns[-1], axis=1, inplace=True)

X_test = pd.read_csv("../python/KNN_minkowski_features_test.csv", index_col=0)


"""
Finish loading the data
"""

"""
PCA Transformation to reduce the dimensionality
"""
X_train_pca, X_test_pca = PCA_transformation(X_train, X_test, ['shop_id', 'item_id', 'cats'])

# Mean Coding on item_id, shop_id, and cat

In [None]:
#1. item_id

index_cols = ['shop_id', 'item_id', 'cats']

all_data = X_train_pca.copy()
all_data = pd.concat((X_train_pca, y_train), axis=1, join='inner')

all_data.sort_values(index_cols,inplace=True)

all_data['item_target_enc'] = all_data.groupby('item_id')['target'].transform('mean')
all_data['item_target_count'] = all_data.groupby('item_id')['target'].transform('count')
all_data['item_target_enc_smooth'] = (all_data['item_target_enc'] * all_data['item_target_count'] + 0.3343 * 100) \
/(all_data['item_target_count'] + 100.0)

all_data_item_encode = all_data[['shop_id', 'item_id', 'cats', 'item_target_enc_smooth']]

#2. shop_id

all_data['shop_target_enc'] = all_data.groupby('shop_id')['target'].transform('mean')
all_data['shop_target_count'] = all_data.groupby('shop_id')['target'].transform('count')
all_data['shop_target_enc_smooth'] = (all_data['shop_target_enc'] * all_data['shop_target_count'] + 0.3343 * 100) \
/(all_data['shop_target_count'] + 100.0)

all_data_shop_encode = all_data[['shop_id', 'item_id', 'cats', 'shop_target_enc_smooth']]

#3. cat

all_data['cat_target_enc'] = all_data.groupby('cats')['target'].transform('mean')
all_data['cat_target_count'] = all_data.groupby('cats')['target'].transform('count')
all_data['cat_target_enc_smooth'] = (all_data['cat_target_enc'] * all_data['cat_target_count'] + 0.3343 * 100) \
/(all_data['cat_target_count'] + 100.0)

all_data_cat_encode = all_data[['shop_id', 'item_id', 'cats', 'cat_target_enc_smooth']]

X_train_pca = X_train_pca.merge(all_data_item_encode, on=index_cols)
X_train_pca = X_train_pca.merge(all_data_shop_encode, on=index_cols)
X_train_pca = X_train_pca.merge(all_data_cat_encode, on=index_cols)

X_train_pca.drop(labels=index_cols, axis=1, inplace=True)

In [None]:
X_test_pca = X_test_pca.merge(all_data_item_encode, on=index_cols)
X_test_pca = X_test_pca.merge(all_data_shop_encode, on=index_cols)
X_test_pca = X_test_pca.merge(all_data_cat_encode, on=index_cols)

X_test_pca.drop(labels=index_cols, axis=1, inplace=True)

# PySpark Machine Learning

In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.classification import LogisticRegression

sc = SparkContext('local')
spark = SparkSession(sc)

In [None]:
X_train_pca_pyspark = pd.concat((X_train_pca, y_train), axis=1, join='inner')

"""
Transform the pandas dataframe to pyspark dataframe
"""

X_train_pyspark = spark.createDataFrame(X_train_pca_pyspark)
X_test_pyspark = spark.createDataFrame(X_test_pca)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

stages = []

numericCol = [feature for feature in X_train_pca.columns]
featureAssembler = VectorAssembler(inputCols=numericCol, outputCol='features')

stages += [featureAssembler]

# class pyspark.ml.regression.GBTRegressor(self, featuresCol="features", labelCol="label", predictionCol="prediction",
#                                          maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, 
#                                          maxMemoryInMB=256, cacheNodeIds=False, subsamplingRate=1.0, 
#                                          checkpointInterval=10, lossType="squared", maxIter=20, stepSize=0.1, 
#                                          seed=None, impurity="variance")

"""
Train a GBT model
"""
gbt = GBTRegressor(featuresCol="features", labelCol='target')

"""
Chain featureAssembler and GBT in a Pipeline
"""
pipeline = Pipeline(stages=[featureAssembler, gbt])

In [None]:
import numpy as np

"""
GridSearch
"""

paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth, np.arange(3, 12, 2)) \
                              .addGrid(gbt.minInstancesPerNode, np.arange(1, 12, 5)) \
                              .build()

evaluator = RegressionEvaluator(labelCol='target', metricName='mse')

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

gbt_cvModel = crossval.fit(X_train_pyspark)