### Introduction


The dataset used was **StackSample: 10% of Stack Overflow Q&A** available at https://www.kaggle.com/datasets/stackoverflow/stacksample?select=Questions.csv

The challenges with this dataset were 

1. There is a large amount of text data in the body column containing multiple commas 
2. There is no obvious set of features contributing to a target column

These issues were addressed by:  

*  The multiple commas in the body column is addressed because after investigation it was found the " is used as an escape key 
*  Additional features were calculated by adding information to the questions data from the answers and tags data
*  There were two columns used for labeled data, the quesetion score and the answer score (imported for the answers csv)

### Import Required Libraries

In [0]:
# Import required libraries
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

from pyspark.sql.functions import col, when, regexp_replace, lower, avg, round, min, max, percentile_approx, collect_list, countDistinct, expr, length, size, rand

### Import Data

In [0]:
#Import the Questions csv file

question_schema = StructType([
    StructField("id", IntegerType(), False),\
    StructField("userid", IntegerType(), False),\
    StructField("opendate", TimestampType(), True),\
    StructField("closedate", TimestampType(), True),\
    StructField("question_score", IntegerType(), True),\
    StructField("title", StringType(), True),\
    StructField("body", StringType(), True)])

# dfquest = sqlContext.read.csv("/FileStore/Kaggle_Datasets/StackSample/miniQuestions.csv", schema=question_schema, header=True, multiLine=True, escape='"') 
dfquest = sqlContext.read.csv("/FileStore/Kaggle_Datasets/StackSample/Questions.csv", schema=question_schema, header=True, multiLine=True, escape='"') 

In [0]:
#Import the Answers csv file
answer_schema = StructType([
    StructField("id", IntegerType(), False),\
    StructField("userid", IntegerType(), False),\
    StructField("answerdate", TimestampType(), True),\
    StructField("questionid", IntegerType(), True),\
    StructField("score", IntegerType(), True),\
    StructField("body", StringType(), True)])

# dfans = sqlContext.read.csv("/FileStore/Kaggle_Datasets/StackSample/miniAnswers.csv", schema=answer_schema, header=True, multiLine=True, escape='"') 
dfans = sqlContext.read.csv("/FileStore/Kaggle_Datasets/StackSample/Answers.csv", schema=answer_schema, header=True, multiLine=True, escape='"') 


In [0]:
#Import the Tags csv file

# dftags = spark.read.csv('/FileStore/Kaggle_Datasets/StackSample/miniTags.csv', inferSchema=True, header=True)
dftags = spark.read.csv('/FileStore/Kaggle_Datasets/StackSample/Tags.csv', inferSchema=True, header=True)

dftags = dftags.withColumnRenamed('Id', 'questionid').withColumnRenamed('Tag','tag')


### Data Pre-Processing

###### Pre-process the Questions Dataframe

In [0]:
# # get the timestamp of the first and last question in the questions dataframe
# dfquest.agg(min(dfquest["opendate"]), max(dfquest["opendate"])).show()

In [0]:
#remove rows with null values for owneruserid
dfquest_process = dfquest.na.drop(how='any', subset=['userid'])
dfquest_process.count()

# remove the html tags from the body column
dfquest_process = dfquest_process.withColumn('body', regexp_replace('body', "<[^>]+>", ""))

In [0]:
# add tags to the questions dataset
# use inner join to join the dfquest_process and dftags dataframe
questandtags = dfquest_process.join(dftags, dfquest_process.id == dftags.questionid)
 
# Group the tags into one row for each id
grouptags = questandtags.groupBy("id").agg(collect_list("tag").alias("tags"))
 
# Add the tags to the dfquest_process dataframe
dfquest_process = dfquest_process.join(grouptags, "id", "left")

In [0]:
#remove duplicates
dfquest_process = dfquest_process.dropDuplicates()


###### Pre-process the Answers Dataframe

In [0]:
# drop the body column as it's not required as part of the project
dfans_process = dfans.drop('body')

#remove rows with null values for userid
dfans_process = dfans_process.na.drop(how='any', subset=['userid'])

###### Pre-process the Tags Dataframe

