# Tweet Thread Augmentation

Many tweets are part of **threads**, which consist of multiple tweets in a linked-list sequence of replies to one another. Since some of the tweets may not have contained the original coronavirus keywords, this step pulls tweets in threads for which at least one tweet is in the dataset. This consists of three steps:

1. Extract **upstream tweets**, which are explicitly linked to in the `in_reply_to_status_id_str` field of the tweet. We can do this using a simple hydrate command with Twarc.
2. Extract **downstream tweets**, which are not explicitly linked. Instead, we look for tweets in each user's timeline within a two-day window on either side of their tweets in the dataset, and recursively find the tweets that link back to the original dataset.
3. Join these two sets together with the original tweet dataset, and assign each tweet a **thread ID**.

In [1]:
import twarc
import json
import time
import shutil
import pandas as pd
import numpy as np
import os
import datetime
import utils
import matplotlib.pyplot as plt

### Paths

Input the paths to your Twarc credentials, and input and output paths below.

In [2]:
# Path to Twarc credentials path (usually expanded version of ~/.twarc)
credentials_path = "/Users/venkatesh-sivaraman/.twarc"

# Path to CSV file batches
input_dir = "/path/to/directory/containing/csvs"

# Path to scratch directory for intermediate results
intermediate_dir = "../data/intermediates"
if not os.path.exists(intermediate_dir):
    os.mkdir(intermediate_dir)
    
# Path to output directory
output_dir = "../data/tweets"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

In [None]:
# Load input tweets
filenames = [os.path.join(input_dir, path) for path in os.listdir(input_dir)
                 if path.endswith(".csv") and not path.startswith(".")]
df = pd.concat([pd.read_csv(filename, dtype=utils.dtype_spec, lineterminator='\n')
                for filename in filenames])
df = df.loc[:, ~df.columns.str.contains('^Unnamed')].reset_index(drop=True)

print(len(df), "tweets")
df.head()

In [5]:
df.to_csv(os.path.join(intermediate_dir, "initial_doctor_tweets.csv"), line_terminator='\n')

In [6]:
# Load Twarc object
t = utils.load_twarc(credentials_path)

# Upstream Tweets

We want to extract specifically all messages that were replied to by a tweet in the dataset, or that reply to a tweet in the dataset.


In [None]:
reply_ids = df[~pd.isna(df.reply_to_id) & (df.reply_to_user == df.user_id)].reply_to_id.unique().tolist()
print("{} reply IDs".format(len(reply_ids)))

In [None]:
# Recursively extract replies
seen_ids = set()
reply_ids = list(set(reply_ids))
hydrated_replies = []
i = 0

while reply_ids:
    print("Round {}, {} tweets to hydrate".format(i, len(reply_ids)))
    new_replies = list(t.hydrate(reply_ids))
    hydrated_replies += new_replies
    seen_ids |= set([tweet["id_str"] for tweet in new_replies])
    # Mark tweets that are in reply to a message by the same user for the next round
    reply_ids = [tweet["in_reply_to_status_id_str"] for tweet in new_replies
                 if tweet["in_reply_to_status_id_str"] is not None and
                 tweet["in_reply_to_status_id_str"] not in seen_ids and
                 tweet["in_reply_to_user_id_str"] == tweet["user"]["id_str"]]
    i += 1

# Write upstream tweets as JSON
print("Writing JSON...")
upstream_tweets = []
with open(os.path.join(intermediate_dir, "all_upstream_tweets.json"), "w") as file:
    for item in hydrated_replies:
        tweet = json.dumps(item)
        file.write(tweet + "\n")
        upstream_tweets.append(utils.json_to_tweet(item))

print("Writing CSV...")
upstream_df = pd.DataFrame(upstream_tweets)
upstream_df.to_csv(os.path.join(intermediate_dir, "all_upstream_tweets.csv"),
                   line_terminator='\n')
print("Wrote {} upstream tweets.".format(len(hydrated_replies)))

# Downstream Tweets

Next use user timelines to find tweets that reply to tweets in the dataset. To do this efficiently, we find a unique set of users and find a consensus date window in which to search for tweets. We assume that replies occur within two days of the original tweet.

In [None]:
# First establish a list of reference IDs for each date
def get_date(tweet, day_delta=0):
    date = datetime.datetime.strptime(tweet['created_at'],'%a %b %d %H:%M:%S +0000 %Y')
    if day_delta != 0:
        date = date + datetime.timedelta(days=day_delta)
    return datetime.date.strftime(date, '%Y-%m-%d')

