Skip to content

Commit d027884

Browse files
committed
feat(lifespan): wire JobStore + CleanupTask in App-Startup (Phase 2)
SQLiteJobStore aus async_session; CleanupTask läuft beim Start sofort und danach alle 24h. PrintQueue und PrintService erhalten store via DI für Recovery und Persistierung. recover_inflight_jobs() entfernt — PrintQueue.start() übernimmt Recovery mit korrekter QUEUED/PRINTING-Differenzierung (Spec R1-C1). Stub-Printer-Row für Mock-Backend / CI: wenn upsert_runtime_printer keinen Host-konfigurierten Drucker anlegt, wird eine Stub-Row mit eindeutigem slug=str(uuid) eingefügt damit jobs.printer_id (FK) nicht verletzt wird. Test test_phase6b_sse_with_batch angepasst (upsert statt blindes create, weil Stub-Row bereits existiert). Refs #93
1 parent ddbe0d4 commit d027884

3 files changed

Lines changed: 79 additions & 18 deletions

File tree

backend/app/main.py

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,22 @@
9292
from app.db.engine import async_session, engine
9393
from app.db.lifespan import (
9494
ensure_printer_state,
95-
recover_inflight_jobs,
9695
run_migrations,
9796
seed_templates,
9897
upsert_runtime_printer,
9998
verify_alembic_at_head,
10099
)
101100
from app.db.session import get_session
102101
from app.integrations.registry import IntegrationRegistry
102+
from app.models.printer import Printer as _Printer
103103
from app.printer_backends import BackendRegistry
104104
from app.printer_backends.exceptions import SnmpDiscoveryError
105105
from app.printer_backends.snmp_helper import query_model_pjl
106106
from app.printer_models.registry import ModelRegistry
107107
from app.schemas.readiness import ReadinessResponse
108+
from app.services.cleanup_task import CleanupTask
108109
from app.services.event_bus import EventBus
110+
from app.services.job_store_sqlite import SQLiteJobStore
109111
from app.services.label_renderer import LabelRenderer
110112
from app.services.lookup_service import AppLookupService
111113
from app.services.print_queue import PrintQueue
@@ -271,13 +273,25 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
271273

272274
# 4. DB-bound init — plugin registry and template cache are populated.
273275
async with async_session() as s:
274-
await recover_inflight_jobs(s)
276+
# Phase 2: recover_inflight_jobs() entfernt (Spec R1-C1) —
277+
# PrintQueue.start() übernimmt Recovery mit korrekter QUEUED/PRINTING-Differenzierung.
275278
await seed_templates(s, TemplateLoader)
276279
db_printer_id = await upsert_runtime_printer(s, settings)
277280
await ensure_printer_state(s)
278281
await s.commit()
279282
# -------------------------------------------------------------------------
280283

284+
# Phase 2: JobStore + CleanupTask
285+
# 'async_session' ist die async_sessionmaker aus app.db.engine (R2-M5)
286+
job_store = SQLiteJobStore(async_session)
287+
288+
cleanup_task = CleanupTask(
289+
store=job_store,
290+
retention_days=settings.job_retention_days,
291+
)
292+
await cleanup_task.start()
293+
app.state.cleanup_task = cleanup_task
294+
281295
discovery_host = settings.pt750w_host or ""
282296
if discovery_host and settings.printer_discover_via_snmp:
283297
model_id = await _resolve_model_id(settings, discovery_host)
@@ -297,15 +311,51 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
297311
tape_registry = TapeRegistry()
298312
printer = driver.make_queue_printer(tape_registry, printer_id=db_printer_id)
299313

314+
# Phase 2: Wenn kein Host konfiguriert ist (Mock-Backend / CI), legt
315+
# upsert_runtime_printer keine Printer-Row an. make_queue_printer generiert
316+
# dann eine frische uuid4. Damit jobs.printer_id (FK → printers.id) bei
317+
# save_queued nicht verletzt wird, legen wir hier eine Stub-Row an.
318+
if db_printer_id is None:
319+
# Wenn kein Host konfiguriert ist (Mock-Backend / CI), liefert
320+
# upsert_runtime_printer None zurück und fügt keine Printer-Row ein.
321+
# make_queue_printer erzeugt dann eine neue uuid4. Damit
322+
# jobs.printer_id (FK → printers.id) bei save_queued nicht verletzt
323+
# wird, legen wir hier eine Stub-Row an. slug wird auf str(id) gesetzt
324+
# (eindeutig durch UUID), damit der UNIQUE-Constraint nicht verletzt wird.
325+
_stub_slug = str(printer.id)
326+
async with async_session() as s:
327+
existing = await s.get(_Printer, printer.id)
328+
if existing is None:
329+
s.add(
330+
_Printer(
331+
id=printer.id,
332+
name=f"stub-{printer.id}",
333+
slug=_stub_slug,
334+
model=model_id.lower(),
335+
backend=settings.printer_backend,
336+
)
337+
)
338+
await s.commit()
339+
300340
# --- SSE EventBus ---
301341
event_bus = EventBus(queue_size=settings.sse_queue_size)
302342
app.state.event_bus = event_bus
303343
# ----- end SSE ------
304344

