In [35]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Binarizer

# Obtain Raw Data

## Amazon product data
We will use a [dataset](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books_5.json.gz)[1] that contains 8.9M book reviews from Amazon, spanning May 1996 - July 2014.

The uncompressed file is around 9GB and when uploaded to HDFS the file is composed of 70 blocks with 3-replicas each.


[1] Image-based recommendations on styles and substitutes
J. McAuley, C. Targett, J. Shi, A. van den Hengel
SIGIR, 2015
http://jmcauley.ucsd.edu/data/amazon/

## Load Data

In [5]:
%%time
rawreviews = sqlContext.read.json('data/amazon/reviews_Books_5.json')

CPU times: user 4.37 ms, sys: 2.13 ms, total: 6.5 ms
Wall time: 22.5 s


In [6]:
rawreviews.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [12]:
allreviews = rawreviews.select('reviewText', 'overall')
allreviews.cache()

DataFrame[reviewText: string, overall: double]

In [14]:
allreviews.show(2)

+--------------------+-------+
|          reviewText|overall|
+--------------------+-------+
|Spiritually and m...|    5.0|
|This is one my mu...|    5.0|
+--------------------+-------+
only showing top 2 rows



In [None]:
allreviews.groupBy('overall').count().show()

In [1]:
lines = sc.textFile('data/amazon/reviews_Books_5.json')

In [10]:
print json.toollines.first()

u'{"reviewerID": "A10000012B7CGYKOMPQ4L", "asin": "000100039X", "reviewerName": "Adam", "helpful": [0, 0], "reviewText": "Spiritually and mentally inspiring! A book that allows you to question your morals and will help you discover who you really are!", "overall": 5.0, "summary": "Wonderful!", "unixReviewTime": 1355616000, "reviewTime": "12 16, 2012"}'

In [100]:
import json
Review = Row("text", "rating")
def extract_review_info(line):
    data = json.loads(line)
    return Review(data['reviewText'], data['overall'])

In [101]:
allreviews = lines.map(extract_review_info).toDF()
allreviews.cache()

DataFrame[text: string, rating: double]

In [103]:
allreviews.groupBy('rating').count().show()

+------+-------+
|rating|  count|
+------+-------+
|   1.0| 323833|
|   3.0| 955189|
|   5.0|4980815|
|   4.0|2223094|
|   2.0| 415110|
+------+-------+



In [104]:
allreviews.rdd.filter(lambda (text, rating): 'much of what has been thought and said about Dresden' in text).count()

1

In [105]:
allreviews.rdd.filter(lambda (text, rating): text == '').count()

0

In [108]:
extremereviews = allreviews.filter((allreviews.rating == 1.0) | (allreviews.rating == 5.0)).filter(allreviews.text != '')

## Binarizer
A transformer to convert numerical features to binary (0/1) features

In [109]:
from pyspark.ml.feature import Binarizer

binarizer = Binarizer(threshold=2.5, inputCol='rating', outputCol='label')
reviews = binarizer.transform(extremereviews)
reviews.cache()
allreviews.unpersist()

DataFrame[text: string, rating: double]

In [110]:
reviews.count()

5304187

In [111]:
reviews.show(2)

+--------------------+------+-----+
|                text|rating|label|
+--------------------+------+-----+
|Spiritually and m...|   5.0|  1.0|
|This is one my mu...|   5.0|  1.0|
+--------------------+------+-----+
only showing top 2 rows



# Split Data

In [112]:
trainingData, testData = reviews.randomSplit([0.8, 0.2])

