# Model training

In [1]:
%%configure -f
{
   "conf":{
      "spark.pyspark.python":"python3",
      "spark.pyspark.virtualenv.enabled":"true",
      "spark.pyspark.virtualenv.type":"native",
      "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
      "spark.jars.packages":"ml.combust.mleap:mleap-spark_2.11:0.16.0"
   }
}

In [2]:
sc.install_pypi_package("pyarrow==0.14")
sc.install_pypi_package("pandas==1.2.0")
sc.install_pypi_package("matplotlib")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1611401876384_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…




In [25]:
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.stat import Correlation
from pyspark.mllib.evaluation import MulticlassMetrics

import zipfile
import tarfile
import os
import json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import boto3
from pprint import pprint
from mleap.pyspark.spark_support import SimpleSparkSerializer

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)
pd.set_option('display.float_format', '{:0.4f}'.format)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
bucket_name = "ratemypost-pre"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data exploration

In [4]:
# Load training data
df = spark.sql("select * from training.model_data")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df.show(1, vertical=True, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 dataset_name                               | android-stackexchange-com                                                                                                                                                                                                                                                                                                                            
 post_id                                    | 11                                                                                                                                                                                

In [6]:
# check NA
df.select([f.count(f.when(f.isnan(c[0]) | f.isnull(c[0]), c[0])).alias(c[0]) for c in df.dtypes if not c[1] in ("string", "timestamp", "array<string>")])\
.toPandas().transpose().sum().sum() == 0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

In [7]:
# Potential target variables
df.agg(*[f.mean(c).alias(c) for c in ["answer_1d_flag",
                                     "answer_7d_flag",
                                     "answer_14d_flag",
                                     "answer_30d_flag",
                                     "answer_accepted_1d_flag",
                                     "answer_accepted_7d_flag",
                                     "answer_accepted_14d_flag",
                                     "answer_accepted_30d_flag",
                                     "post_closed_flag"]]
).toPandas().transpose()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                              0
answer_1d_flag           0.5677
answer_7d_flag           0.6928
answer_14d_flag          0.7079
answer_30d_flag          0.7222
answer_accepted_1d_flag  0.1449
answer_accepted_7d_flag  0.2884
answer_accepted_14d_flag 0.3084
answer_accepted_30d_flag 0.3227
post_closed_flag         0.1064

In [8]:
y = "answer_accepted_7d_flag"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
df.groupBy("dataset_name").agg(f.mean("answer_accepted_7d_flag").alias(y)).sort("dataset_name").toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                            dataset_name  answer_accepted_7d_flag
0              android-stackexchange-com                   0.1658
1                          askubuntu-com                   0.2089
2                   cs-stackexchange-com                   0.3656
3          datascience-stackexchange-com                   0.2381
4                  dba-stackexchange-com                   0.3717
5               devops-stackexchange-com                   0.2358
6              gamedev-stackexchange-com                   0.3931
7          raspberrypi-stackexchange-com                   0.2262
8  softwareengineering-stackexchange-com                   0.4248
9                 unix-stackexchange-com                   0.3645

In [10]:
# Features vs target
x = ['post_hour', 
     'post_dayofweek', 
     'post_month', 
     'post_year',
     'post_body_char_count',
     'post_body_nocode_char_count',
     'post_body_code_perc',
     'post_body_code_flag',
     'post_body_image_flag',
     'post_body_link_flag',
     'post_body_bold_flag',
     'post_title_upper_flag',
     'post_title_question_flag',
     'post_title_char_count',
     'post_tag_count', 
     'post_body_sentence_count', 
     'post_body_word_count', 
     'post_body_word_distinct_count', 
     'post_body_verb_perc', 
     'post_body_noun_perc', 
     'post_body_pronoun_perc', 
     'post_body_adjective_perc', 
     'post_body_adverb_perc', 
     'post_title_word_count', 
     'post_title_word_distinct_count', 
     'post_title_verb_perc', 
     'post_title_noun_perc', 
     'post_title_pronoun_perc', 
     'post_title_adjective_perc', 
     'post_title_adverb_perc', 
     'post_title_in_body_perc', 
     'tag_post_count_max', 
     'tag_post_count_30d_max', 
     'tag_post_count_365d_max',
     'tag_age_days_max',
     'tag_post_count_avg',
     'tag_post_count_30d_avg',
     'tag_post_count_365d_avg',
     'tag_age_days_avg',
     'user_age_days',
     'user_website_flag',
     'user_location_flag',
     'user_about_me_flag',
     'user_badge_count',
     'user_badge_1_count',
     'user_badge_2_count',
     'user_badge_3_count', 
     'user_post_count',
     'user_question_count',
     'user_answer_count',
     'user_first_post_flag',
     'user_first_question_flag',
     'user_answered_questions_count',
     'user_accepted_answers_count',
     'user_score',
     'user_question_score',
     'user_answer_score',
     'android_stackexchange_com_flag',
     'askubuntu_com_flag',
     'cs_stackexchange_com_flag',
     'datascience_stackexchange_com_flag',
     'dba_stackexchange_com_flag',
     'devops_stackexchange_com_flag',
     'gamedev_stackexchange_com_flag',
     'raspberrypi_stackexchange_com_flag',
     'softwareengineering_stackexchange_com_flag',
     'unix_stackexchange_com_flag'
    ]

df_model = df.select(*x, y).withColumnRenamed(y, "y")

x_mean_vs_y = \
df_model.groupBy("y")\
.agg(*[f.mean(c).alias(c) for c in x])\
.toPandas().transpose()

x_mean_vs_y["diff"] = np.abs(x_mean_vs_y[1]/x_mean_vs_y[0] -1)
x_mean_vs_y.sort_values(by="diff", ascending=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                   0         1   diff
android_stackexchange_com_flag                0.0336    0.0685 1.0383
y                                             1.0000    0.0000 1.0000
askubuntu_com_flag                            0.2873    0.4407 0.5343
softwareengineering_stackexchange_com_flag    0.0991    0.0544 0.4513
user_accepted_answers_count                   6.3641    3.5798 0.4375
raspberrypi_stackexchange_com_flag            0.0273    0.0379 0.3863
gamedev_stackexchange_com_flag                0.0752    0.0471 0.3744
user_question_score                          62.1691   39.2554 0.3686
user_answered_questions_count                 8.6108    5.6087 0.3486
user_question_count                           9.3433    6.2663 0.3293
user_first_post_flag                          0.4039    0.5352 0.3252
dba_stackexchange_com_flag                    0.1240    0.0850 0.3149
devops_stackexchange_com_flag                 0.0034    0.0045 0.3133
user_score          

### Correlation

In [11]:
assembler = VectorAssembler(inputCols=x, outputCol="features")
cor = Correlation.corr(assembler.transform(df_model).select("features"), "features", "spearman")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
cor_df = pd.DataFrame(
    pd.DataFrame(np.tril(cor.collect()[0][0].toArray()), columns=x, index=x).unstack().abs(),
).reset_index()
cor_df.columns = ["x1", "x2", "cor"]
cor_df = cor_df.loc[cor_df["x1"] != cor_df["x2"]].set_index("x1", "x2").sort_values(by="cor", ascending=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
cor_df.head(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                          x2    cor
x1                                                                 
user_badge_count                          user_badge_3_count 0.9994
post_body_nocode_char_count             post_body_word_count 0.9921
user_question_count            user_answered_questions_count 0.9787
tag_post_count_max                        tag_post_count_avg 0.9729
post_body_nocode_char_count    post_body_word_distinct_count 0.9716
tag_post_count_30d_max                tag_post_count_30d_avg 0.9682
tag_post_count_365d_max              tag_post_count_365d_avg 0.9672
post_body_word_count           post_body_word_distinct_count 0.9661
tag_age_days_max                            tag_age_days_avg 0.9660
user_post_count                          user_question_count 0.9610
tag_post_count_30d_avg               tag_post_count_365d_avg 0.9610
tag_post_count_30d_max               tag_post_count_365d_max 0.9604
user_score                               user_qu

In [14]:
x_drop = ["user_badge_count", "post_body_word_count", "user_question_count", 
          "tag_post_count_max", "tag_post_count_30d_max", "tag_post_count_365d_max", "tag_age_days_max"]

df_model = df_model.drop(*x_drop)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Estimation

In [22]:
train, test = df_model.randomSplit([0.7, 0.3], seed=1)
print(df.count(), train.count(), test.count())

train.select(f.mean("y")).show()
test.select(f.mean("y")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

837893 586356 251162
+-------------------+
|             avg(y)|
+-------------------+
|0.28840985051071105|
+-------------------+

+------------------+
|            avg(y)|
+------------------+
|0.2882920186970959|
+------------------+

In [23]:
# Baseline model
indexer = StringIndexer(inputCol="y", outputCol='label')
vec_assembler = VectorAssembler(inputCols=df_model.drop("y").columns, outputCol="features")
gbt = GBTClassifier(seed=1)
pipeline = Pipeline(stages=[indexer, vec_assembler, gbt])

model = pipeline.fit(train)
prediction = model.transform(test)

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Accuracy:", evaluator.evaluate(prediction))

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print("ROC AUC:", evaluator.evaluate(prediction))

evaluator = BinaryClassificationEvaluator(metricName="areaUnderPR")
print("PR AUC:", evaluator.evaluate(prediction))

metrics = MulticlassMetrics(prediction.select("prediction", "label").rdd.map(tuple))
print("\nConfusion matrix:\n", metrics.confusionMatrix().toArray()/test.count())

print("\nFeature importance")
pd.DataFrame(model.stages[2].featureImportances.toArray(), index=df_model.drop("y").columns)\
.sort_values(0, ascending=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.7212516224588115
ROC AUC: 0.7007285168117464
PR AUC: 0.4661150477993915

Confusion matrix:
 [[0.68021436 0.03149362]
 [0.24725476 0.04103726]]

Feature importance
                                                0
post_year                                  0.1383
post_body_code_perc                        0.0968
user_accepted_answers_count                0.0692
post_body_noun_perc                        0.0624
user_answered_questions_count              0.0512
post_body_word_distinct_count              0.0482
post_title_question_flag                   0.0456
askubuntu_com_flag                         0.0454
cs_stackexchange_com_flag                  0.0326
user_age_days                              0.0302
softwareengineering_stackexchange_com_flag 0.0286
tag_post_count_30d_avg                     0.0264
post_title_noun_perc                       0.0257
tag_post_count_365d_avg                    0.0253
post_body_char_count                       0.0241
gamedev_stackexchange_com

## Tuning

In [None]:
# Grid search
paramGrid = ParamGridBuilder() \
.addGrid(gbt.stepSize, [0.05, 0.1, 0.15])\
.addGrid(gbt.maxDepth, [5, 8, 10])\
.addGrid(gbt.minInstancesPerNode, [10, 50, 100])\
.build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(metricName="areaUnderPR"),
                          numFolds=5,
                         seed=1)

cv_model = crossval.fit(train)
prediction = cv_model.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-25:
Traceback (most recent call last):
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 997



In [36]:
model = cv_model.bestModel

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
print("PR AUC:", crossval.getEvaluator().evaluate(prediction))
print("ROC AUC:", BinaryClassificationEvaluator(metricName="areaUnderROC").evaluate(prediction))
print("Accuracy:", MulticlassClassificationEvaluator(metricName="accuracy").evaluate(prediction))

metrics = MulticlassMetrics(prediction.select("prediction", "label").rdd.map(tuple))
print("\nConfusion matrix:\n", metrics.confusionMatrix().toArray()/test.count())

print("\nFeature importance")
pd.DataFrame(model.stages[2].featureImportances.toArray(), index=df_model.drop("y").columns)\
.sort_values(0, ascending=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PR AUC: 0.5313153155713395
ROC AUC: 0.7078866009001024
Accuracy: 0.7250738567139934

Confusion matrix:
 [[0.65765124 0.05405674]
 [0.2208694  0.06742262]]

Feature importance
                                                0
user_accepted_answers_count                0.0569
post_body_noun_perc                        0.0393
user_age_days                              0.0389
post_year                                  0.0385
user_answered_questions_count              0.0369
post_body_verb_perc                        0.0344
tag_age_days_avg                           0.0342
post_body_code_perc                        0.0325
post_body_pronoun_perc                     0.0303
post_body_word_distinct_count              0.0294
post_hour                                  0.0280
post_body_adjective_perc                   0.0273
tag_post_count_30d_avg                     0.0267
post_title_noun_perc                       0.0245
tag_post_count_365d_avg                    0.0243
post_body_adverb_perc    

## Save model

In [38]:
# Save model
SimpleSparkSerializer().serializeToBundle(model, "jar:file:/tmp/model.zip", prediction)
with zipfile.ZipFile("/tmp/model.zip") as zf:
    zf.extractall("/tmp/model")
with tarfile.open("/tmp/model.tar.gz", "w:gz") as tar:
    tar.add("/tmp/model/bundle.json", arcname='bundle.json')
    tar.add("/tmp/model/root", arcname='root')
    
# upload to S3
# MLeap format
s3 = boto3.resource('s3') 
s3.Bucket(bucket_name).upload_file('/tmp/model.tar.gz', "model.tar.gz")

# Spark format
model.write().overwrite().save(f"s3://{bucket_name}/sparkmodel")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load model

In [39]:
model = PipelineModel.load(f"s3://{bucket_name}/sparkmodel")
model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PipelineModel_9fffa315d506