Skip to content

Conversation

@tisnik
Copy link
Contributor

@tisnik tisnik commented Oct 17, 2025

Description

LCORE-741: quota limiter scheduler

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Konflux configuration change
  • Unit tests improvement
  • Integration tests improvement
  • End to end tests improvement

Related Tickets & Documents

  • Related Issue #LCORE-741

Summary by CodeRabbit

  • New Features
    • Quota limiting system added to manage user and cluster resource usage.
    • A background quota scheduler now starts automatically at application startup to enforce limits.
    • Persistent quota storage introduced with support for PostgreSQL and SQLite backends.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 17, 2025

Walkthrough

A background quota scheduler was added: two new limiter constants, startup integration to launch the scheduler before the app runs, and a new module that manages quota revocation and updates using PostgreSQL or SQLite backends.

Changes

Cohort / File(s) Summary
Quota Constants
src/constants.py
Added USER_QUOTA_LIMITER = "user_limiter" and CLUSTER_QUOTA_LIMITER = "cluster_limiter".
Startup Integration
src/lightspeed_stack.py
Imported start_quota_scheduler and invoked start_quota_scheduler(configuration.configuration) at application startup before launching uvicorn.
Quota Scheduler Module
src/runners/quota_scheduler.py
New module implementing DB connections (Postgres/SQLite), table creation, SQL statements for increase/reset, quota_revocation orchestration, helpers (increase_quota, reset_quota, get_subject_id, connect*, init_tables), and start_quota_scheduler to spawn a daemon thread. Introduces psycopg2 and sqlite3 usage.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application Startup
    participant Starter as start_quota_scheduler()
    participant Thread as Daemon Thread (quota_scheduler)
    participant DB as Database
    participant Revoker as quota_revocation()

    App->>Starter: start_quota_scheduler(config)
    Starter->>Thread: spawn daemon (background)
    Thread->>DB: connect(config)
    DB-->>Thread: connection
    Thread->>DB: init_tables(connection)
    DB-->>Thread: tables ensured

    loop periodic loop
        Thread->>Thread: select limiter config
        Thread->>Revoker: quota_revocation(connection, limiter, statements)
        Revoker->>Revoker: get_subject_id(limiter.type)
        opt increase configured
            Revoker->>DB: execute INCREASE statement
        end
        opt reset configured
            Revoker->>DB: execute RESET statement
        end
        DB-->>Revoker: rows affected
        Revoker-->>Thread: done
        Thread->>Thread: sleep(period)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I hopped in with constants bright,
A daemon hums through day and night,
Tables ready, connections keen,
Quotas nudged behind the scene,
Tiny rabbit, scheduler sprite.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "LCORE-741: quota limiter scheduler" is fully related to the main changes in the changeset. The pull request implements a new quota limiter scheduler feature, which is reflected across three files: new constants in src/constants.py, integration into the startup flow in src/lightspeed_stack.py, and the core scheduler implementation in src/runners/quota_scheduler.py. The title is clear, specific, and concise—it directly describes the feature being added without vague terminology or extraneous details. The inclusion of the ticket reference (LCORE-741) aligns with the PR's stated objective to implement this feature.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@tisnik tisnik force-pushed the lcore-741-quota-limiter-scheduler branch from 31ee9a8 to a2587b0 Compare October 17, 2025 09:08
@tisnik tisnik force-pushed the lcore-741-quota-limiter-scheduler branch from a2587b0 to 1cbaf7f Compare October 17, 2025 09:13
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (7)
src/runners/quota_scheduler.py (6)

100-104: Unnecessary validation checks for required fields.

Lines 100 and 103 check if quota_limiter.type and quota_limiter.period are None, but according to the QuotaLimiterConfiguration model (lines 567-574 in src/models/config.py), both fields are required and cannot be None. Pydantic validates these at configuration load time.

These defensive checks can be removed:

-    if quota_limiter.type is None:
-        raise ValueError("Limiter type not set, skipping revocation")
-
-    if quota_limiter.period is None:
-        raise ValueError("Limiter period not set, skipping revocation")
-
     subject_id = get_subject_id(quota_limiter.type)

119-141: Improve type annotations and docstrings.

Per coding guidelines, functions should have:

  1. Complete type annotations (connection is typed as Any)
  2. Google-style docstrings with Args, Returns, and Raises sections

