<a href="https://colab.research.google.com/github/satsaras/deploying-machine-learning-models/blob/master/Amazon_Reviews_Classification_using_Pyspark_libraries.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# This notebook utilises pyspark libraries to perform text classification on Amazon reviews corpus. The data has 3.6M reviews in training data set with equal distribution for Negative and positive class

Experiments tried:


1.   TFIDF based features trained with Logistic and Random Forest
2.   Doc2vec based features with Logistic Regression
3. Doc2vec Features trained on DeepLearning model with Keras+Elephas(extension of Keras to run model using spark)
4. Doc2vec based features trained on >1M data with Logistic Regression
5. Bert based sentence embeddings from SparkNLP trained on Deep Learning Model
6. Universal Sentence Encoder based sentence embeddings from Spark NLP trained on Deep Learning Model





In [None]:
!pip install kaggle



In [None]:
from google.colab import files

In [None]:
files.upload()

{}

In [None]:
!rm -r ~/.kaggle
!mkdir ~/.kaggle
!mv ./kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Read Data from kaggle

In [None]:
!kaggle datasets download -d kritanjalijain/amazon-reviews

Downloading amazon-reviews.zip to /content
 99% 1.29G/1.29G [00:17<00:00, 115MB/s]
100% 1.29G/1.29G [00:17<00:00, 80.8MB/s]


In [None]:
! unzip /content/amazon-reviews.zip -d /content/

Archive:  /content/amazon-reviews.zip
  inflating: /content/amazon_review_polarity_csv.tgz  
  inflating: /content/test.csv       
  inflating: /content/train.csv      


In [None]:
!pip install pyspark
!pip install elephas

In [None]:
!pip install h5py==2.10.0



In [None]:
!pip install tensorflow==2.5.0

In [None]:
import pyspark
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,CountVectorizer
from pyspark.ml.feature import Word2Vec
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import when,concat_ws,col,rand
from pyspark.sql.types import *
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras import utils
from tensorflow.keras import layers
from tensorflow.keras import optimizers, regularizers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import optimizers

from elephas.ml_model import ElephasEstimator 
from elephas.spark_model import SparkModel


In [None]:
from sklearn.metrics import accuracy_score

Initiate Sparksession with some spark configs to increase defaults like increasing default memory used per executor from 1 gb to 6gb

In [None]:
spark=SparkSession.builder\
.config('spark.driver.memory',"6g")\
.config('spar.executor.cores','6')\
.config('spark.executor.memory','6g')\
.config('spark.master','local[*]')\
.appName('test')\
.getOrCreate()

Read data

In [None]:
df=spark.read.csv('/content/train.csv')

In [None]:
df.explain()

