In [None]:
# pip install textblob

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
import pandas as pd
import sklearn
from textblob import TextBlob

from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [0]:
spark = SparkSession \
        .builder \
        .appName('Twitter Sentiment Analysis') \
        .getOrCreate()
print('Session created')

sc = spark.sparkContext

Session created


## 1. Import data

In [0]:
Schema = StructType([
  StructField("id", StringType(), True),
  StructField("name", StringType(), True),
  StructField("screen_name", StringType(), True),
    StructField("text", StringType(), True),
    StructField("followers_count", StringType(), True),
    StructField("location", StringType(), True),
    StructField("geo", StringType(), True),
    StructField("created_at", StringType(), True)
])

In [0]:
tweets = spark.read.option('delimiter', '\t').schema(Schema).csv('/mnt/project_twitter/ClimateChange/2022/08/*/*/*')

In [0]:
tweets.count()

Out[5]: 69304

In [0]:
# cache the dataframe for faster iteration
tweets.cache() 

# run the count action to materialize the cache
tweets.count()

Out[38]: 69304

In [0]:
# tweets.unpersist()

In [0]:
display(tweets.take(5))

id,name,screen_name,text,followers_count,location,geo,created_at
1562194217530757120,Steve McEllistrem,SteveMcEllis,We don’t really need to make any sacrifices because of climate change. We can just leave all the suffering to our descendants.,113547,,,Tue Aug 23 21:45:00 +0000 2022
1562194217685893120,Solaris House Music,SolarisHouseM,"""Island in the Sky"" https://t.co/5l9ZVDo3e0 via @YouTube You want to see climate change... turn your eyes to our L… https://t.co/3l8SNq2QGc",62,,,Tue Aug 23 21:45:00 +0000 2022
1562194221968154626,Cletus Anderson,CletusAnderson6,"@GOP Look at the bill people before you think it's not going to help tackle climate change and help with inflation,… https://t.co/o7mMI1Wd0W",142,Upstate New York,,Tue Aug 23 21:45:01 +0000 2022
1562194223386234881,MicroMonde,MondeMicro,RT @TheRealKeean: These are part of the schematic drawings. They detail Trudeau's plan to build an armoury for his new Climate Enforcement…,1873,Canada,,Tue Aug 23 21:45:01 +0000 2022
1562194223398473733,Consersative Guy49,WalterMatthew46,RT @Felicity2024: Could any of you name a bigger scam than climate change?,1282,Somewhere In The USA...,,Tue Aug 23 21:45:01 +0000 2022


In [None]:
tweets_clean = tweets.withColumn('text', F.regexp_replace('text', r"http\S+", "")) \
                     .withColumn('text', F.regexp_replace('text', r"[^ 'a-zA-Z0-9,.?!@#a]", " ")) \
                     .withColumn('text', F.regexp_replace('text', r"\s+", " ")) \
                     .withColumn('text', F.lower('text')) \
                     .withColumn('text', F.trim('text'))
display(tweets_clean)

In [None]:
split_text = F.split(tweets_clean['text'], 'rt')
tweets_clean = tweets_clean.select('*', split_text.getItem(0).alias('tweet'),split_text.getItem(1).alias('retweet'))   
display(tweets_clean)

In [None]:
# spark split column: https://sparkbyexamples.com/pyspark/pyspark-split-dataframe-column-into-multiple-columns/

split_date = F.split(tweets_clean['created_at'], ' ')
tweets_clean = tweets_clean.select('*', 
                                   split_date.getItem(1).alias('month'),
                                   split_date.getItem(2).alias('day'),
                                   split_date.getItem(3).alias('time'),
                                   split_date.getItem(5).alias('year'))   
display(tweets_clean)

In [None]:
split_time = F.split(tweets_clean['time'], ':')
tweets_clean = tweets_clean.select('*', 
                                   split_time.getItem(0).alias('hour'),
                                   split_time.getItem(1).alias('minute'))   
display(tweets_clean)

## Sentiment Labelling

In [0]:
pandasDF = tweets_clean.toPandas()

In [0]:
pandasDF = pandasDF[pandasDF['text'].notna()]

In [0]:
pandasDF.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 69276 entries, 0 to 69303
Data columns (total 10 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   id               69276 non-null  object
 1   name             69273 non-null  object
 2   screen_name      69276 non-null  object
 3   text             69276 non-null  object
 4   followers_count  69142 non-null  object
 5   location         69142 non-null  object
 6   geo              69118 non-null  object
 7   created_at       69118 non-null  object
 8   tweet            69276 non-null  object
 9   retweet          54017 non-null  object
dtypes: object(10)
memory usage: 5.8+ MB


In [0]:
pandasDF['id'].nunique()

Out[11]: 69276

In [0]:
pandasDF.groupby('geo').nunique()

Unnamed: 0_level_0,id,name,screen_name,text,followers_count,location,created_at,tweet,retweet
geo,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,69116,49607,51782,23251,9844,15257,43362,16597,9058
"{'type': 'Point', 'coordinates': [40.0, -74.5]}",1,1,1,1,1,1,1,1,0
"{'type': 'Point', 'coordinates': [40.73, -74.08]}",1,1,1,1,1,1,1,1,0


In [0]:
def sentiment_analysis(tweet):
    def getSubjectivity(text):
        return TextBlob(text).sentiment.subjectivity
  
 #Create a function to get the polarity
    def getPolarity(text):
        return TextBlob(text).sentiment.polarity
  
 #Create two new columns ‘Subjectivity’ & ‘Polarity’
    tweet['subjectivity'] = tweet['text'].apply(getSubjectivity)
    tweet['polarity'] = tweet['text'].apply(getPolarity)
    def getAnalysis(score):
        if score < 0:
            return 'Negative'
        elif score == 0:
            return 'Neutral'
        else:
            return 'Positive'
    tweet['sentiment'] = tweet['polarity'].apply(getAnalysis )
    return tweet

In [0]:
sentiment_analysis(pandasDF)

Unnamed: 0,id,name,screen_name,text,followers_count,location,geo,created_at,tweet,retweet,month,day,time,year,hour,minute,subjectivity,polarity,sentiment
0,1562194217530757120,Steve McEllistrem,SteveMcEllis,we don t really need to make any sacrifices be...,113547,,,Tue Aug 23 21:45:00 +0000 2022,we don t really need to make any sacrifices be...,,Aug,23,21:45:00,2022,21,45,0.200000,0.200000,Positive
1,1562194217685893120,Solaris House Music,SolarisHouseM,island in the sky via @youtube you want to see...,62,,,Tue Aug 23 21:45:00 +0000 2022,island in the sky via @youtube you want to see...,,Aug,23,21:45:00,2022,21,45,0.000000,0.000000,Neutral
2,1562194221968154626,Cletus Anderson,CletusAnderson6,@gop look at the bill people before you think ...,142,Upstate New York,,Tue Aug 23 21:45:01 +0000 2022,@gop look at the bill people before you think ...,,Aug,23,21:45:01,2022,21,45,0.000000,0.000000,Neutral
3,1562194223386234881,MicroMonde,MondeMicro,rt @therealkeean these are part of the schemat...,1873,Canada,,Tue Aug 23 21:45:01 +0000 2022,,@therealkeean these are pa,Aug,23,21:45:01,2022,21,45,0.454545,0.136364,Positive
4,1562194223398473733,Consersative Guy49,WalterMatthew46,rt @felicity2024 could any of you name a bigge...,1282,Somewhere In The USA...,,Tue Aug 23 21:45:01 +0000 2022,,@felicity2024 could any of you name a bigger ...,Aug,23,21:45:01,2022,21,45,0.500000,0.000000,Neutral
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
69299,1562366262336000000,Shanar Tabrizi,Schwanar,rt @menrywy i'm happy to post my climate chang...,342,"Copenhagen, Denmark",,Wed Aug 24 09:08:38 +0000 2022,,@menrywy i'm happy to post my climate change ...,Aug,24,09:08:38,2022,09,08,0.507071,0.312121,Positive
69300,1562366264588308480,Cyril Payen,payenc,rt @afp #breaking macron warns france 'sacrifi...,6176,"Paris, France",,Wed Aug 24 09:08:39 +0000 2022,,@afp #breaking macron warns france 'sacrifice...,Aug,24,09:08:39,2022,09,08,0.427273,0.068182,Positive
69301,1562366274927296513,Paul Williams,PaulWil40857916,rt @pgdynes let s be clear no carbon capture b...,64,,,Wed Aug 24 09:08:41 +0000 2022,,@pgdynes let s be clear no carbon capture bus...,Aug,24,09:08:41,2022,09,08,0.383333,0.100000,Positive
69302,1562366307047297024,Freedom’s Ghost 🇬🇧,Grimysapien,rt @incrementallog1 @et sharing @michaelpbreto...,1162,Everywhere,,Wed Aug 24 09:08:49 +0000 2022,,@incrementallog1 @et sharing @michaelpbreton ...,Aug,24,09:08:49,2022,09,08,0.000000,0.000000,Neutral


In [0]:
labelled_tweets = spark.createDataFrame(pandasDF)

In [None]:
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")
encoder_model = label_encoder.fit(labelled_tweets)
labelled_tweets = encoder_model.transform(labelled_tweets)

display(labelled_tweets)

In [0]:
display(labelled_tweets.take(5))

id,name,screen_name,text,followers_count,location,geo,created_at,tweet,retweet,month,day,time,year,hour,minute,subjectivity,polarity,sentiment
1562194217530757120,Steve McEllistrem,SteveMcEllis,we don t really need to make any sacrifices because of climate change. we can just leave all the suffering to our descendants.,113547,,,Tue Aug 23 21:45:00 +0000 2022,we don t really need to make any sacrifices because of climate change. we can just leave all the suffering to our descendants.,,Aug,23,21:45:00,2022,21,45,0.2,0.2,Positive
1562194217685893120,Solaris House Music,SolarisHouseM,island in the sky via @youtube you want to see climate change... turn your eyes to our l,62,,,Tue Aug 23 21:45:00 +0000 2022,island in the sky via @youtube you want to see climate change... turn your eyes to our l,,Aug,23,21:45:00,2022,21,45,0.0,0.0,Neutral
1562194221968154626,Cletus Anderson,CletusAnderson6,"@gop look at the bill people before you think it's not going to help tackle climate change and help with inflation,",142,Upstate New York,,Tue Aug 23 21:45:01 +0000 2022,"@gop look at the bill people before you think it's not going to help tackle climate change and help with inflation,",,Aug,23,21:45:01,2022,21,45,0.0,0.0,Neutral
1562194223386234881,MicroMonde,MondeMicro,rt @therealkeean these are part of the schematic drawings. they detail trudeau's plan to build an armoury for his new climate enforcement,1873,Canada,,Tue Aug 23 21:45:01 +0000 2022,,@therealkeean these are pa,Aug,23,21:45:01,2022,21,45,0.4545454545454545,0.1363636363636363,Positive
1562194223398473733,Consersative Guy49,WalterMatthew46,rt @felicity2024 could any of you name a bigger scam than climate change?,1282,Somewhere In The USA...,,Tue Aug 23 21:45:01 +0000 2022,,@felicity2024 could any of you name a bigger scam than climate change?,Aug,23,21:45:01,2022,21,45,0.5,0.0,Neutral


In [0]:
labelled_tweets.write.option('header', False).option('delimiter', '\t').mode('overwrite').csv('/mnt/project/labelled_tweets')

In [None]:
display(labelled_tweets)

## 3. Feature Transformer: Tokenizer

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tweets_tokenized = tokenizer.transform(labelled_tweets)

display(tweets_tokenized)

## 3. Feature Transformer: Stopword Removal

In [None]:
#remove stopwords from the review(list of words)

stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
tweets_stopword = stopword_remover.transform(tweets_tokenized)

display(tweets_stopword)

## 4. Feature Transformer: CountVectorizer (TF - Term Frequency)

In [None]:
# vocabSize: number of unique words in the whole dataset;
# in output, length = vocabSize; indices: indices of the words; values: term frequency within the tweet
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
cv_model = cv.fit(tweets_stopword)
tweets_cv = cv_model.transform(tweets_stopword)

display(tweets_cv)

## 5. Feature Transformer: TF-IDF Vectorization


TF-IDF: Term Frequency - Inverse Document Frequency

In [None]:
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
idf_model = idf.fit(tweets_cv)
tweets_idf = idf_model.transform(tweets_cv)

display(tweets_idf)

In [0]:
# Use 90% cases for training, 10% cases for testing
# train, test = tweets_clean.randomSplit([0.9, 0.1], seed=20200819)
train, test = labelled_tweets.sampleBy("label", fraction = [0:0.42, 1: 0.37，2: 0.21], seed = 2022)
train_positive = train[train['sentiment'] == 'Positive']

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms

pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf])

pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

In [None]:
train = labelled_tweets.sampleBy("label", fractions = {0:0.8, 1: 0.8, 2: 0.8}, seed = 2022)
test = labelled_tweets.subtract(train)
train_positive = train[train['sentiment'] == 'Positive']

display(train_positive)

In [0]:
train.groupby('sentiment').count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
| Positive|23555|
|  Neutral|20396|
| Negative|11518|
+---------+-----+



In [0]:
test.groupby('sentiment').count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
| Positive| 5886|
|  Neutral| 5086|
| Negative| 2835|
+---------+-----+



In [0]:
total_counts = tweets_idf.select('features').rdd\
                    .map(lambda row: row['features'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

vocabList = cv_model.vocabulary
d = {'vocabList':vocabList,'counts':total_counts}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+-------------+------------------+
|    vocabList|            counts|
+-------------+------------------+
|           rt| 15663.33937652337|
|      climate|22398.070295254707|
|       change|25738.839080503945|
|@therealkeean|16753.516922000766|
|          new|16265.373069663437|
|       people|14432.421659922304|
|          amp|14691.495364352908|
|        plans|17885.133932930752|
|      trudeau|13444.527356677936|
|         many|11481.758576548906|
|      change?| 10826.95126003317|
|      change,|10722.910713361787|
|       police| 10636.80202249655|
|   government|10279.912831724461|
|      counter| 10091.76519041085|
|         leak| 9579.149591005958|
|    exclusive|  9577.03494558327|
|  @drelidavid|  9316.57554949759|
|    mortality| 8993.820106364019|
|            h|  8984.99041552474|
+-------------+------------------+
only showing top 20 rows



In [0]:
sentenceData = spark.createDataFrame([
    (0, "Python python Spark Spark"),
    (1, "Python SQL")],
 ["document", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer  = CountVectorizer(inputCol="words", outputCol="rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

model = pipeline.fit(sentenceData)

In [0]:
def wordCloud(model, data):
    total_counts = model.transform(data)\
                        .select('rawFeatures').rdd\
                        .map(lambda row: row['rawFeatures'].toArray())\
                        .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

    idf_score = model.transform(data)\
                        .select('features').rdd\
                        .map(lambda row: row['features'].toArray())\
                        .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

    vocabList = model.stages[1].vocabulary
    d = {'vocabList':vocabList,'counts':total_counts, 'idf': idf_score}

    return spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys()))

In [0]:
wordCloud(model, sentenceData).show()

+---------+------+------------------+
|vocabList|counts|               idf|
+---------+------+------------------+
|   python|   3.0|               0.0|
|    spark|   2.0|0.8109302162163288|
|      sql|   1.0|0.4054651081081644|
+---------+------+------------------+



In [0]:
total_counts = model.transform(sentenceData)\
                    .select('rawFeatures').rdd\
                    .map(lambda row: row['rawFeatures'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

idf_score = model.transform(sentenceData)\
                    .select('features').rdd\
                    .map(lambda row: row['features'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

vocabList = model.stages[1].vocabulary
d = {'vocabList':vocabList,'counts':total_counts, 'idf': idf_score}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+---------+------+------------------+
|vocabList|counts|               idf|
+---------+------+------------------+
|   python|   3.0|               0.0|
|    spark|   2.0|0.8109302162163288|
|      sql|   1.0|0.4054651081081644|
+---------+------+------------------+



In [None]:
total_counts = model.transform(sentenceData)\
                    .select('rawFeatures').rdd\
                    .map(lambda row: row['rawFeatures'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

vocabList = model.stages[1].vocabulary
d = {'vocabList':vocabList,'counts':total_counts}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

cv_model.vocabulary

## 6. Label Encoder

In [None]:
label_encoder = StringIndexer(inputCol = "sentiment", outputCol = "label") 
# NOTE: sentiment and labeles maybe different
le_model = label_encoder.fit(tweets_idf)
tweets_label = le_model.transform(tweets_idf)

display(tweets_label)

## 7. Model Training: Logistic Regression Classifier

In [None]:
lr = LogisticRegression(maxIter=100)

lr_model = lr.fit(tweets_label)

predictions = lr_model.transform(tweets_label) # when using transform, will give label and prob; if use predict, will only get labels

display(predictions)

In [0]:
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use 90% cases for training, 10% cases for testing
train, test = tweets_clean.randomSplit([0.9, 0.1], seed=20200819)
# Stratified tweets_clean.sampleBy("label", fraction = [0:0.9, 1: 0.1], seed = 2022)

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, lr])

pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))