Skip to content

Commit ef8fefd

Browse files
authored
Merge pull request #94 from strausmann/feat/phase-2-job-persistence
feat(jobs): Phase 2 Job Persistence (DB-backed + Restart Recovery + Cleanup)
2 parents f6f25e4 + 77e4450 commit ef8fefd

27 files changed

Lines changed: 5374 additions & 83 deletions

backend/app/api/routes/batches.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Phase 2: GET /api/batches/{id} — Snapshot für Hangar Result-Page Initial-Render."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Annotated
6+
from uuid import UUID
7+
8+
from fastapi import APIRouter, Depends, HTTPException
9+
from sqlalchemy.ext.asyncio import AsyncSession
10+
11+
from app.auth.dependencies import AuthContext
12+
from app.auth.scope_deps import require_read
13+
from app.db.session import get_session
14+
from app.models.job import JobState
15+
from app.repositories import jobs as jobs_repo
16+
from app.repositories import print_batches as batches_repo
17+
from app.schemas.batch_read import BatchRead, BatchSummary
18+
from app.schemas.job import JobRead
19+
20+
router = APIRouter(prefix="/api/batches", tags=["batches"])
21+
22+
SessionDep = Annotated[AsyncSession, Depends(get_session)]
23+
ReadAuthDep = Annotated[AuthContext, Depends(require_read)]
24+
25+
26+
@router.get("/{batch_id}", response_model=BatchRead)
27+
async def get_batch(
28+
batch_id: UUID,
29+
session: SessionDep,
30+
_auth: ReadAuthDep,
31+
) -> BatchRead:
32+
"""Snapshot eines Batches + aller aktuellen Job-States.
33+
34+
Wird von Hangar's /admin/print/result/{batch_id} für das initiale
35+
Rendering genutzt. summary.all_terminal == False bedeutet, dass Hangar
36+
einen SSE-Stream zu /api/events?batch_id=... öffnen sollte für
37+
Live-Updates.
38+
"""
39+
batch = await batches_repo.get(session, batch_id)
40+
if batch is None:
41+
raise HTTPException(status_code=404, detail="Batch not found")
42+
43+
# batch.job_ids is list[str] — convert to UUID for repo query
44+
job_uuids = [UUID(jid) for jid in batch.job_ids]
45+
fetched_jobs = await jobs_repo.list_by_ids(session, job_uuids)
46+
job_map = {str(j.id): j for j in fetched_jobs}
47+
48+
# Reihenfolge entspricht batch.job_ids; cleanup-evicted Jobs werden übersprungen
49+
ordered = [job_map[jid] for jid in batch.job_ids if jid in job_map]
50+
51+
summary = BatchSummary(
52+
total=len(ordered),
53+
queued=sum(1 for j in ordered if j.state == JobState.QUEUED.value),
54+
printing=sum(1 for j in ordered if j.state == JobState.PRINTING.value),
55+
done=sum(1 for j in ordered if j.state == JobState.DONE.value),
56+
failed=sum(
57+
1 for j in ordered if j.state in (JobState.FAILED.value, JobState.FAILED_RESTART.value)
58+
),
59+
cancelled=sum(1 for j in ordered if j.state == JobState.CANCELLED.value),
60+
)
61+
62+
return BatchRead(
63+
id=batch.id,
64+
printer_id=batch.printer_id,
65+
created_by=batch.created_by,
66+
created_at=batch.created_at,
67+
jobs=[JobRead.model_validate(j) for j in ordered],
68+
summary=summary,
69+
)

backend/app/api/routes/print.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ async def create_print_job(
7979
"loaded_mm": exc.loaded_mm,
8080
}
8181
return JSONResponse(status_code=http_status, content=body)
82-
return PrintJobResponse(job_id=job_id, status="queued")
82+
# Phase 2: submit_print_job gibt jetzt UUID zurück; Response-Schema erwartet str.
83+
return PrintJobResponse(job_id=str(job_id), status="queued")
8384

8485

