Skip to content

Commit d39c795

Browse files
committed
feat(services): JobStore Protocol + MemoryJobStore (Phase 2)
JobStore ist die Persistierungs-Boundary die PrintQueue nutzt um Lifecycle-Transitionen zu speichern. MemoryJobStore ist die Test-Impl mit gleicher Semantik wie späterer SQLiteJobStore. Refs #93
1 parent 1b1cc1e commit d39c795

2 files changed

Lines changed: 229 additions & 0 deletions

File tree

backend/app/services/job_store.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
"""Phase 2: JobStore Protocol + MemoryJobStore in-memory Implementation.
2+
3+
JobStore ist die Persistierungs-Boundary die PrintQueue nutzt um Job-State-Transitionen
4+
zu speichern. SQLiteJobStore (Produktion) implementiert dieses Protocol durch Delegation
5+
an jobs_repo. MemoryJobStore ist die Test/Migration-Impl.
6+
7+
Klärung (R2-C1): Alle Store-Methoden arbeiten auf app.models.job.Job
8+
(SQLModel, UUID-id). Der Worker-Code in print_queue.py verwendet
9+
app.services.job_lifecycle.Job (Dataclass, str-id). Bridge:
10+
Worker ruft self._store.mark_printing(UUID(job.id))
11+
Store arbeitet intern auf UUID-Schlüsseln.
12+
"""
13+
14+
from __future__ import annotations
15+
16+
from datetime import UTC, datetime, timedelta
17+
from typing import Protocol, runtime_checkable
18+
from uuid import UUID
19+
20+
from app.models.job import Job, JobState
21+
22+
_NON_TERMINAL = (JobState.QUEUED.value, JobState.PRINTING.value)
23+
_TERMINAL = (
24+
JobState.DONE.value,
25+
JobState.FAILED.value,
26+
JobState.FAILED_RESTART.value,
27+
JobState.CANCELLED.value,
28+
)
29+
30+
31+
@runtime_checkable
32+
class JobStore(Protocol):
33+
"""Persistente Backing-Store für Jobs.
34+
35+
Alle Methoden sind async und können I/O durchführen. Implementierungen müssen
36+
sicher für gleichzeitige Aufrufe aus mehreren asyncio-Tasks sein.
37+
"""
38+
39+
async def save_queued(self, job: Job) -> None:
40+
"""Persist a newly-created QUEUED job (insert).
41+
42+
Called from PrintService.submit_print_job BEFORE handing off
43+
to the queue. After this returns, the job is durable.
44+
"""
45+
46+
async def get(self, job_id: UUID) -> Job | None:
47+
"""Load a job by ID. None if not found."""
48+
49+
async def mark_printing(self, job_id: UUID) -> None:
50+
"""Transition QUEUED -> PRINTING. Called by worker when it picks up the job."""
51+
52+
async def mark_done(self, job_id: UUID) -> None:
53+
"""Transition PRINTING -> DONE. Called by worker after successful print."""
54+
55+
async def mark_failed(self, job_id: UUID, error: str) -> None:
56+
"""Transition any non-terminal -> FAILED with given error message."""
57+
58+
async def mark_interrupted(self, printer_id: UUID) -> int:
59+
"""Recovery: set all PRINTING jobs of this printer to FAILED_RESTART
60+
with error='printer_interrupted'.
61+
62+
Called from PrintQueue.start() BEFORE list_pending.
63+
64+
Returns the count of affected rows.
65+
"""
66+
67+
async def list_pending(self, printer_id: UUID) -> list[Job]:
68+
"""Return all non-terminal jobs for this printer, sorted by created_at (FIFO).
69+
70+
Called from PrintQueue.start() AFTER mark_interrupted to find
71+
QUEUED jobs that need to be re-enqueued.
72+
"""
73+
74+
async def evict_terminal_older_than(self, age: timedelta) -> int:
75+
"""Delete terminal jobs (DONE/FAILED/FAILED_RESTART/CANCELLED) with
76+
finished_at older than `age` ago. Used by CleanupTask.
77+
78+
Returns the count of deleted rows.
79+
"""
80+
81+
82+
class MemoryJobStore:
83+
"""In-Memory JobStore für Tests und PrintService Boot-Phase.
84+
85+
Hält Job-Objekte in einem Dict mit id als Schlüssel. Nicht thread-safe, aber
86+
sicher für asyncio Single-Event-Loop-Nutzung.
87+
"""
88+
89+
def __init__(self) -> None:
90+
self._jobs: dict[UUID, Job] = {}
91+
92+
async def save_queued(self, job: Job) -> None:
93+
self._jobs[job.id] = job
94+
95+
async def get(self, job_id: UUID) -> Job | None:
96+
return self._jobs.get(job_id)
97+
98+
async def mark_printing(self, job_id: UUID) -> None:
99+
job = self._jobs.get(job_id)
100+
if job is None:
101+
return
102+
job.state = JobState.PRINTING.value
103+
job.started_at = datetime.now(UTC)
104+
105+
async def mark_done(self, job_id: UUID) -> None:
106+
job = self._jobs.get(job_id)
107+
if job is None:
108+
return
109+
job.state = JobState.DONE.value
110+
job.finished_at = datetime.now(UTC)
111+
112+
async def mark_failed(self, job_id: UUID, error: str) -> None:
113+
job = self._jobs.get(job_id)
114+
if job is None:
115+
return
116+
job.state = JobState.FAILED.value
117+
job.error = error
118+
job.finished_at = datetime.now(UTC)
119+
120+
async def mark_interrupted(self, printer_id: UUID) -> int:
121+
count = 0
122+
for job in self._jobs.values():
123+
if job.printer_id == printer_id and job.state == JobState.PRINTING.value:
124+
job.state = JobState.FAILED_RESTART.value
125+
job.error = "printer_interrupted"
126+
job.finished_at = datetime.now(UTC)
127+
count += 1
128+
return count
129+
130+
async def list_pending(self, printer_id: UUID) -> list[Job]:
131+
items = [
132+
j for j in self._jobs.values()
133+
if j.printer_id == printer_id and j.state in _NON_TERMINAL
134+
]
135+
return sorted(items, key=lambda j: j.created_at)
136+
137+
async def evict_terminal_older_than(self, age: timedelta) -> int:
138+
cutoff = datetime.now(UTC) - age
139+
to_delete = [
140+
jid for jid, j in self._jobs.items()
141+
if j.state in _TERMINAL and j.finished_at is not None and j.finished_at < cutoff
142+
]
143+
for jid in to_delete:
144+
del self._jobs[jid]
145+
return len(to_delete)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""MemoryJobStore Protocol-Conformance Tests."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import UTC, datetime, timedelta
6+
from uuid import uuid4
7+
8+
import pytest
9+
from app.models.job import Job, JobState
10+
from app.services.job_store import JobStore, MemoryJobStore
11+
12+
13+
def _make_job(printer_id, state=JobState.QUEUED, finished_at=None):
14+
return Job(
15+
printer_id=printer_id,
16+
template_key="t",
17+
payload={},
18+
state=state.value,
19+
finished_at=finished_at,
20+
)
21+
22+
23+
@pytest.mark.asyncio
24+
async def test_memory_store_save_and_get_round_trip():
25+
store = MemoryJobStore()
26+
job = _make_job(uuid4())
27+
await store.save_queued(job)
28+
fetched = await store.get(job.id)
29+
assert fetched is job
30+
31+
32+
@pytest.mark.asyncio
33+
async def test_memory_store_implements_protocol():
34+
store = MemoryJobStore()
35+
assert isinstance(store, JobStore)
36+
37+
38+
@pytest.mark.asyncio
39+
async def test_memory_store_mark_interrupted_only_printing():
40+
store = MemoryJobStore()
41+
p1 = uuid4()
42+
queued = _make_job(p1, state=JobState.QUEUED)
43+
printing = _make_job(p1, state=JobState.PRINTING)
44+
await store.save_queued(queued)
45+
await store.save_queued(printing)
46+
47+
affected = await store.mark_interrupted(p1)
48+
assert affected == 1
49+
assert (await store.get(queued.id)).state == JobState.QUEUED.value
50+
interrupted = await store.get(printing.id)
51+
assert interrupted.state == JobState.FAILED_RESTART.value
52+
assert interrupted.error == "printer_interrupted"
53+
assert interrupted.finished_at is not None
54+
55+
56+
@pytest.mark.asyncio
57+
async def test_memory_store_list_pending_returns_queued_and_paused_not_terminal():
58+
store = MemoryJobStore()
59+
p1, p2 = uuid4(), uuid4()
60+
q1 = _make_job(p1, state=JobState.QUEUED)
61+
pr1 = _make_job(p1, state=JobState.PRINTING)
62+
d1 = _make_job(p1, state=JobState.DONE)
63+
q2 = _make_job(p2, state=JobState.QUEUED)
64+
for j in (q1, pr1, d1, q2):
65+
await store.save_queued(j)
66+
67+
p1_pending = await store.list_pending(p1)
68+
assert {j.id for j in p1_pending} == {q1.id, pr1.id}
69+
70+
71+
@pytest.mark.asyncio
72+
async def test_memory_store_evict_terminal_older_than():
73+
store = MemoryJobStore()
74+
old = _make_job(uuid4(), state=JobState.DONE, finished_at=datetime.now(UTC) - timedelta(days=40))
75+
young = _make_job(uuid4(), state=JobState.DONE, finished_at=datetime.now(UTC) - timedelta(days=5))
76+
queued = _make_job(uuid4(), state=JobState.QUEUED)
77+
for j in (old, young, queued):
78+
await store.save_queued(j)
79+
80+
deleted = await store.evict_terminal_older_than(timedelta(days=30))
81+
assert deleted == 1
82+
assert await store.get(old.id) is None
83+
assert await store.get(young.id) is not None
84+
assert await store.get(queued.id) is not None

0 commit comments

Comments
 (0)