In [None]:
from tweepy import Client
import tweepy
import pandas as pd
import re
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, IntegerType, TimestampType, StringType, DoubleType, LongType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from textblob import TextBlob
from wordcloud import WordCloud
import matplotlib.pyplot as plt

### Get Twitter API data, using search by keyword, and put into PySpark DataFrame

In [None]:
bearer_token = 'AAAAAAAAAAAAAAAAAAAAADC8gAEAAAAAOK4Ejw4HBJ0oJ3WgJnVCMxk2diI%3DjuAuhQzeP3qUoT3LJa1V8fU5eKz1xkiApHLKSPSM3ByqWbJqvu'

In [None]:
#getting recent tweets data
def getTweets():
    #asking for the search term and the desired number of tweets
    keyword = 'Lexus'
    query = f'{keyword} -is:retweet lang:en'
    #Twitter API will cap after a certain number, you may not get num_tweets results
    num_tweets = 100000
    #connecting to the twitter API using clent and the bearer_token credentials from config.py
    client = Client('{}'.format(bearer_token))

    #using tweepy paginator to get over 100 last tweets from twitter api
    tweets = []
    for tweet in tweepy.Paginator(client.search_recent_tweets,
                                    query = query,                             
                                    tweet_fields = ['id','created_at', 'public_metrics', 'text', 'source', 'geo'],
                                    max_results = 100).flatten(limit=num_tweets):
    
        tweets.append(tweet)
    return tweets


#function to clean the tweets and load them into a DataFrame
def tweetsETL(tweets):
    
    result = []
    #regex function to clean the tweet text from haashtags, mentions and links
    def cleanTweets(text):
        clean_text = ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)"," ",text).split())
        return clean_text
    
    #function to unpack the tweets list into a dataframe
    for tweet in tweets:
            result.append({'id': tweet.id,
                           'text': tweet.text,
                           'clean_tweet' : cleanTweets(tweet.text),
                           'created_at': tweet.created_at,
                           'source':tweet.source,
                           'lat':tweet.geo.get('coordinates').get('coordinates')[1] if tweet.geo and tweet.geo.get('coordinates') else None,
                           'long':tweet.geo.get('coordinates').get('coordinates')[0] if tweet.geo and tweet.geo.get('coordinates') else None,
                           'retweets': tweet.public_metrics['retweet_count'],
                           'replies': tweet.public_metrics['reply_count'],
                           'likes': tweet.public_metrics['like_count']
                      })

    df = spark.createDataFrame(result)
    return df

In [None]:
df = tweetsETL(getTweets())

### Use Vader Sentiment Analysis model to calculate sentiment score from cleaned_tweet field

In [None]:
sid_obj = SentimentIntensityAnalyzer()
calculate_sentiment = f.udf(lambda x:float(sid_obj.polarity_scores(x)['compound']))
df = df.withColumn('sentiment', calculate_sentiment(df.clean_tweet))

### Map sentiment score to buckets 'Very negative', 'Negative', 'Neutral', 'Positive', 'Very Positive'

In [None]:
@f.udf
def sentiment_to_discrete(sentiment):
    if sentiment > 0:
        if sentiment >= .5:
            return 'Very positive'
        else:
            return 'Positive'
    elif sentiment < 0:
        if sentiment <= -.5:
            return 'Very negative'
        else:
            return 'Negative'
    else:
        return 'Neutral'

In [None]:
df = df.withColumn('discrete_sentiment', sentiment_to_discrete(df.sentiment))

### Create word cloud with buckets of sentiment score

In [None]:
def creatWordCloud(df,clm_name):
    text = " ".join(line for line in df[clm_name])
    # Create the wordcloud object
    wordcloud = WordCloud(width=980, height=580, margin=0,collocations = False, background_color = 'white').generate(text)
    # Display the generated image:
    plt.figure(figsize=(12,5))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis("off")
    plt.margins(x=0, y=0)
    plt.show()
    return plt 

def sentimentWordcloud(df):
    print("We generate Wordclouds for each sentiment to see the words that appear most often for each one :")
    print("_________________________________________________________________________________________________")
    print('Wordcloud for negative sentiment tweets : ')
    creatWordCloud(df.query('discrete_sentiment in ("Negative", "Very Negative")'), 'clean_tweet')
    print('Wordcloud for neutral sentiment tweets : ')
    creatWordCloud(df.query('discrete_sentiment == "Neutral"'), 'clean_tweet')
    print('Wordcloud for positive sentiment tweets : ')
    creatWordCloud(df.query('discrete_sentiment in ("Positive", "Very positive")'), 'clean_tweet')

In [None]:
sentimentWordcloud(df.to_Pandas())

In [None]:
%matplot plt

### Cast sentiment column to double type

In [None]:
df = df.withColumn('sentiment', df.sentiment.cast(DoubleType()))

In [None]:
df = df.select(f.col('clean_tweet'), f.col('created_at'), f.col('id'), f.col('lat').alias('long'), \
f.col('likes'), f.col('long').alias('lat'), f.col('replies'), f.col('retweets'), f.col('source'), f.col('text'), \
f.col('sentiment'), f.col('discrete_sentiment'))

### Save to S3

In [None]:
s3_path = "s3a://twitter-sentiment-analysis2/lexus_tweets.parquet"

In [None]:
df.write.mode('overwrite').parquet(s3_path) 