Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,7 @@

# Default embedding vector dimension for the sentence transformer model
DEFAULT_EMBEDDING_DIMENSION = 768

# quota limiters constants
USER_QUOTA_LIMITER = "user_limiter"
CLUSTER_QUOTA_LIMITER = "cluster_limiter"
3 changes: 3 additions & 0 deletions src/lightspeed_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from configuration import configuration
from llama_stack_configuration import generate_configuration
from runners.uvicorn import start_uvicorn
from runners.quota_scheduler import start_quota_scheduler

FORMAT = "%(message)s"
logging.basicConfig(
Expand Down Expand Up @@ -120,6 +121,8 @@ def main() -> None:
# (step is needed because process context isn’t shared).
os.environ["LIGHTSPEED_STACK_CONFIG_PATH"] = args.config_file

# start the runners
start_quota_scheduler(configuration.configuration)
# if every previous steps don't fail, start the service on specified port
start_uvicorn(configuration.service_configuration)
logger.info("Lightspeed Core Stack finished")
Expand Down
303 changes: 303 additions & 0 deletions src/runners/quota_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
"""User and cluster quota scheduler runner."""

from typing import Any
from threading import Thread
from time import sleep

import sqlite3
import psycopg2

import constants
from log import get_logger
Comment on lines +10 to +11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use absolute imports for internal modules.

The coding guidelines require absolute imports for internal modules (e.g., from src import constants).

As per coding guidelines.

Apply this diff:

-import constants
-from log import get_logger
+from src import constants
+from src.log import get_logger
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import constants
from log import get_logger
import constants
from log import get_logger
Suggested change
import constants
from log import get_logger
from src import constants
from src.log import get_logger
🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 10-11, the imports use
relative/internal short names; update them to absolute imports per guidelines:
replace "import constants" with "from src import constants" and replace "from
log import get_logger" with "from src.log import get_logger" (or the correct
module path under src) so internal modules are imported absolutely.

from models.config import (
Configuration,
QuotaHandlersConfiguration,
QuotaLimiterConfiguration,
PostgreSQLDatabaseConfiguration,
SQLiteDatabaseConfiguration,
)

logger = get_logger(__name__)


CREATE_QUOTA_TABLE = """
CREATE TABLE IF NOT EXISTS quota_limits (
id text NOT NULL,
subject char(1) NOT NULL,
quota_limit int NOT NULL,
available int,
updated_at timestamp with time zone,
revoked_at timestamp with time zone,
PRIMARY KEY(id, subject)
);
"""


INCREASE_QUOTA_STATEMENT_PG = """
UPDATE quota_limits
SET available=available+%s, revoked_at=NOW()
WHERE subject=%s
AND revoked_at < NOW() - INTERVAL %s ;
"""
Comment on lines +36 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Data corruption risk: UPDATEs miss the primary key and may update all rows. Also cap to quota_limit, handle NULLs, and keep timestamps.

Current UPDATEs filter only by subject; they must also filter by id. Additionally:

  • Use LEAST/MIN to not exceed quota_limit.
  • COALESCE available to 0.
  • Maintain updated_at; allow first-time updates when revoked_at is NULL.
  • For PostgreSQL, avoid INTERVAL %s parsing issues by casting.

Also pass limiter.name (id) to increase/reset and make the two operations mutually exclusive per limiter.