# Pipeline
![pipeline](http://spark.apache.org/docs/latest/img/ml-Pipeline.png)

## Tokenizer
A transformer that converts the input string to lowercase and then splits it by white spaces.

In [113]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [55]:
tokenizedData = tokenizer.transform(trainingData)

In [63]:
tokenizedData.show(2)

+--------------------+-----+--------------------+
|                text|label|               words|
+--------------------+-----+--------------------+
|" .... much of wh...|  5.0|[", ...., much, o...|
|" An amazing piec...|  5.0|[", an, amazing, ...|
+--------------------+-----+--------------------+
only showing top 2 rows



## StopWordsRemover
A transformer that filters out stop words from input. Note: null values from input array are preserved unless adding null to stopWords explicitly.

In [114]:
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")

In [71]:
cleanedData = remover.transform(tokenizedData)

In [72]:
cleanedData.show(2)

+--------------------+-----+--------------------+--------------------+
|                text|label|               words|            filtered|
+--------------------+-----+--------------------+--------------------+
|" .... much of wh...|  5.0|[", ...., much, o...|[", ...., thought...|
|" An amazing piec...|  5.0|[", an, amazing, ...|[", amazing, piec...|
+--------------------+-----+--------------------+--------------------+
only showing top 2 rows



In [74]:
f = cleanedData.first()

In [84]:
print f['words'][:10]
print f['filtered'][:10]

[u'"', u'....', u'much', u'of', u'what', u'has', u'been', u'thought', u'and', u'said']
[u'"', u'....', u'thought', u'said', u'dresden', u'destruction', u'(february', u'13,', u'1945)', u'owes']


## HashingTF
A Transformer that converts a sequence of words into a fixed-length feature Vector. It maps a sequence of terms to their term frequencies using a hashing function.

In [115]:
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")

In [87]:
hashedData = hashingTF.transform(cleanedData)

In [88]:
hashedData.show(2)

+--------------------+-----+--------------------+--------------------+--------------------+
|                text|label|               words|            filtered|            features|
+--------------------+-----+--------------------+--------------------+--------------------+
|" .... much of wh...|  5.0|[", ...., much, o...|[", ...., thought...|(262144,[0,34,103...|
|" An amazing piec...|  5.0|[", an, amazing, ...|[", amazing, piec...|(262144,[34,42077...|
+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 2 rows



In [90]:
sample = hashedData.first()

SparseVector(262144, {0: 1.0, 34: 1.0, 1030: 1.0, 2114: 2.0, 2124: 1.0, 4995: 1.0, 6479: 1.0, 7349: 1.0, 9700: 2.0, 9769: 2.0, 9901: 1.0, 11395: 1.0, 12091: 1.0, 14276: 1.0, 14653: 1.0, 16534: 1.0, 16764: 1.0, 18102: 1.0, 19228: 1.0, 20300: 1.0, 20917: 1.0, 23243: 1.0, 23751: 1.0, 23806: 1.0, 24538: 1.0, 26254: 1.0, 26781: 1.0, 26796: 1.0, 29490: 1.0, 29667: 1.0, 29874: 1.0, 30889: 1.0, 34840: 1.0, 35262: 1.0, 36045: 1.0, 37233: 1.0, 37492: 1.0, 37921: 1.0, 38704: 2.0, 39072: 1.0, 39536: 1.0, 39864: 1.0, 40741: 1.0, 43739: 1.0, 43748: 2.0, 44127: 1.0, 44241: 10.0, 44849: 1.0, 46265: 1.0, 48694: 1.0, 48714: 2.0, 48745: 1.0, 49597: 1.0, 50585: 1.0, 50882: 1.0, 51552: 3.0, 51995: 1.0, 52147: 1.0, 52663: 1.0, 52877: 1.0, 53053: 1.0, 53982: 1.0, 54056: 1.0, 54147: 1.0, 55886: 1.0, 56137: 1.0, 57265: 1.0, 58169: 1.0, 59517: 1.0, 60795: 1.0, 60797: 1.0, 61618: 1.0, 61625: 1.0, 61639: 1.0, 62304: 1.0, 63362: 1.0, 64059: 1.0, 65003: 1.0, 65649: 1.0, 65808: 1.0, 66699: 1.0, 66979: 1.0, 69085: 1.

In [91]:
sample['features']

SparseVector(262144, {0: 1.0, 34: 1.0, 1030: 1.0, 2114: 2.0, 2124: 1.0, 4995: 1.0, 6479: 1.0, 7349: 1.0, 9700: 2.0, 9769: 2.0, 9901: 1.0, 11395: 1.0, 12091: 1.0, 14276: 1.0, 14653: 1.0, 16534: 1.0, 16764: 1.0, 18102: 1.0, 19228: 1.0, 20300: 1.0, 20917: 1.0, 23243: 1.0, 23751: 1.0, 23806: 1.0, 24538: 1.0, 26254: 1.0, 26781: 1.0, 26796: 1.0, 29490: 1.0, 29667: 1.0, 29874: 1.0, 30889: 1.0, 34840: 1.0, 35262: 1.0, 36045: 1.0, 37233: 1.0, 37492: 1.0, 37921: 1.0, 38704: 2.0, 39072: 1.0, 39536: 1.0, 39864: 1.0, 40741: 1.0, 43739: 1.0, 43748: 2.0, 44127: 1.0, 44241: 10.0, 44849: 1.0, 46265: 1.0, 48694: 1.0, 48714: 2.0, 48745: 1.0, 49597: 1.0, 50585: 1.0, 50882: 1.0, 51552: 3.0, 51995: 1.0, 52147: 1.0, 52663: 1.0, 52877: 1.0, 53053: 1.0, 53982: 1.0, 54056: 1.0, 54147: 1.0, 55886: 1.0, 56137: 1.0, 57265: 1.0, 58169: 1.0, 59517: 1.0, 60795: 1.0, 60797: 1.0, 61618: 1.0, 61625: 1.0, 61639: 1.0, 62304: 1.0, 63362: 1.0, 64059: 1.0, 65003: 1.0, 65649: 1.0, 65808: 1.0, 66699: 1.0, 66979: 1.0, 69085: 1.

# Estimator
## LogisticRegression

In [116]:
lr = LogisticRegression(maxIter=10, regParam=0.01)

In [117]:
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, lr])

In [118]:
trainingData.groupBy('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|  1.0|3982457|
|  0.0| 259147|
+-----+-------+



In [119]:
pipeLineModel = pipeline.fit(trainingData)

In [123]:
%%time
testPredictions = pipeLineModel.transform(testData)
trainingPredictions = pipeLineModel.transform(trainingData)

evaluator = BinaryClassificationEvaluator()
evaluatorParamMap = {evaluator.metricName: 'areaUnderROC'}

aucTraining = evaluator.evaluate(trainingPredictions, evaluatorParamMap)

aucTest = evaluator.evaluate(testPredictions, evaluatorParamMap)

CPU times: user 21.7 ms, sys: 11.3 ms, total: 33 ms
Wall time: 36.2 s


In [146]:
print 'AUR training dataset: ', aucTraining
print 'AUR testing dataset: ', aucTest

AUR training dataset:  0.990321053742
AUR testing dataset:  0.965908598292


# Hyperparameter Tuning

In [None]:
%%time
paramGrid = ParamGridBuilder() \
            .addGrid(hashingTF.numFeatures, [10000, 100000]) \
            .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
            .addGrid(lr.maxIter, [10, 20]) \
            .build()
            
cv = CrossValidator()
cv.setEstimator(pipeline)
cv.setEvaluator(evaluator)
cv.setEstimatorParamMaps(paramGrid)
cv.setNumFolds(3)

crossValidatorModel = cv.fit(trainingData)

In [126]:
newPredictions = crossValidatorModel.transform(testData)

newAucTest = evaluator.evaluate(newPredictions, evaluatorParamMap)

bestModel = crossValidatorModel.bestModel

In [148]:
print 'AUR testing dataset: ', newAucTest

AUR testing dataset:  0.968295085196


## Retrieving the parameters of the best model

In [135]:
bestModel.stages

[Tokenizer_42cca5171a50de7dc04b,
 StopWordsRemover_413fad8413159b4ff1e1,
 HashingTF_4ee1bc268e255b3ed847,
 LogisticRegression_42c1a064cbc547bc7402]

In [137]:
bestTK = bestModel.stages[2]
bestLR = bestModel.stages[3]

In [138]:
bestTK.extractParamMap()
# numFeatures: 100000, regParam: 0.1, maxIter: 20

{Param(parent='HashingTF_4ee1bc268e255b3ed847', name='numFeatures', doc='number of features.'): 100000,
 Param(parent='HashingTF_4ee1bc268e255b3ed847', name='outputCol', doc='output column name.'): 'features',
 Param(parent='LogisticRegression_42c1a064cbc547bc7402', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
 Param(parent='LogisticRegression_42c1a064cbc547bc7402', name='maxIter', doc='max number of iterations (>= 0).'): 20,
 Param(parent='HashingTF_4ee1bc268e255b3ed847', name='inputCol', doc='input column name.'): 'filtered'}

In [145]:
print 'First 5 LR weights: ', bestLR.weights[:5]
print 'First 5 LR coefficients: ', bestLR.coefficients[:5]
print 'Intercept: ', bestLR.intercept

First 5 LR weights:  [-0.00120508  0.11889881 -0.37481442  0.05985811  0.06514214]
First 5 LR coefficients:  [-0.00120508  0.11889881 -0.37481442  0.05985811  0.06514214]
Intercept:  2.60801770355


# Plotting Data

In [4]:
%matplotlib inline

In [5]:
import pandas as pd
df = pd.DataFrame([['hola', 10], ['mundo', 1], ['test', 5]], columns=['Noun', 'Count'])

In [3]:
from bokeh.charts import Bar, show
from bokeh.io import output_notebook
from bokeh.charts.attributes import cat
output_notebook()

p = Bar(df, label=cat(columns='Noun', sort=False), values='Count',
        title='Top N nouns in r/movies subreddit',
        legend='top_right')
show(p)

In [3]:
lines.cache()

data/sentiment_labelled_sentences/yelp_labelled.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [4]:
lines.count()

1000

In [5]:
columns = lines.map(lambda line: line.split('\t'))
columns.first()

[u'Wow... Loved this place.', u'1']

In [23]:
Review = Row("text", "label")
reviews = columns.map(lambda (text, label): Review(text, float(label))).toDF()
reviews.first()

Row(text=u'Wow... Loved this place.', label=1.0)

In [8]:
reviews2 = sqlContext.createDataFrame(columns.map(lambda (text, label): Review(text, int(label))))

In [9]:
reviews2.describe()

DataFrame[summary: string, label: string]

In [10]:
reviews2.columns

['text', 'label']

In [11]:
reviews2.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: long (nullable = true)



In [24]:
reviews.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  1.0|  500|
|  0.0|  500|
+-----+-----+

