In [None]:
# --- repo bootstrap ---------------------------------------------------------
from pathlib import Path
from dotenv import load_dotenv
import os, sys
import subprocess, sys, importlib, os, re
from datetime import datetime
import truthbrush as tb

def repo_root(start: Path) -> Path:
    cur = start.resolve()
    while cur != cur.parent:
        if (cur / ".env").exists() or (cur / ".git").exists():
            return cur
        cur = cur.parent
    raise RuntimeError("repo root not found")

ROOT = repo_root(Path.cwd())
load_dotenv(ROOT / ".env")             # loads secrets
sys.path.append(str(ROOT / "src"))     # optional helpers

DATA_DIR = ROOT / "data"
OUT_DIR  = ROOT / "outputs"
FIG_DIR  = OUT_DIR / "figs"; FIG_DIR.mkdir(exist_ok=True)

print("Repo root:", ROOT)

In [None]:
# ────────── UNIVERSAL PATCH CELL  (run once, very top of notebook) ──────────
import subprocess, sys, importlib, os, types
from pathlib import Path

# 1️⃣  make sure both python-dotenv and curl_cffi exist
def ensure(pkg, src=None):
    try:
        importlib.import_module(pkg)
    except ModuleNotFoundError:
        subprocess.check_call(
            [sys.executable, "-m", "pip", "install", "--quiet", src or pkg]
        )

ensure("python-dotenv")
ensure("curl_cffi")

# 2️⃣  reload .env (override=True guarantees fresh values)
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(usecwd=True), override=True)

# 3️⃣  import truthbrush and inject curl_cffi so NameError can’t happen
import curl_cffi                     # noqa:  F401  (needed for side-effect)
import truthbrush.api as tb_api
tb_api.curl_cffi = curl_cffi         # hand it to truthbrush’s module scope

import truthbrush as tb
print("✔ Patch cell finished – environment refreshed, curl_cffi wired\n")


In [None]:
import truthbrush as tb
from datetime import datetime, timezone
import random, time

# ---------------------------------------------------------------------------
# install SINGLE wrapper around _get   (skip if it already exists)
# ---------------------------------------------------------------------------
if not hasattr(tb.api.Api, "_get_base"):
    tb.api.Api._get_base = tb.api.Api._get          # save original

    def _polite_get(self, url, params=None):
        """Call Truth Social, then pause a polite, *adaptive* amount of time."""
        resp = self._get_base(url, params)

        # ---------- adaptive back-off if we're about to hit the hard limit -----
        if (self.ratelimit_remaining is not None
                and self.ratelimit_remaining <= 10
                and self.ratelimit_reset):
            wait = max(
                0,
                (self.ratelimit_reset -
                 datetime.utcnow().replace(tzinfo=timezone.utc)
                ).total_seconds()
            ) + random.uniform(1, 2)               # 1–2 s jitter
            print(f"📉 near limit – sleeping {wait:.1f}s")
            time.sleep(wait)

        # ---------- normal gentle delay (you may tune this) --------------------
        else:
            time.sleep(random.uniform(0.6, 1.2))    # was 1.5–3.0

        return resp

    tb.api.Api._get = _polite_get
    print("✓ polite-delay wrapper installed (0.6–1.2 s baseline)")
else:
    print("✓ wrapper already present – no re-patch")


In [None]:
TOKEN = "3XLbnc2wV48rUCGT-nKGnZPT2xFf3QRg0TZR1oMkNzU"   # live bearer token (generated 2:26 AM)
api   = tb.Api(token=TOKEN)
api.auth_id = api.auth_id or ""
print("client ready")

print("lookup test:")
try:
    print(api.lookup("realDonaldTrump")["id"][:8], "… lookup OK")
except Exception as e:
    print("lookup failed:", e)