In [0]:
#check for null values in the dataframe columns
for column in dftags.columns:
    nullvalues = dftags.filter(col(column).isNull()).count()
    print(f'{column} has {nullvalues} null values')
    

questionid has 0 null values
tag has 0 null values


There is no null values in the the Tags dataframe so there are no need to drop rows with null values

In [0]:
#change all text to lowercase
dftags_process = dftags.withColumn('tag', lower(col('tag')))


#### Add additional details to the Questions dataframe

Add columns classifying questions and answers based on whether they have a high or low score. A high score is one that is higher than the the average score

In [0]:
#  add in a column which has the number of characters in the title column
dfquest_process = dfquest_process.withColumn("title_length", length(dfquest_process['title']))

# add in a column which has the number of characters in the body
dfquest_process = dfquest_process.withColumn("question_length", length(dfquest_process['body']))

# add in a column which has the number of tags
dfquest_process = dfquest_process.withColumn("number_of_tags", size(dfquest_process['tags']))

#get the average score for questions
avg_quest_score = dfquest_process.agg(round(avg("question_score"),2)).first()[0]
print(avg_quest_score)

# Add a column to the question dataframe to show whether a question has a high or low rating
dfquest_process = dfquest_process.withColumn("question_rating", when(col("question_score") > avg_quest_score, "high").otherwise("low"))

1.77


#### Add answer information to the questions dataframe

In [0]:
#get the average score for answers
avg_ans_score = dfans_process.agg(round(avg("score"),2)).first()[0]
print(avg_ans_score)

# Add a column to the answer dataframe to show whether an answer has a high or low rating
dfans_process = dfans_process.withColumn("answer_rating", when(col("score") > avg_ans_score, "high").otherwise("low"))

2.48


In [0]:
# create a dataframe with the first answer for each question
firstanswerdate = dfans_process.groupBy("questionid").agg(min("answerdate").alias("first_answerdate"))

# rename the questionid column to make it distinct during later use
firstanswerdate = firstanswerdate.withColumnRenamed("questionid", "questionidfa")

# add data from the answers dataframe to the first answers dataframe
firstanswer = firstanswerdate.join(dfans_process, (firstanswerdate.questionidfa == dfans_process.questionid) & (firstanswerdate.first_answerdate == dfans_process.answerdate), "left")

# There is a duplicate created when joining these dataframes, remove the duplicate
firstanswer = firstanswer.dropDuplicates(["questionidfa"])

firstanswer.show(5)

+------------+-------------------+------+------+-------------------+----------+-----+-------------+
|questionidfa|   first_answerdate|    id|userid|         answerdate|questionid|score|answer_rating|
+------------+-------------------+------+------+-------------------+----------+-----+-------------+
|          80|2008-08-01 16:09:47|   124|    26|2008-08-01 16:09:47|        80|   12|         high|
|          90|2008-08-01 14:45:37|    92|    61|2008-08-01 14:45:37|        90|   13|         high|
|         120|2008-09-23 22:41:11|124363| 12734|2008-09-23 22:41:11|       120|    9|         high|
|         180|2008-08-01 19:36:46|   199|    50|2008-08-01 19:36:46|       180|    1|          low|
|         260|2008-08-01 23:49:57|   269|    91|2008-08-01 23:49:57|       260|    4|         high|
+------------+-------------------+------+------+-------------------+----------+-----+-------------+
only showing top 5 rows



In [0]:
# select the columns for use in the first answer dataframe
first_answer = firstanswer.select('questionid','answer_rating', 'first_answerdate')
first_answer = first_answer.withColumnRenamed("answer_rating", "first_answer_rating")

# Join the DataFrames on the questionid column
dfquest_process = dfquest_process.join(first_answer, dfquest_process["id"] == first_answer["questionid"],"left")

In [0]:
# Group the answer data by 'questionid'
temp_df = dfans_process.groupBy("questionid")

# Get the maximum 'score' column and the earliest answerdate (in case there are duplciate max scores)
tempdf1 = temp_df.agg(max("score").alias("best_answerscore"), min("answerdate").alias("best_answerdate"))

# Select the columns 'questionid', 'score' and 'answerdate'
bestscores = tempdf1.select("questionid","best_answerscore","best_answerdate")

