# NLP Code 

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark as ps
import warnings
# sc = ps.SparkContext()
# spark.stop()

In [2]:
# spark = SparkSession.builder.appName('nlp').getOrCreate()

try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

Just created a SparkContext


In [3]:
sc.master

'local[4]'

In [4]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('clean_tweet.csv')
# data = spark.read.csv("./dataFinal.csv",inferSchema=True,sep=',',header=True)

In [5]:
# data = data.withColumnRenamed('_c0','sentiment').withColumnRenamed('_c1','text')

In [6]:
df.show(5)

+---+--------------------+------+
|_c0|                text|target|
+---+--------------------+------+
|  0|awww that s a bum...|     0|
|  1|is upset that he ...|     0|
|  2|i dived many time...|     0|
|  3|my whole body fee...|     0|
|  4|no it s not behav...|     0|
+---+--------------------+------+
only showing top 5 rows



In [7]:
df.count()

1596747

In [8]:
(train_set, val_set, test_set) = df.randomSplit([0.50, 0.25, 0.25], seed = 2000)

In [9]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer,HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

add_stopwords = ["http","https","amp","rt","t","c","the","@"]
tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# hashtf = HashingTF(numFeatures=2**16, inputCol="filtered", outputCol='tf')
cv = CountVectorizer(inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features" , minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer,stopwordsRemover,cv, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+---+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+
|_c0|                text|target|               words|            filtered|                  cv|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+
|  0|awww that s a bum...|     0|[awww, that, s, a...|[awww, that, s, a...|(179480,[1,2,4,6,...|(179480,[1,2,4,6,...|  0.0|
|  1|is upset that he ...|     0|[is, upset, that,...|[is, upset, that,...|(179480,[2,4,5,7,...|(179480,[2,4,5,7,...|  0.0|
|  2|i dived many time...|     0|[i, dived, many, ...|[i, dived, many, ...|(179480,[0,1,9,11...|(179480,[0,1,9,11...|  0.0|
|  4|no it s not behav...|     0|[no, it, s, not, ...|[no, it, s, not, ...|(179480,[0,4,6,10...|(179480,[0,4,6,10...|  0.0|
|  7|hey long time no ...|     0|[hey, long, time,...|[hey, long, time,...|(179480,[0,2,6,10...|(179480,[0,2,6,10...|  0.0|
+---+---

In [10]:
from pyspark.ml.classification import LogisticRegression,NaiveBayes


In [18]:
# lr = LogisticRegression(maxIter=100)
nb = NaiveBayes(modelType="multinomial")
nbModel = nb.fit(train_df)

In [19]:
nbPredictions = nbModel.transform(val_df)

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
nbEval = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
nbAcc = nbEval.evaluate(nbPredictions)

In [23]:
from pyspark.mllib.evaluation import MulticlassMetrics
predictionAndLabel = nbPredictions.select("prediction", "label").rdd

# Generate confusion matrix
metrics = MulticlassMetrics(predictionAndLabel)
print( metrics.confusionMatrix())
print("Acc of NB: ",nbAcc)

DenseMatrix([[155196.,  44525.],
             [ 48202., 151398.]])
Acc of NB:  0.5356720499219636


In [16]:
lr = LogisticRegression()
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8586692560746748

In [26]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, stopwordsRemover,cv, idf, label_stringIdx, lr])

pipelineFit1 = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.7953
ROC-AUC: 0.8645


In [28]:
from pyspark.ml.util import MLWritable, MLReadable
# MLWritable.save(lrModel,'rash')
# lrModel.save('rashid')

In [27]:
pipelineFit1.write().overwrite().save("tmp/rasNas")

In [35]:
test_set.repartition(200).write \
    .mode("overwrite") \
    .parquet("tmp/testdata")

In [47]:
trainedLR = Pipeline()

In [28]:
from pyspark.ml import PipelineModel
abc = PipelineModel.load("tmp/rasNas")

In [29]:
testResult = abc.transform(test_set)

In [30]:
testResult.show()

+----+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
| _c0|                text|target|               words|            filtered|                  cv|            features|label|       rawPrediction|         probability|prediction|
+----+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
| 211|but this is canad...|     0|[but, this, is, c...|[but, this, is, c...|(65536,[1,7,18,25...|(65536,[1,7,18,25...|  0.0|[3.03563028398918...|[0.95415807476462...|       0.0|
| 470|cant see the flow...|     0|[cant, see, the, ...|[cant, see, flowe...|(65536,[0,2,3,16,...|(65536,[0,2,3,16,...|  0.0|[1.77173027835149...|[0.85467271611953...|       0.0|
| 484|have watched that...|     0|[have, watched, t...|[have, watched, t...|(65536,[0,3,13,16...|(65536,[0,3,1