**Twitter Sentimental Analysis using Spark Streaming**

- Twitter is one of the most dynamic social networks available today. It is possible to collect in real time precious information and people's feelings about the most varied topics. There are millions of tweets per minute all over the world and a lot of precious information is hidden in each tweet.

- Streaming data generated by Twitter can feed analytical applications, allowing companies to understand, in real time, what customers, partners and suppliers are thinking (and writing) about you, your brand, product or service.

- In this project, we collect Twitter Streaming data, apply real-time analysis techniques (as the data is generated) and gain insights on a given subject. The project can be easily reproduced locally on your computer or in a cluster-na-cloud environment.

- Technologies used:
         -Python/NLTK – building the sentiment analysis model
         -Spark Streaming – collection of streaming data
         -Twitter API – connection from our application

project made by: Marx Cerqueira

# Imports

In [3]:
# libs
from pyspark.streaming       import StreamingContext
from pyspark                 import SparkContext
from requests_oauthlib       import OAuth1Session
from operator                import add
from time                    import gmtime, strftime

import requests_oauthlib
import requests
import time
import string
import ast
import json
import os

# NLTK Package
import nltk

from nltk.classify       import NaiveBayesClassifier
from nltk.sentiment      import SentimentAnalyzer
from nltk.corpus         import subjectivity
from nltk.corpus         import stopwords
from nltk.sentiment.util import *

## Helper Variables

In [2]:
# Frequency update
INTERVALO_BATCH = 5

In [3]:
# Creating StreamingContext
ssc = StreamingContext(sc, INTERVALO_BATCH)

In [4]:
# get Twitter environmnet access keys
API_Key        = os.environ.get('API_Key')
API_Key_Secret = os.environ.get('API_Key_Secret')
Bearer_Token   = os.environ.get('Bearer_Token')

# Loading Data

We will use the training dataset provided by the University of Michigan for Kaggle competitions as a basis:

- https://inclass.kaggle.com/c/si650winter11.

This dataset contains 1,578,627 ranked tweets and each row is marked as:

    0 = negative sentimentals
    1 = positive sentimentals

In [87]:
#loading trainig data and creating a RDD in memory with Spark
file = sc.textFile("/home/marxcerqueira/repos/Spark-Real-Time-Analytics/Apache-Spark/Twitter-Sentimental-Analysis/dataset_analise_sentimento.csv")

# Pre-Processing with Map and Reduce

## Data Cleaning

In [88]:
# removing header
header = file.take(1)[0]
dataset = file.filter(lambda line: line != header)

In [106]:
dataset.count()



1578627

In [90]:
# Separate columns in each row, create tuples and remove text punctuation
def get_row(line):
    row = line.split(',')
    sentimental = row[1]
    tweet = row[3].strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    
    for word in tweet:
        tweet_lower.append(word.lower())
        
    return (tweet_lower, sentimental)

In [91]:
# Apply the function to each row of the dataset
train_dataset = dataset.map(lambda line: get_row(line))

In [92]:
# Create an Object SentimentAnalyzer which applies the classifier
sentiment_analyzer = SentimentAnalyzer()

## Removing Stop Words

words that appers often but don't give us information

In [93]:
# Get the list of stopwords in English 
stopwords_all = []

for word in stopwords.words('english'):
    stopwords_all.append(word)
    stopwords_all.append(word + '_NEG')

In [94]:
# Gets 10,000 tweets from the training dataset and returns all non-stopwords
train_dataset_sample = train_dataset.take(10000)

all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in train_dataset_sample])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [98]:
# Create a unigran (n-gram) and extract the features
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nostops, top_n = 200)

sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)

training_set = sentiment_analyzer.apply_features(train_dataset_sample)

In [99]:
type(training_set)

nltk.collections.LazyMap

# Machine Learning Modeling

## Training ML Model

In [107]:
# training sentimental analysis classifier
# 1 positive
# 0 negative

# Model training
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


In [108]:
# Test the classifier in a few sentences

test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]

test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

## Test ML Model

### Gathering Tweets from Twitter URL

In [110]:
# Especifica a URL termo de busca
search_term = 'Trump'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term

In [111]:
# Criando o objeto de autenticação para o Twitter
auth = requests_oauthlib.OAuth1(API_Key, API_Key_Secret, Bearer_Token)

In [112]:
# Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default = rdd)