# Rename the questionid column to avoid later ambiguity
bestscores = bestscores.withColumnRenamed("questionid", "bestquestionid")

# Delete uneeded dataframes
del temp_df
del tempdf1

# View the first 5 rows of the dataframe
bestscores.show(5)

+--------------+----------------+-------------------+
|bestquestionid|best_answerscore|    best_answerdate|
+--------------+----------------+-------------------+
|          7880|              24|2008-08-11 15:58:06|
|         14570|               2|2008-08-18 13:32:51|
|         32460|               5|2008-08-28 14:52:16|
|         56680|               6|2008-09-11 15:36:40|
|         57020|              18|2008-09-11 16:19:10|
+--------------+----------------+-------------------+
only showing top 5 rows



In [0]:
# Join the DataFrames on the questionid column
temp_df = dfquest_process.join(bestscores, dfquest_process["id"] == bestscores["bestquestionid"],"left")

# Select the columns you want to keep
dfquest_process = temp_df

# delete the temporary dataframe
del temp_df

# print the schema of the dataframe
dfquest_process.printSchema()

root
 |-- id: integer (nullable = true)
 |-- userid: integer (nullable = true)
 |-- opendate: timestamp (nullable = true)
 |-- closedate: timestamp (nullable = true)
 |-- question_score: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- title_length: integer (nullable = true)
 |-- question_length: integer (nullable = true)
 |-- number_of_tags: integer (nullable = false)
 |-- question_rating: string (nullable = false)
 |-- questionid: integer (nullable = true)
 |-- first_answer_rating: string (nullable = true)
 |-- first_answerdate: timestamp (nullable = true)
 |-- bestquestionid: integer (nullable = true)
 |-- best_answerscore: integer (nullable = true)
 |-- best_answerdate: timestamp (nullable = true)



#### Calculate how quickly a question is answered

In [0]:

#calculate the time it took to get the first answer
dfquest_process = dfquest_process.withColumn("first_answertime", col("first_answerdate").cast("long") - col("opendate").cast("long"))

#calculate the time it took to get it's best answer
dfquest_process = dfquest_process.withColumn("best_answertime", col("best_answerdate").cast("long") - col("opendate").cast("long"))

#Add a column to show if the best answer had a high or low rating
dfquest_process = dfquest_process.withColumn("bestanswer_rating", when(col("best_answerscore") > avg_ans_score, "high").otherwise("low"))

# remove rows with null values for the best_answerscore column as this means there was no answer for that question
dfquest_process = dfquest_process.dropna(how='any', subset=['best_answerscore','best_answerdate'])

#drop columns no longer required
dfquest_process = dfquest_process.drop('opendate', 'closedate', 'first_answerdate', 'best_answerdate','questionid','bestquestionid','best_answerscore')

#### Identify trends

In [0]:
# # test if the rating of the answer for a question  is influenced by it's length
# dfquest_process.groupBy("bestanswer_rating").mean("question_length").show()

In [0]:
# # test if the rating of the answer for a question is influenced by it's question score
# dfquest_process.groupBy("bestanswer_rating").mean("question_score").show()

### Data Transformation

The dataset data transformation will take place on is the question dataset  
This because analysing this dataset will allow creation of a model to predict:
* How quickly a question for a specific language will be answered
* How quickly a high scoring answer will be added for a question

#### Create pipelines

Due to out of memory problems and other errors the pipeline is broken into a number of steps.

In total however the pipeline will do the following:

* regextokenizer: transform the words in the body column to tokens and remove all punctuation
* stop_word_remover: will remove stop words (e.g. to, from, I) from the token_body column created by regextokenizer
* count_vec: will create a vector which has count of each token (word) in the stop_token column created by stop_word_remover
* idf: runs Inverse Document Frequency which will give more weighing to the less commonly used words in token vectors created by count_vec
* pca: was used to help reduce dimensionality of the tokens after IDF, by identifying the top 15 tokens.
* assembler: created a vector of seperate features
* bucketizer: will divide the time it took for the first answer for a question into the following buckets:
  * less than 15 minutes
  * 15 minutes to 30 minutes
  * 30 minutes to 1 hour
  * 1 hour to 2 hours
  * 2 hours to 4 hours
  * 4 hours to 24 hours
  * greater than 24 hours
