Skip to content

Download as agent + podcast-index fallback + DBOS state-of-truth#109

Merged
rafacm merged 17 commits into
mainfrom
feature/download-agent-and-recovery-removal
Apr 29, 2026
Merged

Download as agent + podcast-index fallback + DBOS state-of-truth#109
rafacm merged 17 commits into
mainfrom
feature/download-agent-and-recovery-removal

Conversation

@rafacm
Copy link
Copy Markdown
Owner

@rafacm rafacm commented Apr 28, 2026

Closes #108.

Summary

Consolidates the pipeline's failure-recovery story into the Download step itself by turning it into a Pydantic AI agent (mirroring Fetch Details from PR #107), and eliminates the parallel Django state machine that duplicated DBOS's own workflow records.

  • Download is now an agent with a 3-tier cascade: cheap wget on a known audio_url → Pydantic AI agent (Playwright + lookup_podcast_index tool fanning out across fyyd.de and podcastindex.org) → structured DownloadFailed exception. Returns a typed DownloadResult on success. 30-request usage cap and RAGTIME_DOWNLOAD_AGENT_TIMEOUT (default 120 s) bound cost and latency per run.
  • Recovery layer deletedepisodes/recovery.py, the step_failed Django signal, RecoveryAttempt admin, and the RAGTIME_RECOVERY_* settings are gone. The download agent's job is "find the audio URL and download it" — i.e. it conceptually IS the post-failure recovery step, so inlining it is the natural collapse.
  • DBOS owns workflow state. Dropped ProcessingRun, ProcessingStep, PipelineEvent, RecoveryAttempt (migration 0023_drop_processing_pipeline_models). The DBOS workflow now declares one explicit @DBOS.step() per pipeline phase (fetch_details_step_, download_step, …, embed_step) — dbos workflow steps <id> mirrors PIPELINE_STEPS exactly, no bookkeeping noise. Successful steps return a typed StepOutput; failures propagate up and DBOS records the exception. Deterministic per-episode workflow IDs (episode-<id>-run-<n>) replace the dropped unique_running_run_per_episode partial index.
  • Episode.guid field added (CharField, blank). Extracted by the fetch_details agent (URN URIs, itunes:episodeGuid, <guid> tags, data-guid); used as a hint by lookup_podcast_index. REQUIRED_FIELDS for fetch-details relaxed to ("title",) only — the download agent now owns audio-URL discovery.
  • DBOS-backed admin — replaced ProcessingRunAdmin / PipelineEventAdmin / RecoveryAttemptAdmin with a single Episode-detail "View workflow steps" page reading DBOS.list_workflow_steps(). Falls back to an empty list when DBOS is offline so admin keeps loading.
  • Configuration — new env vars: RAGTIME_DOWNLOAD_AGENT_API_KEY / _MODEL / _TIMEOUT (with one-release env-var fallback to the deprecated RAGTIME_RECOVERY_AGENT_* keys), RAGTIME_PODCAST_INDEXES, RAGTIME_FYYD_API_KEY, RAGTIME_PODCASTINDEX_API_KEY / _SECRET. Wizard, .env.sample, README, doc/README.md, CHANGELOG all updated.
  • Diagrams — recovery Excalidraw + the "with-recovery" pipeline variant deleted; the canonical processing-pipeline diagram still reflects the 10-step flow accurately.

Pre-prod data is not preserved through this migration (per feedback_reembed_ok_preprod.md). Full Postgres regen + re-ingest is the supported upgrade path.

Commits

  1. 984a4f8Episode.guid field, fetch_details agent extension, REQUIRED_FIELDS = ("title",). Migration 0022_add_episode_guid.
  2. 9dc7a75episodes/podcast_indexes/ provider abstraction (PodcastIndex ABC, fyyd + podcastindex clients, fan-out factory). 11 tests.
  3. 239fcd6 — Convert Download to Pydantic AI agent; remove recovery layer (collapsed plan commits 4 + 5; rename agents/recovery*agents/download* IS the recovery removal — splitting required throwaway compat shims).
  4. 7f186ff — Drop ProcessingRun / ProcessingStep / PipelineEvent / RecoveryAttempt; restructure DBOS workflow with one step per pipeline phase. Migration 0023_drop_processing_pipeline_models.
  5. c9a55d5 — DBOS-backed Episode admin view (template + URL + change-form link).
  6. fc7664d.env.sample, configure wizard, README, doc/README, CHANGELOG, feature doc, implementation session transcript.
  7. 9a43f69 — Remove obsolete recovery diagrams; consolidate Fetch Details + Download per-step docs.

