In [1]:
from pyspark.sql.session import SparkSession
from sklearn.metrics import confusion_matrix,classification_report
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
#from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, CountVectorizer
from pyspark.ml.feature import Tokenizer as pyTokenizer
#from pyspark.ml.feature import RegexTokenizer,StopWordsRemover,CountVectorizer,IDF, HashingTF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from  pyspark.ml.pipeline import PipelineModel  # For saving the model
from pyspark.sql.functions import col
import pyspark.sql.functions as F

# Johnsnow nlp library
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

import pyspark
import numpy as np

In [2]:
# Return spark data frame shape
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape

### Running spark

In [4]:
#!/Users/JoeKifle/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master spark://131.114.50.200:7077 kafka-script/tweet_consumer.py

In [5]:
#!/Users/JoeKifle/spark-3.2.1-bin-hadoop3.2/sbin/start-master.sh

In [6]:
#!/Users/JoeKifle/spark-3.2.1-bin-hadoop3.2/sbin/start-worker.sh spark://joetelila.local:7077

In [7]:
#!/Users/JoeKifle/spark-3.2.1-bin-hadoop3.2/sbin/stop-worker.sh

In [8]:
#!/Users/JoeKifle/spark-3.2.1-bin-hadoop3.2/sbin/stop-master.sh

### Spark Session

In [9]:
# dependency for spark-sql-kafka
conf = pyspark.SparkConf()
conf.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1")
conf.set("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.4")
conf.set("spark.executor.memory", "6g")

#spark_master = "spark://131.114.50.200:7077"
#spark_master = "spark://joetelila.lan:7077"
spark_master = "spark://131.114.50.200:7079"
#spark_master = "spark://joetelila.local:7077"
#sc = pyspark.SparkContext(master=spark_master,appName="Hello Spark")
spark = SparkSession\
        .builder\
        .master(spark_master)\
        .appName("sentimentAnalysis")\
        .config(conf=conf)\
        .getOrCreate()
#spark._sc.setLogLevel("ERROR")



:: loading settings :: url = jar:file:/home/y.telila/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/y.telila/.ivy2/cache
The jars for the packages stored in: /home/y.telila/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c322de0d-2ce5-4edb-b847-86a395effb3b;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;3.4.4 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.5.3 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.603 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.code.findbugs#annotations;3.0.1 in central
	found net.jcip#jcip-annotations;1.0 in central
	found com.google.code.findbugs#jsr305;3.0.1 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#

In [10]:
spark._sc.setLogLevel("ERROR")

### Reading Data

In [11]:
# define the schema
my_schema = tp.StructType([
  tp.StructField(name= 'text',       dataType= tp.StringType(),   nullable= True),
  tp.StructField(name= 'polarity',    dataType= tp.IntegerType(),  nullable= True)
  ])

In [12]:
# read the dataset.
tweet_data = spark.read.csv('/home/y.telila/DEP/Sentiment-analysis-using-kafka-spark/data/tweets_dataset_may6_no_comma.csv',inferSchema=True, header=True)

                                                                                

In [13]:
tweet_data.shape()

(182521, 2)

In [14]:
tweet_data.show(5)

+--------------------+--------+
|                text|polarity|
+--------------------+--------+
|when modi promise...|       2|
|talk all the nons...|       0|
|what did just say...|       1|
|asking his suppor...|       1|
|answer who among ...|       1|
+--------------------+--------+
only showing top 5 rows



In [15]:
# Removing handles and links from the tweets.
tweet_data = tweet_data.withColumn('text', F.regexp_replace('text','@[A-Za-z0-9_]+',''))
tweet_data = tweet_data.withColumn('text', F.regexp_replace('text','https?://[^ ]+',''))
tweet_data = tweet_data.withColumn('text', F.regexp_replace('text','www.[^ ]+',''))

In [16]:
tweet_data.show(5)

+--------------------+--------+
|                text|polarity|
+--------------------+--------+
|when modi promise...|       2|
|talk all the nons...|       0|
|what did just say...|       1|
|asking his suppor...|       1|
|answer who among ...|       1|
+--------------------+--------+
only showing top 5 rows



In [17]:
# print the schema of the file
tweet_data.printSchema()

