# URL -> Bluesky Post Finder (Microcosm Constellation)

This notebook retrieves Bluesky posts that mention specific URLs by querying the Microcosm Constellation `links` endpoint. Configure one or more URLs below and run the pipeline to collect matches.

> Internet access is required to run the API requests. Nothing executes automatically when the notebook is created.


## 0) Setup

In [10]:

# Core imports for talking to Constellation and Bluesky, plus local environment loading
# (uncomment the pip line below if you run this in a clean environment)
# %pip install requests pandas python-dotenv

import json
import os
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import quote

import pandas as pd
import requests

# Load environment variables (e.g., CONSTELLATION_BASE, BSKY_HANDLE) from a local .env file if present
try:
    from dotenv import load_dotenv
    load_dotenv()
except Exception:
    pass


## 1) Configuration

In [None]:

# Notebook configuration: Constellation endpoint, URLs to inspect, and request tuning knobs
CONSTELLATION_BASE = os.getenv("CONSTELLATION_BASE", "https://constellation.microcosm.blue")

TARGET_URLS = [
    "https://arxiv.org/abs/2510.04871"
]

MAX_RESULTS_PER_URL = 1000  # Adjust based on expected volume per URL
REQUEST_TIMEOUT = 20  # Seconds for Constellation and Bluesky API requests

# Optional Bluesky credentials (needed for private posts or higher rate limits)
BSKY_HANDLE = os.getenv("BSKY_HANDLE")
BSKY_APP_PASSWORD = os.getenv("BSKY_APP_PASSWORD")

# Bluesky XRPC endpoints + batch size for getPosts
BSKY_SESSION_ENDPOINT = os.getenv(
    "BSKY_SESSION_ENDPOINT",
    "https://bsky.social/xrpc/com.atproto.server.createSession",
)
BSKY_GET_POSTS_ENDPOINT = os.getenv(
    "BSKY_GET_POSTS_ENDPOINT",
    "https://bsky.social/xrpc/app.bsky.feed.getPosts",
)
BSKY_GET_POSTS_BATCH_SIZE = 25  # API limit: up to 25 post URIs per call


## 2) Constellation helpers

In [12]:

# Shared HTTP session keeps connections warm for multiple API calls
session = requests.Session()
session.headers.update({"User-Agent": "cosmik-url-notebook/0.1"})

# The two most common JSON paths where posts include external URLs
DEFAULT_POST_LINK_PATHS: List[Tuple[str, str]] = [
    ("app.bsky.feed.post", ".embed.external.uri"),
    ("app.bsky.feed.post", ".facets[].features[app.bsky.richtext.facet#link].uri"),
]

def constellation_endpoint(path: str) -> str:
    """Build a fully-qualified Constellation URL for the given path."""
    return CONSTELLATION_BASE.rstrip('/') + path

def fetch_constellation_link_paths(url: str) -> List[Dict[str, Any]]:
    """Discover which collections + json-paths link to the target URL via /links/all."""
    resp = session.get(
        constellation_endpoint('/links/all'),
        params={'target': url},
        timeout=REQUEST_TIMEOUT,
    )
    if resp.status_code == 404:
        return []
    if resp.status_code >= 400:
        preview = (resp.text or '')[:300]
        raise RuntimeError(
            f'Constellation /links/all returned status {resp.status_code} for {url}. '
            f'Response preview: {preview}'
        )

    payload = resp.json() or {}
    combos: List[Dict[str, Any]] = []
    links_payload = payload.get('links') or {}
    for collection, path_map in links_payload.items():
        path_map = path_map or {}
        for path, stats in path_map.items():
            combos.append({'collection': collection, 'path': path, 'stats': stats})
    return combos

