In [None]:
#Import the libraries

import tweepy #py lib for twitter API 
from tweepy import OAuthHandler  #authentication for twitter API
from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import configparser
from kafka import KafkaProducer
from kafka.errors import KafkaError

In [None]:
#Credentials for accessing Twitter API
#Read config data

config = configparser.ConfigParser()
config.read("../resources/config.ini")

consumer_key = config['auth']['consumer_key']
consumer_secret = config['auth']['consumer_secret']
access_token = config['auth']['access_token']
access_secret = config['auth']['access_secret']

filter_key = config['stream']['filter']

broker = config['kafka']['producer']
topic = config['kafka']['topic']

In [None]:
class TweetsListener(StreamListener) :
    
    #Print twitter-stream to stdout
    def on_data(self, data):
        try:
            msg = json.loads(data)
            if msg['truncated'] is True:
                tweet = msg['extended_tweet']['full_text'] 
            else:
                tweet = msg['text']
            print(tweet)
            send_kafka(tweet)
            return True
        except BaseException as e:
            print("Error : %s" %str(e))
        return True
    
    #Deal with errors - printing on console 
    def on_error(self, status):
        print(status)
        return True

#With valid credentials, start a filtered stream
def sendData():
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)
    twitter_stream = Stream(auth, TweetsListener(), tweet_mode='extended') #call to on_data
    twitter_stream.filter(track=[filter_key])  #add filter
    
#Send data to broker
def send_kafka(tweet):
    producer = KafkaProducer(bootstrap_servers=[broker])
    producer.send(topic, tweet.encode())
    producer.flush()

In [None]:
if __name__ == "__main__":
    
    #Generate Twitter Stream 
    sendData()