In [1]:
import pandas as pd
import numpy as np
import json
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

# spark = sparknlp.start()

spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","6G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0")\
    .getOrCreate()

22/11/08 17:13:41 WARN Utils: Your hostname, zainab-ThinkPad-T560 resolves to a loopback address: 127.0.1.1; using 192.168.178.29 instead (on interface wlp4s0)
22/11/08 17:13:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/zainab/miniconda3/envs/sparknlp/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/zainab/.ivy2/cache
The jars for the packages stored in: /home/zainab/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cd08efa3-9240-401c-9f0e-3700a0b69f95;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;4.1.0 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.code.findbugs#annotations;3.0.1 in central
	found net.jcip#jcip-annotations;1.0 in local-m2-cache
	found com.google.code.findbugs#jsr305;3.0.1 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlom

In [98]:
df = spark.read.parquet('./tmp/news.parquet')
# df = spark.read.csv("oldnews.csv",header=True,inferSchema=True)
df.show()

+--------------------+-------+
|               title|subject|
+--------------------+-------+
|Delay in registra...|  crime|
|IHCRA fixes Rs1, ...| health|
|Pakistan reopens ...|  other|
|NA body anxious o...|  other|
|Timely, generous ...|  other|
|Inspiring STEM Ca...|  other|
|Putin says pipeli...|  other|
|All six babies bo...|  scary|
|Hadiqa Kiani to r...|  other|
|Pak squad leaves ...|  other|
|Terrorism is terr...|  crime|
|Will you be able ...|  other|
|Will you be able ...|  other|
|Bye-polls on 12 s...|  other|
|No electricity bi...|  other|
|LHC issues notice...|  other|
|Govt ‘using relig...|  other|
|Alvi stresses ‘gi...|  other|
|US ignores Indian...|  other|
|Taliban deny Jais...|  scary|
+--------------------+-------+
only showing top 20 rows



In [99]:
df.select('title','subject').show()
df.show(5)

+--------------------+-------+
|               title|subject|
+--------------------+-------+
|Delay in registra...|  crime|
|IHCRA fixes Rs1, ...| health|
|Pakistan reopens ...|  other|
|NA body anxious o...|  other|
|Timely, generous ...|  other|
|Inspiring STEM Ca...|  other|
|Putin says pipeli...|  other|
|All six babies bo...|  scary|
|Hadiqa Kiani to r...|  other|
|Pak squad leaves ...|  other|
|Terrorism is terr...|  crime|
|Will you be able ...|  other|
|Will you be able ...|  other|
|Bye-polls on 12 s...|  other|
|No electricity bi...|  other|
|LHC issues notice...|  other|
|Govt ‘using relig...|  other|
|Alvi stresses ‘gi...|  other|
|US ignores Indian...|  other|
|Taliban deny Jais...|  scary|
+--------------------+-------+
only showing top 20 rows

+--------------------+-------+
|               title|subject|
+--------------------+-------+
|Delay in registra...|  crime|
|IHCRA fixes Rs1, ...| health|
|Pakistan reopens ...|  other|
|NA body anxious o...|  other|
|Timely, gene

In [100]:
# Value Counts
df.groupBy('subject').count().show()

+-------+-----+
|subject|count|
+-------+-----+
|  scary|    3|
|  crime|  248|
|   null| 4175|
|  other|  931|
| health|   23|
+-------+-----+



In [101]:
# Value Counts via pandas
df.toPandas()['subject'].value_counts()

other     931
crime     248
health     23
scary       3
Name: subject, dtype: int64

In [102]:
# Check For Missing Values
df.toPandas()['subject'].isnull().sum()

4175

In [103]:
# Drop Missing Values
df = df.dropna(subset=('subject'))

In [104]:
import pyspark.ml.feature
# Load Our Transformer & Extractor Pkgs
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF
from pyspark.ml.feature import StringIndexer

In [105]:
# Stages For the Pipeline
tokenizer = Tokenizer(inputCol='title',outputCol='mytokens')
stopwords_remover = StopWordsRemover(inputCol='mytokens',outputCol='filtered_tokens')
vectorizer = CountVectorizer(inputCol='filtered_tokens',outputCol='rawFeatures')
idf = IDF(inputCol='rawFeatures',outputCol='vectorizedFeatures')

In [106]:
# LabelEncoding/LabelIndexing
labelEncoder = StringIndexer(inputCol='subject',outputCol='label').fit(df)

In [107]:
labelEncoder.transform(df).show(5)

+--------------------+-------+-----+
|               title|subject|label|
+--------------------+-------+-----+
|Delay in registra...|  crime|  1.0|
|IHCRA fixes Rs1, ...| health|  2.0|
|Pakistan reopens ...|  other|  0.0|
|NA body anxious o...|  other|  0.0|
|Timely, generous ...|  other|  0.0|
+--------------------+-------+-----+
only showing top 5 rows



In [108]:
# Dict of Labels
label_dict = {'other':0.0,
 'crime':1.0,
 'health':2.0,
 'scary':3.0}

In [109]:
df = labelEncoder.transform(df)

In [110]:
df.show(5)

+--------------------+-------+-----+
|               title|subject|label|
+--------------------+-------+-----+
|Delay in registra...|  crime|  1.0|
|IHCRA fixes Rs1, ...| health|  2.0|
|Pakistan reopens ...|  other|  0.0|
|NA body anxious o...|  other|  0.0|
|Timely, generous ...|  other|  0.0|
+--------------------+-------+-----+
only showing top 5 rows