In [None]:
# ───────────── Truth Social scrape  →  CSV (auto-flush every 100 hits) ──────────
import os, re, csv, json, random, time, pathlib, sys
from datetime import datetime, timezone
from dateutil import parser as dt_parse
from tqdm.auto import tqdm

SAVE_EVERY   = 100                               # flush cadence
CSV_BASENAME = "new_truth_scrape_matches.csv"    # output file name

KEYWORDS = [
    "russia","russian","ukraine","ukrainian","ru-uk","putin",
    "zelensky","zelenskyy","kremlin","kyiv","crimea","donbas",
    "mariupol","kherson","luhansk","dnipro","odessa","invasion","war",
]
SEED_HANDLES = ["realDonaldTrump","TeamTrump","TrumpWarRoom","WhiteHouse","PressSec"]

# ----------------------------------------------------------------------------- helpers
def keyword_hit(html:str)->bool: return any(k in html.lower() for k in KEYWORDS)

def canonical_handle(hint:str)->str|None:
    """resolve acct → canonical handle or None"""
    try:                # fast path
        return api.lookup(user_handle=hint.lstrip("@")).get("acct")
    except Exception:   # fall back to search
        try:
            page = next(api.search("accounts", hint, limit=1))
            return page["accounts"][0]["acct"] if page["accounts"] else None
        except Exception:
            return None

def prepend_first(item, iterator):
    yield item
    yield from iterator

# ----------------------------------------------------------------------------- resolve
handles = [h for h in (canonical_handle(h) for h in SEED_HANDLES) if h]
if not handles:
    raise RuntimeError("Could not resolve any seed handles – check token/login.")
print("Scanning:", handles)

# ----------------------------------------------------------------------------- choose output path
ROOT = pathlib.Path.cwd()
while ROOT.parent != ROOT and not (ROOT/".git").exists() and not (ROOT/".env").exists():
    ROOT = ROOT.parent
out_path = ROOT / "outputs" / CSV_BASENAME
out_path.parent.mkdir(parents=True, exist_ok=True)

# write header once
with out_path.open("w", newline="", encoding="utf-8") as f:
    csv.DictWriter(f, fieldnames=["created_at","account","id","text"]).writeheader()

def flush(buf:list[dict]):
    """append buffered rows to CSV and clear list"""
    if not buf: return
    with out_path.open("a", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["created_at","account","id","text"])
        w.writerows(buf)
    buf.clear()

# ----------------------------------------------------------------------------- scrape
total_hits = 0
for h in handles:
    print(f"\n↳ pulling @{h}")
    try:
        gen         = api.pull_statuses(username=h, replies=False, verbose=False)
        first       = next(gen)                      # heartbeat
        print("  ✓ first post received")
        gen         = prepend_first(first, gen)

        pbar        = tqdm(gen, unit="post", desc=f"{h}", leave=True)
        matched     = 0
        buf         = []

        for post in pbar:
            if post and post.get("content") and keyword_hit(post["content"]):
                matched += 1; total_hits += 1
                buf.append({
                    "created_at": post["created_at"],
                    "account"   : h,
                    "id"        : post["id"],
                    "text"      : re.sub(r"<[^>]+>","",post["content"]).strip(),
                })
                if len(buf) >= SAVE_EVERY:
                    flush(buf)
            if matched and matched % 25 == 0:
                pbar.set_description(f"{h}  hits:{matched}")

        pbar.close(); flush(buf)
        print(f"✓ @{h}: {matched} matches saved")

    except Exception as e:
        flush(buf)
        if "401" in str(e) or "unauthorized" in str(e).lower():
            print(f"⚠️  token expired while scraping @{h}.  "
                  f"Matches so far are safely saved to CSV.\n"
                  "➡️  Refresh bearer token and rerun to resume.")
            break
        else:
            print(f"⚠️  error on @{h}: {e} – continuing …")

print(f"\n✅ scrape complete · total rows on disk: {total_hits}")
print("📄 CSV location:", out_path.relative_to(ROOT))


