## QUANTIFY MOVIE REVIEW 

In [10]:
! pip install  spark-nlp

Collecting spark-nlp
  Downloading spark_nlp-2.5.1-py2.py3-none-any.whl (121 kB)
[K     |████████████████████████████████| 121 kB 3.1 MB/s eta 0:00:01
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-2.5.1


In [11]:
import sparknlp

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

import sparknlp
from sparknlp import DocumentAssembler, Finisher
from sparknlp.annotator import *

In [12]:
spark = sparknlp.start()

In [13]:
pos_train = spark.sparkContext.wholeTextFiles

**always negociate with stakeholders** As you work on the project, this list may change. The earlier you catch missed constraints the better. If you discover a constraint just before deployment, it can be very expensive to fix. This is why we want to iterate with stakeholders during development.  Now that we have listed our constraints, let’s discuss how we can build our application.

In [14]:
! ls ../datasets/sparkNlp_imdb_review/aclImdb_v1/aclImdb/

imdbEr.txt  imdb.vocab	README	test  train


In [15]:
pos_train = spark.sparkContext.wholeTextFiles('../datasets/sparkNlp_imdb_review/aclImdb_v1/aclImdb/train/pos/')
neg_train = spark.sparkContext.wholeTextFiles('../datasets/sparkNlp_imdb_review/aclImdb_v1/aclImdb/train/neg/')
pos_test = spark.sparkContext.wholeTextFiles('../datasets/sparkNlp_imdb_review/aclImdb_v1/aclImdb/test/pos/')
neg_test = spark.sparkContext.wholeTextFiles('../datasets/sparkNlp_imdb_review/aclImdb_v1/aclImdb/test/neg/')

In [16]:
pos_train = spark.createDataFrame(pos_train, ['path', 'text'])
pos_train = pos_train.repartition(100)
pos_train = pos_train.withColumn('label', lit(1)).persist()

neg_train = spark.createDataFrame(neg_train, ['path', 'text'])
neg_train = neg_train.repartition(100)
neg_train = neg_train.withColumn('label', lit(0)).persist()

pos_test = spark.createDataFrame(pos_test, ['path', 'text'])
pos_test = pos_test.repartition(100)
pos_test = pos_test.withColumn('label', lit(1)).persist()

neg_test = spark.createDataFrame(neg_test, ['path', 'text'])
neg_test = neg_test.repartition(100)
neg_test = neg_test.withColumn('label', lit(0)).persist()

In [17]:
print(pos_train.first()['text'])

One of the most heart-warming foreign films I've ever seen.<br /><br />The young girl is an amazing talent. Stellar performances by her (Doggie), the old man (the king of masks), and Liang (the Living Boddhisatva).<br /><br />(SPOILER) The deplorable treatment of children, especially females is disturbing.<br /><br />Loved the music. The original Chinese dialog heightens the emotional intensity of the performances and the story.<br /><br />This is a MUST SEE -- enjoyable family film, although not for very young children. Would have rated the DVD release even higher if the soundtrack had been transferred better onto the DVD and the transfer had included the widescreen version.


In [18]:
print(neg_train.first()['text'])

This movie is the worst movie i have ever seen... it is humorous how bad it is.. the entire time i was watching it i half expected music to start and the doctor starts dancing..(i've seen porno's with a better plot) When the raptor was trying to get in the door i think someone was throwing a plastic doll against the door from about 2 feet away. But as i said it is so bad you need to watch it so that you can see just how bad it is me explaining it isn't going to do anything compared to if you watch it .. i don't recommend renting it but if it comes on TV watch it for about 30min just to see what i mean. I couldn't watch more than 30min but if you can sit through the whole thing then you have some good willpower


#### check the corpus as a whole 

In [19]:
print('pos_train_size:', pos_train.count())
print('neg_train_size:', neg_train.count())
print('pos_test_size:', pos_test.count())
print('neg_test_size:', neg_test.count())

pos_train_size: 12500
neg_train_size: 12500
pos_test_size: 12500
neg_test_size: 12500


##### check stats about the length of text in pos_train

In [20]:
pos_train.selectExpr('length(text) AS text_len').toPandas().describe()

Unnamed: 0,text_len
count,12500.0
mean,1347.16024
std,1046.747365
min,70.0
25%,695.0
50%,982.0
75%,1651.0
max,13704.0


our project is divided into two parts
1. training and measuring the model
2. building the script

##### combing positive and negative into two datasets, train and test 

In [21]:
train = pos_train.unionAll(neg_train)
test = pos_test.unionAll(neg_test)

Now, let’s use Spark NLP to process the data. We will save both the lemmatized and normalized tokens, as well as GloVe embeddings. This way, we can experiment with different features.

### CLEATING PIPELINE

In [22]:
assembler = DocumentAssembler()\
            .setInputCol('text')\
            .setOutputCol('document')

sentence = SentenceDetector()\
            .setInputCols(['document'])\
            .setOutputCol('sentences')

tokenizer = Tokenizer()\
            .setInputCols(['sentences'])\
            .setOutputCol('tokens')

lemmatizer = LemmatizerModel.pretrained()\
            .setInputCols(['tokens'])\
            .setOutputCol('lemmas')

normalizer = Normalizer()\
            .setCleanupPatterns([
            '[^a-zA-Z.-]+',
            '^[^a-zA-Z]+',
            '[^a-zA-Z]+$'
            ])\
            .setInputCols(['lemmas'])\
            .setOutputCol('normalized')\
            .setLowercase(True)

glove = WordEmbeddingsModel.pretrained(name='glove_100d')\
            .setInputCols(['document', 'normalized'])\
            .setOutputCol('embeddings')

nlp_pipeline = Pipeline().setStages([
    assembler, sentence, tokenizer, lemmatizer, normalizer, glove
]).fit(train)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


Let’s select just the values we are interested in, namely the original data plus the normalized tokens and embeddings.

In [23]:
train = nlp_pipeline.transform(train)\
        .selectExpr(
    'path', 'text', 'label',
    'normalized.result AS normalized',
    'embeddings.embeddings'
)

test = nlp_pipeline.transform(test)\
    .selectExpr('path', 'text', 'label',
                'normalized.result AS normalized',
                'embeddings.embeddings'
    )

In [24]:
!ls ../

checkpoints  datasets  notebooks


In [25]:
nlp_pipeline.write().overwrite().save('../checkpoints/nlp_pipeline.3.12')

#####  we use simplest version of word2vec

In [26]:
import numpy as np
from pyspark.sql.types import *
from pyspark.ml.linalg import DenseVector, VectorUDT

def avg_wordvecs_fun(wordvecs):
    return DenseVector(np.mean(wordvecs, axis=0))

avg_wordvecs = spark.udf.register(
    'avg_wordvecs',
    avg_wordvecs_fun,
    returnType=VectorUDT()
    )

train = train.withColumn('avg_wordvec', avg_wordvecs('embeddings'))
test = test.withColumn('avg_wordvec', avg_wordvecs('embeddings'))

train.drop('embeddings')
test.drop('embeddings')

DataFrame[path: string, text: string, label: int, normalized: array<string>, avg_wordvec: vector]

### to save some space we will save it as parquet file

In [27]:
train.write.mode('overwrite').parquet('../checkpoints/nlp_pipeline.3.12imdb.train')
test.write.mode('overwrite').parquet('../checkpoints/nlp_pipeline.3.12imdb.test')

#####  unpersist to save clean up memory

In [28]:
pos_train.unpersist()
pos_test.unpersist()
neg_train.unpersist()
neg_test.unpersist()

DataFrame[path: string, text: string, label: int]

##### now load data from parquet file

In [31]:
train = spark.read.parquet('../checkpoints/nlp_pipeline.3.12imdb.train/').persist()
test = spark.read.parquet('../checkpoints/nlp_pipeline.3.12imdb.test/').persist()

## try model with simple TF.IDF features

In [32]:
from pyspark.ml.feature import CountVectorizer, IDF

In [34]:
tf = CountVectorizer()\
    .setInputCol('normalized')\
    .setOutputCol('tf')

idf = IDF()\
    .setInputCol('tf')\
    .setOutputCol('tfidf')

featurizer = Pipeline().setStages([tf, idf])

we have our feature, building model with **logistic regression**

In [35]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [39]:
vec_assembler = VectorAssembler()\
        .setInputCols(['avg_wordvec'])\
        .setOutputCol('features')

logreg = LogisticRegression()\
        .setFeaturesCol('features')\
        .setLabelCol('label')

model_pipeline = Pipeline()\
        .setStages([featurizer, vec_assembler, logreg])

In [40]:
model = model_pipeline.fit(train)

#### now saving the model again

In [41]:
model.write().overwrite().save('../checkpoints/model.3.12')

### now we will fit the model

In [42]:
train_preds = model.transform(train)

In [43]:
test_preds = model.transform(test)

## EVALUATING THE LINEAR REGRESSION MODEL

In [44]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [45]:
evaluator = MulticlassClassificationEvaluator()\
    .setMetricName('f1')

In [46]:
evaluator.evaluate(train_preds)

0.8027996352582054

In [47]:
evaluator.evaluate(test_preds)

0.8012334966780377

## DEPLOYMENT OF THE MODEL

In this app we only write a script. Otherwise offline "deployment" often involve creating a workflow which can be run preiodically. For this app we will create a script that can be run for new reviews:

In [None]:
%%writefile movie_review_analysis.py

"""
This script will take file containing reviews of the same.
It will output the results of analysis to std.out.
"""