As per coding guidelines.

Example improvement:

def increase_quota(
    connection: sqlite3.Connection | psycopg2.extensions.connection,
    subject_id: str,
    increase_by: int,
    period: str,
) -> None:
    """Increase quota by specified amount.
    
    Args:
        connection: Database connection (PostgreSQL or SQLite).
        subject_id: Subject identifier ('u' for user, 'c' for cluster).
        increase_by: Amount to increase quota by.
        period: Time period string (e.g., '1 day', '1 hour').
    
    Raises:
        DatabaseError: If the update operation fails.
    """

Apply similar improvements to reset_quota and other functions.


167-175: Consider raising exception for unknown limiter types.

The function returns "?" for unknown limiter types (line 175), which could lead to silent failures. Consider raising a ValueError instead to fail fast when an invalid type is encountered.

 def get_subject_id(limiter_type: str) -> str:
     """Get subject ID based on quota limiter type."""
     match limiter_type:
         case constants.USER_QUOTA_LIMITER:
             return "u"
         case constants.CLUSTER_QUOTA_LIMITER:
             return "c"
         case _:
-            return "?"
+            raise ValueError(f"Unknown limiter type: {limiter_type}")

198-198: Remove or enable commented sslrootcert configuration.

Line 198 has a commented-out sslrootcert parameter. If SSL certificate validation is not needed, remove the comment. If it should be enabled, uncomment and verify the certificate path configuration.

Either remove:

-        # sslrootcert=config.ca_cert_path,

Or enable with proper handling:

-        # sslrootcert=config.ca_cert_path,
+        **({"sslrootcert": config.ca_cert_path} if config.ca_cert_path else {}),

206-220: Verify SQLite connection error handling logic.

Lines 215-216 check if connection is not None inside the exception handler, but connection is initialized to None on line 211. If sqlite3.connect() raises an exception on line 213, connection will still be None, making the check and subsequent close() call unnecessary.

The pattern works but is confusing. Consider simplifying or adding a comment explaining the defensive check.

def connect_sqlite(config: SQLiteDatabaseConfiguration) -> Any:
    """Initialize connection to SQLite database."""
    logger.info("Connecting to SQLite storage")
    try:
        connection = sqlite3.connect(database=config.db_path)
        connection.autocommit = True
        return connection
    except sqlite3.Error as e:
        logger.exception("Error initializing SQLite cache:\n%s", e)
        return None

Also note the same autocommit concern as PostgreSQL (prevents atomic multi-limiter updates).


223-229: Inconsistent cursor management pattern.

This function manually creates and closes the cursor (lines 226-228), while increase_quota and reset_quota use the context manager pattern (with connection.cursor() as cursor:). Use a consistent approach throughout.

def init_tables(connection: Any) -> None:
    """Initialize tables used by quota limiter."""
    logger.info("Initializing tables for quota limiter")
    with connection.cursor() as cursor:
        cursor.execute(CREATE_QUOTA_TABLE)
    connection.commit()

Note: If autocommit=True is set on the connection, the explicit commit() is redundant.

src/lightspeed_stack.py (1)

124-125: Consider adding error handling for quota scheduler startup.

The quota scheduler starts in a daemon thread that returns immediately (line 125). While the scheduler internally handles validation and errors, consider wrapping the startup call in try/except to catch and log any immediate failures during thread creation.

# start the runners
try:
    start_quota_scheduler(configuration.configuration)
except Exception as e:
    logger.error("Failed to start quota scheduler: %s", e)
    # Decide: continue without scheduler or fail fast
    # For now, log and continue since it's a background feature

# if every previous steps don't fail, start the service on specified port
start_uvicorn(configuration.service_configuration)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6feb4e3 and 1cbaf7f.

📒 Files selected for processing (3)
  • src/constants.py (1 hunks)
  • src/lightspeed_stack.py (2 hunks)
  • src/runners/quota_scheduler.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use absolute imports for internal modules (e.g., from auth import get_auth_dependency)

