diff --git a/tests/unit/quota/__init__.py b/tests/unit/quota/__init__.py new file mode 100644 index 00000000..496c96a5 --- /dev/null +++ b/tests/unit/quota/__init__.py @@ -0,0 +1 @@ +"""Unit tests for quota limiters.""" diff --git a/tests/unit/quota/test_cluster_quota_limiter.py b/tests/unit/quota/test_cluster_quota_limiter.py new file mode 100644 index 00000000..f842c396 --- /dev/null +++ b/tests/unit/quota/test_cluster_quota_limiter.py @@ -0,0 +1,173 @@ +"""Unit tests for ClusterQuotaLimiter class.""" + +import pytest + +from models.config import ( + QuotaLimiterConfiguration, + SQLiteDatabaseConfiguration, + QuotaHandlersConfiguration, +) +from quota.cluster_quota_limiter import ClusterQuotaLimiter +from quota.quota_exceed_error import QuotaExceedError + +# pylint: disable=protected-access + + +def create_quota_limiter( + name: str, initial_quota: int, quota_limit: int +) -> ClusterQuotaLimiter: + """Create new quota limiter instance.""" + configuration = QuotaHandlersConfiguration() + configuration.sqlite = SQLiteDatabaseConfiguration( + db_path=":memory:", + ) + configuration.limiters = [ + QuotaLimiterConfiguration( + type="cluster_limiter", + name=name, + initial_quota=quota_limit, + quota_increase=1, + period="5 days", + ), + ] + quota_limiter = ClusterQuotaLimiter(configuration, initial_quota, 1) + assert quota_limiter is not None + return quota_limiter + + +def test_connected(): + """Test the connected method.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + assert quota_limiter.connected() + + +def test_init_quota(): + """Test the init quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + assert ( + str(quota_limiter) == "ClusterQuotaLimiter: initial quota: 1000 increase by: 1" + ) + + +def test_available_quota(): + """Test the available quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota + + +def test_consume_tokens(): + """Test the consume tokens operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota + + quota_limiter.consume_tokens(0, 1, "foo") + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 1 + + quota_limiter.consume_tokens(1, 0, "foo") + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 2 + + quota_limiter.consume_tokens(1, 1, "foo") + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 4 + + +def test_increase_quota(): + """Test the increase_quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota + + quota_limiter.consume_tokens(1, 1, "foo") + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 2 + + quota_limiter.increase_quota("foo") + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 1 + + +def test_ensure_available_quota(): + """Test the ensure_available_quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + quota_limiter.ensure_available_quota("foo") + + +def test_ensure_available_quota_no_quota(): + """Test the ensure_available_quota operation.""" + initial_quota = 0 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + with pytest.raises(QuotaExceedError, match="Cluster has no available tokens"): + quota_limiter.ensure_available_quota("foo") + + +def test_revoke_quota(): + """Test the revoke_quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota + + quota_limiter.consume_tokens(1, 1, "foo") + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 2 + + quota_limiter.revoke_quota("foo") + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota diff --git a/tests/unit/quota/test_connect_pg.py b/tests/unit/quota/test_connect_pg.py new file mode 100644 index 00000000..3d6fb7a9 --- /dev/null +++ b/tests/unit/quota/test_connect_pg.py @@ -0,0 +1,38 @@ +"""Unit tests for PostgreSQL connection handler.""" + +import pytest +from pytest_mock import MockerFixture + +from psycopg2 import OperationalError + +from quota.connect_pg import connect_pg +from models.config import PostgreSQLDatabaseConfiguration + + +def test_connect_pg_when_connection_established(mocker: MockerFixture): + """Test the connection to PostgreSQL database.""" + # any correct PostgreSQL configuration can be used + configuration = PostgreSQLDatabaseConfiguration( + db="db", user="user", password="password" + ) + + # do not use connection to real PostgreSQL instance + mocker.patch("psycopg2.connect") + + # connection should be established + connection = connect_pg(configuration) + assert connection is not None + + +def test_connect_pg_when_connection_error(mocker: MockerFixture): + """Test the connection to PostgreSQL database.""" + # any correct PostgreSQL configuration can be used + configuration = PostgreSQLDatabaseConfiguration( + host="foo", db="db", user="user", password="password" + ) + + # do not use connection to real PostgreSQL instance + mocker.patch("psycopg2.connect", side_effect=OperationalError("ERROR")) + with pytest.raises(OperationalError, match="ERROR"): + # connection should not be established + _ = connect_pg(configuration) diff --git a/tests/unit/quota/test_connect_sqlite.py b/tests/unit/quota/test_connect_sqlite.py new file mode 100644 index 00000000..2fa21e3f --- /dev/null +++ b/tests/unit/quota/test_connect_sqlite.py @@ -0,0 +1,26 @@ +"""Unit tests for SQLite connection handler.""" + +from sqlite3 import OperationalError + +import pytest + +from quota.connect_sqlite import connect_sqlite +from models.config import SQLiteDatabaseConfiguration + + +def test_connect_sqlite_when_connection_established(): + """Test the connection to SQLite database residing in memory.""" + configuration = SQLiteDatabaseConfiguration(db_path=":memory:") + + # connection should be established + connection = connect_sqlite(configuration) + assert connection is not None + + +def test_connect_sqlite_when_connection_error(): + """Test the connection to SQLite database.""" + configuration = SQLiteDatabaseConfiguration(db_path="/") + + # connection should not be established + with pytest.raises(OperationalError, match="unable to open database file"): + _ = connect_sqlite(configuration) diff --git a/tests/unit/quota/test_quota_limiter_factory.py b/tests/unit/quota/test_quota_limiter_factory.py index 3fc4bd6f..d00b344d 100644 --- a/tests/unit/quota/test_quota_limiter_factory.py +++ b/tests/unit/quota/test_quota_limiter_factory.py @@ -21,7 +21,7 @@ def test_quota_limiters_no_storage(): configuration.postgres = None configuration.limiters = [] limiters = QuotaLimiterFactory.quota_limiters(configuration) - assert limiters == [] + assert not limiters def test_quota_limiters_no_limiters_pg_storage(): @@ -32,7 +32,7 @@ def test_quota_limiters_no_limiters_pg_storage(): ) configuration.limiters = None limiters = QuotaLimiterFactory.quota_limiters(configuration) - assert limiters == [] + assert not limiters def test_quota_limiters_no_limiters_sqlite_storage(): @@ -43,7 +43,7 @@ def test_quota_limiters_no_limiters_sqlite_storage(): ) configuration.limiters = None limiters = QuotaLimiterFactory.quota_limiters(configuration) - assert limiters == [] + assert not limiters def test_quota_limiters_empty_limiters_pg_storage(): @@ -54,7 +54,7 @@ def test_quota_limiters_empty_limiters_pg_storage(): ) configuration.limiters = [] limiters = QuotaLimiterFactory.quota_limiters(configuration) - assert limiters == [] + assert not limiters def test_quota_limiters_empty_limiters_sqlite_storage(): @@ -65,7 +65,7 @@ def test_quota_limiters_empty_limiters_sqlite_storage(): ) configuration.limiters = [] limiters = QuotaLimiterFactory.quota_limiters(configuration) - assert limiters == [] + assert not limiters def test_quota_limiters_user_quota_limiter_postgres_storage(mocker: MockerFixture): @@ -90,7 +90,7 @@ def test_quota_limiters_user_quota_limiter_postgres_storage(mocker: MockerFixtur assert isinstance(limiters[0], UserQuotaLimiter) -def test_quota_limiters_user_quota_limiter_sqlite_storage(mocker: MockerFixture): +def test_quota_limiters_user_quota_limiter_sqlite_storage(): """Test the quota limiters creating when one limiter is specified.""" configuration = QuotaHandlersConfiguration() configuration.sqlite = SQLiteDatabaseConfiguration( @@ -132,7 +132,7 @@ def test_quota_limiters_cluster_quota_limiter_postgres_storage(mocker: MockerFix assert isinstance(limiters[0], ClusterQuotaLimiter) -def test_quota_limiters_cluster_quota_limiter_sqlite_storage(mocker: MockerFixture): +def test_quota_limiters_cluster_quota_limiter_sqlite_storage(): """Test the quota limiters creating when one limiter is specified.""" configuration = QuotaHandlersConfiguration() configuration.sqlite = SQLiteDatabaseConfiguration( diff --git a/tests/unit/quota/test_user_quota_limiter.py b/tests/unit/quota/test_user_quota_limiter.py new file mode 100644 index 00000000..3f17d6c5 --- /dev/null +++ b/tests/unit/quota/test_user_quota_limiter.py @@ -0,0 +1,171 @@ +"""Unit tests for UserQuotaLimiter class.""" + +import pytest + +from models.config import ( + QuotaLimiterConfiguration, + SQLiteDatabaseConfiguration, + QuotaHandlersConfiguration, +) +from quota.user_quota_limiter import UserQuotaLimiter +from quota.quota_exceed_error import QuotaExceedError + +# pylint: disable=protected-access + + +def create_quota_limiter( + name: str, initial_quota: int, quota_limit: int +) -> UserQuotaLimiter: + """Create new quota limiter instance.""" + configuration = QuotaHandlersConfiguration() + configuration.sqlite = SQLiteDatabaseConfiguration( + db_path=":memory:", + ) + configuration.limiters = [ + QuotaLimiterConfiguration( + type="user_limiter", + name=name, + initial_quota=quota_limit, + quota_increase=1, + period="5 days", + ), + ] + quota_limiter = UserQuotaLimiter(configuration, initial_quota, 1) + assert quota_limiter is not None + return quota_limiter + + +def test_connected(): + """Test the connected method.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + assert quota_limiter.connected() + + +def test_init_quota(): + """Test the init quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given user + quota_limiter._init_quota() + + assert str(quota_limiter) == "UserQuotaLimiter: initial quota: 1000 increase by: 1" + + +def test_available_quota(): + """Test the available quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota + + +def test_consume_tokens(): + """Test the consume tokens operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota + + quota_limiter.consume_tokens(0, 1, "foo") + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 1 + + quota_limiter.consume_tokens(1, 0, "foo") + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 2 + + quota_limiter.consume_tokens(1, 1, "foo") + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 4 + + +def test_increase_quota(): + """Test the increase_quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota + + quota_limiter.consume_tokens(1, 1, "foo") + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 2 + + quota_limiter.increase_quota("foo") + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 1 + + +def test_ensure_available_quota(): + """Test the ensure_available_quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + quota_limiter.ensure_available_quota("foo") + + +def test_ensure_available_quota_no_quota(): + """Test the ensure_available_quota operation.""" + initial_quota = 0 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + with pytest.raises(QuotaExceedError, match="User foo has no available tokens"): + quota_limiter.ensure_available_quota("foo") + + +def test_revoke_quota(): + """Test the revoke_quota operation.""" + initial_quota = 1000 + quota_limit = 100 + + quota_limiter = create_quota_limiter("foo", initial_quota, quota_limit) + + # init quota for given cluster + quota_limiter._init_quota() + + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota + + quota_limiter.consume_tokens(1, 1, "foo") + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota - 2 + + quota_limiter.revoke_quota("foo") + available_quota = quota_limiter.available_quota("foo") + assert available_quota == initial_quota