def fetch_constellation_links(url: str, collection: str, path: str, limit: Optional[int] = None) -> List[Dict[str, Any]]:
    """Paginate through /links for a single (collection, path) combination."""
    collected: List[Dict[str, Any]] = []
    cursor: Optional[str] = None

    while True:
        if limit is not None and len(collected) >= limit:
            collected = collected[:limit]
            break

        remaining = None
        if limit is not None:
            remaining = max(limit - len(collected), 0)
            if remaining == 0:
                break

        params: Dict[str, Any] = {
            'collection': collection,
            'target': url,
            'path': path,
        }
        if cursor:
            params['cursor'] = cursor
        if remaining is not None and remaining > 0:
            params['limit'] = str(min(remaining, 100))  # API caps page size at 100

        try:
            resp = session.get(
                constellation_endpoint('/links'),
                params=params,
                timeout=REQUEST_TIMEOUT,
            )
        except requests.RequestException as exc:
            raise RuntimeError(f'Constellation /links request failed for {url}: {exc}') from exc

        if resp.status_code >= 400:
            preview = (resp.text or '')[:300]
            raise RuntimeError(
                f'Constellation /links returned status {resp.status_code} for {url}. '
                f'Response preview: {preview}'
            )

        payload = resp.json() or {}
        link_records = payload.get('links')
        if link_records is None:
            link_records = payload.get('linking_records') or []

        if not link_records:
            break

        collected.extend(link_records)

        cursor = payload.get('cursor')
        if not cursor:
            break

    return collected

def build_post_uri(did: Optional[str], collection: Optional[str], rkey: Optional[str]) -> Optional[str]:
    """Translate Constellation row metadata into an at:// post URI."""
    if not (did and collection and rkey):
        return None
    return f'at://{did}/{collection}/{rkey}'

def build_bsky_app_url(did: Optional[str], rkey: Optional[str]) -> Optional[str]:
    """Create a clickable bsky.app URL for the post (preserve colon separators)."""
    if not (did and rkey):
        return None
    return f'https://bsky.app/profile/{quote(did, safe=":")}/post/{rkey}'

def chunked(seq: List[str], size: int) -> List[List[str]]:
    """Yield successive chunks of the given size from seq."""
    return [seq[i:i + size] for i in range(0, len(seq), size)]

_BSKY_SESSION_CACHE: Dict[str, str] = {}

def bsky_bearer_token(handle: Optional[str], app_password: Optional[str]) -> Optional[str]:
    """Obtain (and cache) a short-lived Bluesky bearer token via createSession."""
    if not handle or not app_password:
        return None
    cached = _BSKY_SESSION_CACHE.get(handle)
    if cached:
        return cached
    payload = {"identifier": handle, "password": app_password}
    resp = session.post(BSKY_SESSION_ENDPOINT, json=payload, timeout=REQUEST_TIMEOUT)
    if resp.status_code != 200:
        preview = (resp.text or '')[:200]
        raise RuntimeError(
            f'Bluesky createSession failed with status {resp.status_code}. '
            f'Response preview: {preview}'
        )
    token = resp.json().get('accessJwt')
    if not token:
        raise RuntimeError('Bluesky createSession response missing accessJwt')
    _BSKY_SESSION_CACHE[handle] = token
    return token

def bsky_auth_headers(handle: Optional[str], app_password: Optional[str]) -> Dict[str, str]:
    token = bsky_bearer_token(handle, app_password)
    if token:
        return {"Authorization": f"Bearer {token}"}
    return {}

def fetch_bsky_posts(post_uris: List[str], handle: Optional[str], app_password: Optional[str]) -> Dict[str, Any]:
    """Fetch post bodies via app.bsky.feed.getPosts and return a mapping uri -> post."""
    if not post_uris:
        return {}

    headers = bsky_auth_headers(handle, app_password)
    posts_by_uri: Dict[str, Any] = {}
    for chunk in chunked(post_uris, BSKY_GET_POSTS_BATCH_SIZE):
        params = [("uris", uri) for uri in chunk]
        resp = session.get(
            BSKY_GET_POSTS_ENDPOINT,
            params=params,
            headers=headers,
            timeout=REQUEST_TIMEOUT,
        )
        if resp.status_code >= 400:
            preview = (resp.text or '')[:200]
            for uri in chunk:
                posts_by_uri[uri] = {
                    'error': f'status {resp.status_code}: {preview}'
                }
            continue

        payload = resp.json() or {}
        for post in payload.get('posts') or []:
            uri = post.get('uri')
            if uri:
                posts_by_uri[uri] = post

        # Flag any URIs that did not come back so the caller can see missing posts
        returned = {post.get('uri') for post in payload.get('posts') or []}
        for uri in chunk:
            if uri not in returned and uri not in posts_by_uri:
                posts_by_uri[uri] = {'error': 'not returned from getPosts'}

    return posts_by_uri