345+
# Shared LabelRenderer reused by both PrintService, preview endpoint and
346+
# PrintQueue Recovery. Constructing it once avoids repeated font-loading
347+
# overhead on every POST /api/render/preview request.
348+
# Moved before PrintQueue construction so Recovery in queue.start() can use it.
349+
shared_renderer = LabelRenderer()
350+
app.state.label_renderer = shared_renderer
351+
305352
pq_producer = PrintQueueProducer(bus=event_bus)
306353
queue = PrintQueue(
307354
printers=[printer],
308355
on_state_change=pq_producer.handle_transition,
356+
store=job_store,
357+
renderer=shared_renderer,
358+
loader=TemplateLoader,
309359
)
310360
await queue.start()
311361

@@ -330,25 +380,22 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
330380
app.state.printer_id = printer.id
331381
app.state.printer_host = discovery_host
332382
app.state.printer_snmp_community = settings.printer_snmp_community
333-
# Shared LabelRenderer reused by both PrintService and the preview endpoint.
334-
# Constructing it once avoids repeated font-loading overhead on every
335-
# POST /api/render/preview request.
336-
shared_renderer = LabelRenderer()
337-
app.state.label_renderer = shared_renderer
338383
app.state.print_service = PrintService(
339384
template_loader=TemplateLoader,
340385
renderer=shared_renderer,
341386
print_queue=queue,
342387
lookup_service=AppLookupService(),
343388
printer_id=printer.id,
344389
backend=backend,
390+
store=job_store,
345391
)
346392

347393
try:
348394
yield
349395
finally:
350396
if status_producer is not None:
351397
await status_producer.stop()
398+
await cleanup_task.stop()
352399
await queue.stop(timeout_s=settings.printer_queue_timeout_s)
353400
await engine.dispose()
354401
# Close shared HTTP clients held by integration plugins that support it.

backend/app/services/job_store.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,15 +141,17 @@ async def mark_interrupted(self, printer_id: UUID) -> int:
141141

142142
async def list_pending(self, printer_id: UUID) -> list[Job]:
143143
items = [
144-
j for j in self._jobs.values()
144+
j
145+
for j in self._jobs.values()
145146
if j.printer_id == printer_id and j.state in _NON_TERMINAL
146147
]
147148
return sorted(items, key=lambda j: j.created_at)
148149

149150
async def evict_terminal_older_than(self, age: timedelta) -> int:
150151
cutoff = datetime.now(UTC) - age
151152
to_delete = [
152-
jid for jid, j in self._jobs.items()
153+
jid
154+
for jid, j in self._jobs.items()
153155
if j.state in _TERMINAL and j.finished_at is not None and j.finished_at < cutoff
154156
]
155157
for jid in to_delete:

backend/tests/integration/test_phase6b_sse_with_batch.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,29 @@ async def test_sse_contains_batch_job_events(
142142
# f"printer:{printer_id}:queue"
143143
app_printer_id: uuid.UUID = inner.state.printer_id
144144

145-
# 3. Printer-Row mit ID=app_printer_id in die DB schreiben.
145+
# 3. Printer-Row mit ID=app_printer_id sicherstellen.
146+
# Phase 2: Lifespan legt bei Mock-Backend (kein Host) bereits eine Stub-Row
147+
# an. Wir holen die existierende Row und aktualisieren Name/Slug falls nötig,
148+
# statt blind create() aufzurufen (würde UNIQUE-Constraint verletzten).
146149
# batch.py prüft printer.id == app.state.printer_id — durch die identische
147150
# ID passt der Check ohne dass PrintQueue-Interna umgebaut werden müssen.
148-
p = Printer(
149-
id=app_printer_id,
150-
name="Brother PT-P750W",
151-
slug="brother-p750w",
152-
model="PT-P750W",
153-
backend="mock",
154-
)
155-
await printers_repo.create(sse_batch_db_session, p)
151+
p = await sse_batch_db_session.get(Printer, app_printer_id)
152+
if p is None:
153+
p = Printer(
154+
id=app_printer_id,
155+
name="Brother PT-P750W",
156+
slug="brother-p750w",
157+
model="PT-P750W",
158+
backend="mock",
159+
)
160+
await printers_repo.create(sse_batch_db_session, p)
161+
else:
162+
p.name = "Brother PT-P750W"
163+
p.slug = "brother-p750w"
164+
p.model = "pt-p750w"
165+
p.backend = "mock"
166+
await sse_batch_db_session.commit()
167+
await sse_batch_db_session.refresh(p)
156168

157169
channels = [
158170
f"printer:{app_printer_id}:queue",

0 commit comments

Comments
 (0)