* answer_label: add a column called bestanswer_rating_index which will add a number for the string values in bestanswer_rating
* features: will create a feature vector based on the values in the columns tf_idf, question_length, title_length and number_of_tags.

#### Pipeline 1

In [0]:
# import libraries for assemblers and features
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, Bucketizer, HashingTF, PCA
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline

# Create a list of bucket boundaries
buckets = [-float("inf"), 900, 1800, 3600, 7200, 86400, 172800, float("inf")]

# Additional stop words
more_stop_words = ["dont", "also", "gets", "phew", "need", "like"]

# instantiate the data transformers
regextokenizer = RegexTokenizer(inputCol="body", outputCol="token_body", pattern="[^\w]+", gaps=True, minTokenLength=4, toLowercase=True)
stop_word_remover = StopWordsRemover(inputCol="token_body", outputCol="stop_tokens")
stop_word_remover = stop_word_remover.setStopWords(stop_word_remover.getStopWords() + more_stop_words)
bucketizer = Bucketizer(splits=buckets, inputCol="first_answertime", outputCol="first_answertime_buckets")
answer_label = StringIndexer(inputCol="bestanswer_rating", outputCol="bestanswer_rating_index")

In [0]:
# create the pipeline
pipeline1 = Pipeline(stages=[answer_label, bucketizer, regextokenizer, stop_word_remover])

# run pipeline against dataset
transformer1 = pipeline1.fit(dfquest_process)

In [0]:
# transform dataset
dfquest_trans1 = transformer1.transform(dfquest_process)
dfquest_trans1.printSchema()

root
 |-- id: integer (nullable = true)
 |-- userid: integer (nullable = true)
 |-- question_score: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- title_length: integer (nullable = true)
 |-- question_length: integer (nullable = true)
 |-- number_of_tags: integer (nullable = false)
 |-- question_rating: string (nullable = false)
 |-- first_answer_rating: string (nullable = true)
 |-- first_answertime: long (nullable = true)
 |-- best_answertime: long (nullable = true)
 |-- bestanswer_rating: string (nullable = false)
 |-- bestanswer_rating_index: double (nullable = false)
 |-- first_answertime_buckets: double (nullable = true)
 |-- token_body: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [0]:
# remove the body and token_body columns as they are no longer required
dfquest_trans1 = dfquest_trans1.drop('body','token_body','tags','title')
del dfquest_process

#### Pipeline 2

In [0]:
# instantiate the data transformers
count_vec = CountVectorizer(inputCol="stop_tokens", outputCol="count_vec", minDF=0.05, maxDF=0.75)
idf = IDF(inputCol="count_vec", outputCol="tf_idf", minDocFreq=63000)

In [0]:
#create pipeline
pipeline2 = Pipeline(stages=[count_vec, idf])

In [0]:
# run pipeline against dataset
transformer2 = pipeline2.fit(dfquest_trans1)

In [0]:
# transform dataset
dfquest_trans2 = transformer2.transform(dfquest_trans1)
dfquest_trans2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- userid: integer (nullable = true)
 |-- question_score: integer (nullable = true)
 |-- title_length: integer (nullable = true)
 |-- question_length: integer (nullable = true)
 |-- number_of_tags: integer (nullable = false)
 |-- question_rating: string (nullable = false)
 |-- first_answer_rating: string (nullable = true)
 |-- first_answertime: long (nullable = true)
 |-- best_answertime: long (nullable = true)
 |-- bestanswer_rating: string (nullable = false)
 |-- bestanswer_rating_index: double (nullable = false)
 |-- first_answertime_buckets: double (nullable = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- count_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)



In [0]:
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.sql.functions import udf

# Define a function to convert the sparse vector to a dense vector
convert_to_dense = udf(lambda v: DenseVector(v.toArray()), VectorUDT())

# Convert the 'tf_idf' column to a dense vector
dfquest_trans2 = dfquest_trans2.withColumn("tf_idf_dense", convert_to_dense(dfquest_trans2["tf_idf"]))


In [0]:
# drop unrequired columns
dfquest_trans2 = dfquest_trans2.drop('stop_tokens','count_vec', 'tf_idf', 'number_of_tags')
del dfquest_trans1

