In [3]:
from pyspark import SparkContext
from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import functions as F
from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, asc
from pyspark.sql.types import FloatType

### Load data from Azure Storage Account

In [5]:
texttrain = sc.textFile('wasb://ml-gov-demo@mlgovdemo.blob.core.usgovcloudapi.net/HdiSamples/HdiSamples/ml-demo/training.txt')
testtrain = sc.textFile('wasb://ml-gov-demo@mlgovdemo.blob.core.usgovcloudapi.net/HdiSamples/HdiSamples/ml-demo/test.txt')

In [6]:
textfinal = texttrain.map(lambda (text): (float(text.split("\t")[0]), text.split("\t")[1]))
testfinal = testtrain.map(lambda (text): (float(text.split("\t")[0]), text.split("\t")[1]))
textfinallist = textfinal.collect()
testfinallist = testfinal.collect()
print textfinal.take(1)
print testfinal.take(1)

[(1.0, u'lynn and jon at the theaters for da vinci code-other than that, EVERYONE AT THE LUAU, it was awesome..')]
[(1.0, u'The Da Vinci Code book is just awesome.')]

### Structure Dataframe
#### Each sentence or phrase has been labeled as 0 (negative) or 1 (positive) sentiment

In [7]:
schema = StructType([StructField("label", StringType()), 
                     StructField("text", StringType())])