(Plus the leading plan/transcript commit 8d33162.)

336 tests passing.

Documentation

Test plan

  • uv run python manage.py test — 336 passing.

⚠️ Do NOT run docker compose down -v. The -v flag wipes the pgdata volume, which contains both the ragtime database and the musicbrainz database (per AGENTS.md, MB defaults to the same Postgres container). Re-importing MusicBrainz is a multi-GB / ~30+ minute job. Use manage.py dbreset instead — it drops only the ragtime database + the Qdrant collection, leaving MB untouched.

One-time prerequisites

uv sync
uv run playwright install chromium                     # Required for the download agent's browser path
# .env populated via `uv run python manage.py configure` — must include
#   RAGTIME_DOWNLOAD_AGENT_API_KEY (or the legacy RAGTIME_RECOVERY_AGENT_API_KEY fallback),
#   RAGTIME_PODCAST_INDEXES=podcastindex,fyyd,
#   RAGTIME_PODCASTINDEX_API_KEY, RAGTIME_PODCASTINDEX_API_SECRET.
# MusicBrainz database imported via musicbrainz-database-setup (see doc/README.md#musicbrainz-database).

Reset and bootstrap

  • docker compose up -d — Postgres + Qdrant (no -v anywhere).
  • uv run python manage.py dbreset --yes — drops ragtime DB + Qdrant collection only; MB untouched.
  • uv run python manage.py migrate
  • uv run python manage.py load_entity_types — seed initial EntityType rows (resolver depends on these).
  • uv run python manage.py createsuperuser — for the admin "View workflow steps" check.

Run a worker

submit_episode only enqueues; a live worker must be running to consume the queue.

  • In a separate terminal: uv run uvicorn ragtime.asgi:application --host 127.0.0.1 --port 8000.

Submit episodes

  • Cheap path — submit a clean RSS-fed publisher whose audio_url is in the HTML:
    uv run python manage.py submit_episode "<URL with audio in HTML>"
    Expect: status reaches READY. dbos workflow steps shows download_step completed with no agent invocation (StepOutput payload only).
  • Index path — submit the ARD Sounds episode:
    uv run python manage.py submit_episode "https://www.ardsounds.de/episode/urn:ard:episode:fdcf93eef8395b35/"
    Expect: agent's first tool call is lookup_podcast_index, returns an Akamai enclosure URL from fyyd, download_file succeeds.
  • Agent-browse path — submit a JS-only player URL not indexed by fyyd/podcastindex:
    uv run python manage.py submit_episode "<JS-only player URL not indexed>"
    Expect: agent falls through to navigate_to_url + find_audio_links / intercept_audio_requests. On full failure, episode goes to FAILED with DownloadFailed(...) recorded as the download_step exception in DBOS.

Verify the DBOS trace mirrors PIPELINE_STEPS

  • uv run dbos workflow list — find the workflow ID (episode-<id>-run-<n>).
  • uv run dbos workflow steps <workflow_id> — expect 9 entries: _bootstrap_status, fetch_details_step_, download_step, transcribe_step, summarize_step, chunk_step, extract_step, resolve_step, embed_step. No bookkeeping noise (no create_run_step, is_run_still_active, did_step_complete, mark_run_failed, get_pending_resume_step, mark_queued).

Verify the admin view

🤖 Generated with Claude Code

rafacm and others added 8 commits April 28, 2026 22:21
Plan covers consolidating the recovery layer into the Download step as a
Pydantic AI agent, adding fyyd/podcastindex.org as a deterministic
fallback, and dropping ProcessingRun/ProcessingStep/PipelineEvent/
RecoveryAttempt in favour of DBOS as the single source of truth for
workflow state. See #108 for the tracking issue and full acceptance
criteria.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ELDS

* Episode.guid (CharField, blank=True) — captures the RSS-feed-style
  identifier when the publisher exposes one. Used as a hint by the
  upcoming download agent's lookup_podcast_index tool.
* fetch_details agent extracts guid (system prompt + EpisodeDetails
  schema). Returns null when no stable identifier is visible.
* fetch_details_step REQUIRED_FIELDS = ("title",) only. audio_url is
  no longer mandatory: the download step (becoming an agent next)
  owns audio-URL discovery for cases where fetch-details could only
  recover a title.
* Migration 0022_add_episode_guid.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* episodes/podcast_indexes/ — new package
  * base.py: PodcastIndex ABC + EpisodeCandidate dataclass
  * fyyd.py: open API, term search (no GUID search supported)
  * podcastindex.py: GUID lookup → term-search fallback, X-Auth-Key
    /X-Auth-Date/Authorization sha1 header trio
  * factory.py: builds provider chain from RAGTIME_PODCAST_INDEXES,
    fans out queries, dedupes results by audio_url