#### PCA

In [0]:
# instantiate the data transformers
pca = PCA(k=15, inputCol="tf_idf_dense", outputCol="pca_features")
assembler = VectorAssembler(inputCols=['pca_features', 'question_length','question_score'], outputCol="features")

In [0]:
# run pipeline against dataset
transformer3 =  pca.fit(dfquest_trans2)

In [0]:
# transform dataset
dfquest_trans3 = transformer3.transform(dfquest_trans2)
dfquest_trans3.printSchema()

root
 |-- id: integer (nullable = true)
 |-- userid: integer (nullable = true)
 |-- question_score: integer (nullable = true)
 |-- title_length: integer (nullable = true)
 |-- question_length: integer (nullable = true)
 |-- question_rating: string (nullable = false)
 |-- first_answer_rating: string (nullable = true)
 |-- first_answertime: long (nullable = true)
 |-- best_answertime: long (nullable = true)
 |-- bestanswer_rating: string (nullable = false)
 |-- bestanswer_rating_index: double (nullable = false)
 |-- first_answertime_buckets: double (nullable = true)
 |-- tf_idf_dense: vector (nullable = true)
 |-- pca_features: vector (nullable = true)



In [0]:
# remove the dataframe no longer required
del dfquest_trans2

#### Create Features

In [0]:
# instantiate the Vector Assembler
assembler = VectorAssembler(inputCols=['pca_features', 'question_length','question_score'], outputCol="features")

In [0]:
transformed_df = assembler.transform(dfquest_trans3)

In [0]:
# view schema dataset
transformed_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- userid: integer (nullable = true)
 |-- question_score: integer (nullable = true)
 |-- title_length: integer (nullable = true)
 |-- question_length: integer (nullable = true)
 |-- question_rating: string (nullable = false)
 |-- first_answer_rating: string (nullable = true)
 |-- first_answertime: long (nullable = true)
 |-- best_answertime: long (nullable = true)
 |-- bestanswer_rating: string (nullable = false)
 |-- bestanswer_rating_index: double (nullable = false)
 |-- first_answertime_buckets: double (nullable = true)
 |-- tf_idf_dense: vector (nullable = true)
 |-- pca_features: vector (nullable = true)
 |-- features: vector (nullable = true)



#### Split the Data into train and test

In [0]:
# select the required 
selected_df = transformed_df.select(['bestanswer_rating_index', 'first_answertime_buckets','features'])
display(selected_df)