min_ids = {}
max_ids = {}
for i, tweet in df.iterrows():
    if i % 100000 == 0:
        print(i)
    id_num = int(tweet["id"])
    date = get_date(tweet)
    min_ids[date] = min(min_ids.get(date, 1e30), id_num)
    max_ids[date] = max(max_ids.get(date, 0), id_num)

print(sorted(min_ids.items())[-5:], sorted(max_ids.items())[-5:])

In [None]:
# Now get a set of users and the required search dates
user_dates = {}

tweets_with_replies = df[~pd.isna(df.reply_to_id) & (df.reply_to_user == df.user_id)]
for i, tweet in tweets_with_replies.iterrows():
    user = tweet["user_id"]
    min_date = get_date(tweet, -2)
    max_date = get_date(tweet, 2)
    if user in user_dates:
        user_dates[user] = (min(min_date, user_dates[user][0]),
                          min(max_date, user_dates[user][1]))
    else:
        user_dates[user] = (min_date, max_date)

print("{} users".format(len(user_dates)))

In [None]:
# Fill in reference IDs for dates that aren't in the set. To do this, we'll create two
# rough, conservative linear models for tweet IDs over time by estimating the increment
# in the minimum and maximum tweet IDs per day.

# Note that we're going to give each tweet a two-day interval on either side, so the
# exactness of this estimate isn't important except to improve the performance of the
# tweet scraper.

available_days = sorted(min_ids.keys())
series = [available_days[0]]
current = series[-1]

min_id_items = []
max_id_items = []

date_index = 0
while current != available_days[-1]:
    date = datetime.datetime.strptime(current, '%Y-%m-%d')
    current = datetime.date.strftime(date + datetime.timedelta(days=1), '%Y-%m-%d')
    if current in min_ids:
        min_id_items.append((date_index, min_ids[current]))
    if current in max_ids:
        max_id_items.append((date_index, max_ids[current]))
    series.append(current)
    date_index += 1

min_inc_per_day = (min_id_items[-1][1] - min_id_items[0][1]) / (min_id_items[-1][0] - min_id_items[0][0])
max_inc_per_day = (max_id_items[-1][1] - max_id_items[0][1]) / (max_id_items[-1][0] - max_id_items[0][0])
earliest_date = datetime.datetime.strptime(available_days[0], '%Y-%m-%d')

def get_min_id(date_str):
    """Estimates the minimum tweet ID for the given date string, in the format YYYY-MM-DD."""
    date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
    days = (date - earliest_date).days
    return min_ids[available_days[0]] + days * min_inc_per_day

def get_max_id(date_str):
    """Estimates the maximum tweet ID for the given date string, in the format YYYY-MM-DD."""
    date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
    days = (date - earliest_date).days
    return max_ids[available_days[0]] + days * max_inc_per_day

print("Estimate:", get_min_id("2020-02-04"), "Actual:", min_ids["2020-02-04"])
print("Estimate:", get_max_id("2020-02-08"), "Actual:", max_ids["2020-02-08"])

In [None]:
current_batch = []
current_csv = []
batch_idx = 0

user_items = sorted(user_dates.items())

i = 0
for i, (user_id, (min_date, max_date)) in enumerate(user_items):
    if i % 100 == 0:
        # Periodically sleep to appease the Twitter rate limiting gods
        print(i)
        time.sleep(20)
    i += 1
    
    # Compute the boundary tweet IDs needed to search the Twitter timeline for this user
    min_id = int(get_min_id(min_date))
    max_id = int(get_max_id(max_date))
    for tweet in t.timeline(user_id=user_id, max_id=max_id, since_id=min_id):
        current_batch.append(tweet)
        current_csv.append(utils.json_to_tweet(tweet))

    if i % 1000 == 0:
        print("Writing")
        with open(os.path.join(intermediate_dir, "timeline_tweets_{}.json".format(batch_idx)), "w") as file:
            for item in current_batch:
                file.write(json.dumps(item) + "\n")

        batch_df = pd.DataFrame(current_csv)
        batch_df.to_csv(os.path.join(intermediate_dir, "timeline_tweets_{}.csv".format(batch_idx)),
                      line_terminator="\n")
        batch_idx += 1
        current_batch = []
        current_csv = []
        
# Write out the stragglers
print("Writing last batch")
with open(os.path.join(intermediate_dir, "timeline_tweets_{}.json".format(batch_idx)), "w") as file:
    for item in current_batch:
        file.write(json.dumps(item) + "\n")

batch_df = pd.DataFrame(current_csv)
batch_df.to_csv(os.path.join(intermediate_dir, "timeline_tweets_{}.csv".format(batch_idx)), line_terminator='\n')
batch_idx += 1
current_batch = []
current_csv = []

