## Importing packages

In [1]:
# !pip install nltk
import tweepy as tw
import numpy as np
import pandas as pd
import re
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\shwet\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

## <br>STEP 1 - Twitter API

In [2]:
# Accessing Twitter API credentials which are stored in a csv file
twitter_creds = pd.read_csv("Twitter Creds.csv", index_col=0)

access_token = twitter_creds.loc["ACCESS_TOKEN"][0]
access_secret = twitter_creds.loc["ACCESS_SECRET"][0]
consumer_key = twitter_creds.loc["CONSUMER_KEY"][0]
consumer_secret = twitter_creds.loc["CONSUMER_SECRET"][0]

auth = tw.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = tw.API(auth)

# New dataframe to store the tweets and username 
df = pd.DataFrame(columns = ['Tweets', 'User'])

## <br>STEP 2 - Apache Spark Streaming Application on Twitter

In [3]:
# Function for spark  stream to collect the data
def stream(data, file_name):
    i = 0
    for tweet in tw.Cursor(api.search, q=data, count=1000, lang='en').items():
        print(i+1, end='\r')
        df.loc[i, 'Tweets'] = tweet.text
        df.loc[i, 'User'] = tweet.user.name
        df.to_excel('{}.xlsx'.format(file_name))
        i+=1
        if i == 1000:
            break
        else:
            pass

In [4]:
stream(data = ['#hospital'], file_name = 'my_tweets')

1000

In [5]:
# Function to remove @tags and URL(s) in the tweets
def clean_tweet(tweet):
    return ' '.join(re.sub('(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)', ' ', tweet).split())

In [6]:
# Storing cleaned tweets in a new column in the Data Frame
df['clean_tweet'] = df['Tweets'].apply(lambda x: clean_tweet(x))
df['clean_tweet'] = df['clean_tweet'].str.lower()
df.head()

Unnamed: 0,Tweets,User,clean_tweet
0,#People who believe that #FakePresident #Donal...,🏳️‍🌈👬🏳️‍🌈 Dark_Marc 🏳️‍🌈🦄🏳️‍🌈,people who believe that fakepresident donaldtr...
1,RT @kelly_archives: #WiC #cardiotwitter Welcom...,CareMo,rt archives wic cardiotwitter welcome doctors ...
2,RT @kelly_archives: Enhancing Patient Experien...,CareMo,rt archives enhancing patient experience docto...
3,We were appointed as cost managers at the publ...,Turner & Townsend,we were appointed as cost managers at the publ...
4,Sad effect of #Covid19: as #Coronavirus cases ...,Naomi Fried,sad effect of covid19 as coronavirus cases ris...


## <br>STEP 3 - Sentiment Analysis

In [7]:
# Reading the negative-words text file
with open("negative-words.txt","r") as file:
        neg_list = file.readlines()

negative_words=[]

for i in range(len(neg_list)):
    negative_words.append(neg_list[i].split("\n")[0])
    

# Reading the positive-words text  file
with open("positive-words.txt","r") as file:
        pos_list = file.readlines()

positive_words=[]

for i in range(len(pos_list)):
    positive_words.append(pos_list[i].split("\n")[0])

In [8]:
# Function to get the sentiment score for a single tweet
def tweet_sentiment_value(tweet):

    # Initializing Sentiment value to zero
    sentiment_value=0
    
    # Using  nltk.toeknize to split the tweet into individual words
    for word in nltk.word_tokenize(tweet):
        
        # If word is in positive list increment sentiment value by 1
        if word in positive_words:
            sentiment_value+=1
        
        # If word is in Negative list decrease sentiment value by 1
        elif word in negative_words:
            sentiment_value-=1
    
    # Return the final sentiment score of the tweet
    return sentiment_value

## <br><br><center>Sentiment Analysis with SPARK dataframe</center> 

In [9]:
#Importing required packages and initiating a spark session
import findspark
import pyspark

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from collections import namedtuple
from pyspark.sql import SparkSession

findspark.init('C:\Spark\spark-3.0.1-bin-hadoop2.7')
spark = SparkSession.builder.getOrCreate()

In [10]:
# Creating a spark Dataframe
dfspark = spark.createDataFrame(df)
dfspark = dfspark.dropna()
dfspark.count()

1000

In [11]:
# Displaying first 10 rows of  Spark DataFrame
dfspark.show(10)

+--------------------+--------------------+--------------------+
|              Tweets|                User|         clean_tweet|
+--------------------+--------------------+--------------------+
|#People who belie...|🏳️‍🌈👬🏳️‍🌈 Da...|people who believ...|
|RT @kelly_archive...|              CareMo|rt archives wic c...|
|RT @kelly_archive...|              CareMo|rt archives enhan...|
|We were appointed...|   Turner & Townsend|we were appointed...|
|Sad effect of #Co...|         Naomi Fried|sad effect of cov...|
|Best Warrior Comp...|Martin Army Commu...|best warrior comp...|
|Enhancing Patient...|      Kelly Anderson|enhancing patient...|
|Two dead, a third...|   Saskia Steinhorst|two dead a third ...|
|#WiC #cardiotwitt...|      Kelly Anderson|wic cardiotwitter...|
|Under the rule, h...|      Orbit Products|under the rule ho...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [12]:
#List to score sentiment value of each tweet
sentiment_score = []

# Iterating "clean_tweet" column of Spark dataframe and passing each tweet thorugh "tweet_sentiment_value" function
for tweets in dfspark.select('clean_tweet').collect():
    #Storing the sentiment score in "sentiment_score" list 
    sentiment_score.append(tweet_sentiment_value(tweets[0]))
    
# Creating a  dataframe with original tweets and their corresponding sentiment score
df2 = pd.DataFrame({"Tweet_content":list(df["Tweets"]) , "Sentiment score":sentiment_score})
df2.head(5)

Unnamed: 0,Tweet_content,Sentiment score
0,#People who believe that #FakePresident #Donal...,0
1,RT @kelly_archives: #WiC #cardiotwitter Welcom...,1
2,RT @kelly_archives: Enhancing Patient Experien...,1
3,We were appointed as cost managers at the publ...,0
4,Sad effect of #Covid19: as #Coronavirus cases ...,0


## <br>Step 4 - Exporting the sentiment scores to a csv file

In [13]:
df2.to_csv("Final Output.csv",  index=False)