bestanswer_rating_index,first_answertime_buckets,features
1.0,0.0,"Map(vectorType -> dense, length -> 17, values -> List(-0.7249977740291387, 2.1026890841869355, -0.4248913929654063, -1.2446791589665003, 0.026881178943391877, 2.019014526704761, -0.3348944614828543, 3.9621024592698606, -0.9883225615579796, 2.773020184784261, 1.2416209506846665, -2.056505846799868, 1.1731648146338585, 1.0156606280855, 0.61443605389763, 1457.0, 21.0))"
1.0,0.0,"Map(vectorType -> dense, length -> 17, values -> List(-0.08577328955273153, 0.8019575474715526, -0.5942891469646664, -0.6686307732418455, 0.18347286130306575, -0.36256004195162045, 0.07276924269413121, -0.49924908840098653, 0.4613607441062112, 1.2750844250759048, 0.20349171023468227, -0.11970098070171081, 0.26094711052919584, 0.16210576534580395, 0.1622722801520696, 392.0, 36.0))"
0.0,1.0,"Map(vectorType -> dense, length -> 17, values -> List(-0.26425430517643755, 0.972578019496427, 0.26770044483682953, -1.3832138792994666, -1.8454421839687094, 1.44335122383122, 0.16885647585189964, 1.6144059962536743, 1.4749825866609751, 0.8887662421447716, -0.2408379291605979, -0.822178165933211, -0.7904798279461002, -0.5522357966670668, -0.11291368385393037, 981.0, 3.0))"
1.0,4.0,"Map(vectorType -> dense, length -> 17, values -> List(-0.16632520164228842, 0.25257538353402825, 0.05234890308660873, -0.05222036890902558, -0.059578634290173044, 0.11294197519311709, -0.17756659196265762, 0.25439669033337187, -0.2038704149457234, 0.2667472201321498, 0.28540002532947356, 0.025242723780097565, 0.014316457001992855, 0.2669678980218576, 0.12963943760822041, 410.0, 5.0))"
1.0,0.0,"Map(vectorType -> dense, length -> 17, values -> List(-0.1396685823557962, 0.9835244388529453, -0.47362602022875155, -0.29739733395379114, -0.3233394127174895, 0.9037123306570487, -1.24975147791673, 1.3073777404476554, -1.6791215672272524, 1.2890198016774357, 0.17442158957340084, -0.6705157290295141, 1.0108472162644722, 1.6547032473583791, 1.872411504990132, 226.0, 15.0))"
0.0,0.0,"Map(vectorType -> dense, length -> 17, values -> List(-0.03192135389187, 0.30542720844162574, -0.15513981411767666, -0.11625719438093957, 0.10712868326620255, 0.06945323699786543, -0.12956103158872362, -0.03368096246653195, -0.02207137318140371, 0.2076802659848125, 0.09492978755925557, -0.12198260887782318, 0.14854491973853007, 0.02625950228005817, -0.0906682201715377, 675.0, 0.0))"
1.0,0.0,"Map(vectorType -> dense, length -> 17, values -> List(-0.2623880831621143, 1.772894363538562, 1.9189605531932632, -0.37976036013568554, -0.21827604489222135, -1.3663845327921658, 0.8048220613684676, 0.21059778633760837, -1.8889368978090804, -0.21690270711960974, -1.6341503358618183, -0.7322747606828486, 1.352169641148402, -1.8873306468643323, 1.5715003412054045, 249.0, 12.0))"
1.0,0.0,"Map(vectorType -> dense, length -> 17, values -> List(-0.256775169724677, 0.6365747237667869, 0.05216643831724618, -0.2656678771747173, -0.10609795881735763, 0.4567759424778481, -0.3906956572612442, 0.7855796388240402, -0.3826880972951585, 0.44702504923951786, 0.307285777206884, -0.49076061707549434, 0.48525810162609256, 0.9591572665811103, 0.37215824182052176, 950.0, 24.0))"
1.0,0.0,"Map(vectorType -> dense, length -> 17, values -> List(0.001746397310586732, -0.006029791164138505, -2.2500276218702093E-4, -0.001005227106413009, -7.764813346044635E-5, -0.001449374615372974, 0.006936734862322093, -9.410637219548006E-4, 5.437562576653519E-4, 0.0024365778532495284, 0.00245751502924687, -0.0031879395805252827, 6.422339061230577E-4, -7.880603051704884E-4, -0.0013206176844688298, 311.0, 3.0))"
1.0,0.0,"Map(vectorType -> dense, length -> 17, values -> List(2.4442901927778065E-4, 0.010041910755774939, -4.533693814623291E-4, 5.712606074779508E-4, -0.002672296462957417, 0.005035753223938484, -0.007354565546273681, 0.01118919863910263, -0.0030706575375176732, 0.013359039725628397, 0.01525638796847585, -0.011758168154044823, 0.01140262193511049, 0.007209264478772036, 1.2572528086225273E-4, 212.0, 15.0))"


In [0]:
# split the data into train and test
train, test = selected_df.randomSplit([0.7, 0.3], seed=42)

### Model Prediction

#### Random Forest

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import MultilabelClassificationEvaluator

# create model
rf = RandomForestClassifier(labelCol='bestanswer_rating_index', featuresCol="features", numTrees=20, maxDepth=4, seed=42)

In [0]:
# create the model
rfmodel = rf.fit(train)

In [0]:
# make prediction
rfpred = rfmodel.transform(test)
rfpred.select("prediction", "bestanswer_rating_index", "features").show(5)

+----------+-----------------------+-----------------+
|prediction|bestanswer_rating_index|         features|
+----------+-----------------------+-----------------+
|       0.0|                    0.0| (17,[15],[56.0])|
|       0.0|                    0.0| (17,[15],[80.0])|
|       0.0|                    0.0| (17,[15],[94.0])|
|       0.0|                    0.0| (17,[15],[98.0])|
|       0.0|                    0.0|(17,[15],[108.0])|
+----------+-----------------------+-----------------+
only showing top 5 rows



