# Model Definition

---

**Recall:**

We want to use the text in the `description` column to predict the wine variety.

We identified the general steps to do this (_see the [README.md](./README.md#Model-Definition) for further details_):

1. _step1_: Convert the wine variety column to a categorical label.
2. _step2_: Render each word in the review text to a canonical form.
3. _step3_: Use a predefined set of words, a _target word set_, to create a feature vector of binary features, each indicating the presence or absence of one of the words in the target word set.
4. Define a ML model that can predict the label based on these features.
5. Train the model on a training set and assess its performance.
6. Deploy the model.


Loosely put we want to define our model in the form of a pipeline:

    pipeline = Pipeline(stages=[step1, step2, step3, step4])
    
To do this we need to define _step1_, _step2_, _step3_ and decide which method to use for _step4_.

Luckily for _step1_ this is straight forward:

**Step 1** is identical to our [baseline model](./reviwed_grapes.baseline.method_def.pyspark.v1.ipynb) and done with pyspark's [StringIndexer](https://spark.apache.org/docs/3.1.1/ml-features.html#stringindexer).

For **step 2** and **step 3** it is unfortunately not that simple.
Here we need to do some feature engineering and define custom Transformers and Models that we then can include into our pipeline.
These steps are carried out and illustrated in the [feature engineering notebook](reviewed_grapes.feature_eng.pyspark.v1.ipynb).

According to the approach we decided to take, the method used in **step 4** should be a traditional ML method.
We have several options here and which one we pick should depend on how well they perform.
In the [model evaluation notebook](reviewed_grapes.model_evaluation.pyspark.v1.ipynb) we have tested several models.
It turns out that [Multinomial Logistic Regression](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html) performs best in our case, so we will use this method for step 4.

Here below we will proceed with the model definition and use our finding for steps 2, 3 and 4.

---

In [27]:
# !pip install pandas
# !pip install matplotlib
# !pip install nltk
# !pip install pyspark

In [8]:
import json
from IPython.display import Markdown, display
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession, SQLContext

In [2]:
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to /home/jonas/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [3]:
%matplotlib inline
def warn(string):
    display(Markdown('<span style="color:red">'+string+'</span>'))
def info(string):
    display(Markdown('<span style="color:blue">'+string+'</span>'))

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.driver.memory", "20g") \
    .appName("jojoSparkSession") \
    .getOrCreate()
    # .config("spark.driver.memory", "20g") \
    # .config("spark.default.parallelism", "16") \
    # .config("spark.executor.cores", "16") \
sc = spark.sparkContext
sqlContext = SQLContext(sc) 

In [5]:
# load the dataset
reviews_sdf = spark.read.parquet('data/reviews_cleaned')
reviews_sdf.show()

+-----+--------------------+------------------+
|index|         description|           variety|
+-----+--------------------+------------------+
|    0|aromas include tr...|       white blend|
|    1|this is ripe and ...|    portuguese red|
|    2|tart and snappy, ...|        pinot gris|
|    3|pineapple rind, l...|          riesling|
|    4|much like the reg...|        pinot noir|
|    7|this dry and rest...|    gewürztraminer|
|    8|savory dried thym...|    gewürztraminer|
|    9|this has great de...|        pinot gris|
|   10|soft, supple plum...|cabernet sauvignon|
|   11|this is a dry win...|    gewürztraminer|
|   12|slightly reduced,...|cabernet sauvignon|
|   14|building on 150 y...|        chardonnay|
|   15|zesty orange peel...|          riesling|
|   16|baked plum, molas...|            malbec|
|   17|raw black-cherry ...|            malbec|
|   18|desiccated blackb...| tempranillo blend|
|   19|red fruit aromas ...|          meritage|
|   20|ripe aromas of da...|         red

**Now we can start defining our models**

**Step 1**:

In [9]:
si = StringIndexer(inputCol='variety', outputCol='label')

**Step 2:**

Let's import our custom transformer:

In [10]:
from reviewed_grapes.transformers import NLTKLemmatizer

[nltk_data] Downloading package punkt to /home/jonas/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /home/jonas/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [11]:
# get common english stopwords:
en_stopwords = set(stopwords.words('english'))
# we also define a custom blacklist:
blacklist = set(['wine', 'drink', 'variety', 'show', 'offer', 'make', 'give', 'well',
                 'open', 'come', 'years', 'bottle', 'mouth', 'like', 'also', 'along',
                 'alongside', 'vineyard', 'ready', 'great', 'one', 'slightly', 'deliver',
                 'yet', 'add', 'need', 'big', 'bring', 'easy', 'oral', 'best', 'end', 'alcohol'])
# complete the list with our custom blacklist:
stop_words = list(en_stopwords.union(blacklist))
# instatiate our transformer and the estimator:
nltkl = NLTKLemmatizer(inputCol='description', outputCol='words', stopWords=stop_words)

**Step 3:**

Import our custom estimator:

In [12]:
from utils.Estimators import WordSetTracker

Finally, lets load the different _target word sets_:

In [13]:
with open('data/targetWordSets/common_words.json', 'r') as fo:
    common_words = json.load(fo)
with open('data/targetWordSets/similar_words.json', 'r') as fo:
    similar_words = json.load(fo)
with open('data/targetWordSets/dissimilar_words.json', 'r') as fo:
    dissimilar_words = json.load(fo)
with open('data/targetWordSets/extreme_words.json', 'r') as fo:
    extremes_words = json.load(fo)
with open('data/targetWordSets/lowentropy_words.json', 'r') as fo:
    lowentropy_words = json.load(fo)

Here we use the different feature sets (target word sets) with the best performing length (14\* 57 _refer to the [crossvalidation part of the model evaluation](reviewed_grapes.model_evaluation.pyspark.v1.ipynb#Target-Word-Set-Evaluation) for details_):

In [14]:
limit_to = 14 * 57
# initiate the estimators for our 5 different models:
wst_common = WordSetTracker(inputCol='words', outputCol='features', wordSet=common_words, limitTo=limit_to)
wst_similar = WordSetTracker(inputCol='words', outputCol='features', wordSet=similar_words, limitTo=limit_to)
wst_dissimilar = WordSetTracker(inputCol='words', outputCol='features', wordSet=dissimilar_words, limitTo=limit_to)
wst_extremes = WordSetTracker(inputCol='words', outputCol='features', wordSet=extremes_words, limitTo=limit_to)
wst_lowentropy = WordSetTracker(inputCol='words', outputCol='features', wordSet=lowentropy_words, limitTo=limit_to)

**Step 4:**

In [15]:
from pyspark.ml.classification import LogisticRegression

And finally we initiate the logistic regression.
Note that the values for `regParam` and `elasticNetParam` come from the cross validation carried out in [model evaluation notebook](reviewed_grapes.model_evaluation.pyspark.v1.ipynb).

In [16]:
mlrc = LogisticRegression(regParam=0.01, elasticNetParam=0.01)

Now here are the pipelines for our models.

We directly put them into a `dict` such that we can loop over for training.

_Note: The keys will also be the final names of the models when saved._

In [17]:
model_pipelines = dict(
    CommonWordsModel = Pipeline(stages=[si, nltkl, wst_common, mlrc]),
    SimilarWordsModel = Pipeline(stages=[si, nltkl, wst_similar, mlrc]),
    DissimilarWordsModel = Pipeline(stages=[si, nltkl, wst_dissimilar, mlrc]),
    ExtremesWordsModel = Pipeline(stages=[si, nltkl, wst_extremes, mlrc]),
    LowentropyWordsModel = Pipeline(stages=[si, nltkl, wst_lowentropy, mlrc])
)

---
# Model Training

With our models defined we can train them.

Recall that we carry out model evaluation in the [model evaluation notebook](reviewed_grapes.model_evaluation.pyspark.v1.ipynb) and thus the model training we perform here happens with already tested and optimized hyperparameters.
Nevertheless we perform a train/test split of our data, just to be sure that the training really works as expected.

As a first step we define a training and a test set to be able to check for overfitting:

In [18]:
df_train, df_test = reviews_sdf.randomSplit([0.9, 0.1])

Also, we need to define some evaluation metrics:

In [19]:
accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')

Now we're ready to train our models.

For each model the steps are as follows:

  1. Fitting the pipeline to get a model
  2. Evaluate the fitted model with the test ds.
  3. Export the model for further processing.

Note: After step 3 our models still need to be made ready for deployment.
This final step is explained and carried out in the [model deployment notebook](reviewed_grapes.model_deployment.pyspark.v1.ipynb).

In [20]:
%%time
for model_name, ppl in model_pipelines.items():
    info(f'**Processing model "{model_name}"**')
    # fit the pipeline
    fitted_model = ppl.fit(df_train)
    # transform the training data
    pred_train = fitted_model.transform(df_train)
    # transform the test data
    pred_test = fitted_model.transform(df_test)
    # now check for overfitting 
    info('  Training data:')
    info(f'    - Accuracy: **{round(accuracy.evaluate(pred_train)*100, 1)}%**')
    info(f'    - F1 score: **{round(f1.evaluate(pred_train), 4)}**')
    info('  Test data:')
    info(f'    - Accuracy: **{round(accuracy.evaluate(pred_test)*100, 1)}%**')
    info(f'    - F1 score: **{round(f1.evaluate(pred_test), 4)}**')
    # and finally save it 
    path = 'data/interim/' + model_name
    fitted_model.write().overwrite().save(path)

<span style="color:blue">**Processing model "CommonWordsModel"**</span>

<span style="color:blue">  Training data:</span>

<span style="color:blue">    - Accuracy: **55.8%**</span>

<span style="color:blue">    - F1 score: **0.5248**</span>

<span style="color:blue">  Test data:</span>

<span style="color:blue">    - Accuracy: **52.0%**</span>

<span style="color:blue">    - F1 score: **0.4836**</span>

<span style="color:blue">**Processing model "SimilarWordsModel"**</span>

<span style="color:blue">  Training data:</span>

<span style="color:blue">    - Accuracy: **52.1%**</span>

<span style="color:blue">    - F1 score: **0.4854**</span>

<span style="color:blue">  Test data:</span>

<span style="color:blue">    - Accuracy: **48.9%**</span>

<span style="color:blue">    - F1 score: **0.452**</span>

<span style="color:blue">**Processing model "DissimilarWordsModel"**</span>

<span style="color:blue">  Training data:</span>

<span style="color:blue">    - Accuracy: **49.7%**</span>

<span style="color:blue">    - F1 score: **0.4593**</span>

<span style="color:blue">  Test data:</span>

<span style="color:blue">    - Accuracy: **46.2%**</span>

<span style="color:blue">    - F1 score: **0.4225**</span>

<span style="color:blue">**Processing model "ExtremesWordsModel"**</span>

<span style="color:blue">  Training data:</span>

<span style="color:blue">    - Accuracy: **52.4%**</span>

<span style="color:blue">    - F1 score: **0.4869**</span>

<span style="color:blue">  Test data:</span>

<span style="color:blue">    - Accuracy: **48.8%**</span>

<span style="color:blue">    - F1 score: **0.4494**</span>

<span style="color:blue">**Processing model "LowentropyWordsModel"**</span>

<span style="color:blue">  Training data:</span>

<span style="color:blue">    - Accuracy: **53.3%**</span>

<span style="color:blue">    - F1 score: **0.4972**</span>

<span style="color:blue">  Test data:</span>

<span style="color:blue">    - Accuracy: **50.0%**</span>

<span style="color:blue">    - F1 score: **0.4623**</span>

CPU times: user 926 ms, sys: 249 ms, total: 1.18 s
Wall time: 43min 21s


We are done with training our models with optimized hyperparameters.
All the remains to do now is getting them ready for deployment.

This final step is explained and carried out in the [model deployment notebook](reviewed_grapes.model_deployment.pyspark.v1.ipynb).

---