In [None]:
import tweepy
import time
from pyspark.sql import SparkSession
import pandas as pd
import utilities as util

In [None]:
config = util.read_yaml_config("./secrets/config.yaml")
bearer_token = config["twitter"]["oauth2.0"]["bearer_token"]
consumer_key = config["twitter"]["oauth1.0a"]["consumer_key"]
consumer_secret = config["twitter"]["oauth1.0a"]["consumer_secret"]
access_token = config["twitter"]["oauth1.0a"]["access_token"]
access_token_secret = config["twitter"]["oauth1.0a"]["access_token_secret"]

tweepy_client = tweepy.Client(
        bearer_token=bearer_token,
        consumer_key=consumer_key,
        consumer_secret=consumer_secret,
        access_token=access_token,
        access_token_secret=access_token_secret,
        wait_on_rate_limit=True
    )
tweepy_client

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Test") \
    .config('sprk.sql.session.timeZone', 'UTC') \
    .getOrCreate()
spark

In [None]:
# Expansions and fields for twitter api endpoint: tweets
expansions = ",".join([
    "attachments.poll_ids",
    "attachments.media_keys", 
    "author_id",
    "entities.mentions.username",
    "geo.place_id",
    "in_reply_to_user_id",
    "referenced_tweets.id",
    "referenced_tweets.id.author_id"
])

media_fields = ",".join([
    "duration_ms",
    "height",
    "media_key",
    "preview_image_url",
    "type",
    "url",
    "width",
    "public_metrics",
    "non_public_metrics",
    "organic_metrics",
    "promoted_metrics",
    "alt_text"
])

place_fields = ",".join([
    "contained_within",
    "country",
    "country_code",
    "full_name",
    "geo",
    "id",
    "name",
    "place_type"
])

poll_fields = ",".join([
    "duration_minutes",
    "end_datetime",
    "id",
    "options",
    "voting_status"
])

user_fields = ",".join([
    "created_at",
    "description",
    "entities",
    "id",
    "location",
    "name",
    "pinned_tweet_id",
    "profile_image_url",
    "protected",
    "public_metrics",
    "url",
    "username",
    "verified",
    "withheld"
])

tweet_fields = ",".join([
    "attachments",
    "author_id",
    "context_annotations",
    "conversation_id",
    "created_at",
    "entities",
    "geo",
    "id",
    "in_reply_to_user_id",
    "lang",
    "possibly_sensitive",
    "public_metrics",
    "referenced_tweets",
    "reply_settings",
    "source",
    "text",
    "withheld"
])

In [None]:
start_time = time.time()

util.create_dir("./data/tweets")

# Read in file with all ids
spark.read.option("header", True).option("delimiter", ",").csv("./data/reCOVery/recovery-social-media-data.txt").createOrReplaceTempView("tweets")

# Add index to rows for faked "pagination"
spark.sql("""
SELECT *, monotonically_increasing_id() AS idx
FROM tweets
""").createOrReplaceTempView("tweets2")

# Loop over files hitting twitter endpoint with 100 ids at a time
count = 0
start = 0
end = start+99
while start < 140820: # row count
    # get ids
    ids = ",".join(spark.sql(f"""
        SELECT tweet_id, idx
        FROM tweets2
        WHERE idx BETWEEN {start} AND {end}
    """).toPandas()['tweet_id'])
    start = end+1
    end = start+99

    response = tweepy_client.request(
        method="GET", 
        route="/2/tweets", 
        params={
            "ids": ids,
            "tweet.fields": tweet_fields,
            "expansions": expansions,
            "media.fields": media_fields,
            "place.fields": place_fields,
            "poll.fields": poll_fields,
            "user.fields": user_fields
        }, 
        user_auth=True
    )
    response_json = response.json()
    util.write_json_pretty_to_file(response_json, f"./data/tweets/{str(count).zfill(9)}.json")
    count+=1

print("--- %s seconds ---" % (time.time() - start_time))