@@
-INCREASE_QUOTA_STATEMENT_PG = """
-    UPDATE quota_limits
-       SET available=available+%s, revoked_at=NOW()
-     WHERE subject=%s
-       AND revoked_at < NOW() - INTERVAL %s ;
-    """
+INCREASE_QUOTA_STATEMENT_PG = """
+    UPDATE quota_limits
+       SET available = LEAST(quota_limit, COALESCE(available, 0) + %s),
+           updated_at = NOW(),
+           revoked_at = NOW()
+     WHERE id=%s
+       AND subject=%s
+       AND (revoked_at IS NULL OR revoked_at < NOW() - CAST(%s AS INTERVAL));
+    """
@@
-INCREASE_QUOTA_STATEMENT_SQLITE = """
-    UPDATE quota_limits
-       SET available=available+?, revoked_at=datetime('now')
-     WHERE subject=?
-       AND revoked_at < datetime('now', ?);
-    """
+INCREASE_QUOTA_STATEMENT_SQLITE = """
+    UPDATE quota_limits
+       SET available = MIN(quota_limit, COALESCE(available, 0) + ?),
+           updated_at = datetime('now'),
+           revoked_at = datetime('now')
+     WHERE id=?
+       AND subject=?
+       AND (revoked_at IS NULL OR revoked_at < datetime('now', ?));
+    """
@@
-RESET_QUOTA_STATEMENT_PG = """
-    UPDATE quota_limits
-       SET available=%s, revoked_at=NOW()
-     WHERE subject=%s
-       AND revoked_at < NOW() - INTERVAL %s ;
-    """
+RESET_QUOTA_STATEMENT_PG = """
+    UPDATE quota_limits
+       SET available = LEAST(quota_limit, %s),
+           updated_at = NOW(),
+           revoked_at = NOW()
+     WHERE id=%s
+       AND subject=%s
+       AND (revoked_at IS NULL OR revoked_at < NOW() - CAST(%s AS INTERVAL));
+    """
@@
-RESET_QUOTA_STATEMENT_SQLITE = """
-    UPDATE quota_limits
-       SET available=?, revoked_at=datetime('now')
-     WHERE subject=?
-       AND revoked_at < datetime('now', ?);
-    """
+RESET_QUOTA_STATEMENT_SQLITE = """
+    UPDATE quota_limits
+       SET available = MIN(quota_limit, ?),
+           updated_at = datetime('now'),
+           revoked_at = datetime('now')
+     WHERE id=?
+       AND subject=?
+       AND (revoked_at IS NULL OR revoked_at < datetime('now', ?));
+    """
@@
-    if quota_limiter.quota_increase is not None:
-        increase_quota(
-            connection,
-            increase_quota_statement,
-            subject_id,
-            quota_limiter.quota_increase,
-            quota_limiter.period,
-        )
-
-    if quota_limiter.initial_quota is not None and quota_limiter.initial_quota > 0:
-        reset_quota(
-            connection,
-            reset_quota_statement,
-            subject_id,
-            quota_limiter.initial_quota,
-            quota_limiter.period,
-        )
+    if (
+        quota_limiter.quota_increase is not None
+        and quota_limiter.initial_quota is not None
+        and quota_limiter.initial_quota > 0
+    ):
+        raise ValueError(
+            f"Limiter '{quota_limiter.name}' sets both quota_increase and initial_quota; choose one."
+        )
+    if quota_limiter.quota_increase is not None:
+        increase_quota(
+            connection,
+            increase_quota_statement,
+            quota_limiter.name,  # limiter id
+            subject_id,
+            quota_limiter.quota_increase,
+            quota_limiter.period,
+        )
+    elif quota_limiter.initial_quota is not None and quota_limiter.initial_quota > 0:
+        reset_quota(
+            connection,
+            reset_quota_statement,
+            quota_limiter.name,  # limiter id
+            subject_id,
+            quota_limiter.initial_quota,
+            quota_limiter.period,
+        )
@@
-def increase_quota(
-    connection: Any,
-    update_statement: str,
-    subject_id: str,
-    increase_by: int,
-    period: str,
-) -> None:
+def increase_quota(
+    connection: Any,
+    update_statement: str,
+    limiter_id: str,
+    subject_id: str,
+    increase_by: int,
+    period: str,
+) -> None:
@@
-    cursor.execute(
-        update_statement,
-        (
-            increase_by,
-            subject_id,
-            period,
-        ),
-    )
-    cursor.close()
-    connection.commit()
-    logger.info("Changed %d rows in database", cursor.rowcount)
+    # Format SQLite interval as a negative modifier (e.g., "-1 hour")
+    period_param = period if not isinstance(connection, sqlite3.Connection) else (
+        period if str(period).lstrip().startswith("-") else f"-{period}"
+    )
+    cursor.execute(update_statement, (increase_by, limiter_id, subject_id, period_param))
+    affected = cursor.rowcount
+    cursor.close()
+    connection.commit()
+    logger.info("Changed %d rows in database", affected)
@@
-def reset_quota(
-    connection: Any,
-    update_statement: str,
-    subject_id: str,
-    reset_to: int,
-    period: str,
-) -> None:
+def reset_quota(
+    connection: Any,
+    update_statement: str,
+    limiter_id: str,
+    subject_id: str,
+    reset_to: int,
+    period: str,
+) -> None:
@@
-    cursor.execute(
-        update_statement,
-        (
-            reset_to,
-            subject_id,
-            period,
-        ),
-    )
-    cursor.close()
-    connection.commit()
-    logger.info("Changed %d rows in database", cursor.rowcount)
+    period_param = period if not isinstance(connection, sqlite3.Connection) else (
+        period if str(period).lstrip().startswith("-") else f"-{period}"
+    )
+    cursor.execute(update_statement, (reset_to, limiter_id, subject_id, period_param))
+    affected = cursor.rowcount
+    cursor.close()
+    connection.commit()
+    logger.info("Changed %d rows in database", affected)