In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# evaulate the model 
eval = MulticlassClassificationEvaluator(labelCol="bestanswer_rating_index", predictionCol="prediction", metricName="accuracy")
rfacc = eval.evaluate(rfpred)
print(f"Accuracy: {rfacc * 100}")
print(f"Test Error = {1.0 - rfacc}")

Accuracy: 81.15528762481237
Test Error = 0.18844712375187622


In [0]:
# Change the metric to weighted precision
eval.setMetricName("weightedPrecision")
weightedPrecision = eval.evaluate(rfpred)
print("Weighted Precision: ", weightedPrecision)

Weighted Precision:  0.8031597283052485


In [0]:
# Change the metric to weighted recall
eval.setMetricName("weightedRecall")
weightedRecall = eval.evaluate(rfpred)
print("Weighted Recall: ", weightedRecall)

Weighted Recall:  0.8115528762481238


In [0]:
# Change the metric to F1 score
eval.setMetricName("f1")
f1Score = eval.evaluate(rfpred)
print("F1 Score: ", f1Score)

F1 Score:  0.7953694650362086


In [0]:
# create model to predict first_answertime_buckets values
rffab = RandomForestClassifier(labelCol="first_answertime_buckets", featuresCol="features", numTrees=20, seed=42)
rffabmodel = rffab.fit(train)

In [0]:
# make prediction
rffabpred = rffabmodel.transform(test)
rffabpred.select("prediction", "first_answertime_buckets", "features").show(5)

+----------+------------------------+-----------------+
|prediction|first_answertime_buckets|         features|
+----------+------------------------+-----------------+
|       0.0|                     0.0| (17,[15],[56.0])|
|       0.0|                     0.0| (17,[15],[80.0])|
|       0.0|                     0.0| (17,[15],[94.0])|
|       0.0|                     0.0| (17,[15],[98.0])|
|       0.0|                     0.0|(17,[15],[108.0])|
+----------+------------------------+-----------------+
only showing top 5 rows



In [0]:
# evaulate the model 
rffabeval = MulticlassClassificationEvaluator(labelCol="first_answertime_buckets", predictionCol="prediction", metricName="accuracy")
rffabacc = rffabeval.evaluate(rffabpred)
print(f"Accuracy: {rffabacc * 100}")
print(f"Test Error = {1.0 - rffabacc}")

Accuracy: 41.25553655893845
Test Error = 0.5874446344106155


In [0]:
# Change the metric to weighted precision
rffabeval.setMetricName("weightedPrecision")
weightedPrecision = rffabeval.evaluate(rffabpred)
print("Weighted Precision: ", weightedPrecision)

Weighted Precision:  0.21096326334473522


In [0]:
# Change the metric to weighted recall
rffabeval.setMetricName("weightedRecall")
weightedRecall = rffabeval.evaluate(rffabpred)
print("Weighted Recall: ", weightedRecall)

Weighted Recall:  0.41255536558938455


In [0]:
# Change the metric to F1 score
rffabeval.setMetricName("f1")
f1Score  = rffabeval.evaluate(rffabpred)
print("F1 Score: ", f1Score)

F1 Score:  0.24123959401792527


#### Gradient Boosted Trees

In [0]:
from pyspark.ml.classification import GBTClassifier

# create classifier
gbt = GBTClassifier(labelCol="bestanswer_rating_index", featuresCol="features", predictionCol="prediction", maxIter=20, maxDepth=4)

In [0]:
# create model
gbtmodel = gbt.fit(train)

Exception ignored in: <function JavaWrapper.__del__ at 0x7f75fc0feca0>
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'GBTClassifier' object has no attribute '_java_obj'


In [0]:
# make predictions
gbtpred = gbtmodel.transform(test)
gbtpred.select("prediction", "bestanswer_rating_index", "features").show(5)

