In [None]:
import sqlite3
import os
import tweepy
from tweepy.auth import OAuthHandler
import datetime
import pandas as pd
pd.set_option('display.max_colwidth', -1)
import json

## Setup database, credentials, and query

We create two `sqlite3` databases to store the tweets.

We will use the Search API to go back in time and save data to `histtweets.db`, and the Streaming API to grab tweets in realtime to `livetweets.db`. 

In [None]:
# Create first database and table (run once)

conn = sqlite3.connect("livetweets.db")
c = conn.cursor()
c.execute("""CREATE TABLE tweets (
id TEXT,
created_at TEXT,
author TEXT,
author_location TEXT,
author_followers INT,
author_friends INT,
hashtags TEXT,
tweet TEXT,
in_reply_to TEXT,
lang TEXT,
method TEXT,
UNIQUE(id))
""")

print("Database created.")

In [None]:
# Create second database and table (run once)

conn = sqlite3.connect("histtweets.db")
c = conn.cursor()
c.execute("""CREATE TABLE tweets (
id TEXT,
created_at TEXT,
author TEXT,
author_location TEXT,
author_followers INT,
author_friends INT,
hashtags TEXT,
tweet TEXT,
in_reply_to TEXT,
lang TEXT,
method TEXT,
UNIQUE(id))
""")

print("Database created.")

Set Twitter credentials and authorise with the API.

In [None]:
consumer_key = ""
consumer_secret = ""
access_token = ""
access_secret = ""

In [None]:
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth)

Both API searches and streams can be quite refined based on user accounts and so on. We focus here on searching/streaming for a set of **keywords**.

In [None]:
# Enter (manually) a Python formatted list of keywords
keywords = ['fire', 'power']

# Generate queries formatted for the Search and Streaming API respectively.
searchquery = " OR ".join(keywords)
streamingquery = keywords # same format

print(searchquery)
print(streamingquery)

For the historical download, we go 2 days back in time. This means getting tweets from today (up until the point of launching the search), yesterday (1), and the day before yesterday (2). The API will return tweets from all hours of these two days regardless of what time during today the script is started.

In [None]:
start = str(datetime.date.today() - datetime.timedelta(days=2))

## First, search backwards

This is a linear notebook version of this method. In practice, it is better to run the backwards and forwards search at the same time, to get as many tweets as possible. For the sake of illustration, we run them here one at a time. First, the backwards search. It is probably good to re-run this search an extra time before moving to the next step, to get any new tweets that have been posted while the search was running. 

The code below connects to the Search API and starts retrieving tweets.

Tweets will be downloaded in reverse order, starting from now and going back in time. This means that in many cases we may not want the full 2 days back, but can break the process manually once we have reached the point in time from where we want data. 

If there are many results, the script will pause from time to time due to rate limits.

In [None]:
conn = sqlite3.connect('histtweets.db')

c = tweepy.Cursor(api.search,
                  q=searchquery,
                  since = start,
                  wait_on_rate_limit = True,
                  wait_on_rate_limit_notify=True).items()

In [None]:
starttime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("Searching backwards since " + str(starttime))

while True:
    try:
        tweet = c.next()
        json_tweet = tweet._json     
        
        try:  
            tweet_id = json_tweet['id_str']
            tweet_created_at = json_tweet['created_at']
            tweet_author = json_tweet['user']['screen_name']           
            author_location = json_tweet['user']['location']
            author_followers = json_tweet['user']['followers_count'] if not None else 0
            author_friends = json_tweet['user']['friends_count'] if not None else 0
            hashtags = json_tweet['entities']['hashtags']
            tweet_hashtags = []
            for hashtag in hashtags:
                tweet_hashtags.append("#" + str(hashtag['text']))
            tweet_hashtags = ",".join(tweet_hashtags)
            tweet_text = json_tweet['text']
            to_whom = json_tweet['in_reply_to_screen_name']
            tweet_lang = json_tweet['lang']
            conn.execute('INSERT INTO tweets (id, created_at, author, author_location, author_followers, author_friends, hashtags, tweet, in_reply_to, lang, method) VALUES (?,?,?,?,?,?,?,?,?,?,?)', (tweet_id, tweet_created_at, tweet_author, author_location, author_followers, author_friends, tweet_hashtags, tweet_text, to_whom, tweet_lang, "StreamingAPI"))
            conn.commit()
            cursor = conn.cursor()
            cursor.execute("select * from tweets")
            r = cursor.fetchall() 
            print("\rTweet from " + str(tweet.created_at) + " (" + str(len(r)) +")              ", end='')     
        except KeyError:
            print("Key Error")

        except sqlite3.IntegrityError:
            print("\rAlready in database, continuing...", end="")

    except IOError:
        time.sleep(60*5)
        continue
    except StopIteration:
        break