Also applies to: 44-49, 52-57, 60-65, 149-166, 168-174, 186-193, 199-205, 217-224

🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 36-41 (and also apply the same
pattern to the other ranges noted: 44-49, 52-57, 60-65, 149-166, 168-174,
186-193, 199-205, 217-224), the UPDATE statements only filter by subject and
risk updating all rows; update them to also filter by the limiter primary key
(id) passed as limiter.name, ensure available is computed safely and capped at
quota_limit, preserve updated_at, handle NULLs for available and revoked_at, and
avoid INTERVAL parsing issues: change the WHERE to include AND id = %s, compute
available = LEAST(COALESCE(available,0) + %s, COALESCE(quota_limit, %s)), set
updated_at = NOW() (don’t overwrite revoked_at unless intended), and replace
INTERVAL %s with a safe cast such as (NOW() - ($3)::interval) or use INTERVAL
'%s'::interval and allow revoked_at IS NULL by using (revoked_at IS NULL OR
revoked_at < (NOW() - ($3)::interval)); finally pass limiter.name as the id
parameter to increase/reset calls and serialize operations per limiter (make
operations mutually exclusive) by selecting the row FOR UPDATE (e.g., SELECT ...
FOR UPDATE) or by using a WHERE id=%s with an appropriate advisory lock per
limiter so concurrent increase/reset cannot both run for the same limiter.



INCREASE_QUOTA_STATEMENT_SQLITE = """
UPDATE quota_limits
SET available=available+?, revoked_at=datetime('now')
WHERE subject=?
AND revoked_at < datetime('now', ?);
"""


RESET_QUOTA_STATEMENT_PG = """
UPDATE quota_limits
SET available=%s, revoked_at=NOW()
WHERE subject=%s
AND revoked_at < NOW() - INTERVAL %s ;
"""


RESET_QUOTA_STATEMENT_SQLITE = """
UPDATE quota_limits
SET available=?, revoked_at=datetime('now')
WHERE subject=?
AND revoked_at < datetime('now', ?);
"""


def quota_scheduler(config: QuotaHandlersConfiguration) -> bool:
"""Quota scheduler task."""
if config is None:
logger.warning("Quota limiters are not configured, skipping")
return False

if config.sqlite is None and config.postgres is None:
logger.warning("Storage for quota limiter is not set, skipping")
return False

if len(config.limiters) == 0:
logger.warning("No limiters are setup, skipping")
return False

connection = connect(config)
if connection is None:
logger.warning("Can not connect to database, skipping")
return False

init_tables(connection)
period = config.scheduler.period

increase_quota_statement = get_increase_quota_statement(config)
reset_quota_statement = get_reset_quota_statement(config)

Comment on lines +90 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pick SQL dialect by the actual connection, not by config flags.

If both postgres and sqlite are configured, you may connect to PG but build SQLite statements. Detect by connection type.

-    increase_quota_statement = get_increase_quota_statement(config)
-    reset_quota_statement = get_reset_quota_statement(config)
+    increase_quota_statement = get_increase_quota_statement(connection)
+    reset_quota_statement = get_reset_quota_statement(connection)
@@
-def get_increase_quota_statement(config: QuotaHandlersConfiguration) -> str:
+def get_increase_quota_statement(connection: Any) -> str:
     """Get the SQL statement to increase quota."""
