# Overview

This was a smaall project to while away Xmas last year (2017).

### Goals

* Attenmpt to predict the variety of wine from the published wine reviews
* Build and run a Jupyter Notebook in Docker to run Toree & Spark in a Kernel
* Use Spark ML to train a model
* Explore ways of creating a good science toolset with a mind to creating a completely reproducible experiment regardless of localtion
* Render some nice charts
* Enjoy some nice wine whilst doing so!

### Reproducibility

Containerisation is a great way to package and share a highly reproduicble environment for running Jupyter Notesbooks and their runtime kernels.  The ability to access a hosts physical filesystem through bind-mounts also makes it possible to seperate this from the runtime process further reducing the coupling increasing the reusability of the images.  As the files are located physically outside of the container, they can be maipulated locally, be comitted to git repos and shared with others.

### Process


### LInks

* [Jupyter Notebooks](https://jupyter.org/)
* [Docker Stack For Jupyter](https://jupyter-docker-stacks.readthedocs.io/en/latest/)
* [Apache Toree](https://toree.apache.org/)
* [Apache SparlML](https://spark.apache.org/mllib/)
* [Kaggle Wine Reviews Dataset](https://www.kaggle.com/zynicide/wine-reviews)
* [Bruel Charts](https://github.com/Brunel-Visualization/Brunel/wiki)

### Extensions

* Performn data analysis to check spread of data
* Verify the current feature extraction, what are the most imprtant features for training the model
* Use other feature extraction methods
* Use other model algos
* COnnect to a real Spark cluster running in the cloud AWS / Azure etc

#### Setup

This section sets up the rendering library for data.  This uses a "magic" to download and install the [Brunel](https://github.com/Brunel-Visualization/Brunel/wiki)

In [None]:
%AddJar -magic http://brunelvis.org/jar/spark-kernel-brunel-all-2.2.jar

###### Create the _local_ spark context and imports the explicits for the context

In [None]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

###### Read gzipped raw json document

This will handily gunzip the file as a stream and attempt to infer the scehama from the data.  The will load the data into spark DataFrames through which table based and queries are performed.

In [None]:
val raw = (
    spark.read.option("inferSchema", "true")
    .json("../data/winemag-data-130k-v2.json.gz")
)

raw.cache
raw.show(5)

#### Clean, dedupe and sanitize

For make the model training as effective as possible later, an important step is to clean, dedup and sanitize the data:

* Select just the variety and description and apply trim and lowercase functions to the DF _variety_ and _description_ columns.
* Drop any duplicate reviews

In [None]:
var df = (
    raw
    .select(trim(lower($"variety")) as "variety", trim(lower($"description")) as "description")
    .orderBy($"variety")
).cache

df.show(5)

In [None]:
df = (df.dropDuplicates(Seq("description"))).cache
df.orderBy($"variety").show(5)

In [None]:
df = (df.filter($"variety".isNotNull)).cache
df.orderBy($"variety").show(5)

In [None]:
df = (df        
    .filter(not($"variety".contains("blend")))
    .filter(not($"variety".contains("red")))
    .filter(not($"variety".contains("white")))
    .filter(not($"variety".contains("rose")))
).cache
df.orderBy($"variety").show(5)

#### Remove the variety name and format the description

In [None]:
val sanitizedDf = (df    
    .select($"variety", regexp_replace($"description", $"variety", lit("")) as "description")
    .select($"variety", regexp_replace($"description", "[^\\p{L}\\p{Nd}[0-9]+]+", " ") as "description")
    .select($"variety", trim(lower($"description")) as "description")
    .orderBy($"variety")
).cache

sanitizedDf.show(5)

![alt text](http://i.kinja-img.com/gawker-media/image/upload/s--aUvhMJJ8--/bam2tzwtb5cdfwctsdkc.jpg)


#### Set minimum number of reviews per variety for the data to be included in training the model

This will ensure that we have a reasonable set of reviews per variety upon which to train the model, otherwise generating a model will not be as effective.

In [None]:
val minimumReviewsPerVariety = 3000

In [None]:
val varietyCounts = (
    raw
    .groupBy($"variety")
    .agg(
        count("variety") as "count",
        min("price") as "min_price",
        max("price") as "max_price",
        round(mean("price")) as "mean_price",
        round(stddev("price")) as "price_stddev",
        round(mean("points")) as "points",
        round(stddev("points")) as "potins_stddev"
    )
    .where("count > " + minimumReviewsPerVariety)
    .orderBy($"count".desc)
)

varietyCounts.show(5)

In [None]:
%%brunel

data('varietyCounts')
bar 
x(variety)
y(mean_price)
color(variety)
sort(mean_price)
style('* {font-size: 7pt}') :: width=1000, height=1000

In [None]:
%%brunel 
    data('varietyCounts') 
    bubble color(count) 
    size(count) 
    sort(count) 
    label(variety, count) 
    tooltip(#all) 
    style('* {font-size: 7pt}') :: width=1000, height=1000

In [None]:
val varietyCounts2 = (
    raw
    .groupBy($"variety", $"country")
    .count()
    .where("count > " + minimumReviewsPerVariety)
)

varietyCounts2.orderBy($"count".desc).show

In [None]:
%%brunel
data('varietyCounts2') 
bar 
x(country) 
sort(count) 
y(count) 
color(country) 
style('* {font-size: 7pt}') :: width=1000, height=1000

In [None]:
%%brunel 
    data('varietyCounts2') 
    chord x(country) y(variety) 
    color(count) 
    size(count) 
    sort(count) 
    label(variety) 
    tooltip(#all) :: width=1000, height=1000

### Select the varieties with > 2000 reviews

In [None]:
val counts = (sanitizedDf
    .groupBy($"variety")
    .agg(count($"variety") as "count")
    .where("count > " + minimumReviewsPerVariety)
    .join(sanitizedDf, Seq("variety"))
    .orderBy("variety")
    .select($"variety", $"description", $"count"))

counts.show(5)

###### Create tokenizer to reove any outstanding variety names from the description

In [None]:
import org.apache.spark.ml.feature.StopWordsRemover
import scala.collection.mutable.WrappedArray
import org.apache.spark.ml.feature.Tokenizer

val varietySplits = (
    new Tokenizer()             
    .setInputCol("variety")
    .setOutputCol("variety_splits")
    .transform(counts.select("variety").distinct())
    .select("variety_splits")
    .collect()
    .map(_.toSeq.asInstanceOf[WrappedArray[WrappedArray[String]]])
    .flatMap(_.toSeq)
    .flatMap(_.toSeq)
    .toList
)

val tokenizer = (
    new Tokenizer()
    .setInputCol("description")
    .setOutputCol("words")
)

val stopWordsRemover = (
    new StopWordsRemover()
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("filteredWords")
)

val stopwords = stopWordsRemover.setStopWords((varietySplits:::stopWordsRemover.getStopWords.toList).toSet.toArray)

println(stopwords.getStopWords.mkString(","))


###### Spark ML Pipeline

![alt text](https://spark.apache.org/docs/2.3.1/img/ml-Pipeline.png)

###### [CountVectorizer](https://spark.apache.org/docs/2.3.1/ml-features.html#countvectorizer)

CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.

During the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus. An optional parameter minDF also affects the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary. Another optional binary toggle parameter controls the output vector. If set to true all nonzero counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts.

In [None]:
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val countVectorizer = (
    new CountVectorizer()
    .setInputCol(stopWordsRemover.getOutputCol)
    .setOutputCol("features")
)

###### [StringIndexer](https://spark.apache.org/docs/2.2.0/ml-features.html#stringindexer)

StringIndexer encodes a string column of labels to a column of label indices. The indices are in \[0, numLabels), ordered by label frequencies, so the most frequent label gets index 0. The unseen labels will be put at index numLabels if user chooses to keep them. If the input column is numeric, we cast it to string and index the string values. When downstream pipeline components such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column name. In many cases, you can set the input column with setInputCol.



In [None]:
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.Normalizer

val indexer = (
    new StringIndexer()
    .setInputCol("variety")
    .setOutputCol("label")
)

###### Train a NaiveBayes classifier on the count vectorised features

![alt text](https://i.stack.imgur.com/0QOII.png)

A really important yet subtle point here is that in order to ensure the test and training sets are always the same, when the dataset is split we use a set seed for the PRG.

In [None]:
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val Array(trainingData, testData) = (
    indexer           
    .fit(counts)
    .transform(counts)
    .randomSplit(Array(0.7, 0.3), 42L)
)

###### Bind the stages to the pipline and train against the _training_ data

In [None]:
import org.apache.spark.ml.{Pipeline, PipelineModel}

val naiveBayes = new NaiveBayes()

val pipeline = (
    new Pipeline()
    .setStages(
        Array(
            tokenizer, 
            stopWordsRemover, 
            countVectorizer,
            naiveBayes
        )
    )
)
            
val model = pipeline.fit(trainingData)

###### Run predictions on the test data to check how well the model performs on unseen data

In [None]:
val predictions = model.transform(testData)

val evaluator = (new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy"))

val accuracy = evaluator.evaluate(predictions)
println("Test set accuracy should be 0.7738423373759648 = " + (accuracy == 0.7738423373759648))