In [51]:
from abc import ABC, abstractmethod
import json, os, time
import requests
import random
from datetime import datetime, timezone, timedelta
from pathlib import Path

DATA_START = datetime(2026, 1, 30)
DATA_END = datetime(2026, 2, 5)
DUMMY_PATH = 'dummy.json'
REAL_URL = "https://dummyjson.com/products"
KEY_FILE = 'key.json'

In [42]:
def random_time_window(start, end, window_days: int = 2):
    """
    Generate a random time window between of fixed length (window_days) between 'start' and 'end'.
    
    Parameters
    ----------
    start : The start of the overall time range.
    end : The end of the overall time range.
    window_days : The window of time the start and end range is.

    """
    total_seconds = int((end - start).total_seconds())
    window_seconds = window_days * 86400 # convert days to seconds
    
    if total_seconds < window_seconds:
        raise ValueError(f"Time range must be at least {windows_days} days long.")

    # pick random start point
    rand_start_seconds = random.randint(0, total_seconds - window_seconds)
    rand_start = start + timedelta(seconds=rand_start_seconds)
    rand_end = rand_start + timedelta(days=window_days)

    return (
        rand_start.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
        rand_end.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
    )

def random_comment_window(tweet_created_at, window_days: int = 2):
    """
    Create a random comment window starting after the tweet timestamp.
    """
    tweet_time = datetime.fromisoformat(tweet_created_at.replace("Z", "+00:00"))
    now = datetime.now(timezone.utc) - timedelta(seconds=15) # offsetting the time for acceptable api timing

    # max time you’re allowed to search comments
    max_end = min(tweet_time + timedelta(days=window_days), now)

    if max_end <= tweet_time:
        raise ValueError("Tweet is in the future or no valid comment window.")

    # pick a random start
    delta_seconds = int((max_end - tweet_time).total_seconds() - window_days * 86400)
    delta_seconds = max(delta_seconds, 0)
    
    rand_start_seconds = random.randint(0, delta_seconds) if delta_seconds > 0 else 0
    rand_start = tweet_time + timedelta(seconds=rand_start_seconds)

    rand_end = min(rand_start + timedelta(days = window_days), max_end)

    return (
        rand_start.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
        rand_end.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
    )
    
def save_json(data: dict, folder: str, filename: str):
    """
    Function to save dictionaries as jsons
    """
    os.makedirs(folder, exist_ok=True)
    path = os.path.join(folder, filename)
    with open(path, "w") as f:
        json.dump(data, f, indent=2)
    print(f"Saved → {path}")

In [43]:
# Unfinished Dummy API code

class BaseAPIClient(ABC):
    '''
    We only use this class to define a structure
    and then inherit it. Never use this class directly.
    '''

    @abstractmethod # allows you to define a function with no purpose
    def fetch_data(self) -> dict:
        pass # doesnt do anything
        
class DummyAPIClient(BaseAPIClient):
    '''
    Since this inherist the BaseAPIClient it must
    implement all of its abstract methods
    '''
    def __init__(self):
        print('running in dummy mode')
        return

    def fetch_data(self) -> dict:
        '''
        Load and return the local dummy data
        '''
        with open(DUMMY_PATH, 'r') as f:
            data = json.load(f)
        return data

# START of script

