# API

In [None]:
consumer_key = 'your own'
consumer_secret = 'your own'
access_token = 'your own'
access_secret = 'your own'
bearer_token = 'your own'

import requests
import json
import time
import random
import os
import pandas as pd
endpoint_url = "https://api.twitter.com/2/tweets/search/recent"

rules = [
    {"value": '("Election2024" OR "USElection2024") -is:retweet lang:en', "tag": "Election2024"},
    {"value": '("Trump" OR "Trump2024") -is:retweet lang:en', "tag": "Trump2024"},
    {"value": '("Harris" OR "Kamala Harris") -is:retweet lang:en', "tag": "KamalaHarris"}
]

query_parameters = {
    "tweet.fields": "id,text,author_id,created_at,public_metrics,referenced_tweets",
    "user.fields": "id,name,username,created_at,description,location,verified,public_metrics",
    "expansions": "author_id",
    "max_results": 100,
}
def request_headers(bearer_token: str) -> dict:
    """
    Set up the request headers. 
    Returns a dictionary summarising the bearer token authentication details.
    """
    return {"Authorization": "Bearer {}".format(bearer_token)}
headers = request_headers(bearer_token)

def connect_to_endpoint(endpoint_url: str, headers: dict, parameters: dict) -> json:
    """
    Connects to the endpoint and requests data.
    Returns a json with Twitter data if a 200 status code is yielded.
    Programme stops if there is a problem with the request and sleeps
    if there is a temporary problem accessing the endpoint.
    """
    response = requests.request(
        "GET", url=endpoint_url, headers=headers, params=parameters
    )
    response_status_code = response.status_code
    if response_status_code != 200:
        if response_status_code >= 400 and response_status_code < 500:
            raise Exception(
                "Cannot get data, the program will stop!\nHTTP {}: {}".format(
                    response_status_code, response.text
                )
            )
        
        sleep_seconds = random.randint(5, 60)
        print(
            "Cannot get data, your program will sleep for {} seconds...\nHTTP {}: {}".format(
                sleep_seconds, response_status_code, response.text
            )
        )
        time.sleep(sleep_seconds)
        return connect_to_endpoint(endpoint_url, headers, parameters)
    return response.json()

def process_twitter_data(
    json_response: json,
    query_tag: str,
    tweets_data: pd.DataFrame,
) -> pd.DataFrame:
    """
    Adds new tweet/user information to the table of
    tweets/users and saves dataframes as pickle files,
    if data is avaiable.
    
    Returns the tweets and users updated dataframes.
"""
   
    if "data" in json_response.keys():
        #new = pd.DataFrame(json_response)
        new = pd.json_normalize(json_response)
        tweets_data = pd.concat([tweets_data, new])
        tweets_data.to_pickle("tweets_" + query_tag + ".pkl")

    return tweets_data

# Initialize two empty DataFrames to store tweet and user data
tweets_data = pd.DataFrame()

# Iterate over each rule, send requests, and retrieve tweet and user data
for i in range(len(rules)):
    query_parameters["query"] = rules[i]["value"]  # Set the query string for the current rule
    query_tag = rules[i]["tag"]  # Get the tag for the current rule for file naming

    # Connect to the Twitter API and get the JSON response
    json_response = connect_to_endpoint(endpoint_url, headers, query_parameters)
    
    # Process the JSON response and update the tweet data DataFrame
    tweets_data = process_twitter_data(
        json_response, query_tag, tweets_data
    )

    time.sleep(5)  # Pause for 5 seconds to comply with API rate limits

    # Check if there is a next page of data to retrieve
    while "next_token" in json_response["meta"]:
        query_parameters["next_token"] = json_response["meta"]["next_token"]  # Get the next page token

        # Connect to the Twitter API again to retrieve the next page of data
        json_response = connect_to_endpoint(endpoint_url, headers, query_parameters)
        
        # Process the JSON response and update the tweet and user data DataFrame
        tweets_data = process_twitter_data(
            json_response, query_tag, tweets_data
        )

        time.sleep(5)  # Pause for 5 seconds to comply with API rate limits