In [113]:
# Total de tweets por update
NUM_TWEETS = 500  

In [114]:
# Essa função conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc(t, rdd):
    return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
    response = requests.get(filter_url, auth = auth, stream = True)
    print(filter_url, response)
    count = 0
    for line in response.iter_lines():
        try:
            if count > NUM_TWEETS:
                break
                
            post = json.loads(line.decode('utf-8'))
            contents = [post['text']]
            count += 1
            yield str(contents)
        except:
            result = False

In [115]:
stream = stream.transform(tfunc)

In [116]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [117]:
# Essa função classifica os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
    sentence = [(tweet, '')]
    test_set = sentiment_analyzer.apply_features(sentence)
    print(tweet, classifier.classify(test_set[0][0]))
    return(tweet, classifier.classify(test_set[0][0]))

In [118]:
# Essa função retorna o texto do Twitter
def get_tweet_text(rdd):
    
    for line in rdd:
        tweet = line.strip()
        translator = str.maketrans({key: None for key in string.punctuation})
        tweet = tweet.translate(translator)
        tweet = tweet.split(' ')
        tweet_lower = []
        
        for word in tweet:
            tweet_lower.append(word.lower())
            
        return(classifica_tweet(tweet_lower))

In [119]:
# Cria uma lista vazia para os resultados
resultados = []

In [120]:
# Essa função salva o resultado dos batches de Tweets junto com o timestamp
def output_rdd(rdd):
    global resultados
    pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
    counts = pairs.reduceByKey(add)
    output = []
    
    for count in counts.collect():
        output.append(count)
        
    result = [time.strftime("%I:%M:%S"), output]
    resultados.append(result)
    print(result)

In [121]:
# A função foreachRDD() aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [122]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

                                                                                

['12:48:33', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>
                                                                                

['12:48:36', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:48:40', []]


In [123]:
cont = True
while cont:
  if len(resultados) > 5:
    cont = False

https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:48:46', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:48:51', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:48:57', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:49:00', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:49:05', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:49:10', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:49:15', []]


https://stream.twitter.com/1.1/statuses/filter.json?track=Trump <Response [401]>


['12:49:20', []]


In [1]:
pwd

'/home/marxcerqueira/repos/Spark-Real-Time-Analytics/Apache-Spark/Twitter-Sentimental-Analysis'

In [4]:
# Grava os resultados
rdd_save = '/home/marxcerqueira/repos/Spark-Real-Time-Analytics/Apache-Spark/Twitter-Sentimental-Analysis/r'+time.strftime("%I%M%S")
resultados_rdd = sc.parallelize(resultados)
resultados_rdd.saveAsTextFile(rdd_save)

NameError: name 'resultados' is not defined

In [None]:
# Visualiza os resultados
resultados_rdd.collect()

In [None]:
# Finaliza o streaming
ssc.stop()

In [None]:
API_Key        = os.environ.get('API_Key')
API_Key_Secret = os.environ.get('API_Key_Secret')
Bearer_Token   = os.environ.get('Bearer_Token')

In [None]:
# Autenticação do Twitter 
consumer_key = "xxx"
consumer_secret = "xxx"
access_token = "xxx"
access_token_secret = "xxx"

## Rename Columns

## Data Dimension

## Data Types

## Check NA Values

## Fillout NA

## Change Types

## Descriptive Statistics

# FEATURE ENGINEERING

## Hypothesis Mindmap

## Creating Hypothesis

## Final Hypothesis List

## Feature Engineering

# VARIABLE FILTERING

## Line Filtering

## Columns Selection

# EXPLORATORY DATA ANALYSIS (EDA)

## Univariate Analysis

### Response Variable

### Numerical Variable

### Categorical Variable

## Bivariate Analysis

## Multivariate Analysis

# DATA PREPARATION

## Feature Normalization

## Feature Rescaling

## Feature Transformation

### Enconding

### Target Variable Transformation

### Nature Transformation

# FEATURE SELECTION

## Spliting dataframe into training and test dataset

## Feature Selector (boruta?)

## Best Features

# MACHINE LEARNING ALGORITHM MODELS

## Modelo 1

## Modelo 2

## Modelo 3

## Compare Model's Performance

# HYPERPARAMETERS FINE TUNING

# ERROR INTERPRETATION

# MODEL DEPLOYMENT