From c8d5c856bed2ce4f9d01c19c2c3332172e96ab83 Mon Sep 17 00:00:00 2001 From: Pavel Tisnovsky Date: Tue, 21 Oct 2025 12:59:56 +0200 Subject: [PATCH] LCORE-741: quota limiter implementation for SQLite --- src/quota/quota_limiter.py | 14 ++- src/quota/revokable_quota_limiter.py | 139 +++++++++++++++++++++------ src/quota/sql.py | 29 +++++- 3 files changed, 144 insertions(+), 38 deletions(-) diff --git a/src/quota/quota_limiter.py b/src/quota/quota_limiter.py index f0ad6fba..ace76ba3 100644 --- a/src/quota/quota_limiter.py +++ b/src/quota/quota_limiter.py @@ -4,6 +4,7 @@ from typing import Optional +import sqlite3 import psycopg2 from log import get_logger @@ -75,11 +76,18 @@ def connected(self) -> bool: if self.connection is None: logger.warning("Not connected, need to reconnect later") return False + cursor = None try: - with self.connection.cursor() as cursor: - cursor.execute("SELECT 1") + cursor = self.connection.cursor() + cursor.execute("SELECT 1") logger.info("Connection to storage is ok") return True - except psycopg2.OperationalError as e: + except (psycopg2.OperationalError, sqlite3.Error) as e: logger.error("Disconnected from storage: %s", e) return False + finally: + if cursor is not None: + try: + cursor.close() + except Exception: # pylint: disable=broad-exception-caught + logger.warning("Unable to close cursor") diff --git a/src/quota/revokable_quota_limiter.py b/src/quota/revokable_quota_limiter.py index 127c15b8..42065b3e 100644 --- a/src/quota/revokable_quota_limiter.py +++ b/src/quota/revokable_quota_limiter.py @@ -10,9 +10,13 @@ from quota.sql import ( CREATE_QUOTA_TABLE, UPDATE_AVAILABLE_QUOTA_PG, + UPDATE_AVAILABLE_QUOTA_SQLITE, SELECT_QUOTA_PG, + SELECT_QUOTA_SQLITE, SET_AVAILABLE_QUOTA_PG, + SET_AVAILABLE_QUOTA_SQLITE, INIT_QUOTA_PG, + INIT_QUOTA_SQLITE, ) logger = get_logger(__name__) @@ -40,46 +44,80 @@ def available_quota(self, subject_id: str = "") -> int: """Retrieve available quota for given subject.""" if self.subject_type == "c": subject_id = "" - with self.connection.cursor() as cursor: - cursor.execute( - SELECT_QUOTA_PG, - (subject_id, self.subject_type), - ) - value = cursor.fetchone() - if value is None: - self._init_quota(subject_id) - return self.initial_quota - return value[0] + if self.sqlite_connection_config is not None: + return self._read_available_quota(SELECT_QUOTA_SQLITE, subject_id) + if self.postgres_connection_config is not None: + return self._read_available_quota(SELECT_QUOTA_PG, subject_id) + # default value is used only if quota limiter database is not setup + return 0 + + def _read_available_quota(self, query_statement: str, subject_id: str) -> int: + """Read available quota from selected database.""" + # it is not possible to use context manager there, because SQLite does + # not support it + cursor = self.connection.cursor() + cursor.execute( + query_statement, + (subject_id, self.subject_type), + ) + value = cursor.fetchone() + if value is None: + self._init_quota(subject_id) + return self.initial_quota + cursor.close() + return value[0] @connection def revoke_quota(self, subject_id: str = "") -> None: """Revoke quota for given subject.""" if self.subject_type == "c": subject_id = "" + + if self.postgres_connection_config is not None: + self._revoke_quota(SET_AVAILABLE_QUOTA_PG, subject_id) + return + if self.sqlite_connection_config is not None: + self._revoke_quota(SET_AVAILABLE_QUOTA_SQLITE, subject_id) + return + + def _revoke_quota(self, set_statement: str, subject_id: str) -> None: + """Revoke quota in given database.""" # timestamp to be used revoked_at = datetime.now() - with self.connection.cursor() as cursor: - cursor.execute( - SET_AVAILABLE_QUOTA_PG, - (self.initial_quota, revoked_at, subject_id, self.subject_type), - ) - self.connection.commit() + cursor = self.connection.cursor() + cursor.execute( + set_statement, + (self.initial_quota, revoked_at, subject_id, self.subject_type), + ) + self.connection.commit() + cursor.close() @connection def increase_quota(self, subject_id: str = "") -> None: """Increase quota for given subject.""" if self.subject_type == "c": subject_id = "" + + if self.postgres_connection_config is not None: + self._increase_quota(UPDATE_AVAILABLE_QUOTA_PG, subject_id) + return + + if self.sqlite_connection_config is not None: + self._increase_quota(UPDATE_AVAILABLE_QUOTA_SQLITE, subject_id) + return + + def _increase_quota(self, set_statement: str, subject_id: str) -> None: + """Increase quota in given database.""" # timestamp to be used updated_at = datetime.now() - with self.connection.cursor() as cursor: - cursor.execute( - UPDATE_AVAILABLE_QUOTA_PG, - (self.increase_by, updated_at, subject_id, self.subject_type), - ) - self.connection.commit() + cursor = self.connection.cursor() + cursor.execute( + set_statement, + (self.increase_by, updated_at, subject_id, self.subject_type), + ) + self.connection.commit() def ensure_available_quota(self, subject_id: str = "") -> None: """Ensure that there's avaiable quota left.""" @@ -109,17 +147,39 @@ def consume_tokens( output_tokens, subject_id, ) - to_be_consumed = input_tokens + output_tokens - with self.connection.cursor() as cursor: - # timestamp to be used - updated_at = datetime.now() + if self.sqlite_connection_config is not None: + self._consume_tokens( + UPDATE_AVAILABLE_QUOTA_SQLITE, input_tokens, output_tokens, subject_id + ) + return - cursor.execute( - UPDATE_AVAILABLE_QUOTA_PG, - (-to_be_consumed, updated_at, subject_id, self.subject_type), + if self.postgres_connection_config is not None: + self._consume_tokens( + UPDATE_AVAILABLE_QUOTA_PG, input_tokens, output_tokens, subject_id ) - self.connection.commit() + return + + def _consume_tokens( + self, + update_statement: str, + input_tokens: int, + output_tokens: int, + subject_id: str, + ) -> None: + """Consume tokens from selected database.""" + # timestamp to be used + updated_at = datetime.now() + + to_be_consumed = input_tokens + output_tokens + + cursor = self.connection.cursor() + cursor.execute( + update_statement, + (-to_be_consumed, updated_at, subject_id, self.subject_type), + ) + self.connection.commit() + cursor.close() def _initialize_tables(self) -> None: """Initialize tables used by quota limiter.""" @@ -134,9 +194,10 @@ def _init_quota(self, subject_id: str = "") -> None: # timestamp to be used revoked_at = datetime.now() - with self.connection.cursor() as cursor: + if self.sqlite_connection_config is not None: + cursor = self.connection.cursor() cursor.execute( - INIT_QUOTA_PG, + INIT_QUOTA_SQLITE, ( subject_id, self.subject_type, @@ -145,4 +206,18 @@ def _init_quota(self, subject_id: str = "") -> None: revoked_at, ), ) + cursor.close() self.connection.commit() + if self.postgres_connection_config is not None: + with self.connection.cursor() as cursor: + cursor.execute( + INIT_QUOTA_PG, + ( + subject_id, + self.subject_type, + self.initial_quota, + self.initial_quota, + revoked_at, + ), + ) + self.connection.commit() diff --git a/src/quota/sql.py b/src/quota/sql.py index 2fdce5e0..93a97099 100644 --- a/src/quota/sql.py +++ b/src/quota/sql.py @@ -49,20 +49,43 @@ VALUES (%s, %s, %s, %s, %s) """ +INIT_QUOTA_SQLITE = """ + INSERT INTO quota_limits (id, subject, quota_limit, available, revoked_at) + VALUES (?, ?, ?, ?, ?) + """ + SELECT_QUOTA_PG = """ SELECT available FROM quota_limits - WHERE id=%s and subject=%s LIMIT 1 + WHERE id=%s AND subject=%s LIMIT 1 + """ + +SELECT_QUOTA_SQLITE = """ + SELECT available + FROM quota_limits + WHERE id=? AND subject=? LIMIT 1 """ SET_AVAILABLE_QUOTA_PG = """ UPDATE quota_limits SET available=%s, revoked_at=%s - WHERE id=%s and subject=%s + WHERE id=%s AND subject=%s + """ + +SET_AVAILABLE_QUOTA_SQLITE = """ + UPDATE quota_limits + SET available=?, revoked_at=? + WHERE id=? AND subject=? """ UPDATE_AVAILABLE_QUOTA_PG = """ UPDATE quota_limits SET available=available+%s, updated_at=%s - WHERE id=%s and subject=%s + WHERE id=%s AND subject=%s + """ + +UPDATE_AVAILABLE_QUOTA_SQLITE = """ + UPDATE quota_limits + SET available=available+?, updated_at=? + WHERE id=? AND subject=? """