8586
@router.get(

backend/app/config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,16 @@ class Settings(BaseSettings):
106106
# Set to False during transition to avoid surprising existing automation.
107107
pangolin_bypass_scope_downgrade: bool = False
108108

109+
# Phase 2: Job-Retention für CleanupTask
110+
job_retention_days: int = Field(
111+
default=30,
112+
ge=1,
113+
description=(
114+
"Terminal Jobs (DONE/FAILED/FAILED_RESTART/CANCELLED) werden nach diesem Zeitraum "
115+
"vom CleanupTask gelöscht"
116+
),
117+
)
118+
109119
@field_validator("webhook_api_key")
110120
@classmethod
111121
def validate_api_key_length(cls, v: SecretStr) -> SecretStr:

backend/app/main.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
from app import __version__
7777
from app.api.error_handlers import register_error_handlers
7878
from app.api.routes import batch as batch_routes
79+
from app.api.routes import batches as batches_routes
7980
from app.api.routes import events as events_routes
8081
from app.api.routes import jobs as jobs_routes
8182
from app.api.routes import lookup as lookup_routes
@@ -91,20 +92,22 @@
9192
from app.db.engine import async_session, engine
9293
from app.db.lifespan import (
9394
ensure_printer_state,
94-
recover_inflight_jobs,
9595
run_migrations,
9696
seed_templates,
9797
upsert_runtime_printer,
9898
verify_alembic_at_head,
9999
)
100100
from app.db.session import get_session
101101
from app.integrations.registry import IntegrationRegistry
102+
from app.models.printer import Printer as _Printer
102103
from app.printer_backends import BackendRegistry
103104
from app.printer_backends.exceptions import SnmpDiscoveryError
104105
from app.printer_backends.snmp_helper import query_model_pjl
105106
from app.printer_models.registry import ModelRegistry
106107
from app.schemas.readiness import ReadinessResponse
108+
from app.services.cleanup_task import CleanupTask
107109
from app.services.event_bus import EventBus
110+
from app.services.job_store_sqlite import SQLiteJobStore
108111
from app.services.label_renderer import LabelRenderer
109112
from app.services.lookup_service import AppLookupService
110113
from app.services.print_queue import PrintQueue
@@ -270,13 +273,25 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
270273

271274
# 4. DB-bound init — plugin registry and template cache are populated.
272275
async with async_session() as s:
273-
await recover_inflight_jobs(s)
276+
# Phase 2: recover_inflight_jobs() entfernt (Spec R1-C1) —
277+
# PrintQueue.start() übernimmt Recovery mit korrekter QUEUED/PRINTING-Differenzierung.
274278
await seed_templates(s, TemplateLoader)
275279
db_printer_id = await upsert_runtime_printer(s, settings)
276280
await ensure_printer_state(s)
277281
await s.commit()
278282
# -------------------------------------------------------------------------
279283

284+
# Phase 2: JobStore + CleanupTask
285+
# 'async_session' ist die async_sessionmaker aus app.db.engine (R2-M5)
286+
job_store = SQLiteJobStore(async_session)
287+
288+
cleanup_task = CleanupTask(
289+
store=job_store,
290+
retention_days=settings.job_retention_days,
291+
)
292+
await cleanup_task.start()
293+
app.state.cleanup_task = cleanup_task
294+
280295
discovery_host = settings.pt750w_host or ""
281296
if discovery_host and settings.printer_discover_via_snmp:
282297
model_id = await _resolve_model_id(settings, discovery_host)
@@ -296,15 +311,51 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
296311
tape_registry = TapeRegistry()
297312
printer = driver.make_queue_printer(tape_registry, printer_id=db_printer_id)
298313

314+
if db_printer_id is None:
315+
# Wenn kein Host konfiguriert ist (Mock-Backend / CI), liefert
316+
# upsert_runtime_printer None zurück und fügt keine Printer-Row ein.
317+
# make_queue_printer erzeugt dann eine neue uuid4. Damit
318+
# jobs.printer_id (FK → printers.id) bei save_queued nicht verletzt
319+
# wird, legen wir hier eine Stub-Row an. slug wird auf str(id) gesetzt
320+
# (eindeutig durch UUID), damit der UNIQUE-Constraint nicht verletzt wird.
321+
_stub_slug = str(printer.id)
322+
async with async_session() as s:
323+
# Defensive idempotency: in non-mock production paths the printer_id
324+
# is explicit and would be reused, so the existing check matters
325+
# there. In mock paths (printer_id=None), a fresh uuid4 means
326+
# existing is always None.
327+
existing = await s.get(_Printer, printer.id)
328+
if existing is None:
329+
s.add(
330+
_Printer(
331+
id=printer.id,
332+
name=f"stub-{printer.id}",
333+
slug=_stub_slug,
334+
model=model_id.lower(),
335+
backend=settings.printer_backend,
336+
)
337+
)
338+
await s.commit()
339+
299340
# --- SSE EventBus ---
300341
event_bus = EventBus(queue_size=settings.sse_queue_size)
301342
app.state.event_bus = event_bus
302343
# ----- end SSE ------
303344

345+
# Shared LabelRenderer reused by both PrintService, preview endpoint and
346+
# PrintQueue Recovery. Constructing it once avoids repeated font-loading
347+
# overhead on every POST /api/render/preview request.
348+
# Moved before PrintQueue construction so Recovery in queue.start() can use it.
349+
shared_renderer = LabelRenderer()
350+
app.state.label_renderer = shared_renderer
351+
304352
pq_producer = PrintQueueProducer(bus=event_bus)
305353
queue = PrintQueue(
306354
printers=[printer],
307355
on_state_change=pq_producer.handle_transition,
356+
store=job_store,
357+
renderer=shared_renderer,
358+
loader=TemplateLoader,
308359
)
309360
await queue.start()
310361

@@ -329,18 +380,14 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
329380
app.state.printer_id = printer.id
330381
app.state.printer_host = discovery_host
331382
app.state.printer_snmp_community = settings.printer_snmp_community
332-
# Shared LabelRenderer reused by both PrintService and the preview endpoint.
333-
# Constructing it once avoids repeated font-loading overhead on every
334-
# POST /api/render/preview request.
335-
shared_renderer = LabelRenderer()
336-
app.state.label_renderer = shared_renderer
337383
app.state.print_service = PrintService(
338384
template_loader=TemplateLoader,
339385
renderer=shared_renderer,
340386
print_queue=queue,
341387
lookup_service=AppLookupService(),
342388
printer_id=printer.id,
343389
backend=backend,
390+
store=job_store,
344391
)
345392

346393
try:
@@ -349,6 +396,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
349396
if status_producer is not None:
350397
await status_producer.stop()
351398
await queue.stop(timeout_s=settings.printer_queue_timeout_s)
399+
await cleanup_task.stop()
352400
await engine.dispose()
353401
# Close shared HTTP clients held by integration plugins that support it.
354402
# Plugins that pre-date connection pooling may not have aclose(); skip them.
@@ -597,6 +645,7 @@ async def readiness(
597645
register_error_handlers(app)
598646
app.include_router(print_router)
599647
app.include_router(batch_routes.router)
648+
app.include_router(batches_routes.router)
600649
app.include_router(events_routes.router)
601650
app.include_router(printers_routes.router)
602651
app.include_router(templates_routes.router)

backend/app/repositories/jobs.py

Lines changed: 94 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
from __future__ import annotations
44

5-
from datetime import UTC, datetime
5+
from datetime import UTC, datetime, timedelta
66
from typing import Any
77
from uuid import UUID
88

9-
from sqlalchemy import select, update
9+
from sqlalchemy import delete, select, update
1010
from sqlalchemy.ext.asyncio import AsyncSession
1111
from sqlmodel import col
1212

@@ -160,12 +160,98 @@ async def mark_inflight_as_failed_restart(session: AsyncSession) -> int:
160160
return int(result.rowcount) # type: ignore[attr-defined] # rowcount on UPDATE result
161161

162162

163-
async def list_active(session: AsyncSession) -> list[Job]:
164-
"""Return all jobs in QUEUED or PRINTING state (covered by ix_jobs_state)."""
163+
async def list_active(
164+
session: AsyncSession,
165+
*,
166+
printer_id: UUID | None = None,
167+
) -> list[Job]:
168+
"""Return all jobs in QUEUED or PRINTING state (covered by ix_jobs_state).
169+
170+
Phase 2: optional printer_id filter for PrintQueue.start() recovery.
171+
"""
165172
inflight = (JobState.QUEUED.value, JobState.PRINTING.value)
166-
result = await session.execute(
167-
select(Job)
168-
.where(col(Job.state).in_(inflight)) # col() gives proper Column typing for .in_()
169-
.order_by(col(Job.created_at)) # col() gives proper Column typing
173+
stmt = select(Job).where(col(Job.state).in_(inflight))
174+
if printer_id is not None:
175+
stmt = stmt.where(col(Job.printer_id) == printer_id)
176+
stmt = stmt.order_by(col(Job.created_at))
177+
result = await session.execute(stmt)
178+
return list(result.scalars())
179+
180+
181+
async def mark_printing_as_failed_restart(
182+
session: AsyncSession,
183+
printer_id: UUID,
184+
) -> int:
185+
"""Phase 2: UPDATE only PRINTING jobs for a specific printer to
186+
FAILED_RESTART with error='printer_interrupted'.
187+
188+
Used at PrintQueue.start() — QUEUED jobs are NOT affected because
189+
they will be re-enqueued cleanly. Only PRINTING jobs are ambiguous
190+
(printer may have completed before crash but Hub couldn't update DB).
191+
192+
Returns the count of affected rows.
193+
"""
194+
stmt = (
195+
update(Job)
196+
.where(
197+
col(Job.printer_id) == printer_id,
198+
col(Job.state) == JobState.PRINTING.value,
199+
)
200+
.values(
201+
state=JobState.FAILED_RESTART.value,
202+
error="printer_interrupted",
203+
finished_at=datetime.now(UTC),
204+
)
205+
.execution_options(synchronize_session="fetch")
170206
)
207+
result = await session.execute(stmt)
208+
await session.commit()
209+
return int(result.rowcount) # type: ignore[attr-defined]
210+
211+
212+
async def list_by_ids(
213+
session: AsyncSession,
214+
job_ids: list[UUID],
215+
) -> list[Job]:
216+
"""Bulk-Fetch jobs by ids — order not guaranteed, caller re-orders.
217+
218+
Phase 2: used by GET /api/batches/{id} to load all jobs referenced
219+
by a PrintBatch.job_ids list in a single SQL query.
220+
"""
221+
if not job_ids:
222+
return []
223+
result = await session.execute(select(Job).where(col(Job.id).in_(job_ids)))
171224
return list(result.scalars())
225+
226+
227+
async def evict_terminal_older_than(
228+
session: AsyncSession,
229+
age: timedelta,
230+
) -> int:
231+
"""Phase 2 cleanup: DELETE terminal jobs older than age.
232+
233+
Terminal = DONE | FAILED | FAILED_RESTART | CANCELLED.
234+
Comparison is on finished_at (set whenever a job leaves a non-terminal state).
235+
236+
Jobs with finished_at IS NULL are NOT deleted (NULL < cutoff is SQL UNKNOWN,
237+
which is falsy in WHERE). This is intentional — protects pre-Phase-2 rows
238+
that may not have finished_at set.
239+
240+
Returns the count of deleted rows.
241+
"""
242+
terminal = (
243+
JobState.DONE.value,
244+
JobState.FAILED.value,
245+
JobState.FAILED_RESTART.value,
246+
JobState.CANCELLED.value,
247+
)
248+
cutoff = datetime.now(UTC) - age
249+
stmt = (
250+
delete(Job)
251+
.where(col(Job.state).in_(terminal))
252+
.where(col(Job.finished_at) < cutoff)
253+
.execution_options(synchronize_session="fetch")
254+
)
255+
result = await session.execute(stmt)
256+
await session.commit()
257+
return int(result.rowcount) # type: ignore[attr-defined]

0 commit comments

Comments
 (0)