root
 |-- text: string (nullable = true)
 |-- polarity: integer (nullable = true)



In [18]:
# dropping null columns
tweet_data=tweet_data.na.drop()

In [19]:
# Show distribution of the polarity
tweet_data.groupBy("polarity") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()



+--------+-----+
|polarity|count|
+--------+-----+
|       1|75482|
|       0|59111|
|       2|47736|
+--------+-----+



                                                                                

In [20]:
tweet_data.shape()

                                                                                

(182329, 2)

### 1. Logistic regression Model

In [21]:
# Stages For the Pipeline
tokenizer = pyTokenizer(inputCol='text',outputCol='mytokens')
stopwords_remover = StopWordsRemover(inputCol='mytokens',outputCol='filtered_tokens')
word_2_vec = Word2Vec(inputCol= 'filtered_tokens', outputCol= 'w2v', vectorSize=200) #, vectorSize= 300)

In [22]:
#model = LogisticRegression(featuresCol='vector',labelCol='polarity')
model = LogisticRegression(featuresCol= 'w2v',labelCol= 'polarity') # regParam=0.008, maxIter=10000

In [23]:
# setup the pipeline
pipeline = Pipeline(stages= [tokenizer, stopwords_remover, word_2_vec, model])

#### Training model

In [25]:
### Split Dataset and train
(train_tweet,test_tweet) = tweet_data.randomSplit((0.85,0.15),seed=42)
#pipelineFit = pipeline.fit(train_tweet)

In [22]:
#### Hyper-parameter search
# ref: https://spark.apache.org/docs/2.3.0/ml-tuning.html

In [32]:
# Run pipeline with ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [24]:
### 3 x 3 = 9 times training occurs (it takes a long time)
# The elastic net is a regularized regression method that # linearly combines the L1 and L2 penalties of the lasso and ridge methods.
#  np.logspace(-4, 4, 4)
paramGrid = ParamGridBuilder() \
 .addGrid(model.tol,[0.001, 0.0004]) \
 .addGrid(model.threshold, [0.4,0.5,0.6]) \
 .addGrid(model.regParam, [0.008,0.0008,0.00008]) \
 .addGrid(model.maxIter, [1000,5000,10000]) \
 .build()

In [25]:
tvs = TrainValidationSplit(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol='polarity',predictionCol='prediction',metricName='f1'),
                          # 80% of the data will be used for training, 20% for validation.
                          trainRatio=0.8)

In [26]:
%%time
# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train_tweet)

                                                                                

CPU times: user 8.17 s, sys: 1.74 s, total: 9.9 s
Wall time: 1h 1min 32s


In [30]:
###### View all results (accuracy) by each params
list(zip(model.validationMetrics, model.getEstimatorParamMaps()))

