In [34]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import col
from pyspark.sql import Row

from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler


In [2]:
sc =SparkContext()
sqlContext = SQLContext(sc)




In [14]:
train_df = sqlContext.read.format("csv").load('file://///Users/zaranabhalala/Desktop/project/trainingandtestdata/training.1600000.processed.noemoticon.csv')
# test_df = sqlContext.read.format("csv").load('file://///Users/zaranabhalala/Desktop/project/trainingandtestdata/testdata.manual.2009.06.14.csv')


#Rename Columns
train_df = train_df.withColumnRenamed("_c0", "sentiment")\
       .withColumnRenamed("_c1", "id")\
        .withColumnRenamed("_c2", "date")\
       .withColumnRenamed("_c3", "query")\
       .withColumnRenamed("_c4", "user")\
       .withColumnRenamed("_c5", "text")


# Distribution of Sentiments in Traning  Set

In [15]:
train_df.groupBy("sentiment") \
    .count() \
    .orderBy(col("count").desc()) \
    .distinct() \
    .show()

+---------+------+
|sentiment| count|
+---------+------+
|        0|800000|
|        4|800000|
+---------+------+



# Distribution of Sentiments in Test Set


In [16]:
# test_df.groupBy("sentiment") \
#     .count() \
#     .orderBy(col("count").desc()) \
#     .distinct() \
#     .show()

+---------+-----+
|sentiment|count|
+---------+-----+
|        0|  177|
|        4|  182|
|        2|  139|
+---------+-----+



# Split Train and Test Set

In [19]:
# set seed for reproducibility
(train_df, test_df) = train_df.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(train_df.count()))
print("Test Dataset Count: " + str(test_df.count()))


Training Dataset Count: 1120280
Test Dataset Count: 479720


# Data Preprofessing and Model Training in Spark

In [23]:
#
## regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
#
## stop words
add_stopwords = ["http","https","amp","rt","t","c","the","@"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
#
## bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
#
## convert string labels to indexes
label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")
#
#print (label_stringIdx)

rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100, seed=100)
##lrModel = lr.fit(trainingData)

In [26]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx, rf])

In [28]:
pipelineFit = pipeline.fit(train_df)
predictions = pipelineFit.transform(test_df)

# Model Inference on sample test set

In [33]:
predictions.select("text","sentiment","probability","label","prediction") \
    .orderBy("probability") \
    .show(n = 30, truncate = 40)

+----------------------------------------+---------+----------------------------------------+-----+----------+
|                                    text|sentiment|                             probability|label|prediction|
+----------------------------------------+---------+----------------------------------------+-----+----------+
|Crap :| Can't attend school. Me sick....|        0| [0.4002160167987572,0.5997839832012428]|  1.0|       1.0|
|@nathanprodi yeah, i'm sick  i feel l...|        0| [0.4042877635215788,0.5957122364784212]|  1.0|       1.0|
|I'm always hungry all the time but no...|        0|[0.40473638082922625,0.5952636191707739]|  1.0|       1.0|
|I'm sooooo sick...sore throat, headac...|        0|[0.40516120663837013,0.5948387933616299]|  1.0|       1.0|
|G'morning! I'm sad cus my back hurts ...|        0|[0.40743924994458935,0.5925607500554106]|  1.0|       1.0|
|i hate being sick. i feel horrible. m...|        0|[0.40889193436898824,0.5911080656310118]|  1.0|       1.0|
|

# Spark Model Validation

In [37]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print("Accuracy: %g" % (evaluator.evaluate(predictions)))


Accuracy: 0.7151


# Save Spark Model

In [38]:
pipelineFit.save("file://///Users/zaranabhalala/Desktop/project/logreg.model")
