In [1]:
import csv, os, sys
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as f

In [2]:
# Initialize a spark session.
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [3]:
spark = init_spark()

# Load data from file into spark

In [4]:
filename_train = "../dataset/train.csv"
filename_test = "../dataset/valid.csv"

train_rdd = spark.read.csv(filename_train, header=True, multiLine=True, inferSchema=True, escape='"', quote='"')
test_rdd = spark.read.csv(filename_test, header=True, multiLine=True, inferSchema=True, escape='"', quote='"')
train_rdd.show(5)

+--------+--------------------+--------------------+--------------------+-------------------+--------+
|      Id|               Title|                Body|                Tags|       CreationDate|       Y|
+--------+--------------------+--------------------+--------------------+-------------------+--------+
|34552656|Java: Repeat Task...|<p>I'm already fa...|      <java><repeat>|2016-01-01 00:21:59|LQ_CLOSE|
|34553034|Why are Java Opti...|<p>I'd like to un...|    <java><optional>|2016-01-01 02:03:20|      HQ|
|34553174|Text Overlay Imag...|<p>I am attemptin...|<javascript><imag...|2016-01-01 02:48:24|      HQ|
|34553318|Why ternary opera...|<p>The question i...|<swift><operators...|2016-01-01 03:30:17|      HQ|
|34553755|hide/show fab wit...|<p>I'm using cust...|<android><materia...|2016-01-01 05:21:48|      HQ|
+--------+--------------------+--------------------+--------------------+-------------------+--------+
only showing top 5 rows



In [5]:
training = train_rdd.rdd \
    .map(lambda x: (x["Title"]+" "+x["Body"], x["Y"])) \
    .toDF(["Question", "Output"])# change to collect()

testing = test_rdd.rdd \
    .map(lambda x: (x["Title"]+" "+x["Body"], x["Y"])) \
    .toDF(["Question", "Output"]) # change to collect()

# Prepare pipeline

In [6]:
def get_stop_word_remover(input_col_name, stopwords):
    return StopWordsRemover(inputCol=input_col_name, outputCol="filtered").setStopWords(stopwords)
    

In [7]:
# HEURISTIC 1 - Tokenize the words
regexTokenizer = RegexTokenizer(inputCol="Question", outputCol="words", pattern="\\W")

# HEURISTIC 2 - Remove the stopwords
add_stopwords = ["the", "a", "be", "of", "and", "to", "why"] 
stopwordsRemover = get_stop_word_remover("words", add_stopwords)

In [8]:
def get_bag_of_word_model(features_col_name, label_col_name):
    countVectors = CountVectorizer(inputCol=features_col_name, outputCol="features")
    indexed_features = StringIndexer(inputCol = label_col_name, outputCol = "label")
    return countVectors, indexed_features
    
    

In [9]:
countVectors_h1, indexed_features_h1 = get_bag_of_word_model("words", "Output")
countVectors_h2, indexed_features_h2 = get_bag_of_word_model("filtered", "Output")

In [10]:
def get_pipeline(*args):
    return Pipeline(stages=[*args])

In [11]:
pipeline_h1 = get_pipeline(regexTokenizer, countVectors_h1, indexed_features_h1)
pipeline_h2 = get_pipeline(regexTokenizer, stopwordsRemover, countVectors_h2, indexed_features_h2)

# Process data through Pipeline train & test

In [29]:
def get_pipeline_model(pipeline, data):
    """ We should use the same pipeline model on training and testing """
    return pipeline.fit(data)


In [12]:
model_pipeline = pipeline_h1.fit(training)
# model_pipeline = process_data_in_pipeline(pipeline_h1, training)
data_h1_train  = model_pipeline.transform(training) #what does the transform do?
data_h1_train.show()