In [None]:
# ----------------- 1.  resume Trump with tighter filter --------------------
import re, csv, pathlib, time, random, json
from datetime import datetime, timezone
from dateutil import parser as dt_parse
from tqdm.auto import tqdm
import pandas as pd
import matplotlib.pyplot as plt

# ── parameters
SAVE_EVERY   = 100
OUTFILE      = "new_truth_scrape_matches_v2.csv"
TRUMP_HANDLE = "realDonaldTrump"

KEYWORDS = [
    "russia","russian","ukraine","ukrainian","ru-uk","putin",
    "zelensky","zelenskyy","kremlin","kyiv","crimea","donbas",
    "mariupol","kherson","luhansk","dnipro","odessa","invasion","war",
]
WORD_RE = re.compile(r'\b(' + "|".join(KEYWORDS) + r')\b', re.I)

def keyword_hit(text:str)->bool:
    return bool(WORD_RE.search(text or ""))

# ── find last Trump ID already scraped (if any)
ROOT = pathlib.Path.cwd()
while ROOT.parent!=ROOT and not (ROOT/".git").exists() and not (ROOT/".env").exists():
    ROOT = ROOT.parent
already = ROOT / "outputs" / "new_truth_scrape_matches.csv"
since_id = None
if already.exists():
    df_existing = pd.read_csv(already)
    trump_rows  = df_existing[df_existing["account"]==TRUMP_HANDLE]
    if not trump_rows.empty:
        since_id = str(trump_rows["id"].max())     # resume *after* this id
        print("⏩ resuming after id", since_id)

# ── output file
out_path = ROOT / "outputs" / OUTFILE
out_path.parent.mkdir(exist_ok=True, parents=True)
if not out_path.exists():
    with out_path.open("w", newline="", encoding="utf-8") as f:
        csv.DictWriter(f, fieldnames=["created_at","account","id","text"]).writeheader()

def flush(buf):
    if not buf: return
    with out_path.open("a", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=["created_at","account","id","text"])
        w.writerows(buf)
    ts = datetime.now().strftime('%H:%M:%S')
    print(f"💾  [{ts}] saved {len(buf)} rows")

# ── scrape loop (Trump only) ------------------------------------------------
buf=[]; matched=0
print(f"↳ pulling @{TRUMP_HANDLE} …")
try:
    gen = api.pull_statuses(username=TRUMP_HANDLE,
                            replies=False,
                            verbose=False,
                            since_id=since_id)
    for post in tqdm(gen, unit="post"):
        if keyword_hit(post.get("content","")):
            matched += 1
            buf.append({
                "created_at": post["created_at"],
                "account"   : TRUMP_HANDLE,
                "id"        : post["id"],
                "text"      : re.sub(r"<[^>]+>","",post["content"]).strip()
            })
            if len(buf) >= SAVE_EVERY:
                flush(buf); buf.clear()
    flush(buf)
except KeyboardInterrupt:
    flush(buf)
    print("⏹ interrupted – everything so far is saved")

print(f"✓ finished Trump resume – {matched} new matches")

# ----------------- 2.  quick sanity-check / viz ----------------------------
df = pd.read_csv(out_path)          # v2 file (just Trump resume)
print("rows in v2:", len(df))

# -- posts per month
df["month"] = pd.to_datetime(df["created_at"]).dt.to_period("M")
counts = df.groupby("month").size()

plt.figure(figsize=(8,3))
counts.plot(kind="bar")
plt.title("RU/UA-related Trump Truths per month")
plt.tight_layout()
plt.show()

# -- top keyword hits
def which_keyword(text):
    m = WORD_RE.search(text)
    return m.group(0).lower() if m else None
df["kw"] = df["text"].apply(which_keyword)
topk = df["kw"].value_counts().head(10)

plt.figure(figsize=(5,3))
topk.plot(kind="bar")
plt.title("Top keyword hits")
plt.tight_layout()
plt.show()
