In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *       # for datatype conversion
from pyspark.sql.functions import *   # for col() function
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer
import re
import pyspark.sql.functions as f
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from numpy import array
from math import sqrt
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import NGram
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
import matplotlib.pyplot as plt
from pyspark.ml.clustering import BisectingKMeans

sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Jeopardy Calculation") \
    .config("spark.executor.memory", '2g') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '1') \
    .config("spark.driver.memory",'1g') \
    .getOrCreate()

from nltk.stem.porter import *
stemmer = PorterStemmer()
def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.stem(t)
        out_vec.append(t_stem)       
    return out_vec
from pyspark.sql.types import *
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

In [2]:
#reading in the jeopardy data set
jeopardy = spark.read.json("JEOPARDY_QUESTIONS1-Copy1.json")
j_categoryCount = jeopardy.groupBy("category").count()
#This is the number of categories in the dataset which have greater than 100 observations
count100 = j_categoryCount.sort(desc("count")).filter(j_categoryCount["count"] > 100).count()
#This is a list of all categories which have a count greater than 100
top_categories = list(j_categoryCount.sort(desc("count")).select("category").limit(count100).toPandas().category)
#new dataset that only contains categories that have greater than 100 osbervations
jeo_f = jeopardy.where(col("category").isin(top_categories))

In [3]:
#stripping punctuation, tokenizing, stop word removing, and stemming for the modified dataset
jeo_fpunc = jeo_f.withColumn("stripped", f.regexp_replace(f.col("question"), "[\!@#$%^&*)(><,';:]", ""))
jeo_fpunc.cache()
tokenizer = Tokenizer(inputCol = "stripped", outputCol = "words")
tokenized_f = tokenizer.transform(jeo_fpunc)
tokenized_f.cache()
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
jeo_fStopRemoved = remover.transform(tokenized_f)
jeo_fStopRemoved.cache()
jeo_fStemmed = jeo_fStopRemoved.withColumn("stemmed", stemmer_udf("filtered"))
jeo_fStemmed.cache()


DataFrame[air_date: string, answer: string, category: string, question: string, round: string, show_number: string, value: string, stripped: string, words: array<string>, filtered: array<string>, stemmed: array<string>]

In [6]:
training,test = jeo_fStemmed.randomSplit([0.8,0.2], seed = 1) 

In [7]:
hashingTF = HashingTF(inputCol="stemmed", outputCol="rawFeatures", numFeatures = 50000)
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
indexer = StringIndexer(inputCol="category", outputCol="label")
nb = NaiveBayes(modelType="multinomial", featuresCol = "features", labelCol = "label")

pipelineNB = Pipeline(stages=[hashingTF,idf,indexer,nb])
paramGrid = ParamGridBuilder() \
    .addGrid(nb.smoothing, [0.1,1.0,5.0,10.0,50.0]) \
    .build()

crossval = CrossValidator(estimator=pipelineNB,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=5)
cvModel = crossval.fit(training)

In [8]:
prediction = cvModel.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.29431779516842466


In [9]:
cvModel.avgMetrics

[0.24087622134105305,
 0.2548904255189059,
 0.2639944388359019,
 0.2640067716510574,
 0.2555010420133558]

In [10]:
prediction.groupBy('prediction').count().sort(desc("prediction")).show(145)

+----------+-----+
|prediction|count|
+----------+-----+
|     143.0|    3|
|     142.0|   14|
|     140.0|    9|
|     139.0|   12|
|     138.0|   19|
|     137.0|    4|
|     136.0|    7|
|     135.0|    1|
|     134.0|    9|
|     133.0|    5|
|     132.0|    5|
|     130.0|   13|
|     129.0|   11|
|     128.0|    9|
|     127.0|   16|
|     125.0|    1|
|     123.0|   11|
|     122.0|   13|
|     121.0|    4|
|     120.0|   12|
|     119.0|    3|
|     117.0|    4|
|     115.0|    4|
|     113.0|   14|
|     112.0|    3|
|     111.0|    6|
|     110.0|    9|
|     109.0|    3|
|     108.0|   18|
|     107.0|    2|
|     106.0|    1|
|     105.0|   11|
|     104.0|    4|
|     103.0|    8|
|     101.0|    2|
|     100.0|    3|
|      99.0|    1|
|      98.0|   16|
|      97.0|    4|
|      96.0|    1|
|      95.0|   18|
|      94.0|    2|
|      93.0|    6|
|      92.0|    5|
|      91.0|    2|
|      90.0|    9|
|      89.0|    7|
|      88.0|   21|
|      87.0|   13|
|      86.0|

In [11]:
prediction.filter(prediction['prediction'] == 1).select("category","question").sort("category") \
    .show(424,truncate=False)

+-----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|category               |question                                                                                                                                                                                                                                                                                                                                                                                                                                                                 

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 38246)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pysp