If the process crashes while we are waiting for Anthropic batch jobs to complete, this code can pull the results down when they are ready and put them in the cache

In [4]:
from repsheet_backend.genai import anthropic, genai_cache
import asyncio

In [None]:
SAVED_BATCH_IDS_FILE = "repsheet_backend/data/saved_batch_ids.txt"

with open(SAVED_BATCH_IDS_FILE, "r") as f:
    saved_batch_ids = f.read().splitlines()

async def save_batch_to_cache(batch_id: str):
    if batch_id in saved_batch_ids:
        return
    results = await asyncio.to_thread(anthropic.messages.batches.results, batch_id)
    saved_count = 0
    cache_set_jobs = []
    for result in results:
        cache_key = result.custom_id
        # there are some old jobs with numeric custom_ids
        if len(cache_key) > 10:
            if not await genai_cache.has(cache_key):
                cache_set_jobs.append(genai_cache.set(cache_key, result.result.message.content[0].text)) # type: ignore
            saved_count += 1
    await asyncio.gather(*cache_set_jobs)
    print(f"Saved {saved_count} results for {batch_id}")
    saved_batch_ids.append(batch_id)

In [None]:
await asyncio.gather(*[
    save_batch_to_cache(message.id)
    for message in anthropic.messages.batches.list()
    if message.processing_status == "ended"
])

for message in anthropic.messages.batches.list():
    if message.processing_status != "ended":
        print(f"Batch {message.id} is still {message.processing_status}")

In [None]:
with open(SAVED_BATCH_IDS_FILE, "w") as f:
    f.write("\n".join(saved_batch_ids))