Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 85 additions & 25 deletions validator_api/job_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sqlite3
import uuid
from datetime import datetime
from enum import Enum
Expand Down Expand Up @@ -27,52 +28,111 @@ class JobResult(BaseModel):


class JobStore:
"""Store for background jobs."""

def __init__(self):
"""Initialize the job store."""
self._jobs: Dict[str, JobResult] = {}
"""Store for background jobs using SQLite database."""

def __init__(self, db_path: str = "jobs.db"):
"""Initialize the job store with SQLite database."""
self.db_path = db_path
self._init_db()

def _init_db(self) -> None:
"""Initialize the database and create the jobs table if it doesn't exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
result TEXT,
error TEXT
)
"""
)
conn.commit()

def delete_job(self, job_id: str) -> None:
"""Delete a job by its ID."""
if job_id in self._jobs:
del self._jobs[job_id]
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM jobs WHERE job_id = ?", (job_id,))
conn.commit()

def create_job(self) -> str:
"""Create a new job and return its ID."""
job_id = str(uuid.uuid4())
now = datetime.now()
self._jobs[job_id] = JobResult(
job_id=job_id,
status=JobStatus.PENDING,
created_at=now,
updated_at=now,
)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO jobs (job_id, status, created_at, updated_at)
VALUES (?, ?, ?, ?)
""",
(job_id, JobStatus.PENDING, now, now),
)
conn.commit()
return job_id

def get_job(self, job_id: str) -> Optional[JobResult]:
"""Get a job by its ID."""
return self._jobs.get(job_id)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,))
row = cursor.fetchone()

if row is None:
return None

# Convert the result string to List[str] if it exists
result = eval(row["result"]) if row["result"] is not None else None

return JobResult(
job_id=row["job_id"],
status=JobStatus(row["status"]),
created_at=datetime.fromisoformat(row["created_at"]),
updated_at=datetime.fromisoformat(row["updated_at"]),
result=result,
error=row["error"],
)

def update_job_status(self, job_id: str, status: JobStatus) -> None:
"""Update the status of a job."""
if job_id in self._jobs:
self._jobs[job_id].status = status
self._jobs[job_id].updated_at = datetime.now()
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
UPDATE jobs
SET status = ?, updated_at = ?
WHERE job_id = ?
""",
(status, datetime.now(), job_id),
)
conn.commit()

def update_job_result(self, job_id: str, result: List[str]) -> None:
"""Update the result of a job."""
if job_id in self._jobs:
self._jobs[job_id].result = result
self._jobs[job_id].status = JobStatus.COMPLETED
self._jobs[job_id].updated_at = datetime.now()
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
UPDATE jobs
SET result = ?, status = ?, updated_at = ?
WHERE job_id = ?
""",
(str(result), JobStatus.COMPLETED, datetime.now(), job_id),
)
conn.commit()

def update_job_error(self, job_id: str, error: str) -> None:
"""Update the error of a job."""
if job_id in self._jobs:
self._jobs[job_id].error = error
self._jobs[job_id].status = JobStatus.FAILED
self._jobs[job_id].updated_at = datetime.now()
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
UPDATE jobs
SET error = ?, status = ?, updated_at = ?
WHERE job_id = ?
""",
(error, JobStatus.FAILED, datetime.now(), job_id),
)
conn.commit()


# Create a singleton instance
Expand Down