* Settings: RAGTIME_PODCAST_INDEXES (ordered comma-separated list,
  empty disables), RAGTIME_FYYD_API_KEY (optional, raises rate
  limits), RAGTIME_PODCASTINDEX_API_KEY / _SECRET (required when
  podcastindex is configured).
* 11 tests covering candidate parsing, GUID-first behaviour,
  factory selection, missing-credentials skip, dedup, exception
  containment.

Used by the upcoming download agent's lookup_podcast_index tool.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Combines plan commits 4 + 5 — splitting them across two commits would
require throwaway compatibility shims (the rename of agents/recovery*
to agents/download* IS the recovery removal).

* episodes/downloader.py — three-tier cascade:
  1. wget on episode.audio_url (cheap path)
  2. Pydantic AI download agent (with podcast-index lookup +
     Playwright tools)
  3. DownloadFailed (structured fields: episode_id, sources_tried,
     wget_error, agent_message)
  Returns a typed DownloadResult on success.
* episodes/agents/download{,_browser,_deps,_tools}.py — renamed
  from recovery*. New tool: lookup_podcast_index fans out across
  RAGTIME_PODCAST_INDEXES via the podcast_indexes package.
  DownloadDeps replaces RecoveryDeps (carries title, show_name,
  guid for index lookups; drops recovery-specific error_type/
  http_status fields).
* Deleted: episodes/recovery.py (AgentStrategy / HumanEscalation /
  handle_step_failure), episodes/agents/recovery_resume.py,
  episodes/tests/test_recovery.py, episodes/tests/test_agent_resume.py.
  RecoveryAttemptAdmin downgraded to a read-only audit view (model
  drops in the next commit).
* episodes/workflows.py — drop run_agent_recovery workflow + helpers.
* episodes/apps.py — remove step_failed signal connection.
* Settings: RAGTIME_DOWNLOAD_AGENT_API_KEY/MODEL/TIMEOUT (with env
  fallback to the legacy RAGTIME_RECOVERY_AGENT_* keys during the
  transition). RAGTIME_RECOVERY_* settings removed.
* Tests: test_agent_tools.py renamed deps; test_admin.py drops
  retry_agent_recovery / RunAgentRecoveryTask suites.

339 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tructure DBOS workflow

DBOS owns workflow state; the Episode row carries the user-facing
status / error_message. ``dbos workflow steps <id>`` mirrors
PIPELINE_STEPS exactly.

* episodes/models.py — drop ProcessingRun, ProcessingStep,
  PipelineEvent, RecoveryAttempt. Migration
  0023_drop_processing_pipeline_models.
* episodes/workflows.py — rewrite ``process_episode``: one
  explicit ``@DBOS.step()`` per pipeline phase
  (``fetch_details_step_``, ``download_step``,
  ``transcribe_step``, ``summarize_step``, ``chunk_step``,
  ``extract_step``, ``resolve_step``, ``embed_step``) plus a
  single ``_bootstrap_status`` step. No more dynamic-dispatch
  importlib lookup; no more bookkeeping steps
  (``create_run_step``, ``is_run_still_active``,
  ``did_step_complete``, ``mark_run_failed``,
  ``get_pending_resume_step``, ``mark_queued``). Successful
  steps return a typed ``StepOutput``; failures propagate up
  and DBOS records the exception. Adds ``workflow_id_for()``
  helper for the deterministic per-episode workflow ID
  (``episode-<id>-run-<n>``) that replaces the dropped
  ``unique_running_run_per_episode`` partial index.
* episodes/processing.py — reduced to no-op shims so step
  modules don't have to be rewritten in this commit. Future
  cleanup will inline & delete.
* episodes/signals.py — drop ``step_completed`` /
  ``step_failed`` Signal objects (no remaining listeners).
* episodes/events.py — slim to dataclasses + ``classify_error``;
  drop persistence helpers that referenced PipelineEvent.
* episodes/admin.py — remove ProcessingRunAdmin /
  PipelineEventAdmin / RecoveryAttemptAdmin /
  ``NeedsHumanActionFilter``. Reprocess action no longer
  resolves AWAITING_HUMAN attempts or skips by RUNNING run
  (DBOS rejects duplicate workflow IDs). FAILED episodes
  remain editable; everything else is read-only.
* episodes/telemetry.py — switch session ID from
  ``processing-run-<id>-…`` to per-episode
  ``episode-<id>``.
