In [1]:
from configspark import create_session, read_data
import pyspark.sql.functions as f 
import sparknlp
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer, IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

## Goal
* Objective is to classify Reviews based on ratings to be honest you probably need to do chisquared first to see uniqueness of words per class
  but that doesn't stop the fun this project is to show how to pipeline the data and some basic cleaning not trying to get the best model 
* The note books are separated in parts because I want to exime inside the pipelines.

##### Configspark
* I was being lazy and I added the spark config function and also the read data it's all in the configspark.py at list I added a schema 

In [None]:
spark = create_session()

In [3]:
df = read_data(spark)

##### Split the data to train and validate 
* when you fit a pipeline I line to transform a different dataset than the one I have 

##### Split the data to train and validate 
* when you fit a pipeline I line to transform a different dataset than the one I have 

In [4]:
train_ratio = 0.7
test_ratio = 0.15
validation_ratio = 0.15

# Split the data using randomSplit()
train_data, test_data, validation_data = df.randomSplit([train_ratio, test_ratio, validation_ratio], seed=45)


In [5]:
train_data

DataFrame[review: string, rating: string]

#### Pipelining
* like it says pipeline it's a chain the previous transformation is linked to the next transformation order of excecution is maintained 

In [6]:
# Preprocess the data to concatenate feature columns into one column called text
# featureConcat = FeatureConcatenator(outputCols = ["text"], inputCols = [target_col])

# Prepares data into a format that is processable by Spark NLP. This is the entry point for every Spark NLP pipeline. 
# The DocumentAssembler can read either a String column or an Array[String]
documentAssembler = DocumentAssembler().setInputCol("review").setOutputCol("document")

# Tokenizes raw text in document type columns into TokenizedSentence
tokenizer = Tokenizer().setInputCols("document").setOutputCol("token")

# Annotator that cleans out tokens.
# Remove white space
normalizer = Normalizer().setInputCols("token").setOutputCol("normalized").setLowercase(True).setCleanupPatterns(["[^\w\s]"])

# Remove years (integers starting with 19XX or 20XX)
removeYear = Normalizer().setInputCols(["normalized"]).setOutputCol("remove_year").setCleanupPatterns(["(?:(?:19|20)\d\d)"])

# Find lemmas out of words with the objective of returning a base dictionary word
lemmatizer = LemmatizerModel.pretrained().setInputCols("remove_year").setOutputCol("lemmatized")

# A feature transformer that converts the input array of strings (annotatorType TOKEN) into an array of n-grams (annotatorType CHUNK). 
#  Null values in the input array are ignored. It returns an array of n-grams where each n-gram is represented by a space-separated string of words.
ngrammer = NGramGenerator().setInputCols(['lemmatized']).setOutputCol('ngrams').setN(3).setEnableCumulative(True).setDelimiter('_')

# Converts annotation results into a format that easier to use. It is useful to extract the results from Spark NLP Pipelines. 
# The Finisher outputs annotation(s) values into String
finisher = Finisher().setInputCols(['ngrams']).setOutputCols(['final'])

# Maps a sequence of terms to their term frequencies using the hashing trick.
tfizer= HashingTF(inputCol='final', outputCol='tf_features')

# Inverse document frequency
# This implementation supports filtering out terms which do not appear in a minimum number of documents
# idf = log((m + 1) / (d(t) + 1)), where m is the total number of documents and d(t) is the number of documents that contain term t.
# the number of documents is the number of classes 
idfizer = IDF(inputCol='tf_features', outputCol="features", minDocFreq = 2)


lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
Download done! Loading the resource.
[OK!]


In [7]:
TF_IDF_pipeline = Pipeline().setStages([
                                    documentAssembler,
                                    tokenizer,
                                    normalizer,
                                    removeYear,
                                    lemmatizer,
                                    ngrammer,
                                    finisher,
                                    tfizer,
                                    idfizer
                                    ])

In [8]:
TF_IDF_convetor = TF_IDF_pipeline.fit(train_data)

                                                                                

In [9]:
TF_IDF_df = TF_IDF_convetor.transform(train_data)

##### Measure of importance similar to the chisquared 
* It's all package with the Token and then the term frequences 

In [10]:
TF_IDF_df.show()

+--------------------+------+--------------------+--------------------+--------------------+
|              review|rating|               final|         tf_features|            features|
+--------------------+------+--------------------+--------------------+--------------------+
|"5 Stars - ""Very...|   5.0|[5, star, very, h...|(262144,[44441,12...|(262144,[44441,12...|
|"ALMOST everythin...|   3.0|[almost, everythi...|(262144,[146,7036...|(262144,[146,7036...|
|"After a few week...|   4.0|[after, a, few, w...|(262144,[268,1110...|(262144,[268,1110...|
|"After my old Ham...|   1.0|[after, i, old, h...|(262144,[1147,268...|(262144,[1147,268...|
|"After years of u...|   5.0|[after, year, of,...|(262144,[150,2437...|(262144,[150,2437...|
|"As a coffee novi...|   5.0|[as, a, coffee, n...|(262144,[276,3353...|(262144,[276,3353...|
|"At first, I like...|   2.0|[at, first, i, li...|(262144,[221,752,...|(262144,[221,752,...|
|"Best coffee pot ...|   5.0|[good, coffee, po...|(262144,[351,660,...

23/09/13 00:32:42 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [11]:
TF_IDF_df.select("features").show(truncate=130)

+----------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                          features|
+----------------------------------------------------------------------------------------------------------------------------------+
|(262144,[44441,125422,144403,183411,190647,202791,210040,213314,245949],[6.795985378139127,6.795985378139127,0.0,3.291930611037...|
|(262144,[146,7036,12797,14541,20146,20345,20360,20833,21823,24056,25203,26465,27576,30546,30950,32890,33917,34996,35008,36271,4...|
|(262144,[268,1110,1900,3011,3924,3928,5655,6033,6661,7549,8898,9056,9420,9884,10151,12472,12524,14002,15025,16898,17634,18575,1...|
|(262144,[1147,2687,2761,4263,4291,4900,5330,5464,7625,8619,8972,9420,16029,16620,16898,18587,19036,19321,20524,20567,21549,2310...|
|(262144,[150,2437,2522,3824,4390,4689,5923,6455,6561,6620,6951,8282,

23/09/13 00:32:42 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
