Skip to content

Commit c80bda5

Browse files
committed
feat(repo): jobs_repo Helper für Phase 2 JobStore
- mark_printing_as_failed_restart(printer_id) — nur PRINTING affected - list_active(printer_id=None) — optionaler Filter - evict_terminal_older_than(age) — Cleanup-Helper Refs #93
1 parent 261fc85 commit c80bda5

4 files changed

Lines changed: 193 additions & 7 deletions

File tree

backend/app/repositories/jobs.py

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
from __future__ import annotations
44

5-
from datetime import UTC, datetime
5+
from datetime import UTC, datetime, timedelta
66
from typing import Any
77
from uuid import UUID
88

9-
from sqlalchemy import select, update
9+
from sqlalchemy import delete, select, update
1010
from sqlalchemy.ext.asyncio import AsyncSession
1111
from sqlmodel import col
1212

@@ -160,12 +160,82 @@ async def mark_inflight_as_failed_restart(session: AsyncSession) -> int:
160160
return int(result.rowcount) # type: ignore[attr-defined] # rowcount on UPDATE result
161161

162162

163-
async def list_active(session: AsyncSession) -> list[Job]:
164-
"""Return all jobs in QUEUED or PRINTING state (covered by ix_jobs_state)."""
163+
async def list_active(
164+
session: AsyncSession,
165+
*,
166+
printer_id: UUID | None = None,
167+
) -> list[Job]:
168+
"""Return all jobs in QUEUED or PRINTING state (covered by ix_jobs_state).
169+
170+
Phase 2: optional printer_id filter for PrintQueue.start() recovery.
171+
"""
165172
inflight = (JobState.QUEUED.value, JobState.PRINTING.value)
166-
result = await session.execute(
173+
stmt = (
167174
select(Job)
168-
.where(col(Job.state).in_(inflight)) # col() gives proper Column typing for .in_()
169-
.order_by(col(Job.created_at)) # col() gives proper Column typing
175+
.where(col(Job.state).in_(inflight))
176+
.order_by(col(Job.created_at))
170177
)
178+
if printer_id is not None:
179+
stmt = stmt.where(col(Job.printer_id) == printer_id)
180+
result = await session.execute(stmt)
171181
return list(result.scalars())
182+
183+
184+
async def mark_printing_as_failed_restart(
185+
session: AsyncSession,
186+
printer_id: UUID,
187+
) -> int:
188+
"""Phase 2: UPDATE only PRINTING jobs for a specific printer to
189+
FAILED_RESTART with error='printer_interrupted'.
190+
191+
Used at PrintQueue.start() — QUEUED jobs are NOT affected because
192+
they will be re-enqueued cleanly. Only PRINTING jobs are ambiguous
193+
(printer may have completed before crash but Hub couldn't update DB).
194+
195+
Returns the count of affected rows.
196+
"""
197+
stmt = (
198+
update(Job)
199+
.where(
200+
col(Job.printer_id) == printer_id,
201+
col(Job.state) == JobState.PRINTING.value,
202+
)
203+
.values(
204+
state=JobState.FAILED_RESTART.value,
205+
error="printer_interrupted",
206+
finished_at=datetime.now(UTC),
207+
)
208+
.execution_options(synchronize_session="fetch")
209+
)
210+
result = await session.execute(stmt)
211+
await session.commit()
212+
return int(result.rowcount) # type: ignore[attr-defined]
213+
214+
215+
async def evict_terminal_older_than(
216+
session: AsyncSession,
217+
age: timedelta,
218+
) -> int:
219+
"""Phase 2 cleanup: DELETE terminal jobs older than age.
220+
221+
Terminal = DONE | FAILED | FAILED_RESTART | CANCELLED.
222+
Comparison is on finished_at (set whenever a job leaves a non-terminal state).
223+
224+
Returns the count of deleted rows.
225+
"""
226+
terminal = (
227+
JobState.DONE.value,
228+
JobState.FAILED.value,
229+
JobState.FAILED_RESTART.value,
230+
JobState.CANCELLED.value,
231+
)
232+
cutoff = datetime.now(UTC) - age
233+
stmt = (
234+
delete(Job)
235+
.where(col(Job.state).in_(terminal))
236+
.where(col(Job.finished_at) < cutoff)
237+
.execution_options(synchronize_session="fetch")
238+
)
239+
result = await session.execute(stmt)
240+
await session.commit()
241+
return int(result.rowcount) # type: ignore[attr-defined]

backend/tests/unit/repositories/__init__.py

Whitespace-only changes.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Fixtures für Unit-Tests der Repository-Schicht.
2+
3+
Stellt eine per-Test in-memory SQLite DB mit db_session-Fixture bereit.
4+
FK-Enforcement bewusst NICHT aktiviert — Phase-2-Tests nutzen uuid4()
5+
als printer_id ohne echte Printer-Rows anzulegen (Unit-Scope).
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import app.models # noqa: F401 — registriert alle Models mit SQLModel.metadata
11+
import pytest_asyncio
12+
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
13+
from sqlmodel import SQLModel
14+
15+
16+
@pytest_asyncio.fixture
17+
async def _engine():
18+
eng = create_async_engine("sqlite+aiosqlite:///:memory:")
19+
async with eng.begin() as conn:
20+
await conn.run_sync(SQLModel.metadata.create_all)
21+
yield eng
22+
await eng.dispose()
23+
24+
25+
@pytest_asyncio.fixture
26+
async def db_session(_engine):
27+
factory = async_sessionmaker(_engine, expire_on_commit=False)
28+
async with factory() as s:
29+
yield s
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""Phase 2: neue jobs_repo Helper fuer JobStore Adapter."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import UTC, datetime, timedelta
6+
from uuid import uuid4
7+
8+
import pytest
9+
from app.models.job import Job, JobState
10+
from app.repositories import jobs as jobs_repo
11+
12+
13+
@pytest.mark.asyncio
14+
async def test_mark_printing_as_failed_restart_only_printing(db_session):
15+
"""mark_printing_as_failed_restart darf QUEUED-Jobs NICHT aendern."""
16+
printer_id = uuid4()
17+
other_printer_id = uuid4()
18+
19+
queued = await jobs_repo.create_queued(
20+
db_session, printer_id=printer_id,
21+
template_key="t", payload={"k": "v"},
22+
)
23+
printing = await jobs_repo.create_queued(
24+
db_session, printer_id=printer_id,
25+
template_key="t", payload={"k": "v"},
26+
)
27+
await jobs_repo.mark_printing(db_session, printing.id)
28+
29+
other_printing = await jobs_repo.create_queued(
30+
db_session, printer_id=other_printer_id,
31+
template_key="t", payload={"k": "v"},
32+
)
33+
await jobs_repo.mark_printing(db_session, other_printing.id)
34+
35+
affected = await jobs_repo.mark_printing_as_failed_restart(
36+
db_session, printer_id,
37+
)
38+
assert affected == 1 # nur das eine PRINTING auf unserem printer
39+
40+
await db_session.refresh(queued)
41+
await db_session.refresh(printing)
42+
await db_session.refresh(other_printing)
43+
44+
assert queued.state == JobState.QUEUED.value
45+
assert printing.state == JobState.FAILED_RESTART.value
46+
assert printing.error == "printer_interrupted"
47+
assert printing.finished_at is not None
48+
assert other_printing.state == JobState.PRINTING.value # anderer printer unangetastet
49+
50+
51+
@pytest.mark.asyncio
52+
async def test_list_active_filterable_by_printer(db_session):
53+
"""list_active(printer_id=...) liefert nur Jobs des Druckers."""
54+
p1, p2 = uuid4(), uuid4()
55+
j1 = await jobs_repo.create_queued(db_session, printer_id=p1, template_key="t", payload={})
56+
j2 = await jobs_repo.create_queued(db_session, printer_id=p2, template_key="t", payload={})
57+
58+
all_active = await jobs_repo.list_active(db_session)
59+
assert {j.id for j in all_active} == {j1.id, j2.id}
60+
61+
p1_only = await jobs_repo.list_active(db_session, printer_id=p1)
62+
assert {j.id for j in p1_only} == {j1.id}
63+
64+
65+
@pytest.mark.asyncio
66+
async def test_evict_terminal_older_than(db_session):
67+
"""evict loescht DONE/FAILED/CANCELLED/FAILED_RESTART aelter als age."""
68+
printer_id = uuid4()
69+
old_done = await jobs_repo.create_queued(db_session, printer_id=printer_id, template_key="t", payload={})
70+
await jobs_repo.mark_printing(db_session, old_done.id)
71+
await jobs_repo.mark_done(db_session, old_done.id, result={})
72+
# backdate finished_at by hand for test
73+
old_done.finished_at = datetime.now(UTC) - timedelta(days=35)
74+
await db_session.commit()
75+
76+
young_done = await jobs_repo.create_queued(db_session, printer_id=printer_id, template_key="t", payload={})
77+
await jobs_repo.mark_printing(db_session, young_done.id)
78+
await jobs_repo.mark_done(db_session, young_done.id, result={}) # finished_at is now()
79+
80+
queued = await jobs_repo.create_queued(db_session, printer_id=printer_id, template_key="t", payload={}) # not terminal
81+
82+
deleted = await jobs_repo.evict_terminal_older_than(db_session, age=timedelta(days=30))
83+
assert deleted == 1
84+
85+
assert await jobs_repo.get(db_session, old_done.id) is None
86+
assert await jobs_repo.get(db_session, young_done.id) is not None
87+
assert await jobs_repo.get(db_session, queued.id) is not None

0 commit comments

Comments
 (0)