<h1 align='center'> Twitter Sentiment Analysis <font color='#559E54'> </font> </h1>

## 1. Load Data

In [0]:
# Set AWS credentials
import aws_config
ACCESS_KEY = aws_config.access_key
SECRET_ACCESS_KEY = aws_config.secret_access_key
bucket_name = "tweet2022"
mount_folder = "bc"

In [0]:
%sh
/databricks/python3/bin/pip install spacy 
/databricks/python3/bin/python3 -m spacy download en_core_web_sm

Collecting spacy
  Downloading spacy-3.4.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.6 MB)
Collecting wasabi<1.1.0,>=0.9.1
  Downloading wasabi-0.10.1-py3-none-any.whl (26 kB)
Collecting spacy-loggers<2.0.0,>=1.0.0
  Downloading spacy_loggers-1.0.3-py3-none-any.whl (9.3 kB)
Collecting pydantic!=1.8,!=1.8.1,<1.11.0,>=1.7.4
  Downloading pydantic-1.10.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.6 MB)
Collecting tqdm<5.0.0,>=4.38.0
  Downloading tqdm-4.64.1-py2.py3-none-any.whl (78 kB)
Collecting thinc<8.2.0,>=8.1.0
  Downloading thinc-8.1.5-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (819 kB)
Collecting srsly<3.0.0,>=2.4.3
  Downloading srsly-2.4.5-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (492 kB)
Collecting preshed<3.1.0,>=3.0.2
  Downloading preshed-3.0.8-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (130 kB)
Collecting pathy>=0.3.5
  Downloading pathy-0.10.0-py3-none-any

In [0]:
!pip install locationtagger
!pip install vaderSentiment

Collecting locationtagger
  Downloading locationtagger-0.0.1-py3-none-any.whl (1.6 MB)