[(0.5937511821095822,
  {Param(parent='LogisticRegression_150ed0f15804', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 0.001,
   Param(parent='LogisticRegression_150ed0f15804', name='threshold', doc='Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p].'): 0.4,
   Param(parent='LogisticRegression_150ed0f15804', name='regParam', doc='regularization parameter (>= 0).'): 0.008,
   Param(parent='LogisticRegression_150ed0f15804', name='maxIter', doc='max number of iterations (>= 0).'): 1000}),
 (0.5888227537054588,
  {Param(parent='LogisticRegression_150ed0f15804', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 0.001,
   Param(parent='LogisticRegression_150ed0f15804', name='threshold', doc='Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set,

In [26]:
# setup the pipeline for the best parameter
best_model = LogisticRegression(featuresCol= 'w2v',labelCol= 'polarity',regParam=0.00008,tol=0.0004,threshold=0.4, maxIter=10000) # 
pipeline = Pipeline(stages= [tokenizer, stopwords_remover, word_2_vec, best_model])
optModel = pipeline.fit(train_tweet)

                                                                                

#### Evaluating model

In [27]:
# Predictions on our - Test Dataset.
predictions = optModel.transform(test_tweet)

evaluator = MulticlassClassificationEvaluator(labelCol='polarity',predictionCol='prediction',metricName='f1')
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)



Accuracy: 0.6216805172120891


                                                                                

In [30]:
y_true = predictions.select('polarity')
y_true = y_true.toPandas()
y_pred = predictions.select('prediction')
y_pred = y_pred.toPandas()

                                                                                

In [32]:
# Classication Report
print(classification_report(y_true,y_pred))

              precision    recall  f1-score   support

           0       0.64      0.61      0.62      9032
           1       0.63      0.73      0.68     11262
           2       0.59      0.48      0.53      7001

    accuracy                           0.63     27295
   macro avg       0.62      0.61      0.61     27295
weighted avg       0.62      0.63      0.62     27295



In [41]:
# persisting the model
optModel.write().overwrite().save('pipeline_lr_model')

                                                                                

### 2. Logistric regression with transformation from Johnsnow nlp

#### Building pipeline

In [33]:
#import sparknlp
#from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer

document_assembler = DocumentAssembler() \
      .setInputCol("text") \
      .setOutputCol("document")
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")
normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)
stemmer = Stemmer() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCol("stem")
finisher = Finisher() \
      .setInputCols(["stem"]) \
      .setOutputCols(["token_features"]) \
      .setOutputAsArray(True) \
      .setCleanAnnotations(False)
countVectors = CountVectorizer(inputCol="token_features", outputCol="features", vocabSize=10000, minDF=5)

In [34]:
# Create the pipeline
nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            countVectors
            ])

In [35]:
nlp_model = nlp_pipeline.fit(tweet_data)
processed = nlp_model.transform(tweet_data)

                                                                                

In [36]:
processed.printSchema()

root
 |-- text: string (nullable = true)
 |-- polarity: integer (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- v

In [39]:
processed.select('text','polarity').show(truncate=50)

+--------------------------------------------------+--------+
|                                              text|polarity|
+--------------------------------------------------+--------+
|when modi promised “minimum government maximum ...|       2|
|talk all the nonsense and continue all the dram...|       0|
|what did just say vote for modi  welcome bjp to...|       1|
|asking his supporters prefix chowkidar their na...|       1|
|answer who among these the most powerful world ...|       1|
|           kiya tho refresh maarkefir comment karo|       0|
|surat women perform yagna seeks divine grace fo...|       0|
|this comes from cabinet which has scholars like...|       0|
|with upcoming election india saga going importa...|       1|
|                          gandhi was gay does modi|       1|
|things like demonetisation gst goods and servic...|       1|
|hope tuthukudi people would prefer honest well ...|       1|
|                  calm waters wheres the modi wave|       1|
|one vot

In [41]:
processed.select('text','features','polarity').show()

+--------------------+--------------------+--------+
|                text|            features|polarity|
+--------------------+--------------------+--------+
|when modi promise...|(10000,[0,13,14,2...|       2|
|talk all the nons...|(10000,[0,7,87,36...|       0|
|what did just say...|(10000,[0,3,7,15,...|       1|
|asking his suppor...|(10000,[0,24,30,3...|       1|
|answer who among ...|(10000,[0,33,42,6...|       1|
|kiya tho refresh ...|(10000,[434,1060,...|       0|
|surat women perfo...|(10000,[0,8,85,43...|       0|
|this comes from c...|(10000,[0,4,12,30...|       0|
|with upcoming ele...|(10000,[0,1,5,9,8...|       1|
|gandhi was gay do...|(10000,[0,35,3035...|       1|
|things like demon...|(10000,[0,4,12,15...|       1|
|hope tuthukudi pe...|(10000,[0,2,7,37,...|       1|
|calm waters where...|(10000,[0,402,877...|       1|
|one vote can make...|(10000,[0,5,7,11,...|       0|
|one vote can make...|(10000,[0,5,7,11,...|       0|
|vote such party a...|(10000,[0,3,7,8,2...|   

#### Training the model

In [37]:
### Split Dataset and train
(trainingData, testData) = processed.randomSplit((0.85,0.15),seed=42)

In [30]:
#model = LogisticRegression(featuresCol='vector',labelCol='polarity')
model = LogisticRegression(labelCol= 'polarity') # regParam=0.008, maxIter=10000
#lr = LogisticRegression(regParam=0.008, maxIter=10000, labelCol="polarity")
#lrModel = lr.fit(trainingData)

In [33]:
# 2 x 2 x 3 x 3 = 36 times training occurs (it takes a long time)
# The elastic net is a regularized regression method that # linearly combines the L1 and L2 penalties of the lasso and ridge methods.
paramGrid = ParamGridBuilder() \
 .addGrid(model.tol,[0.001, 0.0004]) \
 .addGrid(model.threshold, [0.4,0.5,0.6]) \
 .addGrid(model.regParam, [0.008,0.0008,0.00008]) \
 .addGrid(model.maxIter, [1000,5000,10000]) \
 .build()

In [34]:
tvs_js = TrainValidationSplit(estimator=model,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol='polarity',predictionCol='prediction',metricName='f1'),
                          # 80% of the data will be used for training, 20% for validation.
                          trainRatio=0.8)

In [35]:
model_js = tvs_js.fit(trainingData)

                                                                                

In [38]:
# View all results (accuracy) by each params
list(zip(model_js.validationMetrics, model_js.getEstimatorParamMaps()))

[(0.8073168208187893,
  {Param(parent='LogisticRegression_b8e126b2e705', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 0.001,
   Param(parent='LogisticRegression_b8e126b2e705', name='threshold', doc='Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p].'): 0.4,
   Param(parent='LogisticRegression_b8e126b2e705', name='regParam', doc='regularization parameter (>= 0).'): 0.008,
   Param(parent='LogisticRegression_b8e126b2e705', name='maxIter', doc='max number of iterations (>= 0).'): 1000}),
 (0.8073168208187893,
  {Param(parent='LogisticRegression_b8e126b2e705', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 0.001,
   Param(parent='LogisticRegression_b8e126b2e705', name='threshold', doc='Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set,

In [38]:
# set seed for reproducibility
#(trainingData, testData) = processed.randomSplit((0.8,0.2),seed=42)
#print("Training Dataset Count: " + str(trainingData.count()))
#print("Test Dataset Count: " + str(testData.count()))

#lr = LogisticRegression(regParam=0.008, maxIter=10000, labelCol="polarity")
#lrModel = lr.fit(trainingData)

In [39]:
### Split Dataset and train
(trainingData, testData) = tweet_data.randomSplit((0.85,0.15),seed=42)

In [40]:
# setup the pipeline for the best parameter
best_model_js = LogisticRegression(labelCol= 'polarity',regParam=0.008,tol=0.001,threshold=0.4, maxIter=1000) #
# Create the pipeline
pipeline_js = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            countVectors,
            best_model_js
            ])
optModel_js = pipeline_js.fit(trainingData)

                                                                                

#### Evaluating model

In [41]:
predictions = optModel_js.transform(testData)
# show polarity and prediction columns.
predictions.select('polarity','prediction').show(10,truncate=50)

[Stage 242:>                                                        (0 + 1) / 1]

+--------+----------+
|polarity|prediction|
+--------+----------+
|       0|       0.0|
|       1|       1.0|
|       0|       1.0|
|       2|       1.0|
|       1|       1.0|
|       0|       2.0|
|       1|       1.0|
|       0|       0.0|
|       2|       2.0|
|       0|       0.0|
+--------+----------+
only showing top 10 rows



                                                                                

In [42]:
# Predictions on our - Test Dataset.
#predictions = pipelineFit.transform(test_tweet)
evaluator = MulticlassClassificationEvaluator(labelCol='polarity',predictionCol='prediction',metricName='f1')
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)



Accuracy: 0.8146702169458271


                                                                                

In [43]:
predictions.printSchema()

root
 |-- text: string (nullable = true)
 |-- polarity: integer (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- v

In [44]:
y_true = predictions.select('polarity')
y_true = y_true.toPandas()
y_pred = predictions.select('prediction')
y_pred = y_pred.toPandas()

                                                                                

In [45]:
# Classication Report
print(classification_report(y_true,y_pred))

              precision    recall  f1-score   support

           0       0.80      0.87      0.83      9032
           1       0.85      0.82      0.83     11262
           2       0.79      0.74      0.76      7001

    accuracy                           0.82     27295
   macro avg       0.81      0.81      0.81     27295
weighted avg       0.82      0.82      0.81     27295



In [22]:
# persisting the model
optModel_js.write().overwrite().save('pipeline_lr_js_model')

                                                                                

In [23]:
spark.stop()