* Tests: drop test_events.py persistence tests, drop the
  recovery-aware regression test in
  test_fetch_details_step.py, drop the
  ``test_reprocess_skips_episode_with_active_run`` admin
  test (no longer applicable). 334 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the deleted ProcessingRun / PipelineEvent / RecoveryAttempt
admin pages with a single DBOS-backed view.

* episodes/admin.py — register a custom admin URL
  ``/<id>/dbos-steps/`` that calls ``DBOS.list_workflows`` (filters
  by workflow ID prefix ``episode-<id>-``), picks the most recent
  run, and pulls per-step rows via ``DBOS.list_workflow_steps``.
  Renders function name + step ID + output + error so admins see
  exactly what each pipeline phase produced. Falls back to an
  empty list (and a friendly "No DBOS records" message) when DBOS
  isn't running or anything raises — admin must keep loading even
  when the queue is offline.
* episodes/templates/admin/episodes/episode/dbos_steps.html — the
  rendering template.
* Episode change form gains a ``dbos_steps_link`` readonly field
  (link in the main fieldset) so admins can jump from the episode
  page to its workflow trace.
* Tests: 2 new tests covering the empty-state and populated
  rendering.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* .env.sample — drop RAGTIME_RECOVERY_*, add the new
  RAGTIME_DOWNLOAD_AGENT_* and RAGTIME_PODCAST_INDEXES /
  RAGTIME_FYYD_API_KEY / RAGTIME_PODCASTINDEX_API_KEY|_SECRET keys.
* core/management/commands/_configure_helpers.py — replace the
  "Recovery" wizard section with two sections: "Download Agent"
  (Convention B) and "Podcast Indexes". Tests in
  core/tests/test_configure.py updated accordingly.
* README.md — refreshed Download row in the pipeline table,
  swapped the recovery bullet for an "agent-driven download"
  bullet, updated tech-stack mention.
* doc/README.md — new "Download cascade" section replaces the
  old "Recovery" section; step-3 description points at it; the
  workflow narrative explains DBOS owns audit state and
  DownloadFailed carries structured fields. Recovery Excalidraw
  diagram flagged as stale.
* CHANGELOG.md — entry under 2026-04-28 with Added / Changed /
  Removed sections, all marked **BREAKING**, links to plan,
  feature, and both session transcripts.
* doc/features/2026-04-28-download-agent-and-recovery-removal.md
  — feature doc.
* doc/sessions/2026-04-28-download-agent-and-recovery-removal-implementation-session.md
  — implementation transcript.

336 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* Delete obsolete Excalidraw diagrams + their SVG renders:
  - doc/architecture/ragtime-recovery.{excalidraw,svg}
  - doc/architecture/ragtime-processing-pipeline-with-recovery.{excalidraw,svg}
  The recovery layer no longer exists; the canonical pipeline diagram
  (ragtime-processing-pipeline) covers the current 10-step flow.
* doc/README.md — fold the standalone "Download cascade" section into
  the step-3 "Download" description so the per-step doc is the single
  source of truth. Expand the step-2 "Fetch Details" doc to spell out
  the EpisodeDetails schema, the orchestrator's responsibilities, and
  why audio_url is no longer required. Drop the stale-diagram callout.
* AGENTS.md — replace the dropped unique_running_run_per_episode
  constraint description with the new deterministic DBOS workflow ID
  approach; update the tech-choices line to list fetch_details and
  download agents instead of "recovery agent".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rafacm rafacm marked this pull request as ready for review April 28, 2026 21:07
The download cascade now invokes the Pydantic AI agent on wget failure,
which tries to launch a Playwright browser. CI has no chromium installed
(`playwright install chromium` is a manual one-time prereq), so the
agent crashed before reaching the DownloadFailed return — the test then
saw a Playwright error string in episode.error_message instead of the
wget reference it was asserting on.

* test_wget_error_sets_failed → test_wget_error_falls_through_to_agent:
  mocks run_download_agent to return success=False, asserts both 'wget'
  and 'agent' show up in the DownloadFailed payload (i.e. sources_tried
  records both tiers).
* New test_agent_recovers_after_wget_fails covering the
  wget-fails-but-agent-succeeds path the old test didn't reach: stages
  a fake downloaded file, confirms the orchestrator attaches it and
  advances the episode to TRANSCRIBING.

337 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Owner Author