-    if config.sqlite is not None:
+    if isinstance(connection, sqlite3.Connection):
         return INCREASE_QUOTA_STATEMENT_SQLITE
     return INCREASE_QUOTA_STATEMENT_PG
@@
-def get_reset_quota_statement(config: QuotaHandlersConfiguration) -> str:
+def get_reset_quota_statement(connection: Any) -> str:
     """Get the SQL statement to reset quota."""
-    if config.sqlite is not None:
+    if isinstance(connection, sqlite3.Connection):
         return RESET_QUOTA_STATEMENT_SQLITE
     return RESET_QUOTA_STATEMENT_PG

Also applies to: 114-126

🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 90-92 (and similarly at 114-126),
the code selects SQL dialect based on config flags which can mismatch the actual
DB connection; instead detect the dialect from the active DB connection (e.g.,
inspect connection.dialect.name or the engine/connection dialect type) and
branch on that value to call get_increase_quota_statement and
get_reset_quota_statement so the generated SQL matches the real backend
(postgresql vs sqlite). Ensure you obtain a live connection or engine object
before selecting the dialect and fall back to a sensible default or raise an
error if the dialect is unsupported.

logger.info(
"Quota scheduler started in separated thread with period set to %d seconds",
period,
)

while True:
logger.info("Quota scheduler sync started")
for limiter in config.limiters:
try:
quota_revocation(
connection, limiter, increase_quota_statement, reset_quota_statement
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Quota revoke error: %s", e)
logger.info("Quota scheduler sync finished")
sleep(period)
# unreachable code
connection.close()
return True


def get_increase_quota_statement(config: QuotaHandlersConfiguration) -> str:
"""Get the SQL statement to increase quota."""
if config.sqlite is not None:
return INCREASE_QUOTA_STATEMENT_SQLITE
return INCREASE_QUOTA_STATEMENT_PG


def get_reset_quota_statement(config: QuotaHandlersConfiguration) -> str:
"""Get the SQL statement to reset quota."""
if config.sqlite is not None:
return RESET_QUOTA_STATEMENT_SQLITE
return RESET_QUOTA_STATEMENT_PG


def quota_revocation(
connection: Any,
quota_limiter: QuotaLimiterConfiguration,
increase_quota_statement: str,
reset_quota_statement: str,
) -> None:
"""Quota revocation mechanism."""
logger.info(
"Quota revocation mechanism for limiter '%s' of type '%s'",
quota_limiter.name,
quota_limiter.type,
)

if quota_limiter.type is None:
raise ValueError("Limiter type not set, skipping revocation")

if quota_limiter.period is None:
raise ValueError("Limiter period not set, skipping revocation")

subject_id = get_subject_id(quota_limiter.type)

if quota_limiter.quota_increase is not None:
increase_quota(
connection,
increase_quota_statement,
subject_id,
quota_limiter.quota_increase,
quota_limiter.period,
)

if quota_limiter.initial_quota is not None and quota_limiter.initial_quota > 0:
reset_quota(
connection,
reset_quota_statement,
subject_id,
quota_limiter.initial_quota,
quota_limiter.period,
)


def increase_quota(
connection: Any,
update_statement: str,
subject_id: str,
increase_by: int,
period: str,
) -> None:
"""Increase quota by specified amount."""
logger.info(
"Increasing quota for subject '%s' by %d when period %s is reached",
subject_id,
increase_by,
period,
)

# for compatibility with SQLite it is not possible to use context manager
# there
cursor = connection.cursor()
cursor.execute(
update_statement,
(
increase_by,
subject_id,
period,
),
)
cursor.close()
connection.commit()
logger.info("Changed %d rows in database", cursor.rowcount)


def reset_quota(
connection: Any,
update_statement: str,
subject_id: str,
reset_to: int,
period: str,
) -> None:
"""Reset quota to specified amount."""
logger.info(
"Resetting quota for subject '%s' to %d when period %s is reached",
subject_id,
reset_to,
period,
)

# for compatibility with SQLite it is not possible to use context manager
# there
cursor = connection.cursor()
cursor.execute(
update_statement,
(
reset_to,
subject_id,
period,
),
)
cursor.close()
connection.commit()
logger.info("Changed %d rows in database", cursor.rowcount)


def get_subject_id(limiter_type: str) -> str:
"""Get subject ID based on quota limiter type."""
match limiter_type:
case constants.USER_QUOTA_LIMITER:
return "u"
case constants.CLUSTER_QUOTA_LIMITER:
return "c"
case _:
return "?"


def connect(config: QuotaHandlersConfiguration) -> Any:
"""Initialize connection to database."""
logger.info("Initializing connection to quota limiter database")
if config.postgres is not None:
return connect_pg(config.postgres)
if config.sqlite is not None:
return connect_sqlite(config.sqlite)
return None


def connect_pg(config: PostgreSQLDatabaseConfiguration) -> Any:
"""Initialize connection to PostgreSQL database."""
logger.info("Connecting to PostgreSQL storage")
connection = psycopg2.connect(
host=config.host,
port=config.port,
user=config.user,
password=config.password.get_secret_value(),
dbname=config.db,
sslmode=config.ssl_mode,
# sslrootcert=config.ca_cert_path,
gssencmode=config.gss_encmode,
)
if connection is not None:
connection.autocommit = True
return connection
Comment on lines +251 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add error handling and reconsider autocommit mode.

Two concerns:

  1. Missing error handling: No try/except block to catch psycopg2.OperationalError or psycopg2.DatabaseError during connection
  2. Autocommit mode: Setting autocommit = True (line 202) means each UPDATE statement commits immediately, preventing atomic updates across multiple limiters in a single sync cycle

Consider:

def connect_pg(config: PostgreSQLDatabaseConfiguration) -> Any:
    """Initialize connection to PostgreSQL database."""
    logger.info("Connecting to PostgreSQL storage")
    try:
        connection = psycopg2.connect(
            host=config.host,
            port=config.port,
            user=config.user,
            password=config.password.get_secret_value(),
            dbname=config.db,
            sslmode=config.ssl_mode,
            gssencmode=config.gss_encmode,
        )
        # Consider removing autocommit to allow transactional updates
        # connection.autocommit = True
        return connection
    except (psycopg2.OperationalError, psycopg2.DatabaseError) as e:
        logger.exception("Failed to connect to PostgreSQL: %s", e)
        return None

If autocommit is removed, add explicit connection.commit() calls after each limiter update or at the end of each sync cycle.



def connect_sqlite(config: SQLiteDatabaseConfiguration) -> Any:
"""Initialize connection to database."""
logger.info("Connecting to SQLite storage")
# make sure the connection will have known state
# even if SQLite is not alive
connection = None
try:
connection = sqlite3.connect(database=config.db_path)
except sqlite3.Error as e:
if connection is not None:
connection.close()
logger.exception("Error initializing SQLite cache:\n%s", e)
raise
connection.autocommit = True
return connection
Comment on lines +269 to +283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

SQLite autocommit handling and error message.

Use isolation_level=None for autocommit (portable across Python versions) and fix the log message wording.

     try:
         connection = sqlite3.connect(database=config.db_path)
     except sqlite3.Error as e:
         if connection is not None:
             connection.close()
-        logger.exception("Error initializing SQLite cache:\n%s", e)
+        logger.exception("Error initializing SQLite quota storage:\n%s", e)
         raise
-    connection.autocommit = True
+    connection.isolation_level = None  # autocommit
     return connection
🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 269 to 283, the code sets
connection.autocommit = True and uses a slightly awkward log message; replace
the autocommit approach with the portable Python pattern by setting
connection.isolation_level = None immediately after a successful sqlite3.connect
call (remove the connection.autocommit assignment), change the initial info log
to something like "Connecting to SQLite database" and update the exception log
text to "Error initializing SQLite connection" while keeping logger.exception to
capture the error and ensure the connection is closed on exception as currently
implemented.



def init_tables(connection: Any) -> None:
"""Initialize tables used by quota limiter."""
logger.info("Initializing tables for quota limiter")
cursor = connection.cursor()
cursor.execute(CREATE_QUOTA_TABLE)
cursor.close()
connection.commit()


def start_quota_scheduler(configuration: Configuration) -> None:
"""Start user and cluster quota scheduler in separate thread."""
logger.info("Starting quota scheduler")
thread = Thread(
target=quota_scheduler,
daemon=True,
args=(configuration.quota_handlers,),
)
thread.start()
Loading