In [None]:
from dotenv import load_dotenv
import os
import json
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

In [None]:
load_dotenv()

ACCESS_TOKEN = os.getenv('ACCESS_TOKEN')
ACCESS_TOKEN_SECRET = os.getenv('ACCESS_TOKEN_SECRET')
CONSUMER_KEY = os.getenv('CONSUMER_KEY')
CONSUMER_SECRET = os.getenv('CONSUMER_SECRET')

In [None]:
topic_name = 'tweet-stream'
words_to_track = ["#crypto"]

In [None]:
def create_topic(topic_name=None, num_partitions=1, replication_factor=1):
    try:
        admin_client = KafkaAdminClient(
            bootstrap_servers="kafka-server:9092", 
            client_id='test'
        )

        topic_list = []
        topic_list.append(NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor))
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
        
        print(f"{topic_name} created!")
    
    except Exception as err:
        
        print(f"Request for topic creation is failing due to {err}")
        return False

In [None]:
def create_producer():
    
    producer = KafkaProducer(
        bootstrap_servers='kafka-server:9092', #Kafka server
        value_serializer=lambda v: json.dumps(v).encode('utf-8') #json serializer
        )
    
    return producer

In [None]:
# Create a handler for the streaming data
class StdOutListener(StreamListener):
    
    def __init__(self, topic_name, producer):
        self.topic_name = topic_name
        self.producer = producer

    def on_data(self, data):
        try:
            msg = json.loads(data)
            # print(msg['text'].encode('utf-8'))
            
            #Send msg to topic
            producer.send(self.topic_name, str(msg['text']))
        
        except BaseException as e:
            print("Error on_data: %s" % str(e))     
        
        return True
                
    def on_error(self, status):
        
        print('Got an error with status code: ' + str(status_code))
        return True # To continue listening

    def on_timeout(self):

        print('Timeout...')
        return True # To continue listening


In [None]:
class TwitterStreamer():
    """
    Class for streaming and processing live tweets.
    """
    def __init__(self):
        pass

    def stream_tweets(self, topic_name, producer, words_to_track):
        # This handles Twitter authetification and the connection to Twitter Streaming API
        listener = StdOutListener(topic_name, producer)
        auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
        stream = Stream(auth, listener)

        # This line filter Twitter Streams to capture data by the keywords:= 
        stream.filter(track=words_to_track)

In [None]:
if __name__ == '__main__':
    
    create_topic(topic_name)
    producer = create_producer()
 
    twitter_streamer = TwitterStreamer()
    twitter_streamer.stream_tweets(topic_name, producer, words_to_track)