## 3) Fetch posts for configured URLs

In [None]:
# Collect every unique post that links to the configured URLs
records: List[Dict[str, Any]] = []
seen_records = set()

for url in TARGET_URLS:
    normalized = url.strip()
    if not normalized:
        continue

    print(f"Inspecting {normalized} ...")

    try:
        discovered_paths = fetch_constellation_link_paths(normalized)
    except Exception as exc:
        print(f"  Warning: failed to inspect link metadata: {exc}")
        discovered_paths = []

    post_paths = [p for p in discovered_paths if p.get('collection') == 'app.bsky.feed.post']
    if post_paths:
        print(f"  Found {len(post_paths)} post link path(s) via /links/all")
    else:
        print("  No post link paths found via /links/all; falling back to defaults")
        post_paths = [
            {'collection': collection, 'path': path, 'stats': None}
            for (collection, path) in DEFAULT_POST_LINK_PATHS
        ]

    for path_info in post_paths:
        collection = path_info['collection']
        path = path_info['path']
        print(f"    Fetching path {path} ...")

        try:
            link_records = fetch_constellation_links(
                normalized,
                collection=collection,
                path=path,
                limit=MAX_RESULTS_PER_URL,
            )
        except Exception as exc:
            print(f"    Warning: failed to fetch path {path}: {exc}")
            continue

        if not link_records:
            print(f"    No matches for path {path}")
            continue

        for link in link_records:
            post_uri = build_post_uri(link.get('did'), link.get('collection'), link.get('rkey'))
            dedupe_key = (post_uri, collection, path)
            if dedupe_key in seen_records:
                continue
            seen_records.add(dedupe_key)

            records.append({
                'input_url': normalized,
                'collection': collection,
                'path': path,
                'post_uri': post_uri,
                'author_did': link.get('did'),
                'record_rkey': link.get('rkey'),
                'bsky_app_url': build_bsky_app_url(link.get('did'), link.get('rkey')),
                'raw_link_json': json.dumps(link)[:4000],
            })

results_df = pd.DataFrame.from_records(records, columns=[
    'input_url',
    'collection',
    'path',
    'post_uri',
    'author_did',
    'record_rkey',
    'bsky_app_url',
    'raw_link_json',
])

if not results_df.empty:
    results_df = results_df.sort_values(by=['input_url', 'collection', 'path']).reset_index(drop=True)

    unique_post_uris = [uri for uri in results_df['post_uri'].dropna().unique()]
    print(f"Fetching content for {len(unique_post_uris)} unique post(s) via Bluesky ...")
    try:
        posts_by_uri = fetch_bsky_posts(unique_post_uris, BSKY_HANDLE, BSKY_APP_PASSWORD)
    except Exception as exc:
        print(f"  Warning: failed to fetch post bodies: {exc}")
        posts_by_uri = {uri: {'error': str(exc)} for uri in unique_post_uris}

    def extract_post_text(uri: Optional[str]) -> Optional[str]:
        if not uri:
            return None
        post = posts_by_uri.get(uri)
        if not post or post.get('error'):
            return None
        record = post.get('record') or {}
        return record.get('text')

    def extract_post_json(uri: Optional[str]) -> Optional[str]:
        if not uri:
            return None
        post = posts_by_uri.get(uri)
        if not post or post.get('error'):
            return post.get('error') if post else None
        return json.dumps(post)[:4000]

    def extract_post_error(uri: Optional[str]) -> Optional[str]:
        if not uri:
            return None
        post = posts_by_uri.get(uri)
        if post and post.get('error'):
            return post['error']
        return None

    results_df['post_text'] = results_df['post_uri'].map(extract_post_text)
    results_df['post_record_json'] = results_df['post_uri'].map(extract_post_json)
    results_df['post_fetch_error'] = results_df['post_uri'].map(extract_post_error)
else:
    print("No Constellation matches found for the provided URLs.")

print(f"Total matches: {len(results_df)}")
results_df.head()

## 4) Optional: Save results

In [None]:
if not results_df.empty:
    output_path = "constellation_url_matches.csv"
    results_df.to_csv(output_path, index=False)
    print(f"Saved {len(results_df)} rows to {output_path}")
else:
    print("No results to save. Run the previous cell after adding URL values.")
