# Data Streaming and Sentiment Analysis with Twitter API (using Python, Spark Streaming and NLP with NLTK)

# Process Description
This analysis was structured in the following steps:
1. Spark Streaming parametrization for Twitter
2. Generate text classifier for sentiment analysis
3. Twitter API authentication and real-time tweets analysis

**NOTE**: For this use case, a virtual machine was setted with **Ubuntu 20.04.3 LTS, Java JDK 1.8** and  **Apache Spark 2.4.2**
**NOTE²**: A developer account for Twitter was requested to authentication, but for privacy reasons the credentials used will be masked as 'XXXX'. Outputs will still be recorded so that you can check the entire process end-to-end.

## 1. Spark Streaming parametrization for Twitter

In [1]:
# Installing necessary packages for authentication, twitter API and NLP processing
!pip install requests_oauthlib
!pip install twython
!pip install nltk

In [2]:
# Importing modules
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
import requests_oauthlib
from time import gmtime, strftime
import requests
import time
import string
import ast
import json

In [3]:
# NLTK library
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [4]:
# Setting up batch interval update frequency (five seconds)
BATCH_INTERVAL = 5

In [5]:
# Generating StreamingContext
ssc = StreamingContext(sc, BATCH_INTERVAL)

# 2. Generate Text Classifier for Sentiment Analysis

In order to generate a text classifier model for sentiment analysis, I've imported a dataset from kaggle **UMICH SI650 - Sentiment Classification** (available in https://inclass.kaggle.com/c/si650winter11; data is also available in .zip format in 'data' folder) with over 1.5mi rows containing evaluations for different tweets with an already implemented machine learning model to predict wether the message has a positive or negative sentiment. 
This dataset is structured with the following columns:
1. **ItemID**: Dataset index (auto incremental);
2. **Sentiment**: Binary classificator: **0 for negative and 1 for positive**;
3. **SentimentSource**: Source of tweet;
4. **SentimentText**: Source with evaluated text message.

In [6]:
# Read text file as an RDD with Spark
arquivo = sc.textFile("/Users/casoto/Dropbox/DSA/dataset_analise_sentimento.csv")

In [7]:
# Remove header row
header = arquivo.take(1)[0]
dataset = arquivo.filter(lambda line: line != header)
type(dataset)

In [9]:
# Create function to separate values in columns, remove punctuations and generate a tuple with string/sentiment
def get_row(line):
  row = line.split(',')
  sentimento = row[1]
  tweet = row[3].strip()
  translator = str.maketrans({key: None for key in string.punctuation})
  tweet = tweet.translate(translator)
  tweet = tweet.split(' ')
  tweet_lower = []
  for word in tweet:
    tweet_lower.append(word.lower())
  return (tweet_lower, sentimento)

In [10]:
# Apply function (lazy evaluation)
dataset_train = dataset.map(lambda line: get_row(line))

In [11]:
# Creates a SentimentAnalyzer object
sentiment_analyzer = SentimentAnalyzer()

In [12]:
# Downloads stopwords library to remove from dataset
# https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml
nltk.download()
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to /Users/dmpm/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [14]:
# Feed list with english stopwords
stopwords_all = []
for word in stopwords.words('english'):
  stopwords_all.append(word)
  stopwords_all.append(word + '_NEG')

In [15]:
# Create training dataset with first 10k rows
dataset_train_sample = dataset_train.take(10000)

In [16]:
# Remove stopwords from dataset
all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [17]:
# Creates an unigram (n-gram of size 1) to extract features
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nostops, top_n = 200)
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_train_sample)

In [18]:
type(training_set)

nltk.collections.LazyMap

In [19]:
# Check unigram for classification
print(training_set)

