In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark import SparkFiles


In [2]:
session = SparkSession.builder.appName('nlp').getOrCreate()
session.sparkContext.addFile('tweet_data_nlp.csv')
df = session.read.csv(SparkFiles.get('tweet_data_nlp.csv'), sep=',', header=True)


In [3]:
df = df.withColumn('length', length(df['text']))
df.show()

+----------+---------------+---------+--------------------+------+
|Unnamed: 0|            key|    party|                text|length|
+----------+---------------+---------+--------------------+------+
|         0| RepDarrenSoto0|democrats|And Episode 2 cov...|  3092|
|         1| RepDarrenSoto1|democrats|TorresBruno famil...|  3002|
|         2| RepDarrenSoto2|democrats|SteveLemongello g...|  2657|
|         3| RepDarrenSoto3|democrats|On this NetNeutra...|  3058|
|         4| RepDarrenSoto4|democrats|Happy VeteransDay...|  3336|
|         5| RepDarrenSoto5|democrats|Today we mourn th...|  3257|
|         6| RepDarrenSoto6|democrats| HispanicFed This...|  2775|
|         7| RepDarrenSoto7|democrats| NHCSL We are ple...|  2902|
|         8| RepDarrenSoto8|democrats| rvivian370 RepDa...|  2875|
|         9| RepDarrenSoto9|democrats| ConsulMexOrl En ...|  2944|
|        10|RepDarrenSoto10|democrats| LisaKFrank1 Than...|  3219|
|        11|RepDarrenSoto11|democrats|Our prayers go ou...|  2

In [5]:
labels = StringIndexer(inputCol='party',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [8]:
pipe = Pipeline(stages=[labels, tokenizer, stopremove, hashingTF, idf, clean_up])

In [10]:
cleaner = pipe.fit(df)
cleaned = cleaner.transform(df)
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[1536,559...|
|  0.0|(262145,[2555,405...|
|  0.0|(262145,[1536,243...|
|  0.0|(262145,[1843,390...|
|  0.0|(262145,[1846,232...|
|  0.0|(262145,[1226,214...|
|  0.0|(262145,[632,2718...|
|  0.0|(262145,[304,664,...|
|  0.0|(262145,[329,2326...|
|  0.0|(262145,[632,1536...|
|  0.0|(262145,[632,2731...|
|  0.0|(262145,[1772,538...|
|  0.0|(262145,[211,316,...|
|  0.0|(262145,[168,784,...|
|  0.0|(262145,[2437,523...|
|  0.0|(262145,[890,1016...|
|  0.0|(262145,[2326,386...|
|  0.0|(262145,[1921,232...|
|  0.0|(262145,[1536,302...|
|  0.0|(262145,[441,1138...|
+-----+--------------------+
only showing top 20 rows



In [11]:
training, testing = cleaned.randomSplit([0.7, 0.3])
nb = NaiveBayes()
predictor = nb.fit(training)
test_results = predictor.transform(testing)
test_results.show(5)


+----------+---------------+---------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Unnamed: 0|            key|    party|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+----------+---------------+---------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|       103|   RepTedLieu14|democrats| CNNSitRoom Stati...|  3447|  0.0|[, cnnsitroom, st...|[, cnnsitroom, st...|(262144,[3082,408...|(262144,[3082,408...|(262145,[3082,408...|[-14339.919796545...|[1.0,6.9413084558...|       0.0|
|       106|   RepTedLieu17|democrats|Today we commemor...|  3757|  0.0|[tod

In [12]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print(f"Accuracy of model at predicting affilcation was: {acc}")

Accuracy of model at predicting reviews was: 0.9906542056074767


In [None]:
import pandas as pd

df_p = pd.read_csv('compact_tweets.csv')
df_p.drop(columns=['Unnamed: 0'])
#df_p.to_csv('tweet_data_nlp.csv',columns=['key','party','text'], index=False)
df_p['text'] = df_p['text'].apply(lambda x: x.replace('\n',''))
df_p['text'] = df_p['text'].apply(lambda x: x.replace('. ','.'))
df_p['text'] = df_p['text'].apply(lambda x: x.replace(' .','.'))


df_p.to_csv('tweet_data_nlp.csv', index=False)