+--------------------+--------+--------------------+--------------------+-----+
|            Question|  Output|               words|            features|label|
+--------------------+--------+--------------------+--------------------+-----+
|Java: Repeat Task...|LQ_CLOSE|[java, repeat, ta...|(201488,[0,1,2,3,...|  1.0|
|Why are Java Opti...|      HQ|[why, are, java, ...|(201488,[0,1,4,7,...|  0.0|
|Text Overlay Imag...|      HQ|[text, overlay, i...|(201488,[0,1,2,3,...|  0.0|
|Why ternary opera...|      HQ|[why, ternary, op...|(201488,[0,1,2,3,...|  0.0|
|hide/show fab wit...|      HQ|[hide, show, fab,...|(201488,[0,1,4,5,...|  0.0|
|Accessing pointer...|LQ_CLOSE|[accessing, point...|(201488,[0,1,2,3,...|  1.0|
|How To Disable 2n...| LQ_EDIT|[how, to, disable...|(201488,[1,4,8,15...|  2.0|
|Resizing containe...| LQ_EDIT|[resizing, contai...|(201488,[1,2,3,4,...|  2.0|
|Changing Theme in...|      HQ|[changing, theme,...|(201488,[0,1,2,3,...|  0.0|
|TextBox Value Dis...| LQ_EDIT|[textbox,

**Vocabulary**

In [61]:
# vocabulary frequency without using the countVectorizer helper column that we generated
counts = data_h1_train.select(f.explode('words').alias('col')).groupBy('col').count().take(20)
print({row['col']: row['count'] for row in counts})

{'1293400': 1, 'mypass': 5, 'connected': 433, 'few': 902, 'input': 9663, 'online': 599, '389999': 1, 'travel': 74, '836400': 1, 'those': 1212, 'still': 2194, 'thread1': 26, 'hope': 513, 'recognize': 120, 'parentheses': 89, 'arguments': 696, 'persist': 74, '2bxhsys2c47eyjfhpmroalpxz5suigeubqu7hjuvfvwpoa0xri3iljvhq5qgbwtwpe1x0': 2, 'pabu': 1, 'some': 8532}


In [62]:
# using the countVectorizer helper column that we generated
# https://stackoverflow.com/questions/50255356/pyspark-countvectorizer-and-word-frequency-in-a-corpus
counts = data_h1_train.select('words').take(20)
print(dict(zip(vocabulary, counts[0]['words'].values)))

NameError: name 'model' is not defined

In [73]:
data_h1_test = process_data_in_pipeline(pipeline_h1, testing)
data_h1_test.show()

+--------------------+--------+--------------------+--------------------+-----+
|            Question|  Output|               words|            features|label|
+--------------------+--------+--------------------+--------------------+-----+
|How to get all th...| LQ_EDIT|[how, to, get, al...|(86590,[1,2,4,6,9...|  2.0|
|Retrieve all exce...| LQ_EDIT|[retrieve, all, e...|(86590,[1,2,7,9,1...|  2.0|
|Pandas: read_html...|      HQ|[pandas, read_htm...|(86590,[0,1,2,3,4...|  0.0|
|Reader Always gim...| LQ_EDIT|[reader, always, ...|(86590,[1,2,4,5,7...|  2.0|
|php rearrange arr...| LQ_EDIT|[php, rearrange, ...|(86590,[1,2,4,5,6...|  2.0|
|How do I make a c...|LQ_CLOSE|[how, do, i, make...|(86590,[0,1,2,3,4...|  1.0|
|how can i create ...| LQ_EDIT|[how, can, i, cre...|(86590,[1,5,6,9,1...|  2.0|
|Re-exporting ES6 ...|      HQ|[re, exporting, e...|(86590,[0,1,2,3,4...|  0.0|
|Fetch API with Co...|      HQ|[fetch, api, with...|(86590,[0,1,2,4,5...|  0.0|
|Print list conten...|LQ_CLOSE|[print, l

In [13]:
data_h1_test  = model_pipeline.transform(testing)
data_h1_test.show()

+--------------------+--------+--------------------+--------------------+-----+
|            Question|  Output|               words|            features|label|
+--------------------+--------+--------------------+--------------------+-----+
|How to get all th...| LQ_EDIT|[how, to, get, al...|(201488,[1,2,4,6,...|  2.0|
|Retrieve all exce...| LQ_EDIT|[retrieve, all, e...|(201488,[1,2,7,8,...|  2.0|
|Pandas: read_html...|      HQ|[pandas, read_htm...|(201488,[0,1,2,3,...|  0.0|
|Reader Always gim...| LQ_EDIT|[reader, always, ...|(201488,[1,2,4,5,...|  2.0|
|php rearrange arr...| LQ_EDIT|[php, rearrange, ...|(201488,[1,2,4,5,...|  2.0|
|How do I make a c...|LQ_CLOSE|[how, do, i, make...|(201488,[0,1,2,3,...|  1.0|
|how can i create ...| LQ_EDIT|[how, can, i, cre...|(201488,[1,5,6,8,...|  2.0|
|Re-exporting ES6 ...|      HQ|[re, exporting, e...|(201488,[0,1,2,3,...|  0.0|
|Fetch API with Co...|      HQ|[fetch, api, with...|(201488,[0,1,2,4,...|  0.0|
|Print list conten...|LQ_CLOSE|[print, l

# Split datasets

In [14]:
def split_dataset(data, distribution):
    return data.randomSplit([distribution, 1-distribution], seed = 1234)
    

In [15]:
train_h1, validate_h1 = split_dataset(data_h1_train, 0.7)

In [76]:
# test1,test2 = split_dataset(data_h1_test, 0.7) # just for testing purposes

# Hyperparameter tuning | is this doing anything? we are not using any data

In [16]:
def get_best_smoothing_values(target_col, prediction_col):
    # Create grid to find best smoothing
    nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
    paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]).build()
#     cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol=prediction_col)
    cvEvaluator = MulticlassClassificationEvaluator(labelCol=target_col, predictionCol=prediction_col)

    # Cross-validate all smoothing values
    cv = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
    return cv
    

In [17]:
cv = get_best_smoothing_values("label", "prediction")

# Train Model

In [18]:
cvModel = cv.fit(train_h1)

# Predict

In [19]:
# Make predictions on validation set
cvPredictions = cvModel.transform(validate_h1)

In [20]:
type(cvPredictions)

pyspark.sql.dataframe.DataFrame

In [21]:
cvPredictions.select("label","prediction").show()

+-----+----------+
|label|prediction|
+-----+----------+
|  2.0|       1.0|
|  1.0|       1.0|
|  2.0|       2.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  0.0|       1.0|
|  2.0|       2.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
+-----+----------+
only showing top 20 rows



In [22]:
cvPredictions_test = cvModel.transform(data_h1_test)

In [23]:
cvPredictions_test.select("label","prediction").show()

+-----+----------+
|label|prediction|
+-----+----------+
|  2.0|       2.0|
|  2.0|       2.0|
|  0.0|       0.0|
|  2.0|       2.0|
|  2.0|       2.0|
|  1.0|       1.0|
|  2.0|       2.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  2.0|       1.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  2.0|       2.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  2.0|       2.0|
|  2.0|       2.0|
+-----+----------+
only showing top 20 rows



# Evaluate Performance of model

In [25]:
def evaluate_model(target_col, prediction_col, predictionAndTarget):
#     evaluator = BinaryClassificationEvaluator(rawPredictionCol=prediction_col)
    evaluatorMulti = MulticlassClassificationEvaluator(labelCol=target_col, predictionCol=prediction_col)
#     accuracy = evaluatorMulti.evaluate(predictions)
    # Get metrics
    acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
    f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
    weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
    weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
#     auc = evaluator.evaluate(predictionAndTarget)
    print ("Model Accuracy: ", acc)
    print ("Model f1-score: ", f1)
    print ("Model weightedPrecision: ", weightedPrecision)
    print ("Model weightedRecall: ", weightedRecall)
#     return accuracy

In [26]:
# Validation Dataset
evaluate_model("label", "prediction", cvPredictions.select("label","prediction"))

Model Accuracy:  0.7653008490217793
Model f1-score:  0.7675841714238029
Model weightedPrecision:  0.7901628512971111
Model weightedRecall:  0.7653008490217792


In [27]:
# testing dataset
evaluate_model("label", "prediction", cvPredictions_test.select("label","prediction"))

Model Accuracy:  0.7633333333333333
Model f1-score:  0.7649182102284617
Model weightedPrecision:  0.7866094786891012
Model weightedRecall:  0.7633333333333333


# Creating one big iteration for all the pipelines

In [30]:
%%time
for pipeline, pipe_name in zip([pipeline_h1, pipeline_h2],["Pipeline 1","Pipeline 2"]):
    print("#######"+ pipe_name + "#######")
    print("**"+ "Creating pipeline model" + "**") 
    model_pipeline = get_pipeline_model(pipeline, training)
    print("**"+ "Transforming training data" + "**") 
    data_train  = model_pipeline.transform(training)
    print("**"+ "Transforming testing data" + "**") 
    data_test  = model_pipeline.transform(testing)
    print("**"+ "Hyperparameter tuning" + "**") 
    cv = get_best_smoothing_values("label", "prediction")
    print("**"+ "Training the model" + "**") 
    cvModel = cv.fit(data_train)
    print("**"+ "predicting on testing dataset" + "**") 
    cvPredictions = cvModel.transform(data_test)
    print("**"+ "Measuring performance" + "**")
    evaluate_model("label", "prediction", cvPredictions.select("label","prediction"))

#######Pipeline 1#######
**Creating pipeline model**
**Transforming training data**
**Transforming testing data**
**Hyperparameter tuning**
**Training the model**
**predicting on testing dataset**
**Measuring performance**
Model Accuracy:  0.7726
Model f1-score:  0.7746909975733439
Model weightedPrecision:  0.7939332260574594
Model weightedRecall:  0.7726
#######Pipeline 2#######
**Creating pipeline model**
**Transforming training data**
**Transforming testing data**
**Hyperparameter tuning**
**Training the model**
**predicting on testing dataset**
**Measuring performance**
Model Accuracy:  0.7788666666666667
Model f1-score:  0.7811193586213604
Model weightedPrecision:  0.7985977934238272
Model weightedRecall:  0.7788666666666667
CPU times: user 1.24 s, sys: 237 ms, total: 1.48 s
Wall time: 4min 22s