[?25l[K     |▏                               | 10 kB 11.1 MB/s eta 0:00:01[K     |▍                               | 20 kB 9.1 MB/s eta 0:00:01[K     |▋                               | 30 kB 11.7 MB/s eta 0:00:01[K     |▉                               | 40 kB 2.9 MB/s eta 0:00:01[K     |█                               | 51 kB 3.5 MB/s eta 0:00:01[K     |█▎                              | 61 kB 4.0 MB/s eta 0:00:01[K     |█▍                              | 71 kB 4.5 MB/s eta 0:00:01[K     |█▋                              | 81 kB 5.0 MB/s eta 0:00:01[K     |█▉                              | 92 kB 5.4 MB/s eta 0:00:01[K     |██                              | 102 kB 4.5 MB/s eta 0:00:01[K     |██▎                             | 112 kB 4.5 MB/s eta 0:00:01[K     |██▌                             | 122 kB 4.5 MB/s eta 0:00:01[K     |██▋                             | 133 kB 4.5 MB/s 

In [0]:
import nltk
import spacy

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, udf, to_date, to_utc_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, TimestampType, FloatType

# Processing Time Data
from datetime import datetime
import pytz

# Processing Location Data 
import locationtagger

# Text Processing 
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

In [0]:
spark = SparkSession \
        .builder \
        .appName('Sentiment Analysis') \
        .getOrCreate()
print('Session created')

sc = spark.sparkContext

Session created


In [0]:
# Function for mounting Amazon S3 data 
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  ACCESS_KEY_ID = access_key
  SECRET_ACCESS_KEY = secret_key
  ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

  print ("Mounting", bucket_name)

  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount("/mnt/%s" % mount_folder)
    
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mount_folder)
    
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
    #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
    print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")
    
# Mount buckets
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, bucket_name, mount_folder)

Mounting tweet2022
/mnt/bc has been unmounted.
The bucket tweet2022 was mounted to bc 



In [0]:
%fs ls /mnt/bc

path,name,size,modificationTime
dbfs:/mnt/bc/2022/,2022/,0,0


In [0]:
dataSchema = StructType([
    StructField("tweetid", LongType(), True),
    StructField("user_name", StringType(), True),
    StructField("screen_name", StringType(), True),
    StructField("tweet", StringType(), True),
    StructField("followers", IntegerType(), True),
    StructField("location", StringType(), True),
    StructField("geo", StringType(), True),
    StructField("created", StringType(), True)]
)

In [0]:
# Load data stored in 2022/11/26
tweets = (spark.read
       .option("header", "false")
       .option("delimiter", "\t")
       .schema(dataSchema)
       .csv("/mnt/bc/2022/11/26/*")
      )

In [0]:
print(tweets.count())
tweets.show(5)

429661
+-------------------+--------------------+---------------+--------------------+---------+-------------------+----+--------------------+
|            tweetid|           user_name|    screen_name|               tweet|followers|           location| geo|             created|
+-------------------+--------------------+---------------+--------------------+---------+-------------------+----+--------------------+
|1596521964608581632|       Gary Borrelli|GaryBorrelli320|RT @RealJamesWood...|       76|               None|None|Sat Nov 26 15:11:...|
|1596521965157851139|         Ironman0063|    ironman0063|@MikeSington @jon...|      173|               None|None|Sat Nov 26 15:11:...|
|1596521965400956929|                YUGE|      YUGESKOOK|@beinlibertarian ...|       13|               None|None|Sat Nov 26 15:11:...|
|1596521965854285827|AmericanUltraNucl...|  ClintonGilley|@DiedSuddenly_ Mo...|       75|      Planet Earth |None|Sat Nov 26 15:11:...|
|1596521966043033600|        Roxan Wetzel

## 2. Data Cleaning
- remove duplicates, convert time

In [0]:
# Drop column 'user_name' as it is redundant 
tweets = tweets.drop("user_name")

# Drop duplicates 
tweets = tweets.dropDuplicates()

# drop if the value in 'tweet' column is na (empty)
tweets = tweets.na.drop(subset=["tweet"])

# cache the dataframe for faster iteration
tweets.cache() 

Out[11]: DataFrame[tweetid: bigint, screen_name: string, tweet: string, followers: int, location: string, geo: string, created: string]

In [0]:
def getDate(x):
  # convert string date to datetime format 
    if x is not None:
        return str(datetime.strptime(x,'%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %H:%M:%S"))
    else:
        return None

date_udf = udf(getDate, StringType())
tweets = tweets.withColumn("created", to_utc_timestamp(date_udf("created"),"UTC")) 

## 3. Text Cleaning

`pyspark.sql.functions.regexp_replace` is used to process the text

1. Remove URLs such as `http://cnn.com`
2. Remove special characters
3. Substituting multiple spaces with single space
4. Lowercase all text
5. Trim the leading/trailing whitespaces

In [0]:
tweets_clean = tweets.withColumn('tweet', F.regexp_replace('tweet', r"[@#&][A-Za-z0-9_-]+", " ")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"\w+:\/\/\S+", " ")) \
                   .withColumn('tweet', F.regexp_replace('tweet', r"[^a-zA-z]", " ")) \
                   .withColumn('tweet', F.regexp_replace('tweet', r"\s+", " ")) \
                   .withColumn('tweet', F.lower('tweet')) \
                   .withColumn('tweet', F.trim('tweet'))

In [0]:
# Clean location 
# delete rows that contain "#" in location 
tweets_clean = tweets_clean.filter(~tweets_clean.location.contains('#'))

# Drop 'geo' since >95% rows do not have this value 
tweets_clean = tweets_clean.drop("geo") 
tweets = tweets.na.drop(subset=["location"])

# Extract locations with locationtagger 
def country_extract(string):
  place_entity = locationtagger.find_locations(text = string)
  if len(place_entity.countries) > 0:
    return place_entity.countries[0]
  else:
    return None

country_extract_udf = udf(lambda c: country_extract(c), StringType())
city_extract_udf = udf(lambda c: city_extract(c), StringType())
tweets_geo = tweets_clean.withColumn("country",country_extract_udf(col("location")))

In [0]:
nltk.downloader.download('maxent_ne_chunker')
nltk.downloader.download('words')
nltk.downloader.download('treebank')
nltk.downloader.download('maxent_treebank_pos_tagger')
nltk.downloader.download('punkt')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package maxent_ne_chunker to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping chunkers/maxent_ne_chunker.zip.
[nltk_data] Downloading package words to /root/nltk_data...
[nltk_data]   Unzipping corpora/words.zip.
[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Unzipping corpora/treebank.zip.
[nltk_data] Downloading package maxent_treebank_pos_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/maxent_treebank_pos_tagger.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
Out[16]: True

In [0]:
#Remove stopwords 
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
sjoin = udf(lambda s: ' '.join(s), StringType())

tweets_tokenized = tokenizer.transform(tweets_geo)
tweets_stopword = stopword_remover.transform(tweets_tokenized)
tweets = tweets_stopword.withColumn('tweet_cleaned', sjoin(tweets_stopword.filtered))
display(tweets.limit(5))

tweetid,screen_name,tweet,followers,location,created,country,tokens,filtered,tweet_cleaned
1596522037048401921,svguruuu,having covid sucks mannn my throat itches so much i wish nanami could coat it with his cum to soothe the itchiness,2502,20 | eng•ph•th | ic : nejmai2,2022-11-26T15:11:30.000+0000,,"List(having, covid, sucks, mannn, my, throat, itches, so, much, i, wish, nanami, could, coat, it, with, his, cum, to, soothe, the, itchiness)","List(covid, sucks, mannn, throat, itches, much, wish, nanami, coat, cum, soothe, itchiness)",covid sucks mannn throat itches much wish nanami coat cum soothe itchiness
1596522061832540161,Hanecdote,rt the fact we aren t even trying to make medical spaces safer we aren t even trying to protect people at highest risk wh,5209,North London,2022-11-26T15:11:36.000+0000,,"List(rt, the, fact, we, aren, t, even, trying, to, make, medical, spaces, safer, we, aren, t, even, trying, to, protect, people, at, highest, risk, wh)","List(rt, fact, aren, even, trying, make, medical, spaces, safer, aren, even, trying, protect, people, highest, risk, wh)",rt fact aren even trying make medical spaces safer aren even trying protect people highest risk wh
1596522064672063494,TheLawThunder,rt vaccine serial number search,2372,"Huntsville, AL",2022-11-26T15:11:36.000+0000,,"List(rt, vaccine, serial, number, search)","List(rt, vaccine, serial, number, search)",rt vaccine serial number search
1596522257526194176,NewbieKrypto,adolph is on the mend recovering from a short bout with covid in argentina he sends his regrets for,108,"Chicago, IL",2022-11-26T15:12:22.000+0000,,"List(adolph, is, on, the, mend, recovering, from, a, short, bout, with, covid, in, argentina, he, sends, his, regrets, for)","List(adolph, mend, recovering, short, bout, covid, argentina, sends, regrets)",adolph mend recovering short bout covid argentina sends regrets
1596522409586294786,KSchmidt1969,rt the death last year of a year old irish teen weeks after he received pfizer s covid vaccine has sparked a consid,39,,2022-11-26T15:12:59.000+0000,,"List(rt, the, death, last, year, of, a, year, old, irish, teen, weeks, after, he, received, pfizer, s, covid, vaccine, has, sparked, a, consid)","List(rt, death, last, year, year, old, irish, teen, weeks, received, pfizer, covid, vaccine, sparked, consid)",rt death last year year old irish teen weeks received pfizer covid vaccine sparked consid


## 4. Sentiment Analysis

In [0]:
def getSentimentScore(tweetText):
    analyzer = SentimentIntensityAnalyzer()
    vs = analyzer.polarity_scores(tweetText)
    return float(vs['compound'])

getSentimentScore_udf = udf(getSentimentScore, FloatType())
tweets_sentiment = tweets.withColumn('tweet_sentiment_score', getSentimentScore_udf(col('tweet_cleaned')))

In [0]:
display(tweets_sentiment.limit(5))

## 5. Data Export

In [0]:
tweets_for_dash = tweets_sentiment.select('tweetid', 'screen_name', 'tweet_cleaned', 'followers', 'created', 'country', 'tweet_sentiment_score')
tweets_for_dash = tweets_for_dash.na.drop()
tweets_for_dash.printSchema()

root
 |-- tweetid: long (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- tweet_cleaned: string (nullable = true)
 |-- followers: integer (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- country: string (nullable = true)
 |-- tweet_sentiment_score: float (nullable = true)



In [0]:
# Save the file  
# tweets_for_dash.write.option("delimiter", "\t").option("header", "false").mode("overwrite").csv('/mnt/bc/out')
test = tweets_for_dash.limit(5000)
test.write.option("delimiter", "\t").option("header", "false").mode("overwrite").csv('/mnt/bc/out2')