# Finding Spark directory

In [1]:
import findspark
findspark.init()

# Declaring SparkContext

In [2]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")

In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
data_sql = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load('tweets_cleaned.csv')

In [5]:
data_sql.show(10)

+---------+--------------------+
|      _c0|                 _c1|
+---------+--------------------+
|emergency|unable to sit due...|
|emergency|pnr no need mdica...|
|emergency|mobile stolen , n...|
|emergency|no water supply i...|
|emergency|fan isnt working ...|
|emergency|paid for ac but f...|
|emergency|big mouse found b...|
|emergency|hanging chains of...|
|emergency|aggressive copass...|
|emergency|having ticket con...|
+---------+--------------------+
only showing top 10 rows



In [6]:
from pyspark.sql.functions import col
data_sql.groupBy("_c0") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+---------+-----+
|      _c0|count|
+---------+-----+
| feedback|  275|
|emergency|  246|
+---------+-----+



# Importing Dataset and Spliting into train & test

In [38]:
data = sc.textFile("tweets_cleaned.csv")
train_data, test_data = data.randomSplit([0.9, 0.1])
train_fields = train_data.map(lambda x: x.split(","))
train_documents = train_fields.map(lambda x: x[1].lower().split(" "))
test_fields = test_data.map(lambda x: x.split(","))
test_documents = test_fields.map(lambda x: x[1].lower().split(" "))

# Feature Extraction (TF-IDF)

In [39]:
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF

hashingTF = HashingTF(2**14)
tf = hashingTF.transform(train_documents)
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
train_xformedData=train_data.zip(tfidf)
train_xformedData.cache()
train_xformedData.collect()[0]

('emergency,unable to sit due to sticky stains on berth. PNR no . do help immediately',
 SparseVector(16384, {2996: 3.8754, 3408: 5.4848, 6351: 2.1706, 6797: 3.1256, 7294: 4.3862, 7338: 3.4699, 9334: 3.5389, 10620: 2.3278, 10907: 4.7916, 11835: 5.4848, 13751: 2.8106, 14437: 5.4848, 14535: 4.7916, 15094: 5.4848}))

In [40]:

# with open('tfidf.model', 'wb') as f:
#     pickle.dump(idf, f)

# import joblib
# joblib.dump(idf, 'tfidf.model')

In [41]:
hashingTF = HashingTF(2**14)
tf_test = hashingTF.transform(test_documents)
tf_test.cache()
# idf = IDF().fit(tf_test)
tfidf = idf.transform(tf_test)

test_xformedData=test_data.zip(tfidf)
test_xformedData.cache()
test_xformedData.collect()[0]

('emergency,"mobile stolen , need emergency help to track it pnr"',
 SparseVector(16384, {0: 2.8457, 7833: 6.1779, 12550: 5.4848}))

# Creating Labeled Points of training and testing data points

In [42]:
from pyspark.mllib.regression import LabeledPoint
def convertToLabeledPoint(inVal) :
    origAttr=inVal[0].split(",")
    sentiment = 0.0 if origAttr[0] == "feedback" else 1.0
    return LabeledPoint(sentiment, inVal[1])

In [43]:
train_labeled_points=train_xformedData.map(convertToLabeledPoint)
train_labeled_points.cache()
train_labeled_points.collect()[0]

LabeledPoint(1.0, (16384,[2996,3408,6351,6797,7294,7338,9334,10620,10907,11835,13751,14437,14535,15094],[3.8753590210565547,5.484796933490655,2.1706109288181294,3.1256471944186814,4.386184644822546,3.46989391294839,3.5388867844353418,2.3277965123405417,4.791649752930709,5.484796933490655,2.810648284064126,5.484796933490655,4.791649752930709,5.484796933490655]))

In [44]:
test_labeled_points=test_xformedData.map(convertToLabeledPoint)
test_labeled_points.cache()
test_labeled_points.collect()[0]

LabeledPoint(1.0, (16384,[0,7833,12550],[2.8457396038753964,6.1779441140506,5.484796933490655]))

In [45]:
print('Training points: ',len(train_labeled_points.collect()))

