# Twitter Stream Processing using Apache Storm

In this tutorial, we will be building a simple data processing pipeline which accepts streams of tweets, and performs text analysis on them. Apache Storm allows us to build highly scaleable applications that can process a lot of tweets in a short time. I will start out by listing all the requirements and what we will be using them for:

* **pyKafka(Apache Kafka):** This will be the producer of tweets in our application and help us run Python code against real time streams of data via Apache Storm.
* **Streamparse(Apache Storm):** This will be the main workhorse of our application and will accept the tweets and process them. More on this later.
* **Pymongo(MongoDB):** This will be the backend of our application which we will be using to store all the data and our processing results from Apache Storm.

And the best part? All of this can be done in Python!!



## Installation

All of the python packages we need to install can be easily installed using pip. The requirements for this tutorial along with installation instructions are given below:
* **pyKakfa**
* **[Apache Kafka](https://kafka.apache.org/quickstart)**
* **[Apache Storm](http://storm.apache.org/releases/current/Setting-up-development-environment.html)**
* **[Streamparse](http://streamparse.readthedocs.io/en/stable/quickstart.html)**
* **pyMongo**
* **[MongoDB](https://docs.mongodb.com/manual/installation/)**
* **tweepy**

The code given below is not meant to be run in the Jupyter Notebook. As Apache Storm, Apache Kafka and MongoDB need to have separate sessions running, it cannot be run straightaway. The code given is meant to support my explanation and give you an idea of how you might be able to create an application like this one and expand on it.

## PyKafka

This is probably the most important part of our application as Kafka helps us connect tweepy and storm together. The tweepy stream gets the tweets and puts them in a queue with the help of Kafka which the Storm spout will consume. This can be easiliy understood with the help of the diagram given below. 
<img src = diagram1.png>
Kafka uses the Zookeeper server so we start the Zookeeper server like so:

We finally start our Kafka Broker with the command :

Now we are ready to make a topic queue that our producer is gonna produce tweets on. We're gonna name it "test". We can register a topic like this:

If you had trouble understanding the previous few lines you can read up more about setting up kafka queues here https://kafka.apache.org/quickstart.

Now to create a producer we will be using a Pythionic version of Apache Kafka called pykafka. We basically need to start the Kafka producer client and then get then register our producer to the particular "topic" queue we want. Then our twitter API tweepy provides us with the tweets in a stream. After putting in our API keys we can start receiving tweets in the JSON format and putting them on the topic queue. 

In [None]:
import tweepy
from pykafka.client import KafkaClient

class MyStreamListener(tweepy.StreamListener):

  def on_status(self, status):
    # Connect to client
    client = KafkaClient(hosts='127.0.0.1:9092')
    #Subscribe to topic
    topic = client.topics[b"test"]
    with topic.get_producer(delivery_reports=False) as producer:
        sentence = status._json
        if sentence is None:
            return
        # Encode to string 
        a = json.dumps(sentence).encode('utf-8')
        # Produce to queue
        producer.produce(a)

# Provide Access tokens
auth = tweepy.OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

# Initialize API and start streaming
api = tweepy.API(auth)
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)

# Stream tweets for keyword 'trump'
myStream.filter(track=['trump'])

We are now done with producing our tweets from the stream. If you had trouble understanding the previous part you can read up more about pykafka [here](https://github.com/Parsely/pykafka).

## Streamparse

Streamparse is a very neat wrapper for Apache Storm that helps us deploy Storm clusters in Python. Apache Storm is a real time distributed computation system that helps us get realtime analytics of our data. It has a very intuitive 'spout' and 'bolt' architecture. You can think of the spout as picking up the tweets from the queue and providing a stream of tweets to the worker 'bolts' which further process the data. We will be processing the tweets to find the top hashtags in tweets related to 'trump' as well as doing a sentiment analysis of the tweets related to it. We will have a hashtag counter bolt that will store a count of all the hashtags as well as a sentiment analyzer bolt that will analyze the sentiments of the tweets and assigning them probability values for positive, negative and neutral. We can then aggregate these tweets to analyze sentiments by location,etc. 

We start by defining a topology which can be easily done via streamparse. A diagram to better understand the topology is given below along with the code for creating it. <img src = "diagram2.png">

Streamparse has a very intuitive way of defining a topology. We can simply define a bolt and then define its inputs with respect to the bolt we're receiving from. If a component has multiple streams we can simply name the different streams and access them as shown below for the hashtag_count_bolt and the sentiment_bolt.

In [None]:
class WordCount(Topology):
    tweet_spout = TweetSpout.spec()
    tweet_clean_bolt = TweetCleanBolt.spec(inputs=[tweet_spout])
    hashtag_count_bolt = HashTagCounterBolt.spec(inputs=[tweet_clean_bolt['count']])
    sentiment_bolt = SentimentBolt.spec(inputs = [tweet_clean_bolt['default']])
    storage_bolt = StorageBolt.spec(inputs = [sentiment_bolt])

The basic structure of a spout in streamparse looks like this:-

In [None]:
class Spout(Spout):
    # Define output stream
    outputs = ['stream_name']

    def initialize(self, stormconf, context):
        # Do initialization stuff here

    def next_tuple(self):
        #Get next tuple 
        message = get_next_tuple()
        #Emit Message
        self.emit([message])

We start with defining an output stream that will go to the next bolt. We can make the spout stream go to multiple bolts by just defining it in the outputs variable. Then we do some initialization stuff for the spout object that will be called once for the spout. We can initialize our connection to clients as well as open a cursor object for databases here. The next_tuple function is called periodically and the spout gets the next tuple to transmit it and then transmits it to the next bolt. 

We will now look at the basic structure of bolts in streamparse:

In [None]:
class Bolt(Bolt):
    #Set output stream
    outputs = ['output_stream']

    def initialize(self, conf, ctx):
        #Do initialization stuff here

    def process(self, tup):
        #Get tuple from stream
        word = tup.values[0]
        #Process tuple
        tup = process(word)
        #Emit for further processing
        self.emit([tup])

We can see that the basic structure of a bolt remains the same as that of a spout. The only difference is that instead of next tuple we call process which picks up the tuple from the stream and sends it to the next bolt. 

## PyMongo

We will now talk about PyMongo which is a very neat wrapper to MongoDB in Python which helps us access all of MongoDB's functionality in Python. We will be using MongoDB to store hashtag counts as well as the sentiment values of the tweets. A basic tutorial of PyMongo is given below:

We start with initializing the MongoDB client like so:

In [None]:
from pymongo import MongoClient
client = MongoClient()

We then get the database we want by doing the following:

In [None]:
db = client.test_database

After getting our database, we get the collection we want from the database. You can think of a collection like a table in SQL.

In [None]:
collection = db.collection_name

We can then perform various functions like finding values in the collection or inserting/updating values in the collection. Each entry in the collection is defined as a dictionary of key-value pairs. 

In [None]:
val = {
    'col1' : 'val1',
    'col2' : 'val2'
}
#Insert value into collection
collection.insert_one(val)
#Find all values matching col1 == val1
collection.find({'col1': 'val1'})
#Find first value matching col1 == val1 and set col2 = val3 for that
collection.update_one({'col1': 'val1'},{'$set':{'col2' : 'val3'}})

One important thing to note is that MongoDB lazily creates databases and collections. So you will not see the database until you add a collection to it and you will not see a collection until you add a value to it. You should definitely read up more on PyMongo [here](https://api.mongodb.com/python/current/tutorial.html) and MongoDB [here](https://docs.mongodb.com/manual/mongo/) before proceeding. You should also check out [Studio 3T](https://studio3t.com/) which is a GUI environment for MongoDB that lets us look at the entries in our collection and analyze them visually.

## Creating our pipeline

### Putting the tweets in a stream

We start with 'consuming' the tweets from our Kafka queue and sending them in a stream to the first bolt to clean up the data. We define our TweetSpout as a Spout object with one output stream 'tweet'. We initialize the spout by connecting to the Kakfa Broker and then subscribing to the particular topic queue we want. The next tuple function simple just picks up the tweet JSON from the queue and sends it to the filtration bolt for processing the tweet.

In [None]:
from streamparse.spout import Spout
from pykafka import KafkaClient

class TweetSpout(Spout):
    #Initialize output stream
    outputs = ['tweet']

    def initialize(self, stormconf, context):
        #Connect to client and subscribe to topic
        client = KafkaClient(hosts='127.0.0.1:9092')
        self.topic = client.topics[b"test"]

    def next_tuple(self):
        #Consume messages from queue
        consumer = self.topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                self.emit([message.value])
        else:
            self.emit([])


### Cleaning up the tweets

We will now start processing our tweets to get them into a form we want. To count the most popular hashtags related to 'trump' we will need to extract the hashtags from the JSON for each tweet and send them to the hashtag counter bolt. To get the sentiment of the tweet, we need to clean up our tweet so that it is in a form that can be processed by the sentiment analyzer bolt. As this bolt outputs to two different bolts, we need to define the outputs as a list of stream objects. We add the 'name' attribute to help us identify which stream we're sending to. You can read more about outputting to multiple streams [here](http://streamparse.readthedocs.io/en/stable/topologies.html#multiple-streams).  We also check if our language is English, otherwise we disregard the tuple. 

In [None]:
class TweetCleanBolt(Bolt):
    # Initialize output streams as we are sending to two bolts
    outputs = [Stream(fields=['tweet'], name='sentiment'),
               Stream(fields=['tweet'], name='count')]

    def initialize(self, conf, ctx):
        pass

    def process(self, tup):
        tweet = tup.values[0]
        # Load tweet from json
        tweet = json.loads(tweet)
        # Get only relevant values
        tweet_id = tweet['id']
        sentence = tweet['text']
        hashtags = tweet['entities']['hashtags']
        # Check language
        lang = detect(sentence)
        if (sentence is not None and lang =='en'):
            #Send to next bolt
            tweet = { 'id' : tweet_id, 'text' : sentence}
            self.emit([tweet, hashtags])
        self.emit([])


### Counting Hashtags

Our hashtag counter bolt is responsible for getting all the hashtags in a tweet from the JSON and searching for them in our MongoDB collection. If it is there in the collection, the count of the hashtag is increased by 1, otherwise a new entry is created with that hashtag and it's count initialized to 1. 

In [None]:

class HashTagCounterBolt(Bolt):
    #Empty as not outputting to anything
    outputs = []

    def initialize(self, conf, ctx):
        #Connect to MongoDB client and get hashtag collection
        client = MongoClient()
        db = client.test_database
        self.collection = db.hashtag_count_tb

    def process(self, tup):
        hashtags = tup.values[0]
        #For each hashtag in the tweet
        for elem in hashtags:            
            text = elem['text']
            #Check if hashtag exists in table
            if (self.collection.find({'hashtag':text}).count()>0):
                #If yes, increase count
                self.collection.update_one({'hashtag': text},{'$inc':{'count': 1}})
            else:
                # Else, initialize new entry with count set to 1
                self.collection.insert_one({'hashtag':text,'count':1})

### Analyzing sentiments

This bolt will be working on analyzing the sentiment of each tweet. We will be using the [text-processing.com](www.text-processing.com) API to get the sentiment of the tweet via HTTP POST calls. We will get a JSON which has probability values for positive, negative and neutral along with a label for the tweet. The JSON looks something like this:



In [None]:
"{'probability': {'neg': 0.6853693817009983, 'neutral': 0.9204427720461091, 'pos': 0.3146306182990018}, 'label': 'neutral'}"

Our API request will contain the sentence in a dictionary mapping the key 'text' to our tweet. We will then send the sentiment values to our tweet storage bolt which will store these values in the MongoDB database. 

In [None]:
class SentimentBolt(Bolt):
    outputs = ['tweet']

    def initialize(self, conf, ctx):
        # API endpoint for sentiment analyzer
        self.endpoint = "http://text-processing.com/api/sentiment/"

    def process(self, tup):
        tweet = tup.values[0]
        if (tweet is not None):
            sentence = tweet['text']
            # Put tweet in a dictionary to send to API
            data = {'text':sentence}
            # Send to API and get reply back
            r = requests.post(url = self.endpoint, data = data)
            # If successful
            if (r.status_code==200):
                #Load JSON from response
                c = r.text
                c =json.loads(c)
                self.logger.info("Sentiment of tweet [{}]".format(c['label']))
                #Send it for storage
                post = {'id': tweet['id'],'text':sentence,'vals': c}
                self.emit([post])
        else:
            self.emit([])

### Storing our tweets

This bolt will be the final part of our data processing pipeline and will handle storing all the tweets along with their sentiment values in the database. We simply initialize our connection and get all the values from the tuple we get. We then insert it into the sentiment collection using the insert_one function.

In [None]:
class StorageBolt(Bolt):
    # No output stream
    outputs = []

    def initialize(self, conf, ctx):
        #Initialize client and get collection
        client = MongoClient()
        db = client.test_database
        self.collection = db.sentiment_tb

    def process(self, tup):
        tweet = tup.values[0]
        # Get all values from dictionary
        text = tweet['text']
        tweet_id = tweet['id']
        pos = tweet['vals']['probability']['pos']
        neg = tweet['vals']['probability']['neg']
        neu = tweet['vals']['probability']['neutral']
        label  = tweet['vals']['label']
        # Insert value in collection
        self.collection.insert_one({'id':tweet_id, 'tweet':text, 'pos':pos, 'neg' :neg , 'neutral':neu, 'label':label})

## Results

After running our pipeline for around 100 tweets, we can check out the values stored in our database using Studio 3T which is a MongoDB GUI client. After connecting to the client we can see our database in the tab. We open the specific collection we want and we can then look at the results using simple SQL queries. You can learn more about using Studio 3T [here](https://studio3t.com/getting-started/).

Querying our sentiment collection using "select * from sentiment_tb;" returns <img src = "diagram3.png">

We can also query our hashtag collection and get the top hashtags using "select * from hashtag_count_tb order by count desc;" which returns <img src = diagram4.png>

As you can see, I ran the application on a very small scale. If we decide to run the application on even a modest size cluster, Apache Storm provides us with the ability to process upto a million records per second per node. 

## Looking ahead

Although we covered a very simple example of processing tweets, Apache Storm + Apache Kafka can be used to build extremely large scale applications to get real-time analytics of a stream. Instead of having one kafka producer, we can have multiple producers producing to the same queue to get data at a faster rate. We can also have multiple topics with each kafka producer producing to a particular topic to get simultaneous analysis of data. Apache Storm can also have extremely complex topologies and we can do all kinds of processing on streams from image analysis to creating dynamic wordclouds and much much more. You can check out some use cases of Apache Storm [here](https://www.quora.com/What-are-some-of-the-use-cases-of-Apache-Storm) and [here](https://hortonworks.com/apache/storm/). To actually learn how to run a large scale application for big data processing using Apache Storm on AWS check out [this](https://cloudacademy.com/blog/how-to-deploy-apache-storm-on-aws/) link.