[({'contains()': False, 'contains(im)': False, 'contains(_NEG)': False, 'contains(followfriday)': False, 'contains(amp)': False, 'contains(dont)': False, 'contains(day)': False, 'contains(love)': False, 'contains(like)': False, 'contains(cant)': False, 'contains(good)': False, 'contains(get)': False, 'contains(go)': False, 'contains(today)': False, 'contains(got)': False, 'contains(want)': False, 'contains(time)': False, 'contains(going)': False, 'contains(back)': False, 'contains(one)': False, 'contains(sad)': True, 'contains(really)': False, 'contains(miss)': False, 'contains(u)': False, 'contains(work)': False, 'contains(new)': False, 'contains(2)': False, 'contains(last)': False, 'contains(still)': False, 'contains(twitter)': False, 'contains(night)': False, 'contains(great)': False, 'contains(lol)': False, 'contains(follow)': False, 'contains(need)': False, 'contains(see)': False, 'contains(much)': False, 'contains(myweakness)': False, 'contains(get_NEG)': False, 'contains(didnt)'

In [20]:
# Train model with Naive Bayes Classifier
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


In [21]:
# Test classifier within sample phases
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]
test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

# 3. Twitter API authentication and real-time tweets analysis

In [22]:
# Authenticate with requests library Twitter API; paste keys
consumer_key = "XXXX"
consumer_secret = "XXXX"
access_token = "XXXX"
access_token_secret = "XXXX"

In [23]:
# Get twitter account as search_term and stream tweets redirected to that specific user's URL (i.e Trump)
search_term = 'Trump'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term

In [24]:
# Create auth object for Twitter's Developer account
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [25]:
# Setting up Spark Streaming context
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default = rdd)

In [26]:
type(stream)

pyspark.streaming.dstream.DStream

In [27]:
# Limit to 500 tweets (constraint)
NUM_TWEETS = 500  

In [28]:
# Returns labeled 'NUM_TWEETS' and append them into json file
def tfunc(t, rdd):
  return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
  response = requests.get(filter_url, auth = auth, stream = True)
  print(filter_url, response)
  count = 0
  for line in response.iter_lines():
    try:
      if count > NUM_TWEETS:
        break
      post = json.loads(line.decode('utf-8'))
      contents = [post['text']]
      count += 1
      yield str(contents)
    except:
      result = False

In [29]:
stream = stream.transform(tfunc)

In [30]:
# Map function
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [31]:
# Applies text classifier for sentiment analysis for each tweet
def classifica_tweet(tweet):
  sentence = [(tweet, '')]
  test_set = sentiment_analyzer.apply_features(sentence)
  print(tweet, classifier.classify(test_set[0][0]))
  return(tweet, classifier.classify(test_set[0][0]))

In [32]:
# Returns clean tweet text
def get_tweet_text(rdd):
  for line in rdd:
    tweet = line.strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
      tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [33]:
# Output result list to store collected data
results = []

In [34]:
# Save microbatches according to that specific timestamp 
def output_rdd(rdd):
  global results
  pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
  counts = pairs.reduceByKey(add)
  output = []
  for count in counts.collect():
    output.append(count)
  result = [time.strftime("%I:%M:%S"), output]
  resultados.append(result)
  print(result)

In [35]:
# foreachRDD() applies a function for each RDD in a data streaming
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [36]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

['10:40:22', []]
['10:40:54', [('0', 372), ('1', 129)]]


In [37]:
cont = True
while cont:
  if len(results) > 5:
    cont = False

['10:41:24', [('0', 389), ('1', 112)]]
['10:41:55', [('0', 371), ('1', 130)]]
['10:42:26', [('1', 131), ('0', 370)]]
['10:42:57', [('1', 138), ('0', 363)]]
['10:43:29', [('1', 127), ('0', 374)]]
['10:43:59', [('0', 375), ('1', 126)]]
['10:44:30', [('1', 122), ('0', 379)]]
['10:45:01', [('1', 131), ('0', 370)]]
['10:45:33', [('0', 368), ('1', 133)]]
['10:46:04', [('1', 124), ('0', 377)]]
['10:46:35', [('1', 120), ('0', 381)]]
['10:47:06', [('0', 387), ('1', 114)]]


In [38]:
# Record results in folder
rdd_save = '/Users/casoto/Dropbox/r'+time.strftime("%I%M%S")
res_rdd = sc.parallelize(results)
res_rdd.saveAsTextFile(rdd_save)

['10:47:36', [('0', 386), ('1', 115)]]


In [39]:
# Check reduced by key (sentiment evaluation) results for each timestamp
res_rdd.collect()

[['10:40:22', []],
 ['10:40:54', [('0', 372), ('1', 129)]],
 ['10:41:24', [('0', 389), ('1', 112)]],
 ['10:41:55', [('0', 371), ('1', 130)]],
 ['10:42:26', [('1', 131), ('0', 370)]],
 ['10:42:57', [('1', 138), ('0', 363)]],
 ['10:43:29', [('1', 127), ('0', 374)]],
 ['10:43:59', [('0', 375), ('1', 126)]],
 ['10:44:30', [('1', 122), ('0', 379)]],
 ['10:45:01', [('1', 131), ('0', 370)]],
 ['10:45:33', [('0', 368), ('1', 133)]],
 ['10:46:04', [('1', 124), ('0', 377)]],
 ['10:46:35', [('1', 120), ('0', 381)]],
 ['10:47:06', [('0', 387), ('1', 114)]]]

['10:48:10', [('0', 375), ('1', 126)]]
['10:48:41', [('0', 378), ('1', 123)]]
['10:49:11', [('0', 391), ('1', 110)]]
['10:49:43', [('0', 385), ('1', 116)]]


In [None]:
# Ends real-time streaming
ssc.stop()

# Conclusion

With the gathered data above, we can see that Naive Bayes text classifier model predicts that, for each 500 tweets per batch interval, approximately 370 (74%) of the tweets related to Trump's twitter account are negative and therefore, 130 (26%) are with positive sentiments. 