Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/unit/quota/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Unit tests for quota limiters."""
173 changes: 173 additions & 0 deletions tests/unit/quota/test_cluster_quota_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Unit tests for ClusterQuotaLimiter class."""

import pytest

from models.config import (
QuotaLimiterConfiguration,
SQLiteDatabaseConfiguration,
QuotaHandlersConfiguration,
)
from quota.cluster_quota_limiter import ClusterQuotaLimiter
from quota.quota_exceed_error import QuotaExceedError

# pylint: disable=protected-access


def create_quota_limiter(
name: str, initial_quota: int, quota_limit: int
) -> ClusterQuotaLimiter:
"""Create new quota limiter instance."""
configuration = QuotaHandlersConfiguration()
configuration.sqlite = SQLiteDatabaseConfiguration(
db_path=":memory:",
)
configuration.limiters = [
QuotaLimiterConfiguration(
type="cluster_limiter",
name=name,
initial_quota=quota_limit,
quota_increase=1,
period="5 days",
),
]
quota_limiter = ClusterQuotaLimiter(configuration, initial_quota, 1)
assert quota_limiter is not None
return quota_limiter


def test_connected():
"""Test the connected method."""
initial_quota = 1000
quota_limit = 100

quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit)
assert quota_limiter.connected()


def test_init_quota():
"""Test the init quota operation."""
initial_quota = 1000
quota_limit = 100

quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit)

# init quota for given cluster
quota_limiter._init_quota()

assert (
str(quota_limiter) == "ClusterQuotaLimiter: initial quota: 1000 increase by: 1"
)


def test_available_quota():
"""Test the available quota operation."""
initial_quota = 1000
quota_limit = 100

quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit)

# init quota for given cluster
quota_limiter._init_quota()

available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota


def test_consume_tokens():
"""Test the consume tokens operation."""
initial_quota = 1000
quota_limit = 100

quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit)

# init quota for given cluster
quota_limiter._init_quota()

available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota

quota_limiter.consume_tokens(0, 1, "foo")

available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota - 1

quota_limiter.consume_tokens(1, 0, "foo")

available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota - 2

quota_limiter.consume_tokens(1, 1, "foo")

available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota - 4


def test_increase_quota():
"""Test the increase_quota operation."""
initial_quota = 1000
quota_limit = 100

quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit)

# init quota for given cluster
quota_limiter._init_quota()

available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota

quota_limiter.consume_tokens(1, 1, "foo")
available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota - 2

quota_limiter.increase_quota("foo")
available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota - 1


def test_ensure_available_quota():
"""Test the ensure_available_quota operation."""
initial_quota = 1000
quota_limit = 100

quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit)

# init quota for given cluster
quota_limiter._init_quota()

quota_limiter.ensure_available_quota("foo")


def test_ensure_available_quota_no_quota():
"""Test the ensure_available_quota operation."""
initial_quota = 0
quota_limit = 100

quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit)

# init quota for given cluster
quota_limiter._init_quota()

with pytest.raises(QuotaExceedError, match="Cluster has no available tokens"):
quota_limiter.ensure_available_quota("foo")


def test_revoke_quota():
"""Test the revoke_quota operation."""
initial_quota = 1000
quota_limit = 100

quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit)

# init quota for given cluster
quota_limiter._init_quota()

available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota

quota_limiter.consume_tokens(1, 1, "foo")
available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota - 2

quota_limiter.revoke_quota("foo")
available_quota = quota_limiter.available_quota("foo")
assert available_quota == initial_quota
38 changes: 38 additions & 0 deletions tests/unit/quota/test_connect_pg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Unit tests for PostgreSQL connection handler."""

import pytest
from pytest_mock import MockerFixture

from psycopg2 import OperationalError

from quota.connect_pg import connect_pg
from models.config import PostgreSQLDatabaseConfiguration


def test_connect_pg_when_connection_established(mocker: MockerFixture):
"""Test the connection to PostgreSQL database."""
# any correct PostgreSQL configuration can be used
configuration = PostgreSQLDatabaseConfiguration(
db="db", user="user", password="password"
)

# do not use connection to real PostgreSQL instance
mocker.patch("psycopg2.connect")

# connection should be established
connection = connect_pg(configuration)
assert connection is not None


def test_connect_pg_when_connection_error(mocker: MockerFixture):
"""Test the connection to PostgreSQL database."""
# any correct PostgreSQL configuration can be used
configuration = PostgreSQLDatabaseConfiguration(
host="foo", db="db", user="user", password="password"
)

# do not use connection to real PostgreSQL instance
mocker.patch("psycopg2.connect", side_effect=OperationalError("ERROR"))
with pytest.raises(OperationalError, match="ERROR"):
# connection should not be established
_ = connect_pg(configuration)
26 changes: 26 additions & 0 deletions tests/unit/quota/test_connect_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Unit tests for SQLite connection handler."""