Files:

  • src/runners/quota_scheduler.py
  • src/lightspeed_stack.py
  • src/constants.py
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: All modules start with descriptive module-level docstrings explaining purpose
Use logger = logging.getLogger(name) for module logging after import logging
Define type aliases at module level for clarity
All functions require docstrings with brief descriptions
Provide complete type annotations for all function parameters and return types
Use typing_extensions.Self in model validators where appropriate
Use modern union syntax (str | int) and Optional[T] or T | None consistently
Function names use snake_case with descriptive, action-oriented prefixes (get_, validate_, check_)
Avoid in-place parameter modification; return new data structures instead of mutating arguments
Use appropriate logging levels: debug, info, warning, error with clear messages
All classes require descriptive docstrings explaining purpose
Class names use PascalCase with conventional suffixes (Configuration, Error/Exception, Resolver, Interface)
Abstract base classes should use abc.ABC and @AbstractMethod for interfaces
Provide complete type annotations for all class attributes
Follow Google Python docstring style for modules, classes, and functions, including Args, Returns, Raises, Attributes sections as needed

Files:

  • src/runners/quota_scheduler.py
  • src/lightspeed_stack.py
  • src/constants.py
src/constants.py

📄 CodeRabbit inference engine (CLAUDE.md)

Keep shared constants in a central src/constants.py with descriptive comments

Files:

  • src/constants.py
🧬 Code graph analysis (2)
src/runners/quota_scheduler.py (2)
src/log.py (1)
  • get_logger (7-13)
src/models/config.py (6)
  • config (140-146)
  • Configuration (596-622)
  • QuotaHandlersConfiguration (584-593)
  • QuotaLimiterConfiguration (568-575)
  • PostgreSQLDatabaseConfiguration (87-105)
  • SQLiteDatabaseConfiguration (75-78)
src/lightspeed_stack.py (2)
src/runners/quota_scheduler.py (2)
  • quota_scheduler (51-89)
  • start_quota_scheduler (232-240)
src/configuration.py (1)
  • configuration (65-69)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: e2e_tests (ci)
  • GitHub Check: e2e_tests (azure)
🔇 Additional comments (3)
src/runners/quota_scheduler.py (1)

232-240: Quota scheduler startup looks good.

The daemon thread approach is appropriate for a background scheduler. The thread will automatically terminate when the main process exits.

Optional: Consider logging thread status after start:

thread.start()
logger.info("Quota scheduler thread started (daemon=%s)", thread.daemon)
src/constants.py (1)

148-150: LGTM!

The new quota limiter constants follow proper naming conventions and are used consistently in the quota scheduler module.

src/lightspeed_stack.py (1)

17-17: Use absolute import for internal module.

Per coding guidelines, use absolute imports for internal modules.

As per coding guidelines.

Apply this diff:

-from runners.quota_scheduler import start_quota_scheduler
+from src.runners.quota_scheduler import start_quota_scheduler
⛔ Skipped due to learnings
Learnt from: CR
PR: lightspeed-core/lightspeed-stack#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T16:46:33.353Z
Learning: Applies to src/**/*.py : Use absolute imports for internal modules (e.g., from auth import get_auth_dependency)

Comment on lines +10 to +11
import constants
from log import get_logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use absolute imports for internal modules.

The coding guidelines require absolute imports for internal modules (e.g., from src import constants).

As per coding guidelines.

Apply this diff:

-import constants
-from log import get_logger
+from src import constants
+from src.log import get_logger
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import constants
from log import get_logger
import constants
from log import get_logger
Suggested change
import constants
from log import get_logger
from src import constants
from src.log import get_logger
🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 10-11, the imports use
relative/internal short names; update them to absolute imports per guidelines:
replace "import constants" with "from src import constants" and replace "from
log import get_logger" with "from src.log import get_logger" (or the correct
module path under src) so internal modules are imported absolutely.

Comment on lines 78 to 89
while True:
logger.info("Quota scheduler sync started")
for limiter in config.limiters:
try:
quota_revocation(connection, limiter)
except Exception as e: # pylint: disable=broad-exception-caught)
logger.error("Quota revoke error: %s", e)
logger.info("Quota scheduler sync finished")
sleep(period)
# unreachable code
connection.close()
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Infinite loop prevents graceful shutdown and connection cleanup.

The infinite while True loop means:

  1. Line 88 connection.close() is unreachable
  2. No mechanism exists for graceful shutdown
  3. Database connection remains open until process termination

Since this runs as a daemon thread, it will be killed abruptly when the main process exits, potentially leaving uncommitted transactions or locks.

Consider implementing a shutdown event mechanism:

import threading