== Physical Plan ==
FileScan csv [_c0#16,_c1#17,_c2#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string>




Reanme Columns

In [None]:
df=df.withColumnRenamed('_c0','Polarity')
df=df.withColumnRenamed('_c1','Title')
df=df.withColumnRenamed('_c2','Review')

Filter out small reviews as they will not provide enough signal, will not be using Titel of review as it may have high polarity words like 'Amazing', 'Worst' etc

In [None]:
df=df.withColumn("len_Review", F.length("Review"))
df=df.filter(df['len_Review']>10)

Add custom stop words(that are high indicator of polarity) to increase generalization capability of model, model should be able to predict based on entire context of text rather than just few obvious words

In [None]:
custom_stopwords=frozenset(['good','great','bad','hate','like','better','recommend','best','disappointed','perfect','return','easy','hard','ha','wa','love','problem','worst','boring','wrong','loved','wonderful','amazing'])

In [None]:
generic_stopwords=frozenset(['all', 'six', 'just', 'less', 'being', 'indeed', 'over', 'move', 'anyway', 'four', 'not', 'own', 'through',
    'using', 'fifty', 'where', 'mill', 'only', 'find', 'before', 'one', 'whose', 'system', 'how', 'somewhere',
    'much', 'thick', 'show', 'had', 'enough', 'should', 'to', 'must', 'whom', 'seeming', 'yourselves', 'under',
    'ours', 'two', 'has', 'might', 'thereafter', 'latterly', 'do', 'them', 'his', 'around', 'than', 'get', 'very',
    'de', 'none', 'cannot', 'every', 'un', 'they', 'front', 'during', 'thus', 'now', 'him', 'nor', 'name', 'regarding',
    'several', 'hereafter', 'did', 'always', 'who', 'didn', 'whither', 'this', 'someone', 'either', 'each', 'become',
    'thereupon', 'sometime', 'side', 'towards', 'therein', 'twelve', 'because', 'often', 'ten', 'our', 'doing', 'km',
    'eg', 'some', 'back', 'used', 'up', 'go', 'namely', 'computer', 'are', 'further', 'beyond', 'ourselves', 'yet',
    'out', 'even', 'will', 'what', 'still', 'for', 'bottom', 'mine', 'since', 'please', 'forty', 'per', 'its',
    'everything', 'behind', 'does', 'various', 'above', 'between', 'it', 'neither', 'seemed', 'ever', 'across', 'she',
    'somehow', 'be', 'we', 'full', 'never', 'sixty', 'however', 'here', 'otherwise', 'were', 'whereupon', 'nowhere',
    'although', 'found', 'alone', 're', 'along', 'quite', 'fifteen', 'by', 'both', 'about', 'last', 'would',
    'anything', 'via', 'many', 'could', 'thence', 'put', 'against', 'keep', 'etc', 'amount', 'became', 'ltd', 'hence',
    'onto', 'or', 'con', 'among', 'already', 'co', 'afterwards', 'formerly', 'within', 'seems', 'into', 'others',
    'while', 'whatever', 'except', 'down', 'hers', 'everyone', 'done', 'least', 'another', 'whoever', 'moreover',
    'couldnt', 'throughout', 'anyhow', 'yourself', 'three', 'from', 'her', 'few', 'together', 'top', 'there', 'due',
    'been', 'next', 'anyone', 'eleven', 'cry', 'call', 'therefore', 'interest', 'then', 'thru', 'themselves',
    'hundred', 'really', 'sincere', 'empty', 'more', 'himself', 'elsewhere', 'mostly', 'on', 'fire', 'am', 'becoming',
    'hereby', 'amongst', 'else', 'part', 'everywhere', 'too', 'kg', 'herself', 'former', 'those', 'he', 'me', 'myself',
    'made', 'twenty', 'these', 'was', 'bill', 'cant', 'us', 'until', 'besides', 'nevertheless', 'below', 'anywhere',
    'nine', 'can', 'whether', 'of', 'your', 'toward', 'my', 'say', 'something', 'and', 'whereafter', 'whenever',
    'give', 'almost', 'wherever', 'is', 'describe', 'beforehand', 'herein', 'doesn', 'an', 'as', 'itself', 'at',
    'have', 'in', 'seem', 'whence', 'ie', 'any', 'fill', 'again', 'hasnt', 'inc', 'thereby', 'thin', 'no', 'perhaps',
    'latter', 'meanwhile', 'when', 'detail', 'same', 'wherein', 'beside', 'also', 'that', 'other', 'take', 'which',
    'becomes', 'you', 'if', 'nobody', 'unless', 'whereas', 'see', 'though', 'may', 'after', 'upon', 'most', 'hereupon',
    'eight', 'but', 'serious', 'nothing', 'such', 'why', 'off', 'a', 'don', 'whereby', 'third', 'i', 'whole', 'noone',
    'sometimes', 'well', 'amoungst', 'yours', 'their', 'rather', 'without', 'so', 'five', 'the', 'first', 'with',
    'make', 'once','this'])

In [None]:
custom_stopwords=custom_stopwords.union(generic_stopwords)

In [None]:
df_sample=df.sample(withReplacement=False, fraction=0.02, seed=0)

In [None]:
df_sample.select("*").show()

+--------+--------------------+--------------------+----------+
|Polarity|               Title|              Review|len_Review|
+--------+--------------------+--------------------+----------+
|       2|Whispers of the W...|This was a easy t...|       270|
|       2|Henry has come ho...|I had Henry back ...|       323|
|       2| keeps his attention|My little boy is ...|       264|
|       2|Grandson enjoyed ...|Fine. Boys and th...|       104|
|       1|More than Thomas,...|I bought this thi...|       644|
|       2|Simply excellent ...|I am a system ana...|       510|
|       2|"James Moody take...|"James Moody, and...|       490|
|       2|       Great Reading|Christine Feehan ...|       131|
|       1|profanity the mov...|i cannot believe ...|       820|
|       1|Pretty pathetic p...|"The new ""stars ...|        99|
|       2|   Simply brilliant!|"This is a great ...|       334|
|       1|            Horrible|This movie was ho...|       302|
|       1|I'd rate it 0 sta...|"Very few

Helper functions for data cleaning

In [None]:
from pyspark.sql.types import StringType
from textblob import TextBlob, Word
import re,string
from gensim import utils

In [None]:
def remove_stopwords(s):
    s = utils.to_unicode(s)
    return " ".join(w for w in s.split() if w not in custom_stopwords)

In [None]:
def remove_puntuations(s):
    RE_PUNCT = re.compile(r'([%s])+' % re.escape(string.punctuation), re.UNICODE)
    s = utils.to_unicode(s)
    return RE_PUNCT.sub(" ", s)

In [None]:
def remove_tags(s):
    RE_TAGS = re.compile(r"<([^>]+)>", re.UNICODE)
    s = utils.to_unicode(s)
    return RE_TAGS.sub(" ", s)

In [None]:
def remove_multiplewhitespaces(s):
    RE_WHITESPACE = re.compile(r"(\s)+", re.UNICODE)
    #s = utils.to_unicode(s)
    return RE_WHITESPACE.sub(" ", s)

In [None]:
def to_lower(s):
    s=utils.to_unicode(s)
    return " ".join(i.lower() for i in s.split())

In [None]:
def remove_smallwords(s,l=3):
    s=utils.to_unicode(s)
    return " ".join(i for i in s.split() if len(i)>=l)


In [None]:
def remove_nonalpha(s):
    RE_NONALPHA = re.compile(r"\W", re.UNICODE)
    s = utils.to_unicode(s)
    return RE_NONALPHA.sub(" ", s)

In [None]:
def remove_numerics(s):
    RE_NUMERIC = re.compile(r"[0-9]+", re.UNICODE)
    s = utils.to_unicode(s)
    return RE_NUMERIC.sub(" ", s)

In [None]:
def lemmetizer(s):
    s=TextBlob(s)
    return " ". join([w.lemmatize() for w in s.words])

In [None]:
Pre_filters=[remove_tags,
        remove_puntuations,
        remove_multiplewhitespaces,
        remove_numerics,
        remove_smallwords,
        to_lower,
        remove_nonalpha,
        lemmetizer,
        remove_stopwords]

In [None]:
def pre_process(x):
    review=x[2]
    review=review.lower()
    for f in Pre_filters:
        review=f(review)
    return(x[0],review)

In [None]:
import nltk
nltk.download('punkt')
!python -m textblob.download_corpora

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Unzipping corpora/brown.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package conll2000 to /root/nltk_data...
[nltk_data]   Unzipping corpora/conll2000.zip.
[nltk_data] Downloading package movie_reviews to /root/nltk_data...
[nltk_data]   Unzipping corpora/movie_reviews.zip.
Finished.


Clean Review Text

In [None]:
def clean_text(df):
    df=df.rdd.map(lambda x: pre_process(x))
    df = df.toDF(['Polarity','Review'])
    return(df)

In [None]:
df_sample=clean_text(df_sample)

In [None]:
df_sample.select("*").show()

+--------+--------------------+
|Polarity|              Review|
+--------+--------------------+
|       2|read book want re...|
|       1|bought charger in...|
|       1|star depends look...|
|       2|gift old daughter...|
|       2|wow album track r...|
|       2|completely satisf...|
|       2|henry early origi...|
|       2|henry dog lost ri...|
|       2|little boy gettin...|
|       2|whale naturalist ...|
|       1|mask maker cinema...|
|       1|formulation produ...|
|       2|product hair curl...|
|       1|buy game becuase ...|
|       1|reviewing gizmo s...|
|       2|fine boy toy trai...|
|       1|bought thinking p...|
|       1|search super save...|
|       1|guerriula warfare...|
|       2|analyst ibm backg...|
+--------+--------------------+
only showing top 20 rows



Get most occuring words, to seee if any words is obvious to provide review polarity like good, bad etc and add them in custom stop words

In [None]:
df_sample.withColumn('word', F.explode(F.split(F.col('Review'), ' '))) \
  .groupBy('word') \
  .count() \
  .sort('count', ascending=False) \
  .take(200)

[Row(word='book', count=104021),
 Row(word='time', count=40300),
 Row(word='movie', count=33085),
 Row(word='read', count=32982),
 Row(word='work', count=26661),
 Row(word='year', count=23087),
 Row(word='product', count=22652),
 Row(word='use', count=20336),
 Row(word='story', count=19954),
 Row(word='buy', count=19599),
 Row(word='bought', count=19586),
 Row(word='album', count=17948),
 Row(word='thing', count=17894),
 Row(word='way', count=17842),
 Row(word='new', count=16855),
 Row(word='song', count=16732),
 Row(word='know', count=16440),
 Row(word='think', count=16254),
 Row(word='little', count=16213),
 Row(word='doe', count=15805),
 Row(word='got', count=15588),
 Row(word='want', count=14878),
 Row(word='game', count=14689),
 Row(word='music', count=14563),
 Row(word='old', count=14184),
 Row(word='day', count=13987),
 Row(word='money', count=13825),
 Row(word='people', count=13794),
 Row(word='character', count=13729),
 Row(word='dvd', count=13078),
 Row(word='look', count=127

In [None]:
def Tokenize(df):
    tokenizer = Tokenizer(inputCol="Review", outputCol="Review_Words")
    wordsData = tokenizer.transform(df)
    return(wordsData)


In [None]:
def TFIDF(df):
    wordsData=Tokenize(df)
    hashingTF = HashingTF(inputCol="Review_Words", outputCol="rawFeatures", numFeatures=512)
    featurizedData = hashingTF.transform(wordsData)

    idf = IDF(inputCol="rawFeatures", outputCol="Review_CV")
    IDFModel = idf.fit(featurizedData)
    IDF_Data = IDFModel.transform(featurizedData)
    return(IDF_Data)


Get TFIDF features for review

In [None]:
IDF_Data=TFIDF(df_sample)

Define func for LogisticRegresion some grid search and CV

In [None]:
def LogictiveRegression_CV(df_train,df_val):
    lr=LogisticRegression(labelCol='label',featuresCol='Review_CV')
    pipeline = Pipeline(stages=[lr])
    paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0,0.2,0.7]) \
    .addGrid(lr.regParam, [0.1, 0.01,0.05]) \
    .build()
    eval=BinaryClassificationEvaluator(labelCol='label')
    crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),numFolds=2) 
    cv=crossval.fit(df_train)
    best_model=cv.bestModel.stages[0]
    prediction=best_model.transform(df_val)
    acc=eval.evaluate(prediction)
    print('The accuracy is %g' %acc)