# Save tweet and user data as a JSON file
tweets_data.to_json("tweets_data.json", orient="records", lines=True, force_ascii=False)

print("Data has been successfully saved as a JSON file.")

In [None]:
import json
import pandas as pd

# Initialize a list to store all the 'data' sections
data_list = []

# Read the JSON file line by line
with open('tweets_data.json', 'r', encoding='utf-8') as file:
    for line in file:
        if line.strip():  # Ensure the line is not empty
            json_line = json.loads(line)
            if 'data' in json_line:  # Check if 'data' is in the JSON
                data_list.extend(json_line['data'])  # Add the 'data' section to the list

# Convert the list to a DataFrame
df1 = pd.DataFrame(data_list)

# Create a list to store flattened data
flat_data_list = []

# Process each tweet
for index, tweet in df1.iterrows():  # Use iterrows() to iterate over rows
    # Extract public_metrics and referenced_tweets
    public_metrics = tweet.get('public_metrics', {})
    referenced_tweets = tweet.get('referenced_tweets')
    
    if isinstance(referenced_tweets, list):  # If it is a list
        referenced_tweet_types = [t.get('type') for t in referenced_tweets if t.get('type') is not None]
    else:
        referenced_tweet_types = []  # Return an empty list if None or other non-list value

    # Construct a flattened dictionary
    flat_data = {
        "edit_history_tweet_ids": tweet.get('edit_history_tweet_ids'),
        "created_at": tweet.get('created_at'),
        "id": tweet.get('id'),
        "author_id": tweet.get('author_id'),
        "text": tweet.get('text'),
        "retweet_count": public_metrics.get('retweet_count', 0),
        "reply_count": public_metrics.get('reply_count', 0),
        "like_count": public_metrics.get('like_count', 0),
        "quote_count": public_metrics.get('quote_count', 0),
        "bookmark_count": public_metrics.get('bookmark_count', 0),
        "impression_count": public_metrics.get('impression_count', 0),
        "referenced_tweet_types": referenced_tweet_types
    }

    # Add the flattened dictionary to the list
    flat_data_list.append(flat_data)

# Convert to DataFrame
df = pd.DataFrame(flat_data_list)

# Print the DataFrame
print("DataFrame:")
print(df)

# Save the DataFrame as a CSV file
df.to_csv('output_tweets.csv', index=False, encoding='utf-8-sig')

In [None]:
import json
import pandas as pd

# Initialize a list to store all user data
data_list = []

# Read the JSON file line by line
with open('tweets_data.json', 'r', encoding='utf-8') as file:
    for line in file:
        if line.strip():  # Ensure the line is not empty
            json_line = json.loads(line)
            if 'includes.users' in json_line:  # Check if 'includes.users' is in the JSON
                data_list.extend(json_line['includes.users'])  # Add the 'users' section to the list

# Convert the list to a DataFrame
user_data_list = pd.DataFrame(data_list)

# Create a list to store flattened data
flat_data_list = []

# Process each user data
for index, user in user_data_list.iterrows():  # Use iterrows() to iterate over rows
    # Construct a flattened dictionary
    flat_data = {
        "name": user.get('name'),
        "username": user.get('username'),
        "description": user.get('description'),
        "followers_count": user.get('public_metrics', {}).get('followers_count', 0),
        "following_count": user.get('public_metrics', {}).get('following_count', 0),
        "tweet_count": user.get('public_metrics', {}).get('tweet_count', 0),
        "listed_count": user.get('public_metrics', {}).get('listed_count', 0),
        "like_count": user.get('public_metrics', {}).get('like_count', 0),
        "verified": user.get('verified', False),
        "created_at": user.get('created_at'),
        "location": user.get('location'),
        "id": user.get('id'),
        "withheld": user.get('withheld')
    }

    # Add the flattened dictionary to the list
    flat_data_list.append(flat_data)

# Convert to DataFrame
df = pd.DataFrame(flat_data_list)

# Save the DataFrame as a CSV file
df.to_csv('output_users.csv', index=False, encoding='utf-8-sig')