In [None]:
from atproto import Client
from datetime import datetime, timedelta, timezone, date
from collections import Counter
from typing import Dict, List, Iterable, Optional
import os, csv, time, requests

# ---------- CONFIG / LOGIN ----------
client = Client()
client.login("YOUR USERNAME.bsky.social", "YOUR PASSWORD")  #!!! In order to run this code, use your own user name and password for production

# Where to save results (macOS)
SAVE_DIR = "/Users/,,,/Dekstop"  ##!!!! Use your own path to save the file in

# HTTP fallback base (public AppView)
APPVIEW_BASE = "https://public.api.bsky.app"

# ---------- COMPANIES & ALIASES ----------
fortune_20 = {
    1: "Walmart", 2: "Amazon", 3: "Apple", 4: "CVS", 5: "Tesla",
    6: "Google", 7: "Meta", 8: "JPMorgan", 9: "Costco", 10: "Kroger",
    11: "Berkshire", 12: "Walgreens", 13: "Target", 14: "UPS",
    15: "Centene", 16: "Cigna", 17: "Microsoft", 18: "Verizon",
    19: "IBM", 20: "UnitedHealth"
}
COMPANIES = list(fortune_20.values())

# Add this dict (aliases + ticker forms). Multi-word terms are auto-quoted by your _normalize_term.
COMPANY_ALIASES = {
    "Walmart": [
        "Walmart", "#Walmart"
    ],
    "Amazon": [
        "Amazon", "AMZN", "$AMZN", "#AMZN", "Amazon.com", "Jeff Bezos"
    ],
    "Apple": [
        "Apple", "AAPL", "$AAPL", "#AAPL", "Apple Inc", "Tim Cook"
    ],
    "CVS": [
        "CVS", "CVS Health", "CVS Pharmacy", "CVS Health Corp", "CVS Health Corporation",
        "CVS", "$CVS", "#CVS"
    ],
    "Tesla": [
        "Tesla", "TSLA", "$TSLA", "#TSLA", "Tesla Motors", "Elon Musk"
    ],
    "Google": [
        "Google", "Alphabet Inc", "GOOGL", "$GOOGL", "#GOOGL"
    ],
    "Meta": [
        "Meta", "Meta Platforms", "META", "$META", "#META",
        "Facebook", "Mark Zuckerberg"  # legacy but still used
    ],
    "JPMorgan": [
        "JPMorgan", "JP Morgan", "JPMorgan Chase", "JPMorgan Chase & Co.",
        "JPM", "$JPM", "#JPM", "Chase", "#Chase"
    ],
    "Costco": [
        "Costco", "Costco Wholesale"
    ],
    "Kroger": [
        "Kroger", "The Kroger Co."
    ],
    "Berkshire": [
        "Berkshire", "Berkshire Hathaway", "Warren Buffett"
    ],
    "Walgreens": [
        "Walgreens"
    ],
    "Target": [
        "Target"
    ],
    "UPS": [
        "UPS", "United Parcel Service", "$UPS", "#UPS"
    ],
    "Centene": [
        "Centene", "Centene Corp", "Centene Corporation"
    ],
    "Cigna": [
        "Cigna", "The Cigna Group"
    ],
    "Microsoft": [
        "Microsoft", "MSFT", "$MSFT", "#MSFT"
    ],
    "Verizon": [
        "Verizon", "Verizon Communications"
    ],
    "IBM": [
        "IBM", "International Business Machines", "$IBM", "#IBM"
    ],
    "UnitedHealth": [
        "UnitedHealth", "UnitedHealth Group", "United Healthcare", "UnitedHealthcare",
        "UNH", "$UNH", "#UNH"
    ],
}

# ---------- HELPERS ----------
def _parse_iso_created_at(maybe_iso: str) -> Optional[datetime]:
    if not maybe_iso:
        return None
    try:
        return datetime.fromisoformat(maybe_iso.replace("Z", "+00:00"))
    except Exception:
        return None

