Initiate a `SparkSession`. A `SparkSession` initializes both a `SparkContext` and a `SQLContext` to use RDD-based and DataFrame-based functionalities of Spark.

In [1]:
import pyspark as ps    # for the pyspark suite
import os               # for environ variables in Part 3

%load_ext autoreload
%autoreload 2

spark = ps.sql.SparkSession.builder \
            .appName("df lecture") \
            .getOrCreate()

# 1. Sentiment Analysis using Naive Bayes

Here we are going to create a text indexing pipeline for user reviews based on Spark ML library.

We propose to work on Amazon Reviews. These reviews, made available by \[[Julian McAuley, UCSD](http://jmcauley.ucsd.edu/data/amazon/)\], are raw qualitative (text) and quantitative (rating) evaluations of products by users. We propose to use ML+NLP on positive/negative reviews to see what words carry out a positive/negative meaning for users.

Here is the plan:

1.1. We will load Amazon Reviews from json into a dataframe.

1.2. We will create a `label` (positive/negative) for each review and a dataset for ML.

1.3. We will create `features` from a the text indexing pipeline using a TF-IDF approach.

1.4. We will use Machine Learning (NaiveBayes classifier) to classify positive/negative reviews.

1.5. We will interpret this classification in terms of positive/negative terms.

We recommend you to work from a notebook using our previous instructions.

## 1.1. Load an Amazon Reviews json file

First, we will work on a local json datafile which contains a limited subset of the [Amazon Reviews](http://jmcauley.ucsd.edu/data/amazon/).

1\. Use [`spark.read.json()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json) to create a dataframe that would contain the content of the file `'data/reviews_Musical_Instruments_5.json.gz'`.

Check the structure of that dataframe, and the column detected in the json content, by using `.printSchema()`. It should read like :

```
root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 ```

In [2]:
df_reviews = spark.read.json('data/reviews_Musical_Instruments_5.json.gz')

df_reviews.printSchema()

print(df_reviews.count())

AnalysisException: 'Path does not exist: file:/Users/ryanholway/Documents/galvanize/dsi-solns-g80/spark-ml/data/reviews_Musical_Instruments_5.json.gz;'

2\. From now on, we will keep only the columns `reviewText` and `overall`. Use `.select()` on the dataframe to keep those two only. You can check your transformation using `.printSchema()` again.

In [3]:
df_corpus = df_reviews.select('reviewText', 'overall')

df_corpus.printSchema()

root
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)



## 1.2. Create a `label` for classification and a balanced dataset

This dataset is made of user reviews and ratings:

* the `reviewText` column (string) contains the raw text of the review.
* the `overall` column (double) contains the rating given by the user, in `{1.0, 2.0, 3.0, 4.0, 5.0}`

1\. Using [`.groupBy()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy) and [`.agg()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.agg), count the number of reviews in each rating value.

In [4]:
from pyspark.sql.functions import count

res_test = df_corpus.groupBy("overall").agg(count("overall"))
#res_test.printSchema()

classes_count = dict(res_test.collect())
print("class representation: {}".format(classes_count))

class representation: {1.0: 217, 2.0: 250, 3.0: 772, 4.0: 2084, 5.0: 6938}


2\. We are going to focus on extreme ratings `{1.0, 5.0}`. Using your count, identify how much examples in each of these two classes we need to keep to build a balanced set of examples having the same number of reviews in `1.0` and `5.0`.

**Note**: depending on the ML algorithm you use to classify pos/neg classes, having a balanced dataset can be a pre-requisite.

In [5]:
#balanced_classsize = min(classes_count[1.0], classes_count[5.0])
balanced_classsize = min(classes_count[1.0], classes_count[5.0], 10000)
print("using limit size: {}".format(balanced_classsize))

using limit size: 217


3\. By using [`.filter()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter) on your dataframe create two dataframes:

* one for the reviews having an `overall` of `1.0` (we will call them the `neg` class),
* another for the reviews having an `overall` of `5.0` (we will call them the `pos` class).

Limit the number of reviews in each dataframe by the number you have identified previously. Be sure to shuffle those reviews before you apply your limit (you can use [`.orderBy()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy) and [`rand()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.rand) for that).

Using [`.union()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.union) between dataframes, create a single dataframe containing the samples from both the balanced `neg` and `pos` classes.

In [6]:
from pyspark.sql.functions import rand

dataset_neg = df_corpus.filter(df_corpus["overall"] <= 1.0).orderBy(rand()).limit(balanced_classsize)
dataset_pos = df_corpus.filter(df_corpus["overall"] >= 5.0).orderBy(rand()).limit(balanced_classsize)

df_posnegdataset = dataset_pos.union(dataset_neg)

print("data points in the neg class: {}".format(dataset_neg.count()))
print("data points in the pos class: {}".format(dataset_pos.count()))

data points in the neg class: 217
data points in the pos class: 217


4\. Using [`.withColumn()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumn) create a new column called `label` that has a value of `0.0` for the `neg` class, and `1.0` for the `pos` class.

In [7]:
df_posnegdataset = df_posnegdataset.withColumn("label", (df_posnegdataset['overall']-1.0)/4.0)

5\. Check your dataframe at this step using `.printSchema()`. It should look like:

```
root
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- label: double (nullable = true)
```

In [8]:
df_posnegdataset.printSchema()

root
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- label: double (nullable = true)



## 1.3. Index every document using a text indexation pipeline

1\. In the file `nlp_pipeline.py` you'll find a function `indexing_pipeline` that will provide a full text indexation pipeline.

This function takes as an input a DataFrame and an `inputCol=...` argument you can use to specify which field to apply the TFIDF on. It returns two things: first, the same DataFrame with a field `'features'` and second, the vocabulary.

Now apply that function to our previous DataFrame to index every review.

In [9]:
from nlp_pipeline import indexing_pipeline

df_output, vocab = indexing_pipeline(df_posnegdataset, inputCol="reviewText")

df_output.printSchema()

print("vocabulary: {}".format(vocab[0:10]))

root
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- label: double (nullable = true)
 |-- bow: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vector_tf: vector (nullable = true)
 |-- features: vector (nullable = true)

vocabulary: ['guitar', 'b', 'string', 'great', 'pedal', 'other', 'sound', 'good', 'cabl', 'amp']


## 1.4. Applying the Naive Bayes algorithm for sentiment analysis

The basics of ML pipelining in spark relies on building step by step instances of classes drawn from the `pyspark.ml` library. We will now use the following class:

* [`pyspark.ml.classification.NaiveBayes`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.NaiveBayes) : implements the Naive Bayes algorithm

Using this requires proceeds this way:

1. you create an instance `i` specifying columns on which to operate, plus necessary keyword arguments specific to the class
2. you fit the machine learning algorithm `i` on your dataframe with a `i.fit()` method, fitting will return a model `m`
3. you apply the model `m` on your dataframe using `m.transform()`

Look for the definitions of the [`NaiveBayes`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.NaiveBayes) class in the spark documentation. ou should :

* identify the arguments (input/output + parameters) you need to provide.
* identify the right values for these arguments.

As a starter, the following table gives you the input/output arguments for each class.

| Class | input column(s) argument | output column argument | keyword arguments... |
|-------|------------|-------------|----------------------|
| [`NaiveBayes`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.NaiveBayes) | `featuresCol="features"` + `labelCol="label"` | `predictionCol="prediction"` | ? |

1\. Before applying the `NaiveBayes` algorithm we will split our dataset into one training set and one testing set using a random split of 70/30. Use [`.randomSplit()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) to create two distinct dataframes for each of those sets.

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy

**Note**: You can use `.persist()` to create a persistent training set before applying `NaiveBayes`.

In [10]:
splits = df_output.randomSplit([0.7, 0.3])
df_train = splits[0]
df_test = splits[1]

df_train.persist()

DataFrame[reviewText: string, overall: double, label: double, bow: array<string>, vector_tf: vector, features: vector]

2\. Implement `NaiveBayes` specifying the columns for features (`featureCol`), labels (`labelCol`) and prediction (`predictionCol`). Then `.fit()` to obtain a model, and apply this model on the testing test.

In [11]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(df_train)

# apply the model on the test set
result = model.transform(df_test)

3\. Use the [`MulticlassClassificationEvaluator`](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator) to obtain an evaluation of the accuracy of your classification.

As any other brick in your pipeline, `MulticlassClassificationEvaluator` needs to have columns specified, and some other arguments you need to identify in the documentation. Then, you will need to apply your instance on the prediction and label columns, by using `.evaluate()`. It will compute accuracy (or any other given metric) based on the differences observed between these two columns.

In [12]:
# keep only label and prediction to compute accuracy
predictionAndLabels = result.select("prediction", "label")

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="accuracy")

print("Accuracy: {}".format(evaluator.evaluate(predictionAndLabels)))

Accuracy: 0.6984126984126984


## 1.4. Interpretation of the NaiveBayes results

The `NaiveBayes` model provides an internal matrix `model.theta` that you can convert into a numpy array with `model.theta.toArray()`. This matrix contains two columns corresponding to the two classes: `0` for `neg` and `1` for `pos`.

The values inside that matrix correspond, for each class, to the prior probabilities used to compute the likelihood of a document to belong to the class. In this implementation, the `model.theta` matrix doesn't provide probabilities, but `log` of probabilities.

Use this `model.theta` matrix, combined with the vocabulary obtained on question 1.3.4 from `CountVectorizer`, to obtain words that are related to the `pos` class, and words that are related to the `neg` class.

Rank these words by their decreasing prior probabilities. What do you see ? How could you enhance these results ?

In [13]:
import numpy as np

thetaarray = model.theta.toArray().T

vocab_size = len(vocab)

dtype = [('label', 'S10'), ('neg', float), ('pos', float)]
prob_values = [ (vocab[i],
                 np.exp(thetaarray[i,0])*(1-np.exp(thetaarray[i,1])),
                 (1-np.exp(thetaarray[i,0]))*np.exp(thetaarray[i,1]))
               for i in range(vocab_size) ]

a = np.array(prob_values, dtype=dtype)       # create a structured array

In [14]:
prob_values[0]

('guitar', 0.0057560313686451899, 0.0093975749182296509)

In [15]:
np.sort(a, order='pos')[::-1][0:20]

array([(b'guitar', 0.00575603136864519, 0.00939757491822965),
       (b'great', 0.0016933786685190068, 0.008658582256422449),
       (b'cabl', 0.0033789793806702118, 0.008192417583327935),
       (b'amp', 0.0034001569730091412, 0.007834345684354128),
       (b'pedal', 0.0047348306556004695, 0.00776656199005429),
       (b'string', 0.005059401122669099, 0.0067799383753098024),
       (b'other', 0.004078548851109216, 0.006072426550338713),
       (b'good', 0.004351915486973183, 0.005986726803388118),
       (b'qualiti', 0.0031672915078594216, 0.005845371284523231),
       (b'sound', 0.003607399814728667, 0.005835045552755042),
       (b'easi', 0.0007085111643986221, 0.005828266978349922),
       (b'stand', 0.0051644450837654374, 0.005647894326469506),
       (b'time', 0.004746183013438898, 0.00548092191410527),
       (b'differ', 0.002280521322310283, 0.005014254214846147),
       (b"b'i", 0.002037761710298428, 0.004936646677319166),
       (b'price', 0.0034176650400916486, 0.00490100948

In [16]:
np.sort(a, order='neg')[::-1][0:20]

array([(b'mic', 0.006599920443538227, 0.0014076421162045943),
       (b'product', 0.005779401283401342, 0.0037354448339636218),
       (b'guitar', 0.00575603136864519, 0.00939757491822965),
       (b'strap', 0.005285362561367388, 0.004869161318960482),
       (b'stand', 0.0051644450837654374, 0.005647894326469506),
       (b'string', 0.005059401122669099, 0.0067799383753098024),
       (b'cheap', 0.00495551514257463, 0.0012895787279319135),
       (b'time', 0.004746183013438898, 0.00548092191410527),
       (b'pedal', 0.0047348306556004695, 0.00776656199005429),
       (b'batteri', 0.0045648173348337975, 0.0004574806806342569),
       (b'tuner', 0.004528548144069985, 0.0023176168678221525),
       (b'thing', 0.004489194923372196, 0.0019119558601155317),
       (b'seller', 0.004420166163973924, 9.58665087677645e-05),
       (b'good', 0.004351915486973183, 0.005986726803388118),
       (b'item', 0.004309878029690998, 0.0007066076349042201),
       (b'b', 0.004181699633989091, 0.004317036

_We can observe some words that clearly carry out a positive/negative feeling. But they are mixed with other words that are only related to the products. It's because we have run this analysis on a dataset based on Instruments only. Thus, the positive/negative concept it biased by the terms related to the products people generally evaluate positively (or negatively)._