_shutdown_event = threading.Event()

def quota_scheduler(config: QuotaHandlersConfiguration) -> bool:
    """Quota scheduler task."""
    # ... validation code ...
    
    logger.info("Quota scheduler started...")
    
    while not _shutdown_event.is_set():
        logger.info("Quota scheduler sync started")
        for limiter in config.limiters:
            try:
                quota_revocation(connection, limiter)
            except Exception as e:  # pylint: disable=broad-exception-caught
                logger.error("Quota revoke error: %s", e)
        logger.info("Quota scheduler sync finished")
        _shutdown_event.wait(timeout=period)
    
    connection.close()
    logger.info("Quota scheduler stopped")
    return True

def stop_quota_scheduler() -> None:
    """Signal the quota scheduler to stop."""
    _shutdown_event.set()
🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 78 to 89 the infinite while True
loop prevents graceful shutdown and leaves the DB connection.close()
unreachable; replace the loop with a stop-able pattern using a module-level
threading.Event (e.g. _shutdown_event), change the loop to while not
_shutdown_event.is_set(), replace the sleep(period) with
_shutdown_event.wait(timeout=period) so the loop can exit promptly on shutdown,
ensure connection.close() and a final logger.info("Quota scheduler stopped") run
after the loop, and add a stop_quota_scheduler() function that calls
_shutdown_event.set() to signal termination.

Comment on lines +188 to +203
def connect_pg(config: PostgreSQLDatabaseConfiguration) -> Any:
"""Initialize connection to PostgreSQL database."""
logger.info("Connecting to PostgreSQL storage")
connection = psycopg2.connect(
host=config.host,
port=config.port,
user=config.user,
password=config.password.get_secret_value(),
dbname=config.db,
sslmode=config.ssl_mode,
# sslrootcert=config.ca_cert_path,
gssencmode=config.gss_encmode,
)
if connection is not None:
connection.autocommit = True
return connection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add error handling and reconsider autocommit mode.

Two concerns:

  1. Missing error handling: No try/except block to catch psycopg2.OperationalError or psycopg2.DatabaseError during connection
  2. Autocommit mode: Setting autocommit = True (line 202) means each UPDATE statement commits immediately, preventing atomic updates across multiple limiters in a single sync cycle

Consider:

def connect_pg(config: PostgreSQLDatabaseConfiguration) -> Any:
    """Initialize connection to PostgreSQL database."""
    logger.info("Connecting to PostgreSQL storage")
    try:
        connection = psycopg2.connect(
            host=config.host,
            port=config.port,
            user=config.user,
            password=config.password.get_secret_value(),
            dbname=config.db,
            sslmode=config.ssl_mode,
            gssencmode=config.gss_encmode,
        )
        # Consider removing autocommit to allow transactional updates
        # connection.autocommit = True
        return connection
    except (psycopg2.OperationalError, psycopg2.DatabaseError) as e:
        logger.exception("Failed to connect to PostgreSQL: %s", e)
        return None

If autocommit is removed, add explicit connection.commit() calls after each limiter update or at the end of each sync cycle.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
src/runners/quota_scheduler.py (3)

10-21: Use absolute imports and standard logger per guidelines.

Switch to absolute imports under src and use logging.getLogger(name). This also avoids depending on a custom logger factory. As per coding guidelines.

+import logging
 from typing import Any