Perform test and Control split, also mlib expects the Output column to be names as 'label' with only 0/1

In [None]:
def train_test_Split(df):
    df = df.withColumnRenamed('Polarity','label')
    df = df.withColumn("label", df["label"].cast(IntegerType()))
    df = df.withColumn("label",when(df["label"] == 2, 0).otherwise(df["label"]))
    df = df.withColumn("label", df["label"].cast(IntegerType()))
    df_train,df_test=df.randomSplit([0.8,0.2])
    return(df_train,df_test)


In [None]:
df_train,df_test=train_test_Split(IDF_Data)

Repartition dataframe for faster processing

In [None]:
IDF_Data=IDF_Data.repartition(64)

Logistic Regression on TFIDF features

In [None]:
LogictiveRegression_CV(df_train,df_test)

The accuracy is 0.74282


In [None]:
def RandomForest_CV(df_train,df_val):
    rf=RandomForestClassifier(labelCol='label',featuresCol='Review_CV')
    pipeline = Pipeline(stages=[rf])
    paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [10]) \
    .addGrid(rf.numTrees, [100]) \
    .addGrid(rf.impurity ,['gini','entropy']) \
    .addGrid(rf.maxBins,[20]) \
    .build()
    eval=BinaryClassificationEvaluator(labelCol='label')
    crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),numFolds=2) 
    cv=crossval.fit(df_train)
    best_model=cv.bestModel.stages[0]
    prediction=best_model.transform(df_val)
    acc=eval.evaluate(prediction)
    print('The accuracy is %g' %acc)


