In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('prez').getOrCreate()

In [3]:
# Load in data
from pyspark import SparkFiles
df = spark.read.format("csv").option("header", "true").load("President_data.csv")
df.head()

Row(Presidents='bush', Party_Affliation='Republican', variable='Speech1', value='"LEHRER On behalf of Commission on Presidential Debates am pleased to welcome you to this first presidential debate of 1988 campaign I\'m Jim Lehrer of MacNeil/Lehrer News Hour My colleagues on panel are John Mashek of Atlanta Journal-Constitution; Ann Groer of Orlando Sentinel; and Peter Jennings of ABC News For next 90 minutes we will be questioning candidates for of United States following a format designed and agreed to by representatives of two candidates The candidates are Vice George Bush Republican nominee; Governor Michael Dukakis Democratic nominee LEHRER Our questions this evening will be about equally divided between foreign and domestic policy matters The first question by agreement between two candidates goes to Vice Bush It is a domestic question You have two minutes for an answer sir The polls say number one domestic issue to a majority of voters is drugs What is there about these times tha

In [4]:
# Create a length column to be used as a future feature 
from pyspark.sql.functions import length
df = df.withColumn("length", length(df.value))
df.show()

+----------+----------------+--------+--------------------+------+
|Presidents|Party_Affliation|variable|               value|length|
+----------+----------------+--------+--------------------+------+
|      bush|      Republican| Speech1|"LEHRER On behalf...| 29012|
|      bush|      Republican| Speech1|"Chief Justice Vi...| 11506|
|      bush|      Republican| Speech1|"have many friend...| 20265|
|      bush|      Republican| Speech1|"My fellow citize...|  4383|
|      bush|      Republican| Speech1|"Governor all ver...| 13265|
|      bush|      Republican| Speech1|"and distinguishe...| 25829|
|      bush|      Republican| Speech1|"members of Unite...| 19878|
|      bush|      Republican| Speech1|"Evan so much And...|  9955|
|      bush|      Republican| Speech1|In life of a nati...|  7913|
|      bush|      Republican| Speech1|"and and Members ...| 17975|
|      bush|      Republican| Speech1|very much Secreta...| 14597|
|      bush|      Republican| Speech1|"Tonight want to ...|  5

### Feature Transformations


In [5]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

# Create the data processing pipeline functions here (note: StringIndexer will be used to encode 
# your target variable column. This column should be named 'label' so our model will recognize it later)
review_data = Tokenizer(inputCol="value", outputCol="Words")
reviewed = review_data.transform(df)
#reviewed.show()
remover = StopWordsRemover(inputCol="Words", outputCol="filtered")
newFrame = remover.transform(reviewed)
#newFrame.show()
hashing = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,10))
# Transform in a DF
hashed_df = hashing.transform(newFrame)
hashed_df.show(truncate=False)
idf = IDF(inputCol="hashedValues", outputCol="feature")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)
rescaledData.select("words", "feature").show(truncate=False)
indexer = StringIndexer(inputCol="Party_Affliation", outputCol="label")
indexed = indexer.fit(rescaledData).transform(rescaledData)
indexed.show()

+----------+----------------+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



+----------+----------------+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+
|Presidents|Party_Affliation|variable|               value|length|               Words|            filtered|        hashedValues|             feature|label|
+----------+----------------+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+
|      bush|      Republican| Speech1|"LEHRER On behalf...| 29012|["lehrer, on, beh...|["lehrer, behalf,...|(1024,[0,2,3,4,5,...|(1024,[0,2,3,4,5,...|  0.0|
|      bush|      Republican| Speech1|"Chief Justice Vi...| 11506|["chief, justice,...|["chief, justice,...|(1024,[2,4,5,8,9,...|(1024,[2,4,5,8,9,...|  0.0|
|      bush|      Republican| Speech1|"have many friend...| 20265|["have, many, fri...|["have, many, fri...|(1024,[0,2,3,4,5,...|(1024,[0,2,3,4,5,...|  0.0|
|      bush|      Republican| Speech1|"My fellow citize...

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors (note: this step utilizes VectorAssembler to create a new vector from our predictor columns.
# Name this column 'features')
assembler = VectorAssembler(
    inputCols=["feature", "length"],
    outputCol="features")

cleaned = assembler.transform(indexed)
print("Assembled 'features', 'length' to vector column 'features'")

Assembled 'features', 'length' to vector column 'features'


In [7]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline

In [8]:
# Fit and transform the pipeline

In [9]:
# Examine your label and features columns

In [10]:
# Partition data into a training set and a testing set (see documentation for randomSplit())
training, testing = cleaned.randomSplit([0.7, 0.3])

In [15]:
from pyspark.ml.classification import NaiveBayes

# Create a Naive Bayes model and fit training data
#nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nb = NaiveBayes()

# train the model
model = nb.fit(training)

# select example rows to display.
predictions = model.transform(testing)
predictions.show()

+----------+----------------+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|Presidents|Party_Affliation|variable|               value|length|               Words|            filtered|        hashedValues|             feature|label|            features|       rawPrediction|         probability|prediction|
+----------+----------------+--------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|      bush|      Republican| Speech1|"Jim Lehrer and w...| 29018|["jim, lehrer, an...|["jim, lehrer, we...|(1024,[0,2,3,4,5,...|(1024,[0,2,3,4,5,...|  0.0|(1025,[0,2,3,4,5,...|[-14571.427626058...|[1.18508485758665...|       1.0|
|      bush|      Republican| Speech1|"Just 2 hours ago...|  8140|["just, 2,

In [16]:
# Use the model to predict classifications for our testing partition

In [17]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(predictions)
print("Accuracy of model at predicting reviews was: {}".format(acc))

Accuracy of model at predicting reviews was: 0.6480038480038479


In [None]:
spark.stop()