@rafacm rafacm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking issues from review:

  1. DBOS records failed pipeline steps as successful and continues advancing. The wrappers in episodes/workflows.py always return StepOutput after calling the legacy step functions, but those functions still fail by setting Episode.status = FAILED and returning. Please make failures raise into DBOS, either in the step functions or by having each wrapper refresh the episode and raise when status is FAILED.

  2. Deterministic workflow IDs are declared but never applied. workflow_id_for() exists and the docs say duplicates are rejected, but episodes/signals.py and admin reprocess enqueue directly, so DBOS generates UUIDs. Please wrap enqueue sites with SetWorkflowID(workflow_id_for(...)) or use DBOS deduplication explicitly. This also fixes the admin trace view that filters by the episode-<id>- prefix.

  3. Agent-discovered audio URLs should replace stale failed URLs. In episodes/downloader.py, the agent URL is saved only when episode.audio_url is empty. If wget failed for an existing URL and the agent found a better enclosure, leaving the stale URL causes future reprocesses to retry the bad URL first. Please save agent_result.audio_url whenever present and different from the current value.

…stale audio_url

Three blocking issues from the review on this branch.

1. Step wrappers in workflows.py always returned StepOutput after the
   legacy step function returned, but those functions handle failure
   by setting Episode.status = FAILED and returning normally — they
   never raise. DBOS therefore recorded failed runs as successful and
   the workflow advanced past them. Added a typed StepFailed exception
   hierarchy (FetchDetailsFailed, DownloadStepFailed, TranscribeFailed,
   …, EmbedFailed) and a `_raise_if_failed(episode_id, exc_cls)` helper
   that refreshes the row after each step body returns and raises the
   matching subclass when status is FAILED. DBOS now records the
   exception class + message verbatim.