def _post_created_at(post) -> Optional[datetime]:
    try:
        rec = getattr(post, "record", None)
        if rec:
            for name in ("created_at", "createdAt"):
                val = getattr(rec, name, None)
                if val:
                    dt = _parse_iso_created_at(val)
                    if dt:
                        return dt
    except Exception:
        pass
    for name in ("created_at", "createdAt"):
        val = getattr(post, name, None)
        if val:
            dt = _parse_iso_created_at(val)
            if dt:
                return dt
    return None

def _get_post_uri(obj) -> Optional[str]:
    """Return a stable unique id for dedupe (SDK: .uri, HTTP: ['uri'])."""
    if isinstance(obj, dict):
        return obj.get("uri")
    return getattr(obj, "uri", None)

def _date_range_last_n_days(days: int, today: Optional[date] = None) -> List[date]:
    if today is None:
        today = datetime.now(timezone.utc).date()
    return [(today - timedelta(days=i)) for i in range(days, 0, -1)]

def _normalize_term(term: str) -> str:
    """Wrap multi-word terms in quotes if not already."""
    t = term.strip()
    if " " in t and not (t.startswith('"') and t.endswith('"')):
        t = f'"{t}"'
    return t

def _q_for_day(base_q: str, day_utc: date) -> str:
    """Inject UTC window into q so the API only returns that day."""
    day1 = day_utc.isoformat()
    day2 = (day_utc + timedelta(days=1)).isoformat()
    return f'{base_q} since:{day1} until:{day2}'

# ---------- RESILIENT SEARCH ----------
_RETRYABLE_HTTP = {429, 500, 502, 503, 504}

def _sdk_search(q: str, limit: int, cursor: Optional[str]) -> object:
    params = {"q": q, "limit": limit}
    if cursor:
        params["cursor"] = cursor
    return client.app.bsky.feed.search_posts(params)

def _http_search(q: str, limit: int, cursor: Optional[str]) -> dict:
    params = {"q": q, "limit": limit}
    if cursor:
        params["cursor"] = cursor
    r = requests.get(f"{APPVIEW_BASE}/xrpc/app.bsky.feed.searchPosts", params=params, timeout=30)
    if r.status_code in _RETRYABLE_HTTP:
        r.raise_for_status()
    r.raise_for_status()
    return r.json()

def _search_resilient(q: str, limit: int, cursor: Optional[str],
                      sdk_tries: int = 3, http_tries: int = 3,
                      backoff_base: float = 1.5):
    """
    Try SDK with exponential backoff; on failure, fallback to HTTP with backoff.
    Returns (posts, next_cursor, used_http: bool).
    Posts are either SDK objects or dicts.
    """
    # SDK
    attempt = 0
    while attempt < sdk_tries:
        attempt += 1
        try:
            resp = _sdk_search(q, limit, cursor)
            posts = getattr(resp, "posts", []) or []
            next_cur = getattr(resp, "cursor", None)
            return posts, next_cur, False
        except Exception as e:
            msg = str(e)
            if any(code in msg for code in ("429", "500", "502", "503", "504", "UpstreamFailure")):
                time.sleep(min(backoff_base ** attempt, 30))
                continue
            break

    # HTTP
    attempt = 0
    while attempt < http_tries:
        attempt += 1
        try:
            resp = _http_search(q, limit, cursor)
            posts = (resp or {}).get("posts") or []
            next_cur = (resp or {}).get("cursor")
            return posts, next_cur, True
        except requests.HTTPError as e:
            status = getattr(e.response, "status_code", None)
            if status in _RETRYABLE_HTTP:
                time.sleep(min(backoff_base ** attempt, 30))
                continue
            raise
        except requests.RequestException:
            time.sleep(min(backoff_base ** attempt, 30))
            continue

    raise RuntimeError("Search failed after SDK and HTTP retries")

