In [4]:
from pyspark.context import  SparkContext
from pyspark.sql.functions import col, lower
from pyspark.sql import SQLContext
import re
sc = SparkContext('local','test1')
sql = SQLContext(sc)

In [124]:
from pyspark.sql.functions import lit
dems_df =  sql.read.text("dems.txt")
gop_df = sql.read.text("gop.txt")

In [125]:
corpus_df = dems_df.select("value", lit(1).alias("label")).union(gop_df.select("value", lit(0).alias("label")))

In [126]:
corpus_df.select("*").limit(20).show()

+--------------------+-----+
|               value|label|
+--------------------+-----+
|This week @senate...|    1|
|Health care profe...|    1|
|RT @SeemaNanda: G...|    1|
|Republicans keep ...|    1|
|RT @SpeakerPelosi...|    1|
|While the preside...|    1|
|You are not alone...|    1|
|RT @DNCWarRoom: W...|    1|
|RT @DNCWarRoom: T...|    1|
|RT @DNCWarRoom: T...|    1|
|LISTEN. TO. HEALT...|    1|
|RT @SeemaNanda: B...|    1|
|This is a HUGE wi...|    1|
|RT @SenSherrodBro...|    1|
|RT @WisDems: Make...|    1|
|RT @DemConvention...|    1|
|Abortion is healt...|    1|
|RT @RepLucyMcBath...|    1|
|Get counted. Get ...|    1|
+--------------------+-----+



In [127]:
from pyspark.sql.functions import udf,lower,col,trim
from pyspark.sql.types import FloatType,StringType,IntegerType
def clean_text(text):
    text=re.sub(r'@[A-Za-z0-9]+','',text).strip() #remove mentions
    text=re.sub(r'#','',text).strip() #removing #symbol
    text=re.sub(r'RT[\s]+','',text).strip()
    #text=re.sub(r'https?:\/\/\S+','',text)
    #text=re.sub(r'\b(?:(?:https?|ftp)://)?\w[\w-]*(?:\.[\w-]+)+\S*', '', text)
    text=re.sub(r'[?|$|.|!|;|:|&|"|,|""*-]','',text).strip()
    text=re.sub(r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)+',"",text).strip()
    return text

In [128]:
def remove_emoji(string):
    emoji_pattern = re.compile("["
                           u"\U0001F600-\U0001F64F"  # emoticons
                           u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                           u"\U0001F680-\U0001F6FF"  # transport & map symbols
                           u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           u"\U00002702-\U000027B0"
                           u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', string)

In [129]:
def check_alphnum(text):
    t=""
    for word in text.split(" "):
        if (word.isalpha() and len(word)>3):
            t=t+" "+word
    return str(t)

In [130]:
# def check_len(text):
#     t=" "
#     for word in text.split(" "):
#         if(len(word)>3):
#             t=t+" "+word
#     return str(t)

In [131]:
#value
clean_udf_str=udf(lambda z: clean_text(z), StringType())
corpus_df=corpus_df.select("label",clean_udf_str("value").alias("value"))
emoji_udf_str=udf(lambda z: remove_emoji(z), StringType())
corpus_df=corpus_df.select("label",emoji_udf_str('value').alias('value'))
corpus_df=corpus_df.select(trim(lower(col('value'))).alias("value"),"label")

#e_udf_str=udf(lambda z: check_alph(z), StringType())
#corpus_df=corpus_df.select("label",e_udf_str('value').alias('value'))
#f_udf_str=udf(lambda z: check_alphnum(z), StringType())
#corpus_df=corpus_df.select("label",f_udf_str('value').alias('value'))
#corpus_df=corpus_df.select(trim(col('value')).alias("value"),"label")


In [132]:
corpus_df.select("*").limit(20).show()

+--------------------+-----+
|               value|label|
+--------------------+-----+
|this week  said w...|    1|
|health care profe...|    1|
|good to see  sign...|    1|
|republicans keep ...|    1|
|the congress has ...|    1|
|while the preside...|    1|
|you are not alone...|    1|
|well this is conc...|    1|
|trump “in the end...|    1|
|trump proposed hu...|    1|
|listen to health ...|    1|
|breaking we  alon...|    1|
|this is a huge wi...|    1|
|update this is th...|    1|
|make sure your vo...|    1|
|in light of the u...|    1|
|abortion is healt...|    1|
|why does completi...|    1|
|get counted get c...|    1|
+--------------------+-----+



In [133]:
train_df, test_df = corpus_df.randomSplit([0.75, 0.25])

In [134]:
#train_df.select(lower(col('value'))).show()
test_df.select("value").show()

+--------------------+
|               value|
+--------------------+
|'s actions on dac...|
|'s campaign says ...|
|(1/3) yesterday i...|
|(ca14) hosted “a ...|
|(co07) held a gov...|
|(fl07) toured ’s ...|
|(fl13) met with p...|
|(fl14) discussed ...|
|(il17) visited go...|
|(in07) greeted ov...|
|(mi12) attended t...|
|(ny03) and 99 ½ y...|
|(ny03) watched ma...|
|(or01) worked wit...|
|(tx18) stood with...|
|(tx33) spoke at h...|
|(tx33) stopped by...|
|(wa10) shadowed t...|
|(•_•) lt)   )the ...|
|1 day of trump he...|
+--------------------+
only showing top 20 rows