In [None]:
RandomForest_CV(df_train,df_test)

The accuracy is 0.809151


Although named as word2vec in mlib, Word2Vec created Doc2vec features of text by performing averaging on word2vec features

In [None]:
def wor2vec(df):
    wordsData=Tokenize(df)
    w2v = Word2Vec(vectorSize=100, minCount=0, inputCol="Review_Words", outputCol="Review_CV")
    w2v_model = w2v.fit(wordsData)
    word2vec_TF = w2v_model.transform(wordsData)
    return(word2vec_TF)

In [None]:
word2vec_TF=wor2vec(df_sample)

In [None]:
word2vec_TF.select("*").write.save('/content/word2vec_features.csv')

In [None]:
#word2vec_TF=spark.read.parquet('/content/word2vec_features.csv')

In [None]:
word2vec_TF.show()

+--------+--------------------+--------------------+--------------------+
|Polarity|              Review|        Review_Words|           Review_CV|
+--------+--------------------+--------------------+--------------------+
|       1|book look visuall...|[book, look, visu...|[0.00886286601710...|
|       2|wanted write rewi...|[wanted, write, r...|[-0.0183582048843...|
|       2|heard zucchero wa...|[heard, zucchero,...|[-0.0343871394354...|
|       2|archer mayor book...|[archer, mayor, b...|[0.01174846675712...|
|       2|enjoy series deal...|[enjoy, series, d...|[0.03125806953758...|
|       1|gift set look big...|[gift, set, look,...|[0.01814348445197...|
|       2|canon lens favori...|[canon, lens, fav...|[0.03245711780327...|
|       2|book cute collage...|[book, cute, coll...|[-0.0104236687766...|
|       1|blank dvd sale co...|[blank, dvd, sale...|[-0.0186463112272...|
|       1|looking book fund...|[looking, book, f...|[-0.0114724895327...|
|       1|buffy fan expecte...|[buffy,

In [None]:
word2vec_TF = word2vec_TF.orderBy(rand())

In [None]:
word2vec_TF=word2vec_TF.withColumnRenamed('w2v_Features','Review_CV')

In [None]:
df_train,df_test=train_test_Split(word2vec_TF)

Perform Logistic Regression on Doc2Vec features

In [None]:
LogictiveRegression_CV(df_train,df_test)

The accuracy is 0.827134


Train Word2vec on bigger sample of text data and build classification model for previously sampled data

In [None]:
def wor2vec(df):
    wordsData=Tokenize(df)
    w2v = Word2Vec(vectorSize=100, minCount=2, inputCol="Review_Words", outputCol="Review_CV",numPartitions=64)
    w2v_model = w2v.fit(wordsData)
    return(w2v_model)

In [None]:
df=df.repartition(12)

Take 30-50% sample from original corpus

In [None]:
df_sample2=df.sample(False,fraction=.3,seed=42)

In [None]:
df_sample2.select('*').count()

1079838

In [None]:
df_sample2=clean_text(df_sample2)

In [None]:
word2vec_full=wor2vec(df_sample2)

In [None]:
word2vec_full.save('/content/word2vec_model_50')

In [None]:
from google.colab import drive
drive.mount('/content/word2vec_model_30')
# copy it there
!cp model content/word2vec_model_30

df_sample is previously sampled ~70k corpus of data with slmost similar distribution for both classes

In [None]:
df_sample=Tokenize(df_sample)
df_sample=word2vec_full.transform(df_sample)

In [None]:
df_train,df_test=train_test_Split(df_sample)

word2vec trained on bigger sample(~1M reviews), gave 4% boost in accuracy

In [None]:
LogictiveRegression_CV(df_train,df_test)

The accuracy is 0.86459


word2vec trained on bigger sample(~1.5M reviews), gave 5% boost in accuracy

In [None]:
LogictiveRegression_CV(df_train,df_test)

The accuracy is 0.875109


Create a deep learning model with keras, utilize Elephas to use distributed computing of spark

In [None]:
df_train=df_train.filter(df_train['Review']!="")

In [None]:
df_train.groupby('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|28756|
|    0|28824|
+-----+-----+



In [None]:
#print(len(df_train.select("Review_CV").first()[0]))
input_dim=100

In [None]:
def build_classifier_model():
    model = tf.keras.Sequential([
      layers.Dense(128,activation='relu',input_shape=(input_dim,),activity_regularizer=regularizers.l2(0.02)),
      layers.Dropout(.5),
      layers.BatchNormalization(),
      layers.Dense(64,activation='relu'),
      layers.Dropout(.4),
      layers.BatchNormalization(),
      layers.Dense(16,activation='relu'),
      layers.Dropout(.4),
      layers.Dense((2),activation='softmax')])
    return(model)

In [None]:
classifier_model=build_classifier_model()
classifier_model.compile(optimizer='Adam',loss='binary_crossentropy',metrics=['accuracy'])

In [None]:

optimizer_conf = optimizers.Adam()
opt_conf = optimizers.serialize(optimizer_conf)

# Initialize SparkML Estimator
estimator = ElephasEstimator()
estimator.setFeaturesCol("Review_CV")
estimator.setLabelCol("label")
estimator.set_keras_model_config(classifier_model.to_json())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(2)
estimator.set_num_workers(16)
estimator.set_epochs(20) 
estimator.set_batch_size(128)
estimator.set_verbosity(4)
estimator.set_validation_split(0.20)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("binary_crossentropy")
estimator.set_metrics(['acc'])

ElephasEstimator_e3f24cf6708d

In [None]:
dl_pipeline = Pipeline(stages=[estimator])
fit_dl_pipeline = dl_pipeline.fit(df_train)

>>> Fit model
>>> Synchronous training complete.


In [None]:
 pred = fit_dl_pipeline.transform(df_test)

In [None]:
pred_filter=pred.select("label", "prediction")


In [None]:
pred_filter=pred_filter.repartition(64)

In [None]:
test_label_pred = pred_filter.toPandas()

In [None]:
import numpy as np

In [None]:
test_label_pred['prediction']=test_label_pred['prediction'].apply(lambda x: np.argmax(x))

In [None]:
#test_label_pred["prediction"]=test_label_pred["prediction"].apply(lambda x:1 if x>0.5 else 0)

In [None]:
accuracy_score(test_label_pred['label'],test_label_pred['prediction'])

0.7317876155785934

# Try SparkNLP based sentence bert model, and building model based on DNN based spark model

In [None]:
!pip install sparknlp
!pip install pyspark

In [None]:
import sparknlp

Word of caution, if sparksession is enabled then documentassembler here will give error

In [None]:
spark = sparknlp.start() 

In [None]:
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd

In [None]:
document = DocumentAssembler()\
    .setInputCol("Review")\
    .setOutputCol("Review_ann")

Try Bert Model(small uncased)

In [None]:
Bert_pretrained = BertSentenceEmbeddings.pretrained('sent_small_bert_L2_768')\
 .setInputCols(["Review_ann"])\
 .setOutputCol("sentence_embeddings")

sent_small_bert_L2_768 download started this may take some time.
Approximate size to download 139.6 MB
[OK!]


In [None]:
classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("label")\
  .setMaxEpochs(5)\
bert_clf_pipeline = Pipeline(
    stages = [
        document,
        Bert_pretrained,
        classsifierdl
    ])

In [None]:
Pre_filters=[remove_tags,
        remove_puntuations,
        remove_multiplewhitespaces,
        remove_numerics,
        #remove_smallwords,
        #to_lower,
        remove_nonalpha
        #lemmetizer,
        #remove_stopwords
        ]
         ## remove some preprocessing for bert based model cleaning 

In [None]:
df_sample=clean_text(df_sample)

In [None]:
from pyspark.sql.functions import trim
df_sample=df_sample.withColumn("Review",trim(df_sample.Review))

In [None]:
df_train,df_test=train_test_Split(df_sample) # text cleaning done based on previous model

In [None]:
bert_pipelineModel = bert_clf_pipeline.fit(df_train)

In [None]:
use_pipelineModel.save('/content/bertsentence/')

In [None]:
df_test=use_pipelineModel.transform(df_test).select('Review','label','class.result')


In [None]:
df_test=df_test.toPandas()
df_test['result']=df_test['result'].apply(lambda x: x[0])

In [None]:
df_test.result=pd.to_numeric(df_test.result)

In [None]:
print(accuracy_score(df_test.label,df_test.result))

0.8137492219378933


Try Universal Sentence Encoder

In [None]:
USE_model = UniversalSentenceEncoder.pretrained()\
 .setInputCols(["Review_ann"])\
 .setOutputCol("sentence_embeddings")

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]


In [None]:
classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("label")\
  .setMaxEpochs(5)
use_clf_pipeline = Pipeline(
    stages = [
        document,
        USE_model,
        classsifierdl
    ])

In [None]:
use_pipelineModel = use_clf_pipeline.fit(df_train) ## bert model training took an hour, USE took 4 min

In [None]:
df_test=use_pipelineModel.transform(df_test).select('Review','label','class.result')


In [None]:
df_test=df_test.toPandas()
df_test['result']=df_test['result'].apply(lambda x: x[0])
df_test.result=pd.to_numeric(df_test.result)

In [None]:
print(accuracy_score(df_test.label,df_test.result))

0.8401590734668248