+----------+-----------------------+-----------------+
|prediction|bestanswer_rating_index|         features|
+----------+-----------------------+-----------------+
|       0.0|                    0.0| (17,[15],[56.0])|
|       0.0|                    0.0| (17,[15],[80.0])|
|       0.0|                    0.0| (17,[15],[94.0])|
|       0.0|                    0.0| (17,[15],[98.0])|
|       0.0|                    0.0|(17,[15],[108.0])|
+----------+-----------------------+-----------------+
only showing top 5 rows



In [0]:
# evaulate the model 
gbteval = MulticlassClassificationEvaluator(labelCol="bestanswer_rating_index", predictionCol="prediction", metricName="accuracy")
gbtacc = gbteval.evaluate(gbtpred)
print(f"Accuracy: {gbtacc * 100}")
print(f"Test Error = {1.0 - gbtacc}")

Accuracy: 81.34620874246828
Test Error = 0.18653791257531716


In [0]:
# Change the metric to weighted precision
gbteval.setMetricName("weightedPrecision")
weightedPrecision = gbteval.evaluate(gbtpred)
print("Weighted Precision: ", weightedPrecision)

Weighted Precision:  0.8068455317627296


In [0]:
# Change the metric to weighted recall
gbteval.setMetricName("weightedRecall")
weightedRecall = gbteval.evaluate(gbtpred)
print("Weighted Recall: ", weightedRecall)

Weighted Recall:  0.8134620874246828


In [0]:
# Change the metric to F1 score
gbteval.setMetricName("f1")
f1Score  = gbteval.evaluate(gbtpred)
print("F1 Score: ", f1Score)

F1 Score:  0.7955241025909798


The GBT model was not run on the first_answertime_buckets as the GBTClassifier currently only supports binary classification

#### Logistic Regression
Logistic Regression is a supervised machine learning algorithm that can be used for both classification and probability prediction tasks. It is a popular choice for text analysis because it can handle high-dimensional data and is relatively easy to interpret.

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# instantiate the model
logreg = LogisticRegression(labelCol='bestanswer_rating_index', featuresCol='features', predictionCol='prediction', maxIter=80)

In [0]:
# create model
lrm = logreg.fit(train)

In [0]:
# make predictions
lrmpred = lrm.evaluate(test)
lrmpred.predictions.show(5)

+-----------------------+------------------------+-----------------+--------------------+--------------------+----------+
|bestanswer_rating_index|first_answertime_buckets|         features|       rawPrediction|         probability|prediction|
+-----------------------+------------------------+-----------------+--------------------+--------------------+----------+
|                    0.0|                     0.0| (17,[15],[56.0])|[1.59207087713372...|[0.83090726151665...|       0.0|
|                    0.0|                     0.0| (17,[15],[80.0])|[1.59665448344736...|[0.83155028353653...|       0.0|
|                    0.0|                     0.0| (17,[15],[94.0])|[1.59932825379698...|[0.83192447839668...|       0.0|
|                    0.0|                     0.0| (17,[15],[98.0])|[1.60009218818259...|[0.83203126930952...|       0.0|
|                    0.0|                     0.0|(17,[15],[108.0])|[1.60200202414660...|[0.83229800965791...|       0.0|
+-----------------------

In [0]:
# evaulate them model
lrmeval = BinaryClassificationEvaluator(labelCol="bestanswer_rating_index", rawPredictionCol="rawPrediction")

In [0]:
# calculate accuracy
lrmpreddf = lrmpred.predictions
total_correct = lrmpreddf.filter(lrmpreddf.prediction == lrmpreddf.bestanswer_rating_index).count()
lrmaccuracy = total_correct / lrmpreddf.count()
print(f"Accuracy: {lrmaccuracy * 100}")
print(f"Test Error = {1.0 - lrmaccuracy}")

# Get the AUC score (a value of 1 indicates perfect performance and a value of 0.5 indicates a classifier that performs no better than random guessing.
auc = lrmeval.evaluate(lrmpred.predictions)
print(f"Area Under the Receiver Operating Characteristic Score = {auc}")

Accuracy: 81.02729066972796
Test Error = 0.18972709330272042
Area Under the Receiver Operating Characteristic Score = 0.7997489041346322


Logistic Regression was not run on the first_answertime_buckets as the LogisticRegression currently only supports length-2 vectors of raw predictions