In [None]:
import pandas as pd
import numpy as np
import pyspark 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NLP-classification").getOrCreate()
val path = "/home/giangnt/Downloads/NLP_DL/nlp-labs/data/sentiments.csv"
df = spark.read.csv(path, header=True)

df.printSchema()

root
 |-- text: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [None]:
df.show(5)

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
|Kickers on my wat...|        1|
|user: AAP MOVIE. ...|        1|
|user I'd be afrai...|        1|
|     MNTA Over 12.00|        1|
|      OI  Over 21.37|        1|
+--------------------+---------+
only showing top 5 rows


In [None]:
df.groupBy("sentiment").count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
|       -1| 2106|
|     NULL|    1|
|        1| 3685|
+---------+-----+



In [None]:
df = df.dropna()
df.groupBy("sentiment").count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
|       -1| 2106|
|        1| 3685|
+---------+-----+



In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, RegexTokenizer

# set string indexer
string_indexer = StringIndexer(inputCol="sentiment", outputCol="label")
# df = string_indexer.fit(df).transform(df)

In [None]:
df.printSchema()

root
 |-- text: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [None]:
# tokenize
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="[\s,.;:!?`'\"(){\}_-]+")
# df = tokenizer.transform(df)
# df.select("text", "words").show(5, truncate=False)

In [None]:
vectorizer = CountVectorizer(inputCol="words", outputCol="vector")
# df = vectorizer.fit(df).transform(df)
# df.select("words", "vector").show(5, truncate=False)

In [None]:
from pyspark.ml.classification import LogisticRegression
classifier = LogisticRegression(featuresCol="vector", labelCol="label")

In [None]:
# pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[string_indexer, tokenizer, vectorizer, classifier])
model = pipeline.fit(df)
output = model.transform(df)

In [None]:
output.show(5)

+--------------------+---------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|                text|sentiment|label|               words|              vector|       rawPrediction|         probability|prediction|
+--------------------+---------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|Kickers on my wat...|        1|  0.0|[kickers, on, my,...|(10580,[3,41,44,4...|[20.4240630582554...|[0.99999999865121...|       0.0|
|user: AAP MOVIE. ...|        1|  0.0|[user, aap, movie...|(10580,[0,6,7,12,...|[27.9766446916048...|[0.99999999999929...|       0.0|
|user I'd be afrai...|        1|  0.0|[user, i, d, be, ...|(10580,[1,2,4,8,1...|[21.4963954038287...|[0.99999999953843...|       0.0|
|     MNTA Over 12.00|        1|  0.0|[mnta, over, 12, 00]|(10580,[22,121,14...|[37.5158690792516...|           [1.0,0.0]|       0.0|
|      OI  Over 21.37|        1|  0.0|  [oi, over, 21, 37]|(10

In [None]:
output.head()

Row(text='Kickers on my watchlist XIDE TIT SOQ PNK CPW BPZ AJ  trade method 1 or method 2, see prev posts', sentiment='1', label=0.0, words=['kickers', 'on', 'my', 'watchlist', 'xide', 'tit', 'soq', 'pnk', 'cpw', 'bpz', 'aj', 'trade', 'method', '1', 'or', 'method', '2', 'see', 'prev', 'posts'], vector=SparseVector(10580, {3: 1.0, 41: 1.0, 44: 1.0, 46: 1.0, 77: 1.0, 85: 1.0, 86: 1.0, 445: 1.0, 1536: 1.0, 2527: 1.0, 3158: 1.0, 3740: 1.0, 3999: 1.0, 4574: 2.0, 4787: 1.0, 4900: 1.0, 5652: 1.0, 7445: 1.0, 8432: 1.0}), rawPrediction=DenseVector([20.4241, -20.4241]), probability=DenseVector([1.0, 0.0]), prediction=0.0)

In [None]:
output.select("label", "prediction").show(10)

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 10 rows


In [None]:
import pyspark.ml.evaluation 
evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
# accuracy = evaluator.evaluate(output)
# print("Accuracy =", accuracy)


In [None]:
# split
train, test = df.randomSplit([0.8, 0.2], seed=1)
print("Train count:", train.count())
print("Test count:", test.count())

Train count: 4607
Test count: 1184


In [None]:
model = pipeline.fit(train)
test_output = model.transform(test)

In [None]:
evaluator.evaluate(test_output)

0.7323536924348284

In [None]:
from pyspark.ml.feature import Word2Vec
w2vec = Word2Vec(inputCol="words", outputCol="vector_w2v", vectorSize=100)
from datasets import load_dataset

ds = load_dataset("zeroshot/twitter-financial-news-sentiment")
new_pipeline = Pipeline(stages=[string_indexer, tokenizer, w2vec, classifier])


In [None]:
new_model = new_pipeline.fit(train)
new_test_output = new_model.transform(test)
evaluator.evaluate(new_test_output)

In [None]:
from datasets import load_dataset

ds = load_dataset("zeroshot/twitter-financial-news-sentiment")