# Putting It All Together

We want to read all the timeline tweets, as well as the original tweet sets and upstream tweets, and put together a directed acyclic graph of tweet replies. This DAG will contain many individual components that are disconnected from each other, corresponding to tweet groups by individual users. We find all tweets in reply graphs that contain at least one tweet in the original dataset, and label those as belonging to a single "thread."

In [None]:
batch_idx = 0
path = os.path.join(intermediate_dir, "timeline_tweets_{}.csv".format(batch_idx))
timelines = None
while os.path.exists(path):
    print("Reading {}...".format(os.path.basename(path)))
    sub_df = pd.read_csv(path, dtype=utils.dtype_spec, lineterminator='\n')
    if timelines is None:
        timelines = sub_df
    else:
        timelines = pd.concat([timelines, sub_df])
    batch_idx += 1
    path = os.path.join(intermediate_dir, "timeline_tweets_{}.csv".format(batch_idx))
timelines = timelines.loc[:, ~timelines.columns.str.contains('^Unnamed')].reset_index(drop=True)    

timelines.head()

In [None]:
# Let's build a set of all the threaded tweets we know about.

# Uncomment to load initial tweets from a previous run
# df = pd.read_csv(os.path.join(intermediate_dir, "initial_doctor_tweets.csv"), dtype=utils.dtype_spec, index_col=0, lineterminator='\n')

all_threaded_tweets = pd.concat([
    timelines,
    pd.read_csv(os.path.join(intermediate_dir, "all_upstream_tweets.csv"), dtype=utils.dtype_spec, index_col=0, lineterminator='\n'),
    df
])

print("{} tweets total".format(len(all_threaded_tweets)))

In [None]:
# 6/22/20: The old approach to threads may have been incorrect because some tweets have multiple
# replies, so the graph is more of a DAG than a linked list. We need to find the correct tweets
# to concatenate in these cases.

downstream_replies = {}
upstream_replies = {}

dedup_tweets = all_threaded_tweets.drop_duplicates("id")
dedup_tweets["id_num"] = dedup_tweets["id"].astype(int)
dedup_tweets = dedup_tweets.sort_values("id_num", ascending=False).reset_index()
print("Removed {} tweets".format(len(all_threaded_tweets) - len(dedup_tweets)))

tweet_ids = {row["id"]: i for i, row in dedup_tweets.iterrows()}

missing_tweets = 0
for i, row in dedup_tweets.iterrows():
    if i % 100000 == 0: print(i, len(downstream_replies), len(upstream_replies))
    if row["reply_to_id"] and row["reply_to_user"] == row["user_id"]:
        if row["reply_to_id"] not in tweet_ids:
            missing_tweets += 1
            continue
        reply_index = tweet_ids[row["reply_to_id"]]
        downstream_replies.setdefault(reply_index, set()).add(i)
        upstream_replies.setdefault(i, set()).add(reply_index)
print("Missing tweets: {}".format(missing_tweets))

In [None]:
# 6/22/20: Combine tweet groups containing self-replies
tweet_groups = []
non_linear_tweet_groups = []
seen_tweets = set()

for base_id, upstream_tweets in upstream_replies.items():
    if base_id in seen_tweets:
        continue

    # Travel upward to the root of this tweet thread
    assert len(upstream_tweets) == 1
    root = list(upstream_tweets)[0]
    while root in upstream_replies:
        up = upstream_replies[root]
        assert len(up) == 1
        root = list(up)[0]
    
    # Grab everything downstream of this root
    group = set()
    queue = [root]
    nonlinear = False
    while queue:
        curr = queue.pop(0)
        assert curr not in group
        assert curr not in seen_tweets
        group.add(curr)
        seen_tweets.add(curr)
        queue += list(downstream_replies.get(curr, set()))
        if len(downstream_replies.get(curr, set())) > 1:
            nonlinear = True
            
    tweet_groups.append(group)
    if nonlinear:
        non_linear_tweet_groups.append(group)
    if len(tweet_groups) % 10000 == 0:
        print("{} tweet groups so far ({} nonlinear)".format(len(tweet_groups), len(non_linear_tweet_groups)))

In [None]:
# Debug: pick a tweet group and look at its structure

