In [1]:
# Import all the requirements
import configparser
import tweepy
from kafka import SimpleProducer, KafkaClient
import json
from datetime import datetime

In [2]:
class TwitterStreamListener(tweepy.StreamListener):
    # A class to read the tweet tream and push it to Kafka
    def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()
        client = KafkaClient("localhost:9092")
        self.producer = SimpleProducer(client, async = True,
                          batch_send_every_n = 1000, batch_send_every_t = 10)
        
    def on_status(self, status):
        # This method is called whenever new data arrives from live stream.
        # We asynchronously push this data to kafka queue
        timestamp = int(status.timestamp_ms)//1000
        isoformat_date = datetime.utcfromtimestamp(timestamp).isoformat()
        msg = json.dumps({"text":status.text, "timestamp": isoformat_date}).encode()
        print(msg)
        try:
            self.producer.send_messages('got', msg)
        except Exception as e:
            print('--',e)
            return False
        return True
    
    def on_error(self, status_code):
        print("Error received in kafka producer", status_code, self)
        return True # Don't kill the stream

    def on_timeout(self):
        return True # Don't kill the stream

In [None]:
# Read the access credentials
config = configparser.ConfigParser()
config.read('twitter.txt')
consumer_key = config['DEFAULT']['consumerKey']
consumer_secret = config['DEFAULT']['consumerSecret']
access_key = config['DEFAULT']['accessToken']
access_secret = config['DEFAULT']['accessTokenSecret']

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)

# Start extracting from the stream
stream = tweepy.Stream(auth, listener=TwitterStreamListener(api))
# Keywords we're following
stream.filter(track=["#got", "#gameofthrones", "#winteriscoming", "#winterishere", "#winterfell", "#gotseason", "#asongoficeandfire"], languages=['en'])

b'{"text": "RT @thronesfacts: #GameofThrones S8 E3\'s the Battle of Winterfell took 55 nights just to film the fight scenes \\u2694\\ufe0f\\n\\nThe previous record was\\u2026", "timestamp": "2019-04-27T18:49:06"}'
b'{"text": "RT @Lupita_Nyongo: Waiting for #gameofthrones tomorrow like... https://t.co/2rhFSU1Fgm", "timestamp": "2019-04-27T18:49:07"}'
b'{"text": "@HBO ruining everyone\\u2019s weekends \\ud83d\\ude02 start the show already! \\ud83d\\udc3a", "timestamp": "2019-04-27T18:49:08"}'
b'{"text": "RT @Lupita_Nyongo: Waiting for #gameofthrones tomorrow like... https://t.co/2rhFSU1Fgm", "timestamp": "2019-04-27T18:49:10"}'
b'{"text": "RT @irresistewart: I got through #AvengersEndgame (barely - not entirely - ok, I\'m still grieving, I\'ll never get over it)\\n\\nSo\\nI can also g\\u2026", "timestamp": "2019-04-27T18:49:11"}'
b'{"text": "RT @Mets: Send a raven (or just RT) for your chance to win a @Noahsyndergaard #GameofThrones bobblehead! https://t.co/OXvoZkr7Bt", "timestamp": "201