print("Done!")
conn.close()

In [None]:
# Number of entries in the database
cursor = conn.cursor()
cursor.execute("select * from tweets")
hist = str(len(cursor.fetchall()))
print("Historical tweets: " + hist)

## Second, stream forward



Now, collect data in real time by starting a listener for the Streaming API.

In [None]:
conn = sqlite3.connect('livetweets.db')

In [None]:
# Create a twitter listener

class MyStreamListener(tweepy.StreamListener):
    
    def on_data(self, data):
        data = json.loads(data)
        tweet_id = int(data['id'])
        tweet_created_at = data['created_at']
        tweet_author = data['user']['screen_name']
        author_location = data['user']['location']
        author_followers = data['user']['followers_count'] if not None else 0
        author_friends = data['user']['friends_count'] if not None else 0
        hashtags = data['entities']['hashtags']
        tweet_hashtags = []
        for hashtag in hashtags:
            tweet_hashtags.append("#" + str(hashtag['text']))
        tweet_hashtags = ",".join(tweet_hashtags)
        tweet_text = data['text']
        in_reply_to = data['in_reply_to_screen_name']
        tweet_lang = data['lang']
        conn.execute('INSERT INTO tweets (id, created_at, author, author_location, author_followers, author_friends, hashtags, tweet, in_reply_to, lang, method) VALUES (?,?,?,?,?,?,?,?,?,?,?)', (tweet_id, tweet_created_at, tweet_author, author_location, author_followers, author_friends, tweet_hashtags, tweet_text, in_reply_to, tweet_lang, "StreamingAPI"))
        conn.commit()
        cursor = conn.cursor()
        cursor.execute("select * from tweets")
        r = cursor.fetchall() 
        print("\rTweet from " + str(tweet_created_at[:-10]) + " (" + str(len(r)) +")              ", end='')
            
# Create a stream
twitter_stream = tweepy.Stream(auth, MyStreamListener())

# Start streaming and check for errors
starttime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("Running since " + str(starttime))

while True:
    try:
        twitter_stream.filter(track=streamingquery)
    except KeyError:
        pass
    except sqlite3.IntegrityError: # skip duplicate tweet ids
        pass

## Merge the two databases

In [None]:
# Get connections to the databases
db_a = sqlite3.connect('livetweets.db')
db_b = sqlite3.connect('histtweets.db')

# Get the contents of a table
b_cursor = db_b.cursor()
b_cursor.execute('SELECT * FROM tweets')
output = b_cursor.fetchall()   # Returns the results as a list.

# Insert those contents into another table.
a_cursor = db_a.cursor()
for row in output:
    try:
        a_cursor.execute('INSERT INTO tweets VALUES (?,?,?,?,?,?,?,?,?,?,?)', row)
    except sqlite3.IntegrityError: # skip duplicate tweet ids
        pass

# Cleanup
db_a.commit()
a_cursor.close()
b_cursor.close()

# Rename the merged db, and delete the other
os.rename('livetweets.db', 'tweets.db')
os.remove('histtweets.db')

## Look at the data

It is worth noting that for the `in_reply_to`

In [None]:
# Read sqlite query results into a pandas DataFrame
conn = sqlite3.connect("tweets.db")
tweets_df = pd.read_sql_query("SELECT * from tweets", conn)
tweets_df = df.sort_values(by="created_at")
tweets_df = tweets_df.replace({'\n': ' '}, regex=True) # remove linebreaks in the dataframe
tweets_df = tweets_df.replace({'\t': ' '}, regex=True) # remove tabs in the dataframe
tweets_df = tweets_df.replace({'\r': ' '}, regex=True) # remove carriage return in the dataframe
tweets_df.head()

In [None]:
tweets_df.tail()