In [1]:
%%time
!pip install gcsfs
import pandas as pd
import os, sys, time, json, re, string, datetime

import gcsfs

from pyspark import SparkContext, SparkConf, StorageLevel, keyword_only

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.param.shared import HasInputCol, HasInputCols, HasOutputCol, HasOutputCols, Param
from pyspark.ml.feature import OneHotEncoder, HashingTF, IDF, Tokenizer, RegexTokenizer, NGram, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler, PCA, OneHotEncoderEstimator,StringIndexer

from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, RandomForestClassifier, LinearSVC
from pyspark.ml.classification import GBTClassifier, MultilayerPerceptronClassifier

from pyspark.ml import Pipeline, Transformer

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from platform import python_version
print("python version: ", python_version())
sc

python version:  3.6.10
CPU times: user 127 ms, sys: 15.5 ms, total: 143 ms
Wall time: 1.59 s


In [2]:
%%time
spark = SparkSession.builder \
        .appName("A0186371Y") \
        .config("spark.master", "yarn") \
        .config("spark.submit.deployMode", "cluster") \
        .config("spark.driver.memory", "10g") \
        .config("spark.driver.cores", "4") \
        .config("spark.executor.instances", "3") \
        .config("spark.executor.cores", "4") \
        .config("spark.executor.memory", "10g") \
        .getOrCreate()

CPU times: user 7.32 ms, sys: 0 ns, total: 7.32 ms
Wall time: 8.2 ms


In [3]:
%%time

pd_df_train = pd.read_csv('gs://dataproc-6ca41800-27b4-47d5-abee-55c011dfa389-asia-southeast1/data/kaggle/train.csv')
pd_df_test = pd.read_csv('gs://dataproc-6ca41800-27b4-47d5-abee-55c011dfa389-asia-southeast1/data/kaggle/test.csv')
pd_df_train.info()
pd_df_train['Outcome'].value_counts()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 44183 entries, 0 to 44182
Data columns (total 3 columns):
Comment    44183 non-null object
Outcome    44183 non-null int64
Id         44183 non-null int64
dtypes: int64(2), object(1)
memory usage: 1.0+ MB
CPU times: user 689 ms, sys: 80.1 ms, total: 769 ms
Wall time: 2.17 s


1    24992
0    19191
Name: Outcome, dtype: int64

In [4]:
py_df_train = spark.createDataFrame(pd_df_train, schema = "Comment STRING, Outcome INT, Id STRING")
py_df_test = spark.createDataFrame(pd_df_test, schema = "Comment STRING, Id STRING")

py_df_train.printSchema()
py_df_train.groupby('Outcome').count().show()

data_train, data_test = py_df_train.randomSplit([0.8, 0.2], seed=123)

