From 962978ce20602133d2af3068a9307f3eb394be1a Mon Sep 17 00:00:00 2001 From: giannisevagorou Date: Thu, 8 May 2025 13:34:55 +0300 Subject: [PATCH 1/3] Use sqlite for job_store.py --- validator_api/job_store.py | 113 ++++++++++++++++++++++++++++--------- 1 file changed, 87 insertions(+), 26 deletions(-) diff --git a/validator_api/job_store.py b/validator_api/job_store.py index 4c87069de..80fe053da 100644 --- a/validator_api/job_store.py +++ b/validator_api/job_store.py @@ -1,8 +1,10 @@ +import sqlite3 import uuid from datetime import datetime from enum import Enum from typing import Dict, List, Optional + from pydantic import BaseModel @@ -27,52 +29,111 @@ class JobResult(BaseModel): class JobStore: - """Store for background jobs.""" - - def __init__(self): - """Initialize the job store.""" - self._jobs: Dict[str, JobResult] = {} + """Store for background jobs using SQLite database.""" + + def __init__(self, db_path: str = "jobs.db"): + """Initialize the job store with SQLite database.""" + self.db_path = db_path + self._init_db() + + def _init_db(self) -> None: + """Initialize the database and create the jobs table if it doesn't exist.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS jobs ( + job_id TEXT PRIMARY KEY, + status TEXT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + result TEXT, + error TEXT + ) + """) + conn.commit() def delete_job(self, job_id: str) -> None: """Delete a job by its ID.""" - if job_id in self._jobs: - del self._jobs[job_id] + with sqlite3.connect(self.db_path) as conn: + conn.execute("DELETE FROM jobs WHERE job_id = ?", (job_id,)) + conn.commit() def create_job(self) -> str: """Create a new job and return its ID.""" job_id = str(uuid.uuid4()) now = datetime.now() - self._jobs[job_id] = JobResult( - job_id=job_id, - status=JobStatus.PENDING, - created_at=now, - updated_at=now, - ) + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + INSERT INTO jobs (job_id, status, created_at, updated_at) + VALUES (?, ?, ?, ?) + """, + (job_id, JobStatus.PENDING, now, now), + ) + conn.commit() return job_id def get_job(self, job_id: str) -> Optional[JobResult]: """Get a job by its ID.""" - return self._jobs.get(job_id) + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + "SELECT * FROM jobs WHERE job_id = ?", (job_id,) + ) + row = cursor.fetchone() + + if row is None: + return None + + # Convert the result string to List[str] if it exists + result = eval(row['result']) if row['result'] is not None else None + + return JobResult( + job_id=row['job_id'], + status=JobStatus(row['status']), + created_at=datetime.fromisoformat(row['created_at']), + updated_at=datetime.fromisoformat(row['updated_at']), + result=result, + error=row['error'] + ) def update_job_status(self, job_id: str, status: JobStatus) -> None: """Update the status of a job.""" - if job_id in self._jobs: - self._jobs[job_id].status = status - self._jobs[job_id].updated_at = datetime.now() + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + UPDATE jobs + SET status = ?, updated_at = ? + WHERE job_id = ? + """, + (status, datetime.now(), job_id), + ) + conn.commit() def update_job_result(self, job_id: str, result: List[str]) -> None: """Update the result of a job.""" - if job_id in self._jobs: - self._jobs[job_id].result = result - self._jobs[job_id].status = JobStatus.COMPLETED - self._jobs[job_id].updated_at = datetime.now() + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + UPDATE jobs + SET result = ?, status = ?, updated_at = ? + WHERE job_id = ? + """, + (str(result), JobStatus.COMPLETED, datetime.now(), job_id), + ) + conn.commit() def update_job_error(self, job_id: str, error: str) -> None: """Update the error of a job.""" - if job_id in self._jobs: - self._jobs[job_id].error = error - self._jobs[job_id].status = JobStatus.FAILED - self._jobs[job_id].updated_at = datetime.now() + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + UPDATE jobs + SET error = ?, status = ?, updated_at = ? + WHERE job_id = ? + """, + (error, JobStatus.FAILED, datetime.now(), job_id), + ) + conn.commit() # Create a singleton instance @@ -93,4 +154,4 @@ async def process_chain_of_thought_job(job_id: str, orchestrator, messages: List job_store.update_job_result(job_id, chunks) except Exception as e: # Update the job with the error - job_store.update_job_error(job_id, str(e)) + job_store.update_job_error(job_id, str(e)) \ No newline at end of file From 4dde1e3b270194c43f959200f342b83a6ed83be4 Mon Sep 17 00:00:00 2001 From: "Lewis Sword (lew-sword)" Date: Thu, 8 May 2025 11:49:07 +0100 Subject: [PATCH 2/3] GEN-1220 add new line for formatting --- validator_api/job_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator_api/job_store.py b/validator_api/job_store.py index 80fe053da..71c329dac 100644 --- a/validator_api/job_store.py +++ b/validator_api/job_store.py @@ -154,4 +154,4 @@ async def process_chain_of_thought_job(job_id: str, orchestrator, messages: List job_store.update_job_result(job_id, chunks) except Exception as e: # Update the job with the error - job_store.update_job_error(job_id, str(e)) \ No newline at end of file + job_store.update_job_error(job_id, str(e)) From d61e9819bc4a139ce1cdf7d9c2b912eb1f0a3a9e Mon Sep 17 00:00:00 2001 From: "Lewis Sword (lew-sword)" Date: Thu, 8 May 2025 11:57:02 +0100 Subject: [PATCH 3/3] GEN-1220 pre-commit run to correct formatting --- validator_api/job_store.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/validator_api/job_store.py b/validator_api/job_store.py index 71c329dac..d20905a51 100644 --- a/validator_api/job_store.py +++ b/validator_api/job_store.py @@ -4,7 +4,6 @@ from enum import Enum from typing import Dict, List, Optional - from pydantic import BaseModel @@ -39,7 +38,8 @@ def __init__(self, db_path: str = "jobs.db"): def _init_db(self) -> None: """Initialize the database and create the jobs table if it doesn't exist.""" with sqlite3.connect(self.db_path) as conn: - conn.execute(""" + conn.execute( + """ CREATE TABLE IF NOT EXISTS jobs ( job_id TEXT PRIMARY KEY, status TEXT NOT NULL, @@ -48,7 +48,8 @@ def _init_db(self) -> None: result TEXT, error TEXT ) - """) + """ + ) conn.commit() def delete_job(self, job_id: str) -> None: @@ -76,24 +77,22 @@ def get_job(self, job_id: str) -> Optional[JobResult]: """Get a job by its ID.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row - cursor = conn.execute( - "SELECT * FROM jobs WHERE job_id = ?", (job_id,) - ) + cursor = conn.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,)) row = cursor.fetchone() - + if row is None: return None - + # Convert the result string to List[str] if it exists - result = eval(row['result']) if row['result'] is not None else None - + result = eval(row["result"]) if row["result"] is not None else None + return JobResult( - job_id=row['job_id'], - status=JobStatus(row['status']), - created_at=datetime.fromisoformat(row['created_at']), - updated_at=datetime.fromisoformat(row['updated_at']), + job_id=row["job_id"], + status=JobStatus(row["status"]), + created_at=datetime.fromisoformat(row["created_at"]), + updated_at=datetime.fromisoformat(row["updated_at"]), result=result, - error=row['error'] + error=row["error"], ) def update_job_status(self, job_id: str, status: JobStatus) -> None: @@ -101,7 +100,7 @@ def update_job_status(self, job_id: str, status: JobStatus) -> None: with sqlite3.connect(self.db_path) as conn: conn.execute( """ - UPDATE jobs + UPDATE jobs SET status = ?, updated_at = ? WHERE job_id = ? """, @@ -114,7 +113,7 @@ def update_job_result(self, job_id: str, result: List[str]) -> None: with sqlite3.connect(self.db_path) as conn: conn.execute( """ - UPDATE jobs + UPDATE jobs SET result = ?, status = ?, updated_at = ? WHERE job_id = ? """, @@ -127,7 +126,7 @@ def update_job_error(self, job_id: str, error: str) -> None: with sqlite3.connect(self.db_path) as conn: conn.execute( """ - UPDATE jobs + UPDATE jobs SET error = ?, status = ?, updated_at = ? WHERE job_id = ? """,