structuredtext = texttrain.map(lambda (textstring): (textstring.split("\t")[0],textstring.split("\t")[1:]))
structuredtest = testtrain.map(lambda (textstring): (textstring.split("\t")[0],textstring.split("\t")[1:]))
df = sqlContext.createDataFrame(structuredtext, schema)
dftest = sqlContext.createDataFrame(structuredtest, schema)
df.registerTempTable("datatable")
dftest.registerTempTable("testtable")
df.show(5)
dftest.show(5)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    1|[lynn and jon at ...|
|    1|[The Da Vinci Cod...|
|    1|[also, the da vin...|
|    1|[* gasp * I LOVE ...|
|    1|[As for movies, T...|
+-----+--------------------+
only showing top 5 rows

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    1|[The Da Vinci Cod...|
|    1|[this was the fir...|
|    1|[i liked the Da V...|
|    1|[i liked the Da V...|
|    1|[I liked the Da V...|
+-----+--------------------+
only showing top 5 rows

In [8]:
zeroes = df.where(col("label") == "0")
ones = df.where(col("label") == "1")
print "Number of negatives:" + str(zeroes.count())
print "Number of positives:" + str(ones.count())

Number of negatives:2939
Number of positives:3845

In [9]:
finaldf = df.select(df.label.cast('double'), df.text)
finaldftest = dftest.select(dftest.label.cast('double'),dftest.text)
finaldf.take(1)

[Row(label=1.0, text=u'[lynn and jon at the theaters for da vinci code-other than that, EVERYONE AT THE LUAU, it was awesome..]')]

### TF - IDF Feature Extraction
#### Applying weighting factor that indicates how important a word is to data

In [62]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokentext = tokenizer.transform(combined_df)
tokentest = tokenizer.transform(combinedtest_df)

In [63]:
print tokentext.first()

Row(label=1.0, text=u'[lynn and jon at the theaters for da vinci code-other than that, EVERYONE AT THE LUAU, it was awesome..]', words=[u'[lynn', u'and', u'jon', u'at', u'the', u'theaters', u'for', u'da', u'vinci', u'code-other', u'than', u'that,', u'everyone', u'at', u'the', u'luau,', u'it', u'was', u'awesome..]'])

In [64]:
newhashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=2000)
hashtext = newhashingTF.transform(tokentext)
hashtest = newhashingTF.transform(tokentest)
print hashtext.first()

Row(label=1.0, text=u'[lynn and jon at the theaters for da vinci code-other than that, EVERYONE AT THE LUAU, it was awesome..]', words=[u'[lynn', u'and', u'jon', u'at', u'the', u'theaters', u'for', u'da', u'vinci', u'code-other', u'than', u'that,', u'everyone', u'at', u'the', u'luau,', u'it', u'was', u'awesome..]'], features=SparseVector(2000, {333: 1.0, 417: 1.0, 436: 1.0, 495: 1.0, 745: 1.0, 756: 2.0, 893: 1.0, 1036: 1.0, 1055: 1.0, 1124: 1.0, 1234: 1.0, 1261: 1.0, 1598: 1.0, 1710: 2.0, 1793: 1.0, 1891: 1.0, 1998: 1.0}))

In [65]:
idf = IDF(inputCol="features").fit(hashtext)
tfidf = idf.transform(hashtest)
print tfidf.first()

Row(label=1.0, text=u'[The Da Vinci Code book is just awesome.]', words=[u'[the', u'da', u'vinci', u'code', u'book', u'is', u'just', u'awesome.]'], features=SparseVector(2000, {307: 1.0, 393: 1.0, 420: 1.0, 524: 1.0, 1281: 1.0, 1521: 1.0, 1793: 1.0, 1998: 1.0}), IDF_4ff699181516231e2af0__output=SparseVector(2000, {307: 1.5145, 393: 1.4861, 420: 3.8031, 524: 2.1383, 1281: 0.4251, 1521: 4.6657, 1793: 4.62, 1998: 3.2739}))

### Machine Learning Pipeline 
#### Tokenizer -> Remove unnecessary words -> TFIDF -> Naive Bayes ML model

In [10]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopremover= StopWordsRemover().setInputCol("words").setOutputCol("removed").setCaseSensitive(False)
newhashingTF = HashingTF(inputCol="removed", outputCol="features", numFeatures=2000)
nb_pipeline = Pipeline(stages=[tokenizer, stopremover, newhashingTF, nb])

In [11]:
nb_model = nb_pipeline.fit(finaldf)

In [12]:
nb_predictions = nb_model.transform(finaldftest)

In [13]:
nb_filterdf = nb_predictions.filter( (nb_predictions.prediction==nb_predictions.label))
print "Accuracy = " + str(float(nb_filterdf.count())/float(nb_predictions.count()))

Accuracy = 0.885191347754

In [15]:
nb_model.save('wasb:///HdiSamples/HdiSamples/sentimentone')

In [16]:
nbmodel = PipelineModel.load('wasb:///HdiSamples/HdiSamples/sentimentone')

In [17]:
testinginput = [('I hate her, she is so mean'),('This book is interesting')]
testrdd = sc.parallelize(testinginput)
temp = testrdd.map(lambda x: Row(text = x))
tempdf = sqlContext.createDataFrame(temp)
testpred =nbmodel.transform(tempdf)
print testpred.take(2)

[Row(text=u'I hate her, she is so mean', words=[u'i', u'hate', u'her,', u'she', u'is', u'so', u'mean'], removed=[u'hate', u'her,', u'mean'], features=SparseVector(2000, {69: 1.0, 493: 1.0, 909: 1.0}), rawPrediction=DenseVector([-20.0025, -23.9995]), probability=DenseVector([0.982, 0.018]), prediction=0.0), Row(text=u'This book is interesting', words=[u'this', u'book', u'is', u'interesting'], removed=[u'book', u'interesting'], features=SparseVector(2000, {393: 1.0, 765: 1.0}), rawPrediction=DenseVector([-18.8335, -14.1577]), probability=DenseVector([0.0092, 0.9908]), prediction=1.0)]

### Training on Amazon review data 

In [18]:
amazontext = sc.textFile('wasb://ml-gov-demo@mlgovdemo.blob.core.usgovcloudapi.net/HdiSamples/HdiSamples/ml-demo/amazonreviewdata.txt')

In [19]:
print amazontext.take(1)

[u'__label__2 Stuning even for the non-gamer: This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^']

In [20]:
testline = amazontext.first()

In [21]:
testlist = testline.split("__label__")
testlabel = testlist[1][0]
testtext = testlist[1][2:]
print testlabel
print testtext

2
Stuning even for the non-gamer: This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^

In [22]:
schema2 = StructType([StructField("label", StringType()), 
                     StructField("text", StringType())])

structuredamazontext = amazontext.map(lambda (textstring): (textstring.split("__label__")[1][0],textstring.split("__label__")[1][2:]))
amazondf = sqlContext.createDataFrame(structuredamazontext, schema2)
amazondf.registerTempTable("amazondatatable")
amazondf.show(5)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    2|Stuning even for ...|
|    2|The best soundtra...|
|    2|Amazing!: This so...|
|    2|Excellent Soundtr...|
|    2|Remember, Pull Yo...|
+-----+--------------------+
only showing top 5 rows

### Change Labels to 0s and 1s

In [23]:
from pyspark.sql import functions as F

amazondf = amazondf.withColumn('label',
    F.when(amazondf['label']== '1','0').when(amazondf['label']=='2','1').
    otherwise(amazondf['label']))

In [24]:
zeroes = amazondf.where(col("label") == "0")
ones = amazondf.where(col("label") == "1")
print "Number of negatives:" + str(zeroes.count())
print "Number of positives:" + str(ones.count())

Number of negatives:1800000
Number of positives:1800000

In [25]:
amazondf = amazondf.select(amazondf.label.cast('double'), amazondf.text)
print amazondf.take(1)

[Row(label=1.0, text=u'Stuning even for the non-gamer: This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^')]

In [26]:
train_amazonset, test_amazonset = amazondf.randomSplit([0.9, 0.1], 12345)

### Training model with Naive Bayes

In [27]:
amazon_nb_model = nb_pipeline.fit(train_amazonset)

In [28]:
amazon_testpredictions = amazon_nb_model.transform(test_amazonset)

In [29]:
amazon_testpredictions.take(1)



In [32]:
amazon_filterdf = amazon_testpredictions.filter( (amazon_testpredictions.prediction==amazon_testpredictions.label))
print "Accuracy = " + str(float(amazon_filterdf.count())/float(amazon_testpredictions.count()))

Accuracy = 0.788299380228

### Training Model with Logistic Regression 

In [33]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_pipeline = Pipeline(stages=[tokenizer,stopremover, newhashingTF, lr])

amazon_lr_model = lr_pipeline.fit(train_amazonset)

### NB Model without removing Stop Words

In [35]:
from pyspark.sql.functions import lower, col

amazondf2 = amazondf.withColumn('text', lower(col('text')));
train_amazonset2, test_amazonset2 = amazondf2.randomSplit([0.9, 0.1], 12345)

#trying without stop remover
newhashingTF2 = HashingTF(inputCol="words", outputCol="features", numFeatures=2000)
nb_pipeline2 = Pipeline(stages=[tokenizer, newhashingTF2, nb])

amazon_nb_model2 = nb_pipeline2.fit(train_amazonset2)

In [36]:
amazon_nbpredictions2 = amazon_nb_model2.transform(test_amazonset2)
prednb = amazon_nbpredictions2.filter( (amazon_nbpredictions2.prediction==amazon_nbpredictions2.label))
print "Accuracy = " + str(float(prednb.count())/float(amazon_nbpredictions2.count()))

Accuracy = 0.797256885578

In [37]:
amazon_nb_model2.save('wasb:///HdiSamples/HdiSamples/sentimenttwo')

### Adding Amazon data to previous model

In [38]:
combined_df = finaldf.unionAll(train_amazonset)

In [39]:
combined_nb_model = nb_pipeline.fit(combined_df)

In [40]:
combinedtest_df = finaldftest.unionAll(test_amazonset)

In [41]:
finalpreds = combined_nb_model.transform(combinedtest_df)
filterred = finalpreds.filter((finalpreds.prediction==finalpreds.label))
print "Accuracy = " + str(float(filterred.count())/float(finalpreds.count()))

Accuracy = 0.788017013909

### Save trained model back to Azure Storage

In [42]:
combined_nb_model.save('wasb:///HdiSamples/HdiSamples/sentimentfinal')

In [43]:
combined_nb_model = nb_pipeline.fit(combined_df)

### Load trained model back into notebook

In [5]:
combinedmodel = PipelineModel.load('wasb:///HdiSamples/HdiSamples/sentimentfinal')

#### We can now run test data on our trained model

In [6]:
testinginput = [('I am excited and love the music.'),('I have been feeling tired and anxious')]
testrdd = sc.parallelize(testinginput)
temp = testrdd.map(lambda x: Row(text = x))
tempdf = sqlContext.createDataFrame(temp)
testpred =combinedmodel.transform(tempdf)
print testpred.take(2)

[Row(text=u'I am excited and love the music.', words=[u'i', u'am', u'excited', u'and', u'love', u'the', u'music.'], removed=[u'excited', u'love', u'music.'], features=SparseVector(2000, {80: 1.0, 203: 1.0, 240: 1.0}), rawPrediction=DenseVector([-21.1538, -20.6054]), probability=DenseVector([0.3662, 0.6338]), prediction=1.0), Row(text=u'I have been feeling tired and anxious', words=[u'i', u'have', u'been', u'feeling', u'tired', u'and', u'anxious'], removed=[u'feeling', u'tired', u'anxious'], features=SparseVector(2000, {162: 1.0, 1110: 1.0, 1582: 1.0}), rawPrediction=DenseVector([-24.9139, -24.9936]), probability=DenseVector([0.5199, 0.4801]), prediction=0.0)]

#### Test input: "I am excited and love the music"
#### Result: probability=DenseVector([0.3662, 0.6338]), prediction=1.0 
#### Probability of being negative sentiment = 0.3662
#### Probability of being positive sentiment = 0.6338

In [None]:
testmodel = LogisticRegressionModel.load()