In [3]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tweepy
from kafka import KafkaProducer
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from tweepy import Stream
import json

# Declare a Python class (MyListener) that inherit from tweepy.StreamListener
# Then we override the on_status method that will be automatically used once
# Twitter "sends" us a new tweet.
class MyListener(StreamListener):

  def __init__(self, producer, producer_topic):
    super().__init__()
    self.producer = producer
    self.producer_topic = producer_topic

  def on_status(self, status):
    if "extended_tweet" in status._json: # verifica che il tweet sia esteso 
        tweet = {
          'date': str(status.created_at),
          'user_id': str(status.user.id),
          'username': status.user.name,
          'screen_name': status.user.screen_name,
          'text': status.extended_tweet["full_text"],
          'hashtags': []
        }
        if status.entities.get('hashtags') is not None:
          hashtags = status.entities.get('hashtags')
          tweet['hashtags'] = [ h.get('text') for h in hashtags ]
        self.producer.send(topic=self.producer_topic, value=tweet)
        
    else:
        tweet = {
          'date': str(status.created_at),
          'user_id': str(status.user.id),
          'username': status.user.name,
          'screen_name': status.user.screen_name,
          'text': status.text,
          'hashtags': []
        }
        if status.entities.get('hashtags') is not None:
          hashtags = status.entities.get('hashtags')
          tweet['hashtags'] = [ h.get('text') for h in hashtags ]
        self.producer.send(topic=self.producer_topic, value=tweet)
    
# Set consumer key, consumer secret, access token and access token secret
# Read them from your Twitter dev page
consumer_key = 
consumer_secret = 
access_token = 
access_token_secret = 

# Authenticate with the OAuth protocol
# See: https://en.wikipedia.org/wiki/OAuth
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

# Get the API object passing the authentication object
# See http://docs.tweepy.org/en/latest/api.html to discover the available methods
api = tweepy.API(auth)

producer = KafkaProducer(
  bootstrap_servers=["kafka:9092"],
  value_serializer=lambda v: json.dumps(v).encode("utf-8"))

listener = MyListener(producer=producer, producer_topic="OscarsActors")
# We need an api to stream, so we reuse the api object we create before.
# Thus, we create a Stream object.
stream = Stream(auth = api.auth, listener=listener, tweet_mode='extended')

# A number of twitter streams are available through Tweepy.
# For more information on the capabilities and limitations of the different
# streams see Twitter Streaming API Documentation.

# In this example we will use filter to stream all tweets containing the word "Milano".
# The track parameter is an array of search terms to stream.
stream.filter(track=["rizwanahmed oscars,chadwickboseman oscars,AnthonyHopkins oscars,Gary Oldman oscars,steveyeun oscars,violadavis oscars,AndraDayMusic oscars,VanessaKirby oscars,Frances Mcdormand oscars,Carey Mulligan oscars,SachaBaronCohen oscars,Daniel Kaluuya oscars,leslieodomjr oscars,Paul Raci oscars,Lakeith Stanfield oscars,Maria Bakalova oscars,Glenn Close oscars,Olivia Colman oscars,AmandaSeyfried oscars,Yuh-Jung Youn oscars"], languages=["en"])

KeyboardInterrupt: 