def print_group(group):
    root = list(group)[0]
    while root in upstream_replies:
        up = upstream_replies[root]
        assert len(up) == 1
        root = list(up)[0]
    
    # Grab everything downstream of this root
    queue = [root]
    print_ids = {}
    while queue:
        curr = queue.pop(0)
        tweet = dedup_tweets.iloc[curr]
        print("{}: {} said at {} in reply to {}:".format(tweet["id"], tweet["screen_name"], tweet["created_at"], print_ids.get(tweet["reply_to_id"], -1)))
        print(tweet["full_text"])
        if len(downstream_replies.get(curr, set())) > 1:
            print("NONLINEAR")
        print("=" * 40)
        print_ids[tweet["id"]] = len(print_ids)
        queue += list(downstream_replies.get(curr, set()))
    
sample_group = non_linear_tweet_groups[np.random.choice(len(non_linear_tweet_groups))]
print_group(sample_group)

In [12]:
# Optionally filter threaded tweets by those posted within this time interval of the tweet in the dataset.
## To avoid arbitrary cutoffs, we currently don't use this interval.

time_period = datetime.timedelta(days=1)

In [None]:
# For each tweet in the original dataframe, grab tweets in the thread within an hour of each tweet and call
# them a thread.

thread_ids = {}
current_thread = 0

df["id_num"] = df["id"].astype(int)
sorted_original_tweets = df.sort_values(by="id_num", ascending=True).reset_index()

for i, row in sorted_original_tweets.iterrows():
    if i % 100000 == 0:
        print(i)
    if row["id"] in thread_ids:
        continue
    thread_ids[row["id"]] = current_thread

    if time_period:
        timestamp = datetime.datetime.strptime(row.created_at, "%a %b %d %H:%M:%S +0000 %Y")
        lower_time_bound = timestamp - time_period
        upper_time_bound = timestamp + time_period
    
    # Grab upstream tweets
    curr = tweet_ids[row["id"]]
    while curr in upstream_replies:
        curr = list(upstream_replies[curr])[0]
        tweet = dedup_tweets.iloc[curr]
        
        if time_period:
            curr_ts = datetime.datetime.strptime(tweet.created_at, "%a %b %d %H:%M:%S +0000 %Y")
            if curr_ts < lower_time_bound or curr_ts > upper_time_bound:
                break

        thread_ids[tweet["id"]] = current_thread
    
    # Downstream tweets
    queue = [tweet_ids[row["id"]]]
    while queue:
        curr = queue.pop(0)
        if curr not in downstream_replies:
            continue
        tweet = dedup_tweets.iloc[curr]
        
        if time_period:
            curr_ts = datetime.datetime.strptime(tweet.created_at, "%a %b %d %H:%M:%S +0000 %Y")
            if curr_ts < lower_time_bound or curr_ts > upper_time_bound:
                continue

        thread_ids[tweet["id"]] = current_thread
        
        queue += list(downstream_replies[curr])

    current_thread += 1
    
print("{} thread IDs".format(len(thread_ids)))

In [None]:
present_tweets = dedup_tweets[dedup_tweets.id.isin(thread_ids)]
present_tweets["thread_id"] = present_tweets.id.map(lambda id: thread_ids[id])
present_tweets = present_tweets.sort_values(by="id", ascending=True)
present_tweets = present_tweets.loc[:, ~present_tweets.columns.str.contains('^Unnamed|^index$|^level_')].reset_index(drop=True)    
present_tweets.head()

In [16]:
present_tweets.to_csv(os.path.join(output_dir, "thread_annotated_tweets.csv"),
                 line_terminator="\n")

# Sanity Checks

In [None]:
# Find some example threads by screen name
test_screen_name = "AdamJKucharski"

def print_thread_groupby(thread):
    for i, tweet in thread.iterrows():
        print(tweet.id, tweet.full_text)
    print("===")
    print("")

screen_name_tweets = present_tweets[present_tweets.screen_name == test_screen_name]
screen_name_tweets.groupby(['thread_id']).apply(print_thread_groupby)

In [None]:
# Sanity check for if the threads are being extracted correctly:
# Many tweet threads contain pagination markers, like 1/5, 2/5, etc.
# Here we extract tweets that contain the ending pagination marker, i.e. the numerator and denominator
# are the same value, and look at the threads that contain those tweets. We should see the full numerical
# range of indexes from 1 to n for each thread.

thread_flags = present_tweets[present_tweets.full_text.str.contains(r"(\d+)/\1\b")]
for i, start_tweet in thread_flags.iloc[50:60].iterrows():
    thread_group = present_tweets[present_tweets.thread_id == start_tweet.thread_id]
    for j, tweet in thread_group.iterrows():
        print("{}: {} said at {} in reply to {}:".format(tweet["id"], tweet["screen_name"], tweet["created_at"], tweet["reply_to_id"]))
        print(tweet["full_text"])
        print("=" * 40)
    print("\n\n")