Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/quota/quota_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from typing import Optional

import sqlite3
import psycopg2

from log import get_logger
Expand Down Expand Up @@ -75,11 +76,18 @@ def connected(self) -> bool:
if self.connection is None:
logger.warning("Not connected, need to reconnect later")
return False
cursor = None
try:
with self.connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
logger.info("Connection to storage is ok")
return True
except psycopg2.OperationalError as e:
except (psycopg2.OperationalError, sqlite3.Error) as e:
logger.error("Disconnected from storage: %s", e)
return False
finally:
if cursor is not None:
try:
cursor.close()
except Exception: # pylint: disable=broad-exception-caught
logger.warning("Unable to close cursor")
139 changes: 107 additions & 32 deletions src/quota/revokable_quota_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
from quota.sql import (
CREATE_QUOTA_TABLE,
UPDATE_AVAILABLE_QUOTA_PG,
UPDATE_AVAILABLE_QUOTA_SQLITE,
SELECT_QUOTA_PG,
SELECT_QUOTA_SQLITE,
SET_AVAILABLE_QUOTA_PG,
SET_AVAILABLE_QUOTA_SQLITE,
INIT_QUOTA_PG,
INIT_QUOTA_SQLITE,
)

logger = get_logger(__name__)
Expand Down Expand Up @@ -40,46 +44,80 @@ def available_quota(self, subject_id: str = "") -> int:
"""Retrieve available quota for given subject."""
if self.subject_type == "c":
subject_id = ""
with self.connection.cursor() as cursor:
cursor.execute(
SELECT_QUOTA_PG,
(subject_id, self.subject_type),
)
value = cursor.fetchone()
if value is None:
self._init_quota(subject_id)
return self.initial_quota
return value[0]
if self.sqlite_connection_config is not None:
return self._read_available_quota(SELECT_QUOTA_SQLITE, subject_id)
if self.postgres_connection_config is not None:
return self._read_available_quota(SELECT_QUOTA_PG, subject_id)
# default value is used only if quota limiter database is not setup
return 0

def _read_available_quota(self, query_statement: str, subject_id: str) -> int:
"""Read available quota from selected database."""
# it is not possible to use context manager there, because SQLite does
# not support it
cursor = self.connection.cursor()
cursor.execute(
query_statement,
(subject_id, self.subject_type),
)
value = cursor.fetchone()
if value is None:
self._init_quota(subject_id)
return self.initial_quota
cursor.close()
return value[0]

@connection
def revoke_quota(self, subject_id: str = "") -> None:
"""Revoke quota for given subject."""
if self.subject_type == "c":
subject_id = ""

if self.postgres_connection_config is not None:
self._revoke_quota(SET_AVAILABLE_QUOTA_PG, subject_id)
return
if self.sqlite_connection_config is not None:
self._revoke_quota(SET_AVAILABLE_QUOTA_SQLITE, subject_id)
return

def _revoke_quota(self, set_statement: str, subject_id: str) -> None:
"""Revoke quota in given database."""
# timestamp to be used
revoked_at = datetime.now()

with self.connection.cursor() as cursor:
cursor.execute(
SET_AVAILABLE_QUOTA_PG,
(self.initial_quota, revoked_at, subject_id, self.subject_type),
)
self.connection.commit()
cursor = self.connection.cursor()
cursor.execute(
set_statement,
(self.initial_quota, revoked_at, subject_id, self.subject_type),
)
self.connection.commit()
cursor.close()

@connection
def increase_quota(self, subject_id: str = "") -> None:
"""Increase quota for given subject."""
if self.subject_type == "c":
subject_id = ""

if self.postgres_connection_config is not None:
self._increase_quota(UPDATE_AVAILABLE_QUOTA_PG, subject_id)
return

if self.sqlite_connection_config is not None:
self._increase_quota(UPDATE_AVAILABLE_QUOTA_SQLITE, subject_id)
return

def _increase_quota(self, set_statement: str, subject_id: str) -> None:
"""Increase quota in given database."""
# timestamp to be used
updated_at = datetime.now()