root
 |-- Comment: string (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- Id: string (nullable = true)

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|24992|
|      0|19191|
+-------+-----+



In [5]:
# customized transformer class to manually extract some counting based word features
class TextFeatureTransformer(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol="Comment", outputCol="word_features"):
        super(TextFeatureTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
        
    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)        

    def _transform(self, dataset):        
        def f(data):
            word_count = len(data)
            unique_word_count = len(set(data))
            char_count = 0 
            dict_punct_count = {}                      
            for c in data:
                if str(c) in string.punctuation:
                    dict_punct_count[str(c)] = dict_punct_count.get(str(c), 0) + 1                
                else: 
                    char_count += 1

            punctuation_count = sum(dict_punct_count.values())        
            feature_list = [word_count,
                            char_count, char_count / (word_count + 1e-10),
                            unique_word_count, unique_word_count / (word_count + 1e-10),
                            punctuation_count, punctuation_count / (word_count + 1e-10)]

            puntuation_list = ["!", "?", "=", "{", "<", ">", "(", "+", "-", "*", "/", "["]             
            for p in puntuation_list:
                value = 0.0
                for key in dict_punct_count:
                    if key == p:
                        value = dict_punct_count.get(key, 0)

                feature_list.append(value)
                feature_list.append(value / (word_count + 1e-10))
            
            return Vectors.dense(feature_list)
        
        return dataset.withColumn(self.getOutputCol(), F.udf(f, VectorUDT())(dataset[self.getInputCol()]))

In [6]:
def build_pipeline(vocab_size=5000):
    preproc_steps = [
        RegexTokenizer(inputCol="Comment", outputCol="all_words", pattern=r"\W"),
        StopWordsRemover(inputCol="all_words", outputCol="words"),
        CountVectorizer(inputCol="words", outputCol="tf_features", vocabSize=vocab_size),
        IDF(inputCol="tf_features", outputCol="tfidf_features"),
        TextFeatureTransformer(inputCol="words", outputCol="word_features"),
        VectorAssembler(inputCols=["tfidf_features", "word_features"], outputCol="features")
    ]
    return Pipeline(stages=preproc_steps)

In [7]:
def build_models(df_train, df_test, labelCol = 'Outcome', featuresCol='features'):
    lr_model = LogisticRegression(featuresCol=featuresCol, labelCol=labelCol,
                                  predictionCol='prediction', probabilityCol='probability', 
                                  rawPredictionCol='rawPrediction',
                                  family='binomial', fitIntercept=True, 
                                  threshold=0.5, standardization=False, 
                                  maxIter=200, regParam=0.001, elasticNetParam=0, tol=1e-06,  aggregationDepth=2)
    
    lr_param_grid = ParamGridBuilder() \
                    .addGrid(lr_model.regParam, [0.001, 0.005]) \
                    .addGrid(lr_model.standardization, [False, True]) \
                    .addGrid(lr_model.elasticNetParam, [0.0, 0.5 , 1]) \
                    .build()
    
    svm_model = LinearSVC(featuresCol=featuresCol, labelCol=labelCol, 
                          predictionCol='prediction', rawPredictionCol='rawPrediction', 
                          maxIter=100, regParam=0.001, tol=1e-06,
                          fitIntercept=True, standardization=False, threshold=0.0, aggregationDepth=2)
    
    svm_param_grid = ParamGridBuilder() \
                        .addGrid(svm_model.regParam, [0.01, 0.005]) \
                        .addGrid(svm_model.standardization, [False, True]) \
                        .build()
    
    dt_model = DecisionTreeClassifier(featuresCol=featuresCol, labelCol=labelCol,
                                      predictionCol='prediction', probabilityCol='probability', rawPredictionCol='rawPrediction', 
                                      maxDepth=10, maxBins=32, 
                                      minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=2048, 
                                      cacheNodeIds=True, checkpointInterval=10, impurity='gini', seed=666) 
    
    dt_param_grid = ParamGridBuilder() \
                     .addGrid(dt_model.maxDepth, [10, 20, 30])  \
                     .addGrid(dt_model.maxBins, [20, 40, 60])  \
                     .build()
        
    rf_model = RandomForestClassifier(featuresCol=featuresCol, labelCol=labelCol,
                                      predictionCol='prediction', probabilityCol='probability', 
                                      rawPredictionCol='rawPrediction',
                                      maxDepth=10, maxBins=32, numTrees=200,
                                      minInstancesPerNode=1, minInfoGain=0.0, 
                                      maxMemoryInMB=2048, cacheNodeIds=True, 
                                      checkpointInterval=10, impurity='gini', 
                                      featureSubsetStrategy='auto', seed=666, subsamplingRate=0.8)
    rf_param_grid = ParamGridBuilder() \
                     .addGrid(rf_model.maxDepth, [10, 20, 30])  \
                     .addGrid(rf_model.maxBins, [20, 40, 60])  \
                     .addGrid(rf_model.numTrees, [100, 200, 300])  \
                     .build()
    
    gbt_model = GBTClassifier(featuresCol=featuresCol, labelCol=labelCol, maxIter=250)
    gbt_param_grid = ParamGridBuilder().build()

    mp_model = MultilayerPerceptronClassifier(featuresCol=featuresCol, labelCol=labelCol,
                                              predictionCol='prediction', 
                                              layers=[df_train.schema["features"].metadata["ml_attr"]["num_attrs"],20, 10, 2],  
                                              maxIter=250, blockSize=128, seed=1234)
    gbt_param_grid = ParamGridBuilder().build()
    
    nb_model = NaiveBayes(featuresCol='features', labelCol='Outcome', 
                          predictionCol='prediction', probabilityCol='probability',
                          rawPredictionCol='rawPrediction',
                          smoothing=1, modelType='multinomial')
    nb_param_grid = ParamGridBuilder().build()
    
    ##[model_name, model, parameter_gride, run_flag, tuning_flag, probability_col]
    models = [["LogisticRegression", lr_model, lr_param_grid, True, False, "probability"],
              ["LinearSVC",svm_model, svm_param_grid, True, False, "rawPrediction"],
              ["DecisionTreeClassifier",dt_model, dt_param_grid, True, False, "probability"],
              ["RandomForestClassifier",rf_model, rf_param_grid, True, False, "probability"],
              ["GBTClassifier", gbt_model, gbt_param_grid, True, False, "probability"],
              ["MultilayerPerceptronClassifier" , mp_model, gbt_param_grid, True, False, "probability"],
              ["NaiveBayes",nb_model, nb_param_grid, True, False, "probability"]]
    return models

In [8]:
def run_models(models, df_train, df_test, label_col="Outcome", featuresCol='features', prediction_col="prediction"): 
    best_model_name = ""
    best_model = None
    best_accuracy = 0
    for item in models:        
        name = item[0]
        model = item[1]
        param_grid = item[2]
        run_flag = item[3]
        tuning_flag = item[4]
        probability_col = item[5]
        
        if run_flag:                
            print("**********Run Model {0}**********".format(name))    
            t = time.time()        
            selected_model = model

            if tuning_flag:
                evaluator = BinaryClassificationEvaluator(rawPredictionCol=probability_col, labelCol=label_col, metricName="areaUnderROC")
                crossval = CrossValidator(estimator=model, evaluator=evaluator,estimatorParamMaps=param_grid,
                                          numFolds=2, parallelism=3, seed=234)
                crossval_model = crossval.fit(df_train)
                selected_model = crossval_model.bestModel 
                print("***Best Params: ") 
                print(selected_model.explainParams())
            else:
                selected_model = model.fit(df_train)

            pred_dataset = selected_model.transform(df_test)

            eval_dataset = pred_dataset.select(label_col, prediction_col, probability_col)
            model_eval = MulticlassClassificationEvaluator(predictionCol=prediction_col, labelCol=label_col)
            accuracy_val = model_eval.evaluate(eval_dataset, {model_eval.metricName: 'accuracy'})
            print("accuracy = {0}".format(accuracy_val))    
            print("time taken for {0}: {1}".format(name, str(datetime.timedelta(seconds=time.time() - t))))

        if accuracy_val > best_accuracy:
            best_model_name = name
            best_model = selected_model
            best_accuracy = accuracy_val
    
    return best_model_name, best_model, best_accuracy

In [9]:
%%time
vocab_size = 2000
preproc_model = build_pipeline(vocab_size).fit(data_train)
df_train = preproc_model.transform(data_train).select("Outcome", "features")
df_test = preproc_model.transform(data_test).select("Outcome", "features")

CPU times: user 174 ms, sys: 27.8 ms, total: 201 ms
Wall time: 10.6 s


In [10]:
print(df_train.take(1))

[Row(Outcome=0, features=SparseVector(2031, {0: 1.8053, 2: 2.3572, 4: 2.9163, 8: 3.6794, 24: 2.085, 30: 1.7174, 33: 3.817, 42: 2.4743, 62: 2.8825, 64: 7.7216, 89: 2.6441, 130: 3.3884, 171: 3.7089, 191: 3.4429, 192: 3.509, 222: 6.5558, 282: 3.4106, 294: 4.2749, 354: 4.0608, 444: 4.1202, 799: 4.8562, 1174: 11.0987, 1768: 15.9829, 1823: 5.5493, 2000: 50.0, 2001: 50.0, 2002: 1.0, 2003: 36.0, 2004: 0.72}))]


In [11]:
print(df_test.take(1))

[Row(Outcome=1, features=SparseVector(2031, {0: 0.9026, 6: 1.6535, 18: 2.0475, 24: 6.2549, 41: 1.9133, 47: 5.1899, 135: 2.711, 145: 3.1581, 164: 2.9046, 283: 3.591, 338: 4.4729, 572: 4.4055, 601: 4.5558, 844: 4.779, 1322: 4.9241, 1471: 5.5865, 1477: 5.2764, 2000: 23.0, 2001: 23.0, 2002: 1.0, 2003: 19.0, 2004: 0.8261}))]


In [None]:
%%time
models = build_models(df_train, df_test, labelCol='Outcome', featuresCol='features')
best_model_name, best_model, best_accuracy = run_models(models, df_train, df_test)
print("**********Winner is {0} with accuracy = {1}**********".format(best_model_name, best_accuracy))

**********Run Model LogisticRegression**********
accuracy = 0.6778942672317645
time taken for LogisticRegression: 0:00:41.824251
**********Run Model LinearSVC**********
accuracy = 0.6792326567031006
time taken for LinearSVC: 0:09:50.264606
**********Run Model DecisionTreeClassifier**********
accuracy = 0.6143207673432969
time taken for DecisionTreeClassifier: 0:00:16.515173
**********Run Model RandomForestClassifier**********
accuracy = 0.6359580637965648
time taken for RandomForestClassifier: 0:00:29.678597
**********Run Model GBTClassifier**********
accuracy = 0.6868168637073389
time taken for GBTClassifier: 0:12:41.413851
**********Run Model MultilayerPerceptronClassifier**********
accuracy = 0.67945572161499
time taken for MultilayerPerceptronClassifier: 0:01:28.141524
**********Run Model NaiveBayes**********
accuracy = 0.6272585322328798
time taken for NaiveBayes: 0:00:05.084994
**********Winner is GBTClassifier with accuracy = 0.6868168637073389**********
CPU times: user 1.55 s, 