# Spark Streaming - Twitter

In [None]:
# Pode ser necessário instalar esses pacotes
!pip install requests_oauthlib
!pip install twython
!pip install nltk

In [1]:
# Módulos usados
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
import requests_oauthlib
from time import gmtime, strftime
import requests
import time
import string
import ast
import json

In [2]:
# Pacote NLTK
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [3]:
# Frequência de update
INTERVALO_BATCH = 5

In [4]:
# Criando o StreamingContext
ssc = StreamingContext(sc, INTERVALO_BATCH)

## Treinando o Classificador de Análise de Sentimento

1 para o sentimento positivo

0 para o sentimento negativo

In [5]:
# Lendo o arquivo texto e criando um RDD em memória com o Spark
arquivo = sc.textFile("data/dataset_analise_sentimento.csv")

In [6]:
# Removendo o cabeçalho
header = arquivo.take(1)[0]
dataset = arquivo.filter(lambda line: line != header)

In [7]:
type(dataset)

pyspark.rdd.PipelinedRDD

In [8]:
dataset.take(5)

['1,0,Sentiment140,                     is so sad for my APL friend.............',
 '2,0,Sentiment140,                   I missed the New Moon trailer...',
 '3,1,Sentiment140,              omg its already 7:30 :O',
 "4,0,Sentiment140,          .. Omgaga. Im sooo  im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...",
 '5,0,Sentiment140,         i think mi bf is cheating on me!!!       T_T']

In [9]:
# Função que separa as colunas em cada linha, cria uma tupla e remove a pontuação
def get_row(line):
    row = line.split(',')
    sentimento = row[1]
    tweet = row[3].strip()
    translator = str.maketrans({kev: None for kev in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []    
    for word in tweet:
        tweet_lower.append(word.lower())
    return (tweet_lower, sentimento)

In [10]:
# Aplicando a função a cada linha do dataset
dataset_treino = dataset.map(lambda line: get_row(line))

In [11]:
# Cria um objeto SentimentAnalyzer
sentiment_analyser = SentimentAnalyzer()

In [12]:
# Certifique-se de ter espaço em disco - Aproximadamente 5GB
# https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml
nltk.download()
nltk.download("stopwords")

showing info https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml


[nltk_data] Downloading package stopwords to /home/aline/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [14]:
# Obtém a lista de stopwords em Inglês
stopwords_all = []
for word in stopwords.words('english'):
    stopwords_all.append(word)
    stopwords_all.append(word + '_NEG')

In [15]:
# Obtém 10.000 tweets do dataset de treino e retorna todas as palavras que não são stopwords
dataset_treino_amostra = dataset_treino.take(10000)

In [16]:
all_words_neg = sentiment_analyser.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [17]:
# Cria um unigram e extrai as features
unigram_feats = sentiment_analyser.unigram_word_feats(all_words_neg_nostops, top_n = 200)
sentiment_analyser.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyser.apply_features(dataset_treino_amostra)

In [18]:
type(training_set)

nltk.collections.LazyMap

In [19]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyser.train(trainer, training_set)

Training classifier


In [20]:
# Testa o classificador em algumas sentenças
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]
test_set = sentiment_analyser.apply_features(test_sentence1)
test_set2 = sentiment_analyser.apply_features(test_sentence2)
test_set3 = sentiment_analyser.apply_features(test_sentence3)

In [21]:
# Autenticação do Twitter - Passar os dados corretamente
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""

In [22]:
# Especifica a URL termo de busca
search_term = 'Trump'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term

In [23]:
# Criando o objeto de autenticação para o twitter
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [24]:
# Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default=rdd)

In [25]:
type(stream)

pyspark.streaming.dstream.DStream

In [26]:
# Total de tweets por update
NUM_TWEETS = 500

In [27]:
# Função que conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc (t, rdd):
    return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
    response = requests.get(filter_url, auth = auth, stream = True)
    print(filter_url, response)
    count = 0
    for line in response.iter_lines():
        try:
            if count > NUM_TWEETS:
                break
            post = json.loads(line.decode('utf-8'))
            contents = [post['text']]
            count += 1
            yield str(contents)
        except:
            result = False

In [28]:
stream = stream.transform(tfunc)

In [29]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [30]:
# Função para classificar os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
    sentence = [(tweet, '')]
    test_set = sentiment_analyser.apply_features(sentence)
    print(tweet, classifier.classify(test_set[0][0]))
    return (tweet, classifier.classify(test_set[0][0]))

In [31]:
# Função para retornar o texto do twitter
def get_tweet_text(rdd):
    for line in rdd:
        tweet = line.strip()
        translator = str.maketrans({key: None for key in string.punctuation})
        tweet = tweet.translate(translator)
        tweet = tweet.split(' ')
        tweet_lower = []
        for word in tweet:
            tweet_lower.append(word.lower())
        return(classifica_tweet(tweet_lower))

In [32]:
# Cria uma lista vazia para os resultados
resultados = []

In [33]:
# Função para salvar o resultado dos batchs de tweets junto com o timestamp
def output_rdd(rdd):
    global resultados
    pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
    counts = pairs.reduceByKey(add)
    output = []
    for count in counts.collect():
        output.append(count)
    result = [time.strftime("%I:%M:%S"), output]
    resultados.append(result)
    print(result)

In [34]:
# A função foreachRDD, aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [35]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

In [36]:
cont = True
while cont:
    if len(resultados) > 5:
        cont = False

['11:44:27', []]
['11:44:49', [('0', 405), ('1', 96)]]
['11:45:05', [('0', 386), ('1', 115)]]
['11:45:21', [('1', 104), ('0', 397)]]
['11:45:36', [('0', 411), ('1', 90)]]
['11:45:50', [('0', 385), ('1', 116)]]


In [37]:
# Grava os resultados
rdd_save = 'result/r'+time.strftime("%I%M%S")
resultados_rdd = sc.parallelize(resultados)
resultados_rdd.saveAsTextFile(rdd_save)

In [38]:
# Visualiza os dados
resultados_rdd.collect()

[['11:44:27', []],
 ['11:44:49', [('0', 405), ('1', 96)]],
 ['11:45:05', [('0', 386), ('1', 115)]],
 ['11:45:21', [('1', 104), ('0', 397)]],
 ['11:45:36', [('0', 411), ('1', 90)]],
 ['11:45:50', [('0', 385), ('1', 116)]]]

In [39]:
# Finaliza o streaming
ssc.stop()

['11:46:05', [('1', 112), ('0', 389)]]