with self.connection.cursor() as cursor:
cursor.execute(
UPDATE_AVAILABLE_QUOTA_PG,
(self.increase_by, updated_at, subject_id, self.subject_type),
)
self.connection.commit()
cursor = self.connection.cursor()
cursor.execute(
set_statement,
(self.increase_by, updated_at, subject_id, self.subject_type),
)
self.connection.commit()

def ensure_available_quota(self, subject_id: str = "") -> None:
"""Ensure that there's avaiable quota left."""
Expand Down Expand Up @@ -109,17 +147,39 @@ def consume_tokens(
output_tokens,
subject_id,
)
to_be_consumed = input_tokens + output_tokens

with self.connection.cursor() as cursor:
# timestamp to be used
updated_at = datetime.now()
if self.sqlite_connection_config is not None:
self._consume_tokens(
UPDATE_AVAILABLE_QUOTA_SQLITE, input_tokens, output_tokens, subject_id
)
return

cursor.execute(
UPDATE_AVAILABLE_QUOTA_PG,
(-to_be_consumed, updated_at, subject_id, self.subject_type),
if self.postgres_connection_config is not None:
self._consume_tokens(
UPDATE_AVAILABLE_QUOTA_PG, input_tokens, output_tokens, subject_id
)
self.connection.commit()
return

def _consume_tokens(
self,
update_statement: str,
input_tokens: int,
output_tokens: int,
subject_id: str,
) -> None:
"""Consume tokens from selected database."""
# timestamp to be used
updated_at = datetime.now()

to_be_consumed = input_tokens + output_tokens

cursor = self.connection.cursor()
cursor.execute(
update_statement,
(-to_be_consumed, updated_at, subject_id, self.subject_type),
)
self.connection.commit()
cursor.close()

def _initialize_tables(self) -> None:
"""Initialize tables used by quota limiter."""
Expand All @@ -134,9 +194,10 @@ def _init_quota(self, subject_id: str = "") -> None:
# timestamp to be used
revoked_at = datetime.now()

with self.connection.cursor() as cursor:
if self.sqlite_connection_config is not None:
cursor = self.connection.cursor()
cursor.execute(
INIT_QUOTA_PG,
INIT_QUOTA_SQLITE,
(
subject_id,
self.subject_type,
Expand All @@ -145,4 +206,18 @@ def _init_quota(self, subject_id: str = "") -> None:
revoked_at,
),
)
cursor.close()
self.connection.commit()
if self.postgres_connection_config is not None:
with self.connection.cursor() as cursor:
cursor.execute(
INIT_QUOTA_PG,
(
subject_id,
self.subject_type,
self.initial_quota,
self.initial_quota,
revoked_at,
),
)
self.connection.commit()
29 changes: 26 additions & 3 deletions src/quota/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,43 @@
VALUES (%s, %s, %s, %s, %s)
"""

INIT_QUOTA_SQLITE = """
INSERT INTO quota_limits (id, subject, quota_limit, available, revoked_at)
VALUES (?, ?, ?, ?, ?)
"""

SELECT_QUOTA_PG = """
SELECT available
FROM quota_limits
WHERE id=%s and subject=%s LIMIT 1
WHERE id=%s AND subject=%s LIMIT 1
"""

SELECT_QUOTA_SQLITE = """
SELECT available
FROM quota_limits
WHERE id=? AND subject=? LIMIT 1
"""

SET_AVAILABLE_QUOTA_PG = """
UPDATE quota_limits
SET available=%s, revoked_at=%s
WHERE id=%s and subject=%s
WHERE id=%s AND subject=%s
"""

SET_AVAILABLE_QUOTA_SQLITE = """
UPDATE quota_limits
SET available=?, revoked_at=?
WHERE id=? AND subject=?
"""

UPDATE_AVAILABLE_QUOTA_PG = """
UPDATE quota_limits
SET available=available+%s, updated_at=%s
WHERE id=%s and subject=%s
WHERE id=%s AND subject=%s
"""

UPDATE_AVAILABLE_QUOTA_SQLITE = """
UPDATE quota_limits
SET available=available+?, updated_at=?
WHERE id=? AND subject=?
"""
Loading