In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext

# Explore dataset

In [2]:
# Read training data from HDFS
train = spark.read.csv('hdfs://namenode:9000//user/root/data/twitter_training.csv', header=False, inferSchema=True)
train = train.toDF('id', 'entity', 'sentiment', 'text')

In [3]:
train.show(5)

+----+-----------+---------+--------------------+
|  id|     entity|sentiment|                text|
+----+-----------+---------+--------------------+
|2401|Borderlands| Positive|im getting on bor...|
|2401|Borderlands| Positive|I am coming to th...|
|2401|Borderlands| Positive|im getting on bor...|
|2401|Borderlands| Positive|im coming on bord...|
|2401|Borderlands| Positive|im getting on bor...|
+----+-----------+---------+--------------------+
only showing top 5 rows



In [4]:
train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- entity: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- text: string (nullable = true)



In [5]:
from pyspark.sql.functions import col, sum

# Check nulls in each column
null_counts = train.select([sum(col(c).isNull().cast('int')).alias(c) for c in train.columns])
null_counts.show()

+---+------+---------+----+
| id|entity|sentiment|text|
+---+------+---------+----+
|  0|     0|        0| 686|
+---+------+---------+----+



In [6]:
train = train.na.drop(subset=['text'])

In [7]:
train.groupBy('entity').count().show()

+--------------------+-----+
|              entity|count|
+--------------------+-----+
|       Cyberpunk2077| 2262|
|         Borderlands| 2280|
|       Xbox(Xseries)| 2283|
|   PlayStation5(PS5)| 2291|
|                FIFA| 2324|
|           Overwatch| 2316|
|             Verizon| 2365|
|        WorldOfCraft| 2357|
|      AssassinsCreed| 2234|
|PlayerUnknownsBat...| 2234|
|               CS-GO| 2284|
|         Battlefield| 2316|
| GrandTheftAuto(GTA)| 2293|
|           HomeDepot| 2292|
|               NBA2K| 2343|
|              Google| 2274|
|               Dota2| 2359|
|RedDeadRedemption...| 2249|
|CallOfDutyBlackop...| 2343|
|     LeagueOfLegends| 2377|
+--------------------+-----+
only showing top 20 rows



In [8]:
train.groupBy('sentiment').count().show()

+----------+-----+
| sentiment|count|
+----------+-----+
|Irrelevant|12875|
|  Positive|20655|
|   Neutral|18108|
|  Negative|22358|
+----------+-----+



# Create pipeline

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, StringIndexer

In [10]:
# Configure an ML pipeline
indexer = StringIndexer(inputCol='sentiment', outputCol="label")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="meanWords")
hashingTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
lr = LogisticRegression(labelCol="label", featuresCol="features",
                        maxIter=10, regParam=0.01)

pipeline = Pipeline(stages=[indexer, tokenizer, swr, hashingTF, lr])

In [11]:
# Train the model
model = pipeline.fit(train)

In [12]:
trainingSummary = model.stages[-1].summary

In [13]:
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

objectiveHistory:
1.3665632586088805
0.6021721834184313
0.41003009531871454
0.2914972083155612
0.24115410758471267
0.239130120070332
0.23577946649623435
0.2156194228609918
0.21480625311407978
0.213760456761295
0.2084295087072241


In [14]:
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Accuracy: 0.9536326287907454
FPR: 0.018070421400974597
TPR: 0.9536326287907455
F-measure: 0.9538545495344515
Precision: 0.9555558822310862
Recall: 0.9536326287907455


# Make predictions

In [15]:
# Read test data from HDFS
test = spark.read.csv('hdfs://namenode:9000//user/root/data/twitter_validation.csv', header=False, inferSchema=True)
test = test.toDF('id', 'entity', 'sentiment', 'text')

test.show(5)

+----+---------+----------+--------------------+
|  id|   entity| sentiment|                text|
+----+---------+----------+--------------------+
|3364| Facebook|Irrelevant|I mentioned on Fa...|
| 352|   Amazon|   Neutral|BBC News - Amazon...|
|8312|Microsoft|  Negative|@Microsoft Why do...|
|4371|    CS-GO|  Negative|CSGO matchmaking ...|
|4433|   Google|   Neutral|Now the President...|
+----+---------+----------+--------------------+
only showing top 5 rows



In [16]:
# Check nulls in each column
null_counts = test.select([sum(col(c).isNull().cast('int')).alias(c) for c in test.columns])
null_counts.show()

+---+------+---------+----+
| id|entity|sentiment|text|
+---+------+---------+----+
|  0|   467|      495| 510|
+---+------+---------+----+



In [17]:
test = test.na.drop()

In [18]:
test.groupBy('sentiment').count().show()

+----------+-----+
| sentiment|count|
+----------+-----+
|Irrelevant|  172|
|   Neutral|  285|
|  Positive|  277|
|  Negative|  266|
+----------+-----+



In [19]:
prediction = model.transform(test)

In [20]:
final_prediction = prediction.select("prediction", "label")
final_prediction.show(n=10)

+----------+-----+
|prediction|label|
+----------+-----+
|       3.0|  3.0|
|       2.0|  2.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       2.0|  2.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
+----------+-----+
only showing top 10 rows



In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()

evaluator.evaluate(prediction)

0.906319890000543

In [22]:
accuracy = evaluator.evaluate(prediction, {evaluator.metricName: "accuracy"})
falsePositiveRate = evaluator.evaluate(prediction, {evaluator.metricName: "weightedFalsePositiveRate"})
truePositiveRate = evaluator.evaluate(prediction, {evaluator.metricName: "weightedTruePositiveRate"})
fMeasure = evaluator.evaluate(prediction, {evaluator.metricName: "weightedFMeasure"})
precision = evaluator.evaluate(prediction, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(prediction, {evaluator.metricName: "weightedRecall"})
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Accuracy: 0.906
FPR: 0.03455194950148892
TPR: 0.906
F-measure: 0.906319890000543
Precision: 0.9106597786107368
Recall: 0.906


# Save the model in HDFS

In [24]:
model.write().overwrite().save("hdfs://namenode:9000//user/root/model/twitter_lg")