<a href="https://colab.research.google.com/github/ralsouza/apache_spark_real_time_analytics/blob/master/notebooks/spark_streaming_twitter/01_spark_streaming_twitter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Setup

In [None]:
!apt-get update

In [2]:
# Install the dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [3]:
# Environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
 
# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [4]:
# Libraries and Context Setup
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)


# Instance Spark Session
spark = SparkSession.builder.master('local').appName('My-SparkSQL').getOrCreate()

# Create the SQL Context
sqlContext = pyspark.SQLContext(sc)

In [None]:
# sc.stop()

In [5]:
# Check context
print(sc)

<SparkContext master=local[*] appName=pyspark-shell>


## Other packages to streaming - Twitter

In [None]:
!pip install requests_oauthlib
!pip install twython
!pip install nltk

## Install Modules

In [7]:
from pyspark.streaming import StreamingContext
import requests_oauthlib
from operator import add
from time import gmtime, strftime
import requests
import time
import string 
import ast

## Install NLTK modules

In [8]:
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [9]:
# Update frequency
BATCH_INTERVAL = 5

In [10]:
# Making the StreamingContext
ssc = StreamingContext(sc,batchDuration=BATCH_INTERVAL)

An essencial part to create a sentiment analysis algorithm, such as any data mining algorithm, is to have a comprehensive data or "corpus" to learn, as well as a dataset to test and to ensure it perfectly meet the requeriments.

It allows you to adjust the algorithm to deduce better (or more accurate) natural language characteristics that could be extracted from the text and that will contribuite to the sentiment classification, instead of using a generic approach.

We will take as a work base a train dataset provided by Michigan University, to Kaggle competitions -  https://inclass.kaggle.com/c/si650winter11.

This dataset contains 1.578.627 classified tweets and each row is marked as:
* 1 with regard positive sentiment
* 0 with regard negative sentiment

In [11]:
rdd_sent = sc.textFile('/content/drive/My Drive/Colab Notebooks/08-apache-spark/data/dataset_analise_sentimento.csv')

In [12]:
rdd_sent.take(5)

['ItemID,Sentiment,SentimentSource,SentimentText',
 '1,0,Sentiment140,                     is so sad for my APL friend.............',
 '2,0,Sentiment140,                   I missed the New Moon trailer...',
 '3,1,Sentiment140,              omg its already 7:30 :O',
 "4,0,Sentiment140,          .. Omgaga. Im sooo  im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)..."]

In [13]:
# Removing header
header = rdd_sent.take(1)[0]
dataset = rdd_sent.filter(lambda row: row != header)

In [14]:
header

'ItemID,Sentiment,SentimentSource,SentimentText'

In [15]:
dataset.take(5)

['1,0,Sentiment140,                     is so sad for my APL friend.............',
 '2,0,Sentiment140,                   I missed the New Moon trailer...',
 '3,1,Sentiment140,              omg its already 7:30 :O',
 "4,0,Sentiment140,          .. Omgaga. Im sooo  im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...",
 '5,0,Sentiment140,         i think mi bf is cheating on me!!!       T_T']

In [17]:
type(dataset)

pyspark.rdd.PipelinedRDD

In [18]:
# This function splits the columns in each row, creating a tuple and removing 
# the punctiation
def get_row(row):
  row = row.split(",")
  sentiment = row[1]
  tweet = row[3].strip()
  translator = str.maketrans({key: None for key in string.punctuation})
  tweet = tweet.translate(translator)
  tweet = tweet.split(' ')
  tweet_lower = []
  for word in tweet:
    tweet_lower.append(word.lower())
  return (tweet_lower, sentiment)

In [20]:
# Apply the function in each row in the dataset
ds_train = dataset.map(lambda row: get_row(row))

In [21]:
# Create an object SentimentAnalyser
sentiment_analyzer = SentimentAnalyzer()

In [22]:
# Download the stopwords package - Need approximately 5GB on disk
# https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml
# Full download nltk.download()
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [23]:
from IPython.display import Image
Image(url='/content/drive/My Drive/Colab Notebooks/08-apache-spark/images/ntlkdata.png')

In [24]:
# Get the stopwords list in English
stopwords_all = []
for word in stopwords.words('english'):
  stopwords_all.append(word)
  stopwords_all.append(word+'_NEG')

In [25]:
# Get 10.000 tweets from train dataset and return all words that aren't stopwords
ds_train_sample = ds_train.take(10000)