2. workflow_id_for() existed but no caller used it: signals.py and
   admin reprocess called episode_queue.enqueue() bare, so DBOS
   generated UUIDs and the admin's "View workflow steps" page (which
   filters by ``episode-<id>-`` prefix) returned nothing. Introduced
   `enqueue_episode(episode_id, from_step="")` that wraps the enqueue
   in `SetWorkflowID(workflow_id_for(episode_id, next_attempt(...)))`.
   `next_attempt()` walks DBOS.list_workflows() to compute the next
   `run-<n>` suffix (returns 1 when no prior runs exist or DBOS
   isn't running). signals.queue_next_step and the admin reprocess
   action now route through enqueue_episode.
3. downloader.py only saved agent_result.audio_url when episode.audio_url
   was empty, so a stale URL that wget had failed on was never
   overwritten by an agent-discovered enclosure. Next reprocess would
   waste a wget hop on the bad URL. Save whenever the agent URL is
   present and different from the current value.

Tests:
* New episodes/tests/test_workflows.py — 8 tests covering
  workflow_id_for, next_attempt (DBOS-unavailable / no-prior-runs /
  highest-run + 1 with unrelated workflows ignored), _raise_if_failed
  no-op vs typed-raise paths, and the StepFailed subclass step_name
  attribute.
* test_signals.py — assert deterministic workflow ID via
  SetWorkflowID, update enqueue assertion (now always carries
  from_step="").
* test_download.py — assert the agent-discovered URL overwrites the
  stale one in the post-cascade audio_url field.

346 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rafacm
Copy link
Copy Markdown
Owner Author

rafacm commented Apr 29, 2026

Addressed all three blocking issues from the review in 4833851.

1. DBOS records failed pipeline steps as successful → ✅ fixed

Added a typed StepFailed exception hierarchy in episodes/workflows.py (FetchDetailsFailed, DownloadStepFailed, TranscribeFailed, SummarizeFailed, ChunkFailed, ExtractFailed, ResolveFailed, EmbedFailed) and a _raise_if_failed(episode_id, exc_cls) helper. Each step wrapper refreshes the episode after the legacy step body returns and raises the matching subclass when status == FAILED, so DBOS records the exception class + error_message instead of treating the failed run as success. Coverage in episodes/tests/test_workflows.py:RaiseIfFailedTests.

2. Deterministic workflow IDs declared but never applied → ✅ fixed

Introduced enqueue_episode(episode_id, from_step="") in episodes/workflows.py that wraps episode_queue.enqueue(...) in SetWorkflowID(workflow_id_for(episode_id, next_attempt(episode_id))). next_attempt() walks DBOS.list_workflows() to find the highest existing episode-<id>-run-<n> suffix and returns n + 1 (defaults to 1 when DBOS isn't running or no prior runs exist). Both enqueue sites — signals.queue_next_step and EpisodeAdmin._execute_reprocess — now route through enqueue_episode. The admin "View workflow steps" page's episode-<id>- prefix filter now resolves to the freshly-enqueued workflow. Coverage in test_workflows.py:WorkflowIdHelpersTests and test_signals.py:test_creating_episode_uses_deterministic_workflow_id.

3. Agent-discovered audio URLs should replace stale failed URLs → ✅ fixed

Changed the episodes/downloader.py guard from if agent_result.audio_url and not episode.audio_url: to if agent_result.audio_url and agent_result.audio_url != episode.audio_url:. A stale URL that wget failed on now gets overwritten by the agent-discovered enclosure, so a future reprocess starts the cascade against the working URL. Coverage in test_download.py:test_agent_recovers_after_wget_fails.

Test suite: 346 passing (up from 337 — 9 new regression tests across the three fixes).

@rafacm
Copy link
Copy Markdown
Owner Author

rafacm commented Apr 29, 2026

Reviewed the fixes in 4833851. The three previously reported issues are addressed:

  1. DBOS failure propagation now raises typed StepFailed subclasses after legacy step functions mark an episode failed.
  2. Episode pipeline enqueue now uses deterministic SetWorkflowID(workflow_id_for(...)) via enqueue_episode() from both signal and admin paths.
  3. Agent-discovered audio URLs now replace stale failed audio_url values.

I also ran the focused regression slice:

env UV_CACHE_DIR=/tmp/uv-cache uv run python manage.py test episodes.tests.test_workflows episodes.tests.test_signals episodes.tests.test_download episodes.tests.test_admin

Result: 41 tests passing.

There are no GitHub review threads to mark resolved because the original review was posted as a top-level review body, not inline comments.

DBOS's Python SDK returns ``list_workflows()`` and ``list_workflow_steps()``
rows as TypedDicts, not objects with attributes. The previous helper used
``getattr(record, ...)`` which always returned the default for dicts —
the prefix filter never matched, the helper returned [] before reaching
``list_workflow_steps``, and the admin page rendered the empty-state
message even when the workflow existed.

* episodes/admin.py — new ``_dbos_field(record, name, default=None)``
  helper that uses dict ``.get()`` for TypedDicts and falls back to
  ``getattr`` for forward compatibility (in case a future DBOS version
  returns dataclasses or attrs classes). Both prefix-filter and step-
  unwrapping paths route through it.
* episodes/tests/test_admin.py — regression test covering the dict
  path: mocks ``dbos.DBOS`` to return TypedDict workflows + steps,
  asserts the helper picks the most-recent matching workflow and
  unwraps the step rows.

347 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
rafacm and others added 3 commits April 29, 2026 06:04
…format

DBOS persists per-step ``output`` and ``error`` as base64-encoded
pickle bytes. ``DBOS.list_workflow_steps()`` returns those raw
strings rather than reconstructing the objects, so the admin's
"View workflow steps" page was rendering the b64 blob in the
ERROR column instead of the human-readable message.

Two-part fix:

* episodes/workflows.py — add ``StepFailed.__reduce__`` returning
  ``(cls, (episode_id, error_message))`` so pickle round-trips
  cleanly. The default ``Exception.__reduce__`` puts only the
  formatted message in ``self.args`` and unpickling tries
  ``cls(message)`` which raises ``TypeError`` against the
  two-arg ``__init__``.
* episodes/admin.py — new ``_decode_dbos_payload(value)`` helper:
  when the value looks like a base64-encoded pickle stream
  (starts with ``gAS`` — the protocol-4 magic), base64-decode +
  ``pickle.loads`` and return ``str(obj)``. Falls back to the
  raw value on decode failure, so legacy rows pickled before
  the ``__reduce__`` fix still render (as wire format) without
  crashing the page. Both ``output`` and ``error`` columns route
  through it.

Tests: 4 new (StepFailed pickle round-trip via the helper,
passthrough for empty / plain / pickle-look-alike-but-not-valid
values, plus the wire-format → readable assertion in the admin
helper).

Note: the standalone ``dbos`` CLI still can't unpickle these
objects because ``episodes.workflows`` isn't on its import path —
that's tracked separately in #110, which proposes moving the
exception classes to a Django-free module.

349 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous helper ``_dbos_workflow_steps`` sorted by ``created_at``
and returned only the latest run's steps. After a reprocess, earlier
runs disappeared from the view — a regression vs. the old
``ProcessingRunInlineForEpisode`` + ``ProcessingStepInline`` UX which
listed every ``ProcessingRun`` with its steps.

* episodes/admin.py — new ``_dbos_workflow_runs(episode_id)`` returns
  every ``episode-<id>-run-<n>`` workflow recorded for the episode,
  newest first, each carrying its own ``steps`` list looked up via
  the new ``_step_rows_for_workflow`` helper. ``_dbos_workflow_steps``
  is kept as a thin shim returning the latest run's steps for any
  caller that still wants the single-run shape. New
  ``_epoch_ms_to_datetime`` helper turns DBOS's epoch-ms timestamps
  into ``datetime`` for display.
* dbos_steps_view passes ``runs`` instead of ``steps`` to the
  template. The change-form link relabel: "View workflow runs"
  (was "View workflow steps").
* dbos_steps.html — restructure to render one section per run with
  the workflow ID, a colour-coded status pill (SUCCESS green,
  ERROR red, PENDING/ENQUEUED yellow, CANCELLED grey), started /
  updated / queue metadata, and the per-step table underneath.
  Plural-aware run count in the header.
* Tests updated and added — replace the single-run rendering test
  with a multi-run one (asserting both ``run-1`` SUCCESS and
  ``run-2`` ERROR sections render with their own steps), and the
  helper-level test now exercises ``_dbos_workflow_runs`` with a
  mocked ``DBOS.list_workflow_steps`` side_effect that returns
  different steps per workflow_id.

349 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…s types

The advisory-lock guard in ``_acquire_name_locks`` only sorted names
**within** a single entity type. Lock acquisition was driven by the
per-type loop in ``resolve_entities``, so two resolvers running on
different episodes could interleave the loop in opposite orders:

  Episode A: takes (musician, "Miles Davis"), waits on (album, "Kind of Blue")
  Episode B: takes (album, "Kind of Blue"), waits on (musician, "Miles Davis")

Postgres' deadlock detector kicked one out:
  "deadlock detected … waits for ExclusiveLock on advisory lock …"

Fix: hoist all advisory-lock acquisitions out of the per-type loop
into a single batched call at the start of the transaction.

* ``_acquire_name_locks`` now takes an iterable of ``(entity_type_id,
  name)`` tuples — the full set across every type the resolve
  touches — and sorts the lock keys globally before acquiring. This
  gives every resolver the same total acquisition order, eliminating
  the cross-type cycle.
* ``resolve_entities`` pre-resolves every ``EntityType`` referenced by
  the aggregated chunks before opening the transaction (so unknown
  keys still log a single warning each), builds the global lock-pair
  list inside the transaction, takes the batch lock, then runs the
  per-type DB work without re-locking.

349 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
rafacm and others added 2 commits April 29, 2026 07:48
Calling ``wikidata_queue.enqueue(...)`` from inside the resolver was a
silent no-op past the first call: DBOS keys its operation log on
``(parent_workflow_id, parent_workflow_fid, target_func_name)``. Inside
a ``@DBOS.step()`` body every enqueue call shares the step's
``function_id``, so only the first enqueue per ``(step, target_func)``
pair ever runs and subsequent calls return the first call's handle
without dispatching anything. Multiple resolve runs over time then
saw the same recorded result and never enqueued anything new.

The wikidata_enrichment queue therefore stayed permanently empty even
with uvicorn running — every enqueue from inside ``resolve_step`` was
either the original first one (already terminal) or a dedup hit.

Fix: hoist the enqueue into ``process_episode`` (workflow context).

* ``episodes/resolver.py:resolve_entities`` now returns the list of
  entity IDs that need background enrichment (``[]`` on every
  early-exit / failure path) instead of trying to enqueue them itself.
  ``_enqueue_enrichment`` deleted.
* ``episodes/workflows.py`` — new ``ResolveStepOutput(StepOutput)``
  carrying ``entity_ids_to_enrich``. ``resolve_step`` returns it;
  ``process_episode`` captures the value during dispatch and runs
  the enqueue loop after the pipeline completes, from workflow
  context where DBOS allows ``Queue.enqueue`` properly.
* Tests: setUp in ``test_resolve.py`` switches from patching the
  deleted ``_enqueue_enrichment`` to capturing ``resolve_entities``'s
  return value via ``patch.object`` + ``side_effect``. Three call
  sites updated to read the captured list. New ``test_workflows.py``
  cases pin the ``ResolveStepOutput`` contract: a non-empty resolve
  returns its entity IDs, an empty resolve returns ``()``.

351 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…l_work_url

MusicBrainz names entity-relationship tables ``l_<a>_<b>`` with the
two entity types in alphabetical order, and the matching ``entity0``
/ ``entity1`` columns follow the same alphabetical ordering. For
most types ``<entity> < url``, so the table is ``l_<entity>_url``
with ``entity0=<entity>``, ``entity1=url`` — what the existing query
assumed.

But ``url < work`` alphabetically. MB stores work-URL relationships
in ``l_url_work`` with ``entity0=url``, ``entity1=work``. The hard-
coded ``l_work_url`` reference in ``_TABLE_CONFIG`` raised
``UndefinedTable`` when ``enrich_entity_wikidata`` hit a work
entity, leaving 76 entities resolved + 151 stuck pending in the
user's pipeline.

* ``_TABLE_CONFIG`` — every entry now declares ``link_main_col`` /
  ``link_url_col`` explicitly. The ``work`` entry uses
  ``l_url_work`` with ``link_main_col="entity1"``,
  ``link_url_col="entity0"`` (everyone else stays
  ``entity0`` / ``entity1``).
* ``get_wikidata_qid`` reads those columns from the config and
  threads them into the JOIN, defaulting to ``entity0`` /
  ``entity1`` for forward compatibility if a future entry omits
  them.

Verified against the live MB schema:
  to_regclass('musicbrainz.l_url_work') -> musicbrainz.l_url_work
  to_regclass('musicbrainz.l_work_url') -> NULL

351 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…sting rounds

Per AGENTS.md, the implementation transcript captures all interactions
"up to and including PR review feedback and any follow-up changes made
after the PR is created." The transcript stopped at commit 8 — appended
the rounds that followed:

* /pr-create + push + ready-for-review
* CI failure (test_download.py mocking)
* Verification block / test plan rewrite for issue #108 + PR #109
* CI test reporting (deferred)
* PR self-review with three blocking issues (4833851)
* Recovery diagrams removed + Fetch Details / Download docs consolidated
* Manual-testing diagnostics: admin DBOS view returning empty list
  (TypedDict bug), pickle wire format rendering, only-one-run shown
* Resolver deadlock fix
* The big one: Wikidata enrichment never running (enqueue from workflow,
  not step) + the l_url_work MB schema fix
* Five follow-up issues filed (#110/#111/#112/#113 plus the ARD candidate
  matching one)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rafacm rafacm merged commit dab3033 into main Apr 29, 2026
1 check passed
rafacm added a commit that referenced this pull request Apr 29, 2026
…stale audio_url

Three blocking issues from the review on this branch.

1. Step wrappers in workflows.py always returned StepOutput after the
   legacy step function returned, but those functions handle failure
   by setting Episode.status = FAILED and returning normally — they
   never raise. DBOS therefore recorded failed runs as successful and
   the workflow advanced past them. Added a typed StepFailed exception
   hierarchy (FetchDetailsFailed, DownloadStepFailed, TranscribeFailed,
   …, EmbedFailed) and a `_raise_if_failed(episode_id, exc_cls)` helper
   that refreshes the row after each step body returns and raises the
   matching subclass when status is FAILED. DBOS now records the
   exception class + message verbatim.
2. workflow_id_for() existed but no caller used it: signals.py and
   admin reprocess called episode_queue.enqueue() bare, so DBOS
   generated UUIDs and the admin's "View workflow steps" page (which
   filters by ``episode-<id>-`` prefix) returned nothing. Introduced
   `enqueue_episode(episode_id, from_step="")` that wraps the enqueue
   in `SetWorkflowID(workflow_id_for(episode_id, next_attempt(...)))`.
   `next_attempt()` walks DBOS.list_workflows() to compute the next
   `run-<n>` suffix (returns 1 when no prior runs exist or DBOS
   isn't running). signals.queue_next_step and the admin reprocess
   action now route through enqueue_episode.
3. downloader.py only saved agent_result.audio_url when episode.audio_url
   was empty, so a stale URL that wget had failed on was never
   overwritten by an agent-discovered enclosure. Next reprocess would
   waste a wget hop on the bad URL. Save whenever the agent URL is
   present and different from the current value.

Tests:
* New episodes/tests/test_workflows.py — 8 tests covering
  workflow_id_for, next_attempt (DBOS-unavailable / no-prior-runs /
  highest-run + 1 with unrelated workflows ignored), _raise_if_failed
  no-op vs typed-raise paths, and the StepFailed subclass step_name
  attribute.
* test_signals.py — assert deterministic workflow ID via
  SetWorkflowID, update enqueue assertion (now always carries
  from_step="").
* test_download.py — assert the agent-discovered URL overwrites the
  stale one in the post-cascade audio_url field.

346 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rafacm rafacm deleted the feature/download-agent-and-recovery-removal branch April 29, 2026 06:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Download as agent + podcast-index fallback + DBOS state-of-truth

1 participant