from sqlite3 import OperationalError

import pytest

from quota.connect_sqlite import connect_sqlite
from models.config import SQLiteDatabaseConfiguration


def test_connect_sqlite_when_connection_established():
"""Test the connection to SQLite database residing in memory."""
configuration = SQLiteDatabaseConfiguration(db_path=":memory:")

# connection should be established
connection = connect_sqlite(configuration)
assert connection is not None


def test_connect_sqlite_when_connection_error():
"""Test the connection to SQLite database."""
configuration = SQLiteDatabaseConfiguration(db_path="/")

# connection should not be established
with pytest.raises(OperationalError, match="unable to open database file"):
_ = connect_sqlite(configuration)
14 changes: 7 additions & 7 deletions tests/unit/quota/test_quota_limiter_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_quota_limiters_no_storage():
configuration.postgres = None
configuration.limiters = []
limiters = QuotaLimiterFactory.quota_limiters(configuration)
assert limiters == []
assert not limiters


def test_quota_limiters_no_limiters_pg_storage():
Expand All @@ -32,7 +32,7 @@ def test_quota_limiters_no_limiters_pg_storage():
)
configuration.limiters = None
limiters = QuotaLimiterFactory.quota_limiters(configuration)
assert limiters == []
assert not limiters


def test_quota_limiters_no_limiters_sqlite_storage():
Expand All @@ -43,7 +43,7 @@ def test_quota_limiters_no_limiters_sqlite_storage():
)
configuration.limiters = None
limiters = QuotaLimiterFactory.quota_limiters(configuration)
assert limiters == []
assert not limiters


def test_quota_limiters_empty_limiters_pg_storage():
Expand All @@ -54,7 +54,7 @@ def test_quota_limiters_empty_limiters_pg_storage():
)
configuration.limiters = []
limiters = QuotaLimiterFactory.quota_limiters(configuration)
assert limiters == []
assert not limiters


def test_quota_limiters_empty_limiters_sqlite_storage():
Expand All @@ -65,7 +65,7 @@ def test_quota_limiters_empty_limiters_sqlite_storage():
)
configuration.limiters = []
limiters = QuotaLimiterFactory.quota_limiters(configuration)
assert limiters == []
assert not limiters


def test_quota_limiters_user_quota_limiter_postgres_storage(mocker: MockerFixture):
Expand All @@ -90,7 +90,7 @@ def test_quota_limiters_user_quota_limiter_postgres_storage(mocker: MockerFixtur
assert isinstance(limiters[0], UserQuotaLimiter)


def test_quota_limiters_user_quota_limiter_sqlite_storage(mocker: MockerFixture):
def test_quota_limiters_user_quota_limiter_sqlite_storage():
"""Test the quota limiters creating when one limiter is specified."""
configuration = QuotaHandlersConfiguration()
configuration.sqlite = SQLiteDatabaseConfiguration(
Expand Down Expand Up @@ -132,7 +132,7 @@ def test_quota_limiters_cluster_quota_limiter_postgres_storage(mocker: MockerFix
assert isinstance(limiters[0], ClusterQuotaLimiter)


def test_quota_limiters_cluster_quota_limiter_sqlite_storage(mocker: MockerFixture):
def test_quota_limiters_cluster_quota_limiter_sqlite_storage():
"""Test the quota limiters creating when one limiter is specified."""
configuration = QuotaHandlersConfiguration()
configuration.sqlite = SQLiteDatabaseConfiguration(
Expand Down
Loading
Loading