Training points:  481


In [46]:
print('Testing Points: ',len(test_labeled_points.collect()))

Testing Points:  40


# Naive Bayes

In [62]:
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
model = NaiveBayes.train(train_labeled_points, 1.0)

In [63]:
train_predictionAndLabel = train_labeled_points.map(lambda p: (model.predict(p.features), p.label))
accuracy = 1.0 * train_predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / train_labeled_points.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.8544698544698545


In [64]:
test_predictionAndLabel = test_labeled_points.map(lambda p: (model.predict(p.features), p.label))
accuracy = 1.0 * test_predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test_labeled_points.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.75


In [67]:
import joblib
joblib.dump(model,'NB.model')

['NB.model']

# Confusion matrix - train

In [50]:
predDF = sqlContext.createDataFrame(
    [[float(tup[0]), float(tup[1])] for tup in train_predictionAndLabel.collect()],
    ["prediction","label"]
)
predDF.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  186|
|  0.0|       1.0|   31|
|  1.0|       0.0|   39|
|  0.0|       0.0|  225|
+-----+----------+-----+



# Confusion matrix - test

In [51]:
predDF = sqlContext.createDataFrame(
    [[float(tup[0]), float(tup[1])] for tup in test_predictionAndLabel.collect()],
    ["prediction","label"]
)
predDF.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   15|
|  0.0|       1.0|    4|
|  1.0|       0.0|    6|
|  0.0|       0.0|   15|
+-----+----------+-----+



# SVM

In [52]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
model = SVMWithSGD.train(train_labeled_points, iterations=100)

In [53]:
train_predictionAndLabel = train_labeled_points.map(lambda p: (model.predict(p.features), p.label))
accuracy = 1.0 * train_predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / train_labeled_points.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.8711018711018711


In [54]:
test_predictionAndLabel = test_labeled_points.map(lambda p: (model.predict(p.features), p.label))
accuracy = 1.0 * test_predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test_labeled_points.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.625


# Confusion matrix - train

In [55]:
predDF = sqlContext.createDataFrame(
    [[float(tup[0]), float(tup[1])] for tup in train_predictionAndLabel.collect()],
    ["prediction","label"]
)
predDF.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  185|
|  0.0|       1.0|   22|
|  1.0|       0.0|   40|
|  0.0|       0.0|  234|
+-----+----------+-----+



# Confusion matrix - test

In [56]:
predDF = sqlContext.createDataFrame(
    [[float(tup[0]), float(tup[1])] for tup in test_predictionAndLabel.collect()],
    ["prediction","label"]
)
predDF.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   11|
|  0.0|       1.0|    5|
|  1.0|       0.0|   10|
|  0.0|       0.0|   14|
+-----+----------+-----+



# Logistic Regression

In [57]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
model = LogisticRegressionWithLBFGS.train(train_labeled_points)

In [58]:
train_predictionAndLabel = train_labeled_points.map(lambda p: (model.predict(p.features), p.label))
accuracy = 1.0 * train_predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / train_labeled_points.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.8794178794178794


In [59]:
test_predictionAndLabel = test_labeled_points.map(lambda p: (model.predict(p.features), p.label))
accuracy = 1.0 * test_predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / test_labeled_points.count()
print('model accuracy {}'.format(accuracy))

model accuracy 0.675


# Confusion matrix - train

In [60]:
predDF = sqlContext.createDataFrame(
    [[float(tup[0]), float(tup[1])] for tup in train_predictionAndLabel.collect()],
    ["prediction","label"]
)
predDF.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  189|
|  0.0|       1.0|   22|
|  1.0|       0.0|   36|
|  0.0|       0.0|  234|
+-----+----------+-----+



# Confusion matrix - test

In [61]:
predDF = sqlContext.createDataFrame(
    [[float(tup[0]), float(tup[1])] for tup in test_predictionAndLabel.collect()],
    ["prediction","label"]
)
predDF.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   12|
|  0.0|       1.0|    4|
|  1.0|       0.0|    9|
|  0.0|       0.0|   15|
+-----+----------+-----+