In [35]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="value", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="words_cleaned")
vectorizer = CountVectorizer(inputCol="words_cleaned", outputCol="features")
cleaning_pipeline = Pipeline(stages = [tokenizer,stop_words_remover,vectorizer])
#cleaning_pipeline = Pipeline(stages = [stop_words_remover,vectorizer])
cleaning_pipeline_model = cleaning_pipeline.fit(corpus_df)
cleaned_training_df = cleaning_pipeline_model.transform(train_df)
cleaned_testing_df = cleaning_pipeline_model.transform(test_df)

In [168]:
cleaned_training_df.show()

+--------------------+-----+--------------------+--------------------+--------------------+
|               value|label|               words|       words_cleaned|            features|
+--------------------+-----+--------------------+--------------------+--------------------+
|                    |    1|                  []|                  []|(55380,[3944],[1.0])|
|                    |    1|                  []|                  []|(55380,[3944],[1.0])|
|                    |    1|                  []|                  []|(55380,[3944],[1.0])|
|                    |    1|                  []|                  []|(55380,[3944],[1.0])|
|                    |    1|                  []|                  []|(55380,[3944],[1.0])|
|                    |    1|                  []|                  []|(55380,[3944],[1.0])|
|                    |    1|                  []|                  []|(55380,[3944],[1.0])|
|                    |    1|                  []|                  []|(55380,[39

In [66]:
cleaned_training_df.toPandas().to_csv("cleanedtraining.csv")

In [67]:
from pyspark.ml.classification import NaiveBayes
naive_bayes = NaiveBayes(featuresCol="features", labelCol = "label")

In [68]:
naive_bayes_model = naive_bayes.fit(cleaned_training_df)
predictions_df = naive_bayes_model.transform(cleaned_testing_df)

In [69]:
predictions_df.select("features","label","prediction").limit(20).show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|   (45487,[0],[1.0])|    1|       0.0|
|   (45487,[0],[1.0])|    1|       0.0|
|(45487,[0,2,3,323...|    1|       0.0|
|(45487,[0,76,80,5...|    1|       0.0|
|(45487,[0,41,122,...|    1|       1.0|
|(45487,[0,29,107,...|    1|       1.0|
|(45487,[0,13,14,3...|    1|       1.0|
|(45487,[1,4,24,68...|    1|       1.0|
|(45487,[7,10,12,3...|    1|       1.0|
|(45487,[7,10,12,3...|    1|       1.0|
|(45487,[12,18,24,...|    1|       1.0|
|(45487,[0,9,52,10...|    1|       1.0|
|(45487,[0,24,54,8...|    1|       1.0|
|(45487,[0,29,283,...|    1|       1.0|
|(45487,[0,29,283,...|    1|       1.0|
|(45487,[5,18,31,6...|    1|       1.0|
|(45487,[3,12,46,6...|    1|       1.0|
|(45487,[5,101,141...|    1|       1.0|
|(45487,[0,2,13,15...|    1|       1.0|
|(45487,[4,24,68,9...|    1|       1.0|
+--------------------+-----+----------+



In [70]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
eval = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction', metricName = 'accuracy')
eval.evaluate(predictions_df)

0.6744747899159664

In [71]:
cleaned_training_df.select("features").show(4)

+--------------------+
|            features|
+--------------------+
|(45487,[23,28,43,...|
|(45487,[0,12,21,1...|
|(45487,[0,4,52,21...|
|(45487,[0,55,143,...|
+--------------------+
only showing top 4 rows