-from threading import Thread
+from threading import Thread, Event
-from time import sleep
+from time import sleep
@@
-import constants
-from log import get_logger
-from models.config import (
+from src import constants
+from src.models.config import (
     Configuration,
     QuotaHandlersConfiguration,
     QuotaLimiterConfiguration,
     PostgreSQLDatabaseConfiguration,
     SQLiteDatabaseConfiguration,
 )
@@
-logger = get_logger(__name__)
+logger = logging.getLogger(__name__)

98-111: Infinite loop prevents shutdown; make it stoppable and close the connection.

Replace while True + sleep with an Event-driven loop. Add stop_quota_scheduler() and return the thread from start_quota_scheduler(). As per coding guidelines.

-from threading import Thread
+from threading import Thread, Event
@@
-logger = get_logger(__name__)
+logger = logging.getLogger(__name__)
+
+_shutdown_event: Event = Event()
@@
-    while True:
+    while not _shutdown_event.is_set():
@@
-        sleep(period)
-    # unreachable code
-    connection.close()
-    return True
+        _shutdown_event.wait(timeout=period)
+    connection.close()
+    logger.info("Quota scheduler stopped")
+    return True
@@
-def start_quota_scheduler(configuration: Configuration) -> None:
-    """Start user and cluster quota scheduler in separate thread."""
+def start_quota_scheduler(configuration: Configuration) -> Thread:
+    """Start user and cluster quota scheduler in separate thread."""
@@
-    thread.start()
+    thread.start()
+    return thread
+
+def stop_quota_scheduler() -> None:
+    """Signal the quota scheduler to stop."""
+    _shutdown_event.set()

Also applies to: 4-6, 20-22, 295-303


251-267: Add PG connection error handling; autocommit choice.

Wrap psycopg2.connect in try/except and log OperationalError/DatabaseError. Keep autocommit if desired, but consider transactional batching later.

 def connect_pg(config: PostgreSQLDatabaseConfiguration) -> Any:
     """Initialize connection to PostgreSQL database."""
     logger.info("Connecting to PostgreSQL storage")
-    connection = psycopg2.connect(
-        host=config.host,
-        port=config.port,
-        user=config.user,
-        password=config.password.get_secret_value(),
-        dbname=config.db,
-        sslmode=config.ssl_mode,
-        # sslrootcert=config.ca_cert_path,
-        gssencmode=config.gss_encmode,
-    )
-    if connection is not None:
-        connection.autocommit = True
-    return connection
+    try:
+        connection = psycopg2.connect(
+            host=config.host,
+            port=config.port,
+            user=config.user,
+            password=config.password.get_secret_value(),
+            dbname=config.db,
+            sslmode=config.ssl_mode,
+            # sslrootcert=config.ca_cert_path,
+            gssencmode=config.gss_encmode,
+        )
+    except (psycopg2.OperationalError, psycopg2.DatabaseError) as e:
+        logger.exception("Failed to connect to PostgreSQL: %s", e)
+        return None
+    connection.autocommit = True
+    return connection
🧹 Nitpick comments (3)
src/runners/quota_scheduler.py (3)

186-197: Access rowcount before closing cursor.

Capture rowcount before closing to avoid driver-specific surprises.

-    cursor.close()
-    connection.commit()
-    logger.info("Changed %d rows in database", cursor.rowcount)
+    affected = cursor.rowcount
+    cursor.close()
+    connection.commit()
+    logger.info("Changed %d rows in database", affected)

Also applies to: 217-227


230-239: Fail fast on unknown limiter type.

Returning "?" hides config errors; raise instead.

 def get_subject_id(limiter_type: str) -> str:
     """Get subject ID based on quota limiter type."""
     match limiter_type:
         case constants.USER_QUOTA_LIMITER:
             return "u"
         case constants.CLUSTER_QUOTA_LIMITER:
             return "c"
         case _:
-            return "?"
+            raise ValueError(f"Unknown limiter type: {limiter_type!r}")

23-33: Consider backend-specific CREATE TABLE for better portability and defaults.

SQLite accepts arbitrary type names, but using TEXT/INTEGER and default timestamps improves clarity; also consider NOT NULL + sensible defaults and an index on (subject, revoked_at) for the WHERE pattern.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1cbaf7f and cb0ebdf.

📒 Files selected for processing (1)
  • src/runners/quota_scheduler.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use absolute imports for internal modules (e.g., from auth import get_auth_dependency)

Files:

  • src/runners/quota_scheduler.py
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: All modules start with descriptive module-level docstrings explaining purpose
Use logger = logging.getLogger(name) for module logging after import logging
Define type aliases at module level for clarity
All functions require docstrings with brief descriptions
Provide complete type annotations for all function parameters and return types
Use typing_extensions.Self in model validators where appropriate
Use modern union syntax (str | int) and Optional[T] or T | None consistently
Function names use snake_case with descriptive, action-oriented prefixes (get_, validate_, check_)
Avoid in-place parameter modification; return new data structures instead of mutating arguments
Use appropriate logging levels: debug, info, warning, error with clear messages
All classes require descriptive docstrings explaining purpose
Class names use PascalCase with conventional suffixes (Configuration, Error/Exception, Resolver, Interface)
Abstract base classes should use abc.ABC and @AbstractMethod for interfaces
Provide complete type annotations for all class attributes
Follow Google Python docstring style for modules, classes, and functions, including Args, Returns, Raises, Attributes sections as needed

Files:

  • src/runners/quota_scheduler.py
🧠 Learnings (1)
📚 Learning: 2025-09-18T16:46:33.353Z
Learnt from: CR
PR: lightspeed-core/lightspeed-stack#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-18T16:46:33.353Z
Learning: Applies to src/**/*.py : Use absolute imports for internal modules (e.g., from auth import get_auth_dependency)

Applied to files:

  • src/runners/quota_scheduler.py
🧬 Code graph analysis (1)
src/runners/quota_scheduler.py (2)
src/log.py (1)
  • get_logger (7-13)
src/models/config.py (6)
  • config (140-146)
  • Configuration (596-622)
  • QuotaHandlersConfiguration (584-593)
  • QuotaLimiterConfiguration (568-575)
  • PostgreSQLDatabaseConfiguration (87-105)
  • SQLiteDatabaseConfiguration (75-78)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: e2e_tests (azure)
  • GitHub Check: e2e_tests (ci)
  • GitHub Check: build-pr

Comment on lines +36 to +41
INCREASE_QUOTA_STATEMENT_PG = """
UPDATE quota_limits
SET available=available+%s, revoked_at=NOW()
WHERE subject=%s
AND revoked_at < NOW() - INTERVAL %s ;
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Data corruption risk: UPDATEs miss the primary key and may update all rows. Also cap to quota_limit, handle NULLs, and keep timestamps.

Current UPDATEs filter only by subject; they must also filter by id. Additionally:

  • Use LEAST/MIN to not exceed quota_limit.
  • COALESCE available to 0.
  • Maintain updated_at; allow first-time updates when revoked_at is NULL.
  • For PostgreSQL, avoid INTERVAL %s parsing issues by casting.

Also pass limiter.name (id) to increase/reset and make the two operations mutually exclusive per limiter.

@@
-INCREASE_QUOTA_STATEMENT_PG = """
-    UPDATE quota_limits
-       SET available=available+%s, revoked_at=NOW()
-     WHERE subject=%s
-       AND revoked_at < NOW() - INTERVAL %s ;
-    """
+INCREASE_QUOTA_STATEMENT_PG = """
+    UPDATE quota_limits
+       SET available = LEAST(quota_limit, COALESCE(available, 0) + %s),
+           updated_at = NOW(),
+           revoked_at = NOW()
+     WHERE id=%s
+       AND subject=%s
+       AND (revoked_at IS NULL OR revoked_at < NOW() - CAST(%s AS INTERVAL));
+    """
@@
-INCREASE_QUOTA_STATEMENT_SQLITE = """
-    UPDATE quota_limits
-       SET available=available+?, revoked_at=datetime('now')
-     WHERE subject=?
-       AND revoked_at < datetime('now', ?);
-    """
+INCREASE_QUOTA_STATEMENT_SQLITE = """
+    UPDATE quota_limits
+       SET available = MIN(quota_limit, COALESCE(available, 0) + ?),
+           updated_at = datetime('now'),
+           revoked_at = datetime('now')
+     WHERE id=?
+       AND subject=?
+       AND (revoked_at IS NULL OR revoked_at < datetime('now', ?));
+    """
@@
-RESET_QUOTA_STATEMENT_PG = """
-    UPDATE quota_limits
-       SET available=%s, revoked_at=NOW()
-     WHERE subject=%s
-       AND revoked_at < NOW() - INTERVAL %s ;
-    """
+RESET_QUOTA_STATEMENT_PG = """
+    UPDATE quota_limits
+       SET available = LEAST(quota_limit, %s),
+           updated_at = NOW(),
+           revoked_at = NOW()
+     WHERE id=%s
+       AND subject=%s
+       AND (revoked_at IS NULL OR revoked_at < NOW() - CAST(%s AS INTERVAL));
+    """
@@
-RESET_QUOTA_STATEMENT_SQLITE = """
-    UPDATE quota_limits
-       SET available=?, revoked_at=datetime('now')
-     WHERE subject=?
-       AND revoked_at < datetime('now', ?);
-    """
+RESET_QUOTA_STATEMENT_SQLITE = """
+    UPDATE quota_limits
+       SET available = MIN(quota_limit, ?),
+           updated_at = datetime('now'),
+           revoked_at = datetime('now')
+     WHERE id=?
+       AND subject=?
+       AND (revoked_at IS NULL OR revoked_at < datetime('now', ?));
+    """
@@
-    if quota_limiter.quota_increase is not None:
-        increase_quota(
-            connection,
-            increase_quota_statement,
-            subject_id,
-            quota_limiter.quota_increase,
-            quota_limiter.period,
-        )
-
-    if quota_limiter.initial_quota is not None and quota_limiter.initial_quota > 0:
-        reset_quota(
-            connection,
-            reset_quota_statement,
-            subject_id,
-            quota_limiter.initial_quota,
-            quota_limiter.period,
-        )
+    if (
+        quota_limiter.quota_increase is not None
+        and quota_limiter.initial_quota is not None
+        and quota_limiter.initial_quota > 0
+    ):
+        raise ValueError(
+            f"Limiter '{quota_limiter.name}' sets both quota_increase and initial_quota; choose one."
+        )
+    if quota_limiter.quota_increase is not None:
+        increase_quota(
+            connection,
+            increase_quota_statement,
+            quota_limiter.name,  # limiter id
+            subject_id,
+            quota_limiter.quota_increase,
+            quota_limiter.period,
+        )
+    elif quota_limiter.initial_quota is not None and quota_limiter.initial_quota > 0:
+        reset_quota(
+            connection,
+            reset_quota_statement,
+            quota_limiter.name,  # limiter id
+            subject_id,
+            quota_limiter.initial_quota,
+            quota_limiter.period,
+        )
@@
-def increase_quota(
-    connection: Any,
-    update_statement: str,
-    subject_id: str,
-    increase_by: int,
-    period: str,
-) -> None:
+def increase_quota(
+    connection: Any,
+    update_statement: str,
+    limiter_id: str,
+    subject_id: str,
+    increase_by: int,
+    period: str,
+) -> None:
@@
-    cursor.execute(
-        update_statement,
-        (
-            increase_by,
-            subject_id,
-            period,
-        ),
-    )
-    cursor.close()
-    connection.commit()
-    logger.info("Changed %d rows in database", cursor.rowcount)
+    # Format SQLite interval as a negative modifier (e.g., "-1 hour")
+    period_param = period if not isinstance(connection, sqlite3.Connection) else (
+        period if str(period).lstrip().startswith("-") else f"-{period}"
+    )
+    cursor.execute(update_statement, (increase_by, limiter_id, subject_id, period_param))
+    affected = cursor.rowcount
+    cursor.close()
+    connection.commit()
+    logger.info("Changed %d rows in database", affected)
@@
-def reset_quota(
-    connection: Any,
-    update_statement: str,
-    subject_id: str,
-    reset_to: int,
-    period: str,
-) -> None:
+def reset_quota(
+    connection: Any,
+    update_statement: str,
+    limiter_id: str,
+    subject_id: str,
+    reset_to: int,
+    period: str,
+) -> None:
@@
-    cursor.execute(
-        update_statement,
-        (
-            reset_to,
-            subject_id,
-            period,
-        ),
-    )
-    cursor.close()
-    connection.commit()
-    logger.info("Changed %d rows in database", cursor.rowcount)
+    period_param = period if not isinstance(connection, sqlite3.Connection) else (
+        period if str(period).lstrip().startswith("-") else f"-{period}"
+    )
+    cursor.execute(update_statement, (reset_to, limiter_id, subject_id, period_param))
+    affected = cursor.rowcount
+    cursor.close()
+    connection.commit()
+    logger.info("Changed %d rows in database", affected)

Also applies to: 44-49, 52-57, 60-65, 149-166, 168-174, 186-193, 199-205, 217-224

🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 36-41 (and also apply the same
pattern to the other ranges noted: 44-49, 52-57, 60-65, 149-166, 168-174,
186-193, 199-205, 217-224), the UPDATE statements only filter by subject and
risk updating all rows; update them to also filter by the limiter primary key
(id) passed as limiter.name, ensure available is computed safely and capped at
quota_limit, preserve updated_at, handle NULLs for available and revoked_at, and
avoid INTERVAL parsing issues: change the WHERE to include AND id = %s, compute
available = LEAST(COALESCE(available,0) + %s, COALESCE(quota_limit, %s)), set
updated_at = NOW() (don’t overwrite revoked_at unless intended), and replace
INTERVAL %s with a safe cast such as (NOW() - ($3)::interval) or use INTERVAL
'%s'::interval and allow revoked_at IS NULL by using (revoked_at IS NULL OR
revoked_at < (NOW() - ($3)::interval)); finally pass limiter.name as the id
parameter to increase/reset calls and serialize operations per limiter (make
operations mutually exclusive) by selecting the row FOR UPDATE (e.g., SELECT ...
FOR UPDATE) or by using a WHERE id=%s with an appropriate advisory lock per
limiter so concurrent increase/reset cannot both run for the same limiter.

Comment on lines +90 to +92
increase_quota_statement = get_increase_quota_statement(config)
reset_quota_statement = get_reset_quota_statement(config)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pick SQL dialect by the actual connection, not by config flags.

If both postgres and sqlite are configured, you may connect to PG but build SQLite statements. Detect by connection type.

-    increase_quota_statement = get_increase_quota_statement(config)
-    reset_quota_statement = get_reset_quota_statement(config)
+    increase_quota_statement = get_increase_quota_statement(connection)
+    reset_quota_statement = get_reset_quota_statement(connection)
@@
-def get_increase_quota_statement(config: QuotaHandlersConfiguration) -> str:
+def get_increase_quota_statement(connection: Any) -> str:
     """Get the SQL statement to increase quota."""
-    if config.sqlite is not None:
+    if isinstance(connection, sqlite3.Connection):
         return INCREASE_QUOTA_STATEMENT_SQLITE
     return INCREASE_QUOTA_STATEMENT_PG
@@
-def get_reset_quota_statement(config: QuotaHandlersConfiguration) -> str:
+def get_reset_quota_statement(connection: Any) -> str:
     """Get the SQL statement to reset quota."""
-    if config.sqlite is not None:
+    if isinstance(connection, sqlite3.Connection):
         return RESET_QUOTA_STATEMENT_SQLITE
     return RESET_QUOTA_STATEMENT_PG

Also applies to: 114-126

🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 90-92 (and similarly at 114-126),
the code selects SQL dialect based on config flags which can mismatch the actual
DB connection; instead detect the dialect from the active DB connection (e.g.,
inspect connection.dialect.name or the engine/connection dialect type) and
branch on that value to call get_increase_quota_statement and
get_reset_quota_statement so the generated SQL matches the real backend
(postgresql vs sqlite). Ensure you obtain a live connection or engine object
before selecting the dialect and fall back to a sensible default or raise an
error if the dialect is unsupported.

Comment on lines +269 to +283
def connect_sqlite(config: SQLiteDatabaseConfiguration) -> Any:
"""Initialize connection to database."""
logger.info("Connecting to SQLite storage")
# make sure the connection will have known state
# even if SQLite is not alive
connection = None
try:
connection = sqlite3.connect(database=config.db_path)
except sqlite3.Error as e:
if connection is not None:
connection.close()
logger.exception("Error initializing SQLite cache:\n%s", e)
raise
connection.autocommit = True
return connection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

SQLite autocommit handling and error message.

Use isolation_level=None for autocommit (portable across Python versions) and fix the log message wording.

     try:
         connection = sqlite3.connect(database=config.db_path)
     except sqlite3.Error as e:
         if connection is not None:
             connection.close()
-        logger.exception("Error initializing SQLite cache:\n%s", e)
+        logger.exception("Error initializing SQLite quota storage:\n%s", e)
         raise
-    connection.autocommit = True
+    connection.isolation_level = None  # autocommit
     return connection
🤖 Prompt for AI Agents
In src/runners/quota_scheduler.py around lines 269 to 283, the code sets
connection.autocommit = True and uses a slightly awkward log message; replace
the autocommit approach with the portable Python pattern by setting
connection.isolation_level = None immediately after a successful sqlite3.connect
call (remove the connection.autocommit assignment), change the initial info log
to something like "Connecting to SQLite database" and update the exception log
text to "Error initializing SQLite connection" while keeping logger.exception to
capture the error and ensure the connection is closed on exception as currently
implemented.

@tisnik tisnik merged commit c0e0d6c into lightspeed-core:main Oct 17, 2025
18 of 20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant