In [None]:
# Inspiration:
# Syed Zainullah Qazi, "Mastodon Data Extraction â€“ Research and Learning Purpose"
# https://medium.com/@syedzainullahqazi/mastodon-data-extraction-research-and-learning-purpose-1eec53068b15
# The pagination and rate-limit handling logic was adapted and extended.


import time
import random
import requests
import pandas as pd
from bs4 import BeautifulSoup

# -------------------------
# Config
# -------------------------
BASE_URL = "https://mastodon.social"
HASHTAG = "ai"  # <-- change for a different tag
ENDPOINT = f"/api/v1/timelines/tag/{HASHTAG}"

DAYS_BACK = 14
LIMIT = 40                  # maximum items per request
MAX_PAGES = 20000           # safety stop to avoid infinite loops
DEBUG = True

# If you have a token, use it (recommended). Otherwise leave as None.
ACCESS_TOKEN = None  # "PASTE_HERE"

headers = {
    "User-Agent": "mastodon-research-scraper/1.0",
}
if ACCESS_TOKEN:
    headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"

def clean_text(html: str) -> str:
    """
    Extract plain text from Mastodon HTML content.
    """
    return BeautifulSoup(html or "", "html.parser").get_text(" ", strip=True)

def parse_reset_ts(reset_value: str):
    """
    X-RateLimit-Reset is usually a Unix timestamp (seconds).
    If parsing fails, return None.
    """
    if not reset_value:
        return None
    try:
        return float(reset_value)
    except ValueError:
        return None

def smart_throttle(resp: requests.Response, min_pause=(0.3, 0.8)):
    """
    - Always apply a small random pause (anti-bot behavior smoothing)
    - If Remaining is low, sleep until Reset
    """
    # Small jitter pause
    time.sleep(random.uniform(*min_pause))

    limit_h = resp.headers.get("X-RateLimit-Limit")
    remaining_h = resp.headers.get("X-RateLimit-Remaining")
    reset_h = resp.headers.get("X-RateLimit-Reset")

    try:
        remaining = int(remaining_h) if remaining_h is not None else None
    except ValueError:
        remaining = None

    reset_ts = parse_reset_ts(reset_h)

    # If close to rate limit, wait until reset (+1 second safety)
    if remaining is not None and remaining <= 5 and reset_ts is not None:
        wait = max(0, reset_ts - time.time()) + 1.0
        if DEBUG:
            print(f"[THROTTLE] remaining={remaining}. Sleeping {wait:.1f}s until reset.")
        time.sleep(wait)

def safe_get_json(session: requests.Session, url: str, params: dict):
    """
    Perform GET request safely.
    Handles 429 (rate limit) and non-200 responses.
    """
    resp = session.get(url, params=params, headers=headers, timeout=30)

    if resp.status_code == 429:
        # Respect Retry-After header if present
        ra = resp.headers.get("Retry-After")
        reset_ts = parse_reset_ts(resp.headers.get("X-RateLimit-Reset"))

        if ra is not None:
            try:
                wait = float(ra) + 1.0
            except ValueError:
                wait = 10.0
        elif reset_ts is not None:
            wait = max(0, reset_ts - time.time()) + 1.0
        else:
            wait = 30.0

        if DEBUG:
            print(f"[429] Rate limited. Sleeping {wait:.1f}s and retrying...")
        time.sleep(wait)
        return None, resp

    if resp.status_code != 200:
        if DEBUG:
            print(f"[HTTP {resp.status_code}] {resp.text[:200]}")
        return None, resp

    try:
        return resp.json(), resp
    except ValueError:
        if DEBUG:
            print("[ERROR] Response is not JSON:", resp.text[:200])
        return None, resp

# -------------------------
# Main
# -------------------------
url = f"{BASE_URL}{ENDPOINT}"
since = pd.Timestamp.utcnow() - pd.Timedelta(days=DAYS_BACK)

session = requests.Session()
results = []

params = {"limit": LIMIT}
page = 0

while True:
    page += 1
    if page > MAX_PAGES:
        print("[STOP] MAX_PAGES reached.")
        break

    data, resp = safe_get_json(session, url, params)
    if resp is not None:
        smart_throttle(resp)

    # If request failed (e.g. 429), retry in next loop
    if data is None:
        continue

    if not isinstance(data, list) or len(data) == 0:
        if DEBUG:
            print("[STOP] Empty list.")
        break

    stop = False
    last_id = None

    for toot in data:
        last_id = toot.get("id")

        created_at = toot.get("created_at")
        if not created_at:
            continue

        ts = pd.to_datetime(created_at, utc=True)
        if ts < since:
            stop = True
            break

        acc = toot.get("account") or {}
        content = clean_text(toot.get("content"))

        results.append({
            "timestamp": ts,
            "content": content,
            "language": toot.get("language"),
            "username": acc.get("username"),
            "acct": acc.get("acct"),
            "display_name": acc.get("display_name"),
            "followers_count": acc.get("followers_count"),
            "following_count": acc.get("following_count"),
            "statuses_count": acc.get("statuses_count"),
            "visibility": toot.get("visibility"),
            "toot_id": toot.get("id"),
            "toot_url": toot.get("url"),
            "replies_count": toot.get("replies_count"),
            "reblogs_count": toot.get("reblogs_count"),
            "favourites_count": toot.get("favourites_count"),
        })

    if stop:
        if DEBUG:
            print("[STOP] Reached older than DAYS_BACK window.")
        break

    if not last_id:
        if DEBUG:
            print("[STOP] No last_id for pagination.")
        break

    # Pagination backward in time
    params["max_id"] = last_id

df = pd.DataFrame(results)
df = df.sort_values("timestamp").reset_index(drop=True)
print("Rows collected:", len(df))

# Derived features for weekday/hour analysis
df["hour"] = df["timestamp"].dt.hour
df["weekday"] = df["timestamp"].dt.day_name()



[STOP] Reached older than DAYS_BACK window.
Rows collected: 2402


### Multi-Tag Data Collection

The extraction pipeline was executed separately for different hashtags to ensure comparable datasets.

To control for temporal bias, data was collected within the same time window (`DAYS_BACK`) for each tag.  
The resulting datasets were then exported to CSV files, effectively creating fixed snapshots for downstream comparative analysis.

In this project, we analyzed:
- `tech`
- `cooking`

These represent distinct topical communities, enabling meaningful statistical comparison.

In [None]:
df.to_csv("mastodon_dataset_yourtag.csv", index=False)