# ---------- CORE ----------
def count_mentions_last_n_days_for_terms(
    terms: List[str],
    days: int = 365,
    max_pages: int = 50,
    per_request_limit: int = 100,
) -> Counter:
    """
    Count unique posts per day for a list of search terms (aliases).
    - Queries each day with since:/until: in q
    - Runs for each term, dedupes by post URI across terms and pages
    """
    counts = Counter()
    today = datetime.now(timezone.utc).date()
    day_list = _date_range_last_n_days(days, today)

    # Normalize terms (quote multi-word)
    terms = [_normalize_term(t) for t in terms]

    for d in day_list:
        start_dt = datetime.combine(d, datetime.min.time(), tzinfo=timezone.utc)
        end_dt = datetime.combine(d + timedelta(days=1), datetime.min.time(), tzinfo=timezone.utc)

        seen_uris: set = set()
        daily_total = 0

        for base in terms:
            cursor = None
            pages = 0
            q_day = _q_for_day(base, d)

            while True:
                if pages >= max_pages:
                    break
                pages += 1

                try:
                    posts, cursor, used_http = _search_resilient(q_day, per_request_limit, cursor)
                except Exception:
                    # Give up this term/day, move to next term
                    break

                if not posts:
                    break

                first = posts[0]
                is_dict = isinstance(first, dict)

                if is_dict:
                    # HTTP
                    for post in posts:
                        uri = post.get("uri")
                        if not uri or uri in seen_uris:
                            continue
                        rec = post.get("record") or {}
                        dt = _parse_iso_created_at(rec.get("createdAt") or rec.get("created_at"))
                        if dt and (start_dt <= dt < end_dt):
                            seen_uris.add(uri)
                            daily_total += 1
                else:
                    # SDK
                    for post in posts:
                        uri = _get_post_uri(post)
                        if not uri or uri in seen_uris:
                            continue
                        dt = _post_created_at(post)
                        if dt and (start_dt <= dt < end_dt):
                            seen_uris.add(uri)
                            daily_total += 1

                if not cursor:
                    break

        counts[d] = daily_total

    return counts

def counts_for_companies(
    companies: Iterable[str],
    days: int = 365
) -> Dict[str, Counter]:
    out: Dict[str, Counter] = {}
    for name in companies:
        terms = COMPANY_ALIASES.get(name, [name])
        out[name] = count_mentions_last_n_days_for_terms(terms, days=days)
    return out

def write_company_date_matrix_csv(
    company_to_counts: Dict[str, Counter],
    days: int,
    save_dir: str,
    filename_prefix: str = "bluesky_counts_matrix"
) -> str:
    os.makedirs(save_dir, exist_ok=True)
    today = datetime.now(timezone.utc).date()
    start_day = today - timedelta(days=days)

    dates = _date_range_last_n_days(days, today)
    header = ["company"] + [d.isoformat() for d in dates]

    filename = f"{filename_prefix}_{start_day.strftime('%Y%m%d')}-{today.strftime('%Y%m%d')}.csv"
    path = os.path.join(save_dir, filename)

    with open(path, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        for company, cnt in company_to_counts.items():
            row = [company] + [cnt.get(d, 0) for d in dates]
            writer.writerow(row)

    return path

# ---------- RUN ----------
if __name__ == "__main__":
    DAYS = 365

    company_counts = counts_for_companies(COMPANIES, days=DAYS)
    csv_path = write_company_date_matrix_csv(company_counts, DAYS, SAVE_DIR)

    print(f"Saved matrix to: {csv_path}\n")

    dates = _date_range_last_n_days(DAYS)
    print("company,", ", ".join(d.isoformat() for d in dates))
    for c in COMPANIES:
        cnt = company_counts.get(c, Counter())
        row = [str(cnt.get(d, 0)) for d in dates]
        print(f"{c}, " + ", ".join(row))