class RealAPIClient(BaseAPIClient):
    '''
    Since this inherist the BaseAPIClient it must
    implement all of its abstract methods
    '''
    def __init__(self, key, bearer):
        self.key = key
        self.bearer = bearer
        return
        
    def fetch_data(self) -> dict:
        """Placeholder to satisfy abstract method requirement."""
        return {}

    def fetch_posts(self,
                    politician_handle: str,
                    randomize_time=True,
                    target_count=10
                   ):
        politician_folder = f"data/posts/{politician_handle}"
        os.makedirs(politician_folder, exist_ok=True)

        posts_path = f"{politician_folder}/{politician_handle}_posts.json"

        existing_posts = []
        collected_post_ids = set()

        # Load posts into exisitng_posts if they exist
        if os.path.exists(posts_path):
            with open(posts_path, "r") as f:
                existing_posts = json.load(f).get("data", [])
                collected_post_ids = {tweet["id"] for tweet in existing_posts}

        # if we have enough posts, STOP
        if len(existing_posts) >= target_count:
            print(f"Already have {len(existing_posts)} posts for {politician_handle}")
            return {"data": existing_posts[:target_count]}, [p["id"] for p in existing_posts[:target_count]]

        collected_posts = existing_posts[:]

        url = "https://api.twitter.com/2/tweets/search/recent"
        headers = {"Authorization": f"Bearer {self.bearer}"}
        
        attempts = 0
        while len(collected_posts) < target_count and attempts <5: # ensures that we only collect 10 posts in each folder
            attempts += 1
            added_this_round = 0 # number of posts added to the folder
            # Step 1: Set or randomize timeframe
            if randomize_time:
                start, end = random_time_window(DATA_START, DATA_END)
            else:
                # manually set date window here
                start = "2025-12-16T19:00:00Z"
                end   = "2025-12-16T21:00:00Z"

            print(f"Fetching tweets for {politician_handle} between {start} and {end}")

            params = {
                "query": f"from:{politician_handle} -is:retweet", # no retweets!
                "start_time": start,
                "end_time": end,
                "max_results": 10,
                "tweet.fields": "id,text,created_at"
            }
    
            # Get response
            response = requests.get(url, headers=headers, params=params)

            if response.status_code == 200:
                print("SUCCESS")
                data = response.json().get("data", [])
                for tweet in data:
                    pid = tweet["id"]
                    if pid not in collected_post_ids:
                        #print(f"Skipping post {pid} — already saved")
                        collected_posts.append(tweet)
                        collected_post_ids.add(pid)
                        added_this_round += 1
                    # only collect 10 posts in the foslder
                    if len(collected_posts) >= target_count:
                        break
                if added_this_round == 0:
                    print("No new posts found - stopping early")
                    continue
                        
            elif response.status_code == 429:
                reset = response.headers.get("x-rate-limit-reset")
                wait = int(reset) - int(time.time()) if reset else 60
                print(f"→ RATE LIMIT HIT. Sleeping {wait} seconds…")
                time.sleep(wait)
            else:
                print("Error:", response.text)
                response.raise_for_status()

        # SAVE FINAL POSTS
        if collected_posts:
            save_json(
                {"data": collected_posts[:target_count]},
                folder=politician_folder,
                filename=f"{politician_handle}_posts.json"
            )
        return {"data": collected_posts[:target_count]}, [p["id"] for p in collected_posts[:target_count]]

    def fetch_comments(self,
                       politician_handle: str,
                       post_id: str,
                       tweet_created_at: str,
                       target_count=10,
                       randomize_time: bool = True
                      ):
        post_folder = f"data/posts/{politician_handle}/{post_id}"
        os.makedirs(post_folder, exist_ok=True)

        comments_path = os.path.join(post_folder, "comments.json")
        
        existing_comments = []
        existing_comment_ids = set()
    
        # LOAD EXISTING COMMENTS
        if os.path.exists(comments_path):
            with open(comments_path, "r") as f:
                existing_comments = json.load(f).get("data", [])
                existing_comment_ids = {c["id"] for c in existing_comments}
        else:
            existing_comments = []
    
        if len(existing_comments) >= target_count:
            print(f"Already have {len(existing_comments)} comments for {post_id}")
            return existing_comments[:target_count]
    
        collected_comments = existing_comments[:]

        url = "https://api.twitter.com/2/tweets/search/recent"
        headers = {"Authorization": f"Bearer {self.bearer}"}

        attempts = 0
        while len(collected_comments) < target_count and attempts < 10:
            attempts += 1
            added_this_round = 0
            
            # Define Timeframe
            start, end = random_comment_window(tweet_created_at, window_days=2)
            print(f"Fetching comments for {post_id} between {start} and {end}")
            
            params = {"query": f"conversation_id:{post_id}",
                      "start_time": start,
                      "end_time": end,
                      "max_results": 10,
                     "tweet.fields": "id,text,created_at"}
    
            response = requests.get(url, headers=headers, params=params)
            
            if response.status_code == 200:
                print("SUCCESS")
                data = response.json().get("data", [])

                for tweet in data:
                    cid = tweet['id']
                    if cid not in existing_comment_ids:
                        collected_comments.append(tweet)
                        existing_comment_ids.add(cid)
                        added_this_round += 1

                    if len(collected_comments) >= target_count:
                        break
                if added_this_round == 0:
                    print("No new comments found - stopping early")
                    break
                
            elif response.status_code == 429:
                reset = response.headers.get("x-rate-limit-reset")
                wait = int(reset) - int(time.time()) if reset else 60
                print(f"→ RATE LIMIT HIT. Sleeping {wait} seconds…")
                time.sleep(wait)
            else: #other Errors
                print("→ ERROR:", response.text)
                response.raise_for_status()

        save_json({"data": collected_comments[:target_count]},
                  folder=post_folder,
                  filename="comments.json")

        return collected_comments[:target_count]

class ToxicityApp:
    def __init__(self):
        if self.load_key():
            self.api = RealAPIClient(self.key, self.bearer)
        else:
            self.api = DummyAPIClient()

    def load_key(self):
        try:
            with open("key.json") as f:
                data = json.load(f)
                self.key = data["X_API_KEY"]
                self.bearer = data["bearer_token"]
            return True
        except:
            print("Could not load key.")
            return False

    def collect_politician_comments(self, 
                                    handles: list[str], 
                                    randomize_time: bool = True
                                   ):
        """
        Handes a list of twitter handles, returns json of posts and comments
        """
        results = {}

        for handle in handles:
            print(f"Collecting posts for {handle}")
            
            posts_json, post_ids = self.api.fetch_posts(handle)
            tweets = posts_json.get("data", [])

            if not post_ids:
                print(f"No posts found for {handle}.")
                results[handle] = {"posts": posts_json, "comments": {}}
                continue
                
            comments_for_this_handle = {}

            # Iterate posts
            for tweet in tweets:
                post_id = tweet["id"]
                created_at = tweet["created_at"]

                comment_data = self.api.fetch_comments(
                    politician_handle=handle,
                    post_id=post_id,
                    tweet_created_at = created_at,
                    target_count=10,
                    randomize_time=randomize_time)
                comments_for_this_handle[post_id] = comment_data
        
            # Store inside the results dictionary
            results[handle] = {
                "posts" : posts_json,
                "comments": comments_for_this_handle
            }

        return results

In [50]:
app = ToxicityApp()

results = app.collect_politician_comments(
    ["SenTuberville",
     "SenKatieBritt",
     "lisamurkowski",
     "SenDanSullivan",
     "SenRubenGallego",
     "SenMarkKelly",
     "JohnBoozman",
     "SenTomCotton",
     "AlexPadilla4CA",
     "AdamSchiff",
     "MichaelBennet"
    ],
    randomize_time=True
)

Could not load key.
running in dummy mode
Collecting posts for SenTuberville


AttributeError: 'DummyAPIClient' object has no attribute 'fetch_posts'