In [26]:
# Mark all negative words with _NEG sufix
all_neg_words = sentiment_analyzer.all_words([mark_negation(tweet) for tweet in ds_train_sample])

# Get all non-stopwords 
all_words_neg_nonstops = [x for x in all_neg_words if x not in stopwords_all]

In [27]:
# Create an unigram (n-gram) and extract the features
# Get all negative non-stopwords 
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nonstops,top_n=20)
# Extract the unigrams
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
# Apply features in the train dataset
train_set = sentiment_analyzer.apply_features(ds_train_sample)

In [28]:
# Check the data type
type(train_set)

nltk.collections.LazyMap

In [29]:
print(train_set)

[({'contains()': False, 'contains(im)': False, 'contains(_NEG)': False, 'contains(followfriday)': False, 'contains(amp)': False, 'contains(dont)': False, 'contains(day)': False, 'contains(love)': False, 'contains(like)': False, 'contains(cant)': False, 'contains(good)': False, 'contains(get)': False, 'contains(go)': False, 'contains(today)': False, 'contains(got)': False, 'contains(want)': False, 'contains(time)': False, 'contains(going)': False, 'contains(back)': False, 'contains(one)': False}, '0'), ({'contains()': False, 'contains(im)': False, 'contains(_NEG)': False, 'contains(followfriday)': False, 'contains(amp)': False, 'contains(dont)': False, 'contains(day)': False, 'contains(love)': False, 'contains(like)': False, 'contains(cant)': False, 'contains(good)': False, 'contains(get)': False, 'contains(go)': False, 'contains(today)': False, 'contains(got)': False, 'contains(want)': False, 'contains(time)': False, 'contains(going)': False, 'contains(back)': False, 'contains(one)': F

In [30]:
# Training model
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer,train_set)

Training classifier


In [31]:
# Testing the classifier
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]

test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

# Get Twitter's Data

In [33]:
# Twitter Authentication
consumer_key = 'xxx'
consumer_secret = 'xxx'
access_token = 'xxx'
access_token_secret = 'xxx'

In [34]:
# Specify the search term
search_item = 'Trump'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track=' + search_item

In [35]:
# Create the authentication object to Twitter
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [80]:
# Configuring the stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([],default = rdd)

In [81]:
type(stream)

pyspark.streaming.dstream.DStream

In [94]:
# Total of tweets by update
NUM_TWEETS = 50

In [95]:
# This function connects on Tweeter and returns a specific number of tweets (NUM_TWEETS)
def tfunc(t, rdd):
  return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
  response = requests.get(filter_url, auth = auth, stream = True)
  print(filter_url, response)
  count = 0
  for line in response.iter_lines():
    try:
      if count > NUM_TWEETS:
        break
      post = json.loads(line.decode('utf-8'))
      contents = [post['text']]
      count += 1
      yield str(contents)
    except:
      result = False

In [101]:
stream = stream.transform(tfunc)

In [109]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [110]:
type(coord_stream)

pyspark.streaming.dstream.TransformedDStream

In [104]:
# This function classifies the tweets, applying the features from model created previously
def classifica_tweet(tweet):
  sentence = [(tweet, '')]
  test_set = sentiment_analyzer.apply_features(sentence)
  print(tweet, classifier.classify(test_set[0][0]))
  return(tweet, classifier.classify(test_set[0][0]))

In [105]:
# This function returns the Tweeter's text
def get_tweet_text(rdd):
  for line in rdd:
    tweet = line.strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
      tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [106]:
# Create a empty list to results
results = []

In [107]:
# This function stores the result in batches with the timestamps
def output_rdd(rdd):
  global resultados
  pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
  counts = pairs.reduceByKey(add)
  output = []
  for count in counts.collect():
    output.append(count)
  result = [time.strftime("%I:%M:%S"), output]
  resultados.append(result)
  print(result)

In [111]:
# The foreachRDD() aplies a function in each RDD to data streaming
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

Py4JJavaError: ignored

In [91]:
# Start streaming
ssc.start()

In [None]:
ssc.awaitTermination()

In [None]:
# Save results
rdd_save = '/content/drive/My Drive/Colab Notebooks/08-apache-spark/data/'+time.strftime("%I%M%S")
results_rdd = sc.parallelize(results)
results_rdd.saveAsTextFile(rdd_save)

In [None]:
# Visualizing results
results_rdd.collect()

In [None]:
ssc.stop()