# Ingest Tweets using Twitter API, Kafka and Python

## Import Libraries

In [None]:
import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

### Twitter API Setup 
getting the API object using authorization information from twitter developer website

In [None]:
# twitter setup
access_token = ""
access_token_secret = ""
consumer_key = ""
consumer_secret = ""

# create the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)

# set the access token and secret
auth.set_access_token(access_token, access_token_secret)

# create the API object by passing in auth information
api = tweepy.API(auth)

In [None]:
class StdOutListener(StreamListener):
    def on_data(self, data):
        producer.send_messages("tweets-data", data.encode('utf-8'))
        print (data)
        return True
    def on_error(self, status):
        print (status)

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,10))
topic_name = 'tweets-data'
l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)

A helper function normalize_timestamp to normalize the time a tweet was created with the time of my system

In [None]:
from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime -= timedelta(hours=5)   # the tweets are timestamped in GMT timezone, while I am in -5 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S")) 

### Define the Kafka producer
specify the Kafka Broker

specify the topic name

In [None]:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,10))
topic_name = 'tweets-data'

### Produce and send records to the Kafka Broker
querying the Twitter API Object

extracting relevant information from the response

formatting and sending the data to proper topic on the Kafka Broker

resulting tweets have following attributes:
id,
text,
created_at,
followers_count,
location,
favorite_count,
retweet_count

In [None]:
def get_twitter_data():
    res = api.search("the")
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(i.text)
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        producer.send(topic_name, str.encode(record))

In [None]:
get_twitter_data()

### Deployment
perform the task every couple of minutes and wait in between

In [None]:
def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)

In [None]:
periodic_work(60*0.1)  # get data every couple of minutes