In [111]:
### Split Dataset
(trainDF,testDF) = df.randomSplit((0.7,0.3),seed=42)

In [112]:
### Estimator
from pyspark.ml.classification import LogisticRegression

In [113]:
lr = LogisticRegression(featuresCol='vectorizedFeatures',labelCol='label')

In [114]:
from pyspark.ml import Pipeline

In [115]:
pipeline = Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,lr])

In [116]:
# Building MOdel
lr_model = pipeline.fit(trainDF)

In [193]:
lr_model.save("./trained_pipelines/classifier_pipeline")
# loadedPipeline = PipelineModel.load("./sentimentdl_pipeline")
# loadedPipeline.transform(YOUR_DATAFRAME)

                                                                                

In [117]:
# Predictions on our Test Dataset
predictions = lr_model.transform(testDF)

In [118]:
predictions.show()

+--------------------+-------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|               title|subject|label|            mytokens|     filtered_tokens|         rawFeatures|  vectorizedFeatures|       rawPrediction|         probability|prediction|
+--------------------+-------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|$3 billion on the...|  other|  0.0|[$3, billion, on,...|  [$3, billion, way]|(2853,[348,2335],...|(2853,[348,2335],...|[13.7350531307451...|[0.99999995935182...|       0.0|
|10 killed, two in...|  crime|  1.0|[10, killed,, two...|[10, killed,, two...|(2853,[11,34,69,2...|(2853,[11,34,69,2...|[-13.196458622153...|[3.95848358977358...|       1.0|
|13 people die in ...|  crime|  1.0|[13, people, die,...|[13, people, die,...|(2853,[13,14,16,7...|(2853,[13,14,16,7...|[12.015551

In [119]:
# Select Columns
predictions.columns

['title',
 'subject',
 'label',
 'mytokens',
 'filtered_tokens',
 'rawFeatures',
 'vectorizedFeatures',
 'rawPrediction',
 'probability',
 'prediction']

In [120]:
predictions.select('rawPrediction','probability','subject','label','prediction').show(10)

+--------------------+--------------------+-------+-----+----------+
|       rawPrediction|         probability|subject|label|prediction|
+--------------------+--------------------+-------+-----+----------+
|[13.7350531307451...|[0.99999995935182...|  other|  0.0|       0.0|
|[-13.196458622153...|[3.95848358977358...|  crime|  1.0|       1.0|
|[12.0155517729986...|[0.99999951612974...|  crime|  1.0|       0.0|
|[14.0373767044725...|[0.99999996875660...|  other|  0.0|       0.0|
|[-11.282700149276...|[5.60690821966456...|  crime|  1.0|       1.0|
|[-14.091361953763...|[3.55489500490563...|  crime|  1.0|       1.0|
|[-17.633725493880...|[3.10264921021428...|  crime|  1.0|       1.0|
|[-7.8119036039660...|[3.67069971798460...|  other|  0.0|       1.0|
|[-1.1116842890233...|[6.67181321974157...|  crime|  1.0|       1.0|
|[14.5499833379537...|[0.99999998420740...|  other|  0.0|       0.0|
+--------------------+--------------------+-------+-----+----------+
only showing top 10 rows



In [122]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
accuracy

0.8789808917197452

In [123]:
from pyspark.sql.types import StringType

In [188]:
ex1 = spark.createDataFrame([
    ("Three people died in Motorway collision near Attock",StringType())
],
# Column Name
["title"]

)

ex1.show(truncate=False)

+---------------------------------------------------+---+
|title                                              |_2 |
+---------------------------------------------------+---+
|Three people died in Motorway collision near Attock|{} |
+---------------------------------------------------+---+



In [189]:
pred_ex1 = lr_model.transform(ex1)

In [190]:
pred_ex1.show()

+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|               title| _2|            mytokens|     filtered_tokens|         rawFeatures|  vectorizedFeatures|       rawPrediction|         probability|prediction|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Three people died...| {}|[three, people, d...|[three, people, d...|(2853,[16,38,317,...|(2853,[16,38,317,...|[-3.9249638210230...|[2.29244688501167...|       1.0|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+



In [191]:
pred_ex1.select('title','rawPrediction','probability','prediction').show()

+--------------------+--------------------+--------------------+----------+
|               title|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|Three people died...|[-3.9249638210230...|[2.29244688501167...|       1.0|
+--------------------+--------------------+--------------------+----------+



In [192]:
label_dict

{'other': 0.0, 'crime': 1.0, 'health': 2.0, 'scary': 3.0}

# Naive Bayes

In [129]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1, featuresCol='vectorizedFeatures',labelCol='label')
pipeline2 = Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,nb])
modelNB = pipeline2.fit(trainDF)
predictionsNB = modelNB.transform(testDF)


In [130]:
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')
accuracy = evaluator.evaluate(predictionsNB)
accuracy

0.8248407643312102

In [131]:
pred_ex12 = modelNB.transform(ex1)
pred_ex12.select('title','rawPrediction','probability','prediction').show()

+--------------------+--------------------+--------------------+----------+
|               title|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|Husband murdered ...|[-197.11185115063...|[2.12448689874153...|       1.0|
+--------------------+--------------------+--------------------+----------+

