Skip to content

Commit

Permalink
Add Postgres backend (#162)
Browse files Browse the repository at this point in the history
* Add docker-compose test

* Update docker-compose for testing

* update docker-compose

* Add psycopg2 as optional backend

* Add PostgresClock

* Update postgres bucket

* Update action

* Update

* limit pg-pool

* disable PostgresClock

* up

* Update readme & bump

* update readme

* up
  • Loading branch information
vutran1710 committed Mar 16, 2024
1 parent 8555053 commit 6ca9da3
Show file tree
Hide file tree
Showing 12 changed files with 465 additions and 114 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/poetry-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 5
pyrate_postgres:
image: bitnami/postgresql
env:
ALLOW_EMPTY_PASSWORD: yes
POSTGRESQL_PASSWORD: postgres
ports:
- 5432:5432
strategy:
fail-fast: true
matrix:
Expand Down
25 changes: 22 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Full project documentation can be found at [pyratelimiter.readthedocs.io](https:
- [InMemoryBucket](#inmemorybucket)
- [SQLiteBucket](#sqlitebucket)
- [RedisBucket](#redisbucket)
- [PostgresBucket](#postgresbucket)
- [Decorator](#decorator)
- [Advanced Usage](#advanced-usage)
- [Component-level Diagram](#component-level-diagram)
Expand Down Expand Up @@ -437,9 +438,10 @@ for _ in range(4):

A few different bucket backends are available:

- InMemoryBucket using python built-in list as bucket
- RedisBucket, using err... redis, with both async/sync support
- SQLite, using sqlite3
- **InMemoryBucket**: using python built-in list as bucket
- **RedisBucket**, using err... redis, with both async/sync support
- **PostgresBucket**, using `psycopg2`
- **SQLiteBucket**, using sqlite3

#### InMemoryBucket

Expand Down Expand Up @@ -504,6 +506,23 @@ table = "my-bucket-table"
bucket = SQLiteBucket(rates, conn, table)
```

#### PostgresBucket

Postgres is supported, but you have to install `psycopg2` or `asyncpg` either as an extra or as a separate package.

You can use Postgres's built-in **CURRENT_TIMESTAMP** as the time source with `PostgresClock`, or use an external custom time source.

```python
from pyrate_limiter import PostgresBucket, Rate, PostgresClock
from psycopg2.pool import ThreadedConnectionPool

connection_pool = ThreadedConnectionPool(5, 10, 'postgresql://postgres:postgres@localhost:5432')

clock = PostgresClock(connection_pool)
rates = [Rate(3, 1000), Rate(4, 1500)]
bucket = PostgresBucket(connection_pool, "my-bucket-table", rates)
```

### Decorator

Limiter can be used as decorator, but you have to provide a `mapping` function that maps the wrapped function's arguments to `limiter.try_acquire` function arguments. The mapping function must return either a tuple of `(str, int)` or just a `str`
Expand Down
38 changes: 38 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: "3.7"

services:
redis-master:
image: bitnami/redis:latest
ports:
- "6379:6379"
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_REPLICATION_MODE=master
- REDIS_REPLICA_PASSWORD=""
networks:
- pyrate-bay

redis-slave:
image: bitnami/redis:latest
ports:
- "6380:6379"
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_MASTER_HOST=redis-master
- REDIS_REPLICATION_MODE=slave
- REDIS_MASTER_PASSWORD=""
networks:
- pyrate-bay

postgres:
image: bitnami/postgresql
ports:
- "5432:5432"
environment:
- POSTGRESQL_PASSWORD=postgres
networks:
- pyrate-bay

networks:
pyrate-bay:
driver: bridge
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Reuse virtualenv created by poetry instead of creating new ones
nox.options.reuse_existing_virtualenvs = True

PYTEST_ARGS = ["--verbose", "-s", "--full-trace", "--maxfail=1", "--numprocesses=auto"]
PYTEST_ARGS = ["--verbose", "--maxfail=1", "--numprocesses=auto"]
COVERAGE_ARGS = ["--cov", "--cov-report=term", "--cov-report=xml", "--cov-report=html"]


Expand Down
255 changes: 146 additions & 109 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyrate-limiter"
version = "3.4.1"
version = "3.5.0"
description = "Python Rate-Limiter using Leaky-Bucket Algorithm"
authors = ["vutr <me@vutr.io>"]
license = "MIT"
Expand Down Expand Up @@ -29,6 +29,7 @@ python = "^3.8"
# Optional backend dependencies
filelock = {optional=true, version=">=3.0"}
redis = {optional=true, version="^5.0.0"}
psycopg2 = {version = "^2.9.9", optional = true}

# Documentation dependencies needed for Readthedocs builds
furo = {optional=true, version="^2022.3.4"}
Expand Down Expand Up @@ -57,6 +58,7 @@ coverage = "6"
[tool.poetry.group.dev.dependencies]
pytest = "^8.1.1"
pytest-asyncio = "^0.23.5.post1"
psycopg2 = "^2.9.9"

[tool.black]
line-length = 120
Expand Down
2 changes: 2 additions & 0 deletions pyrate_limiter/buckets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"""Conrete bucket implementations
"""
from .in_memory_bucket import InMemoryBucket
from .postgres import PostgresBucket
from .postgres import Queries as PgQueries
from .redis_bucket import RedisBucket
from .sqlite_bucket import Queries as SQLiteQueries
from .sqlite_bucket import SQLiteBucket
168 changes: 168 additions & 0 deletions pyrate_limiter/buckets/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""A bucket using PostgreSQL as backend
"""
from __future__ import annotations

from contextlib import contextmanager
from typing import Awaitable
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Union

from ..abstracts import AbstractBucket
from ..abstracts import Rate
from ..abstracts import RateItem

if TYPE_CHECKING:
from psycopg2.pool import AbstractConnectionPool


class Queries:
CREATE_BUCKET_TABLE = """
CREATE TABLE IF NOT EXISTS {table} (
name VARCHAR,
weight SMALLINT,
item_timestamp TIMESTAMP
)
"""
CREATE_INDEX_ON_TIMESTAMP = """
CREATE INDEX IF NOT EXISTS {index} ON {table} (item_timestamp)
"""
COUNT = """
SELECT COUNT(*) FROM {table}
"""
PUT = """
INSERT INTO {table} (name, weight, item_timestamp) VALUES (%s, %s, TO_TIMESTAMP(%s))
"""
FLUSH = """
DELETE FROM {table}
"""
PEEK = """
SELECT name, weight, (extract(EPOCH FROM item_timestamp) * 1000) as item_timestamp
FROM {table}
ORDER BY item_timestamp DESC
LIMIT 1
OFFSET {offset}
"""
LEAK = """
DELETE FROM {table} WHERE item_timestamp < TO_TIMESTAMP({timestamp})
"""
LEAK_COUNT = """
SELECT COUNT(*) FROM {table} WHERE item_timestamp < TO_TIMESTAMP({timestamp})
"""


class PostgresBucket(AbstractBucket):
table: str
pool: AbstractConnectionPool

def __init__(self, pool: AbstractConnectionPool, table: str, rates: List[Rate]):
self.table = table.lower()
self.pool = pool
assert rates
self.rates = rates
self._full_tbl = f'ratelimit___{self.table}'
self._create_table()

@contextmanager
def _get_conn(self, autocommit=False):
with self.pool._getconn() as conn:
with conn.cursor() as cur:
yield cur

if autocommit:
conn.commit()

self.pool._putconn(conn)

def _create_table(self):
with self._get_conn(autocommit=True) as cur:
cur.execute(Queries.CREATE_BUCKET_TABLE.format(table=self._full_tbl))
index_name = f'timestampIndex_{self.table}'
cur.execute(Queries.CREATE_INDEX_ON_TIMESTAMP.format(table=self._full_tbl, index=index_name))

def put(self, item: RateItem) -> Union[bool, Awaitable[bool]]:
"""Put an item (typically the current time) in the bucket
return true if successful, otherwise false
"""
if item.weight == 0:
return True

with self._get_conn(autocommit=True) as cur:
for rate in self.rates:
bound = f"SELECT TO_TIMESTAMP({item.timestamp / 1000}) - INTERVAL '{rate.interval} milliseconds'"
query = f'SELECT COUNT(*) FROM {self._full_tbl} WHERE item_timestamp >= ({bound})'
cur.execute(query)
count = int(cur.fetchone()[0])

if rate.limit - count < item.weight:
self.failing_rate = rate
return False

self.failing_rate = None

query = Queries.PUT.format(table=self._full_tbl)
arguments = [(item.name, item.weight, item.timestamp / 1000)] * item.weight
cur.executemany(query, tuple(arguments))

return True

def leak(
self,
current_timestamp: Optional[int] = None,
) -> Union[int, Awaitable[int]]:
"""leaking bucket - removing items that are outdated"""
assert current_timestamp is not None, "current-time must be passed on for leak"
lower_bound = current_timestamp - self.rates[-1].interval

if lower_bound <= 0:
return 0

count = 0

with self._get_conn(autocommit=True) as cur:
cur.execute(Queries.LEAK_COUNT.format(table=self._full_tbl, timestamp=lower_bound / 1000))
result = cur.fetchone()

if result:
cur.execute(Queries.LEAK.format(table=self._full_tbl, timestamp=lower_bound / 1000))
count = int(result[0])

return count

def flush(self) -> Union[None, Awaitable[None]]:
"""Flush the whole bucket
- Must remove `failing-rate` after flushing
"""
with self._get_conn(autocommit=True) as cur:
cur.execute(Queries.FLUSH.format(table=self._full_tbl))
self.failing_rate = None

return None

def count(self) -> Union[int, Awaitable[int]]:
"""Count number of items in the bucket"""
count = 0
with self._get_conn() as cur:
cur.execute(Queries.COUNT.format(table=self._full_tbl))
result = cur.fetchone()
assert result
count = int(result[0])

return count

def peek(self, index: int) -> Union[Optional[RateItem], Awaitable[Optional[RateItem]]]:
"""Peek at the rate-item at a specific index in latest-to-earliest order
NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is
we can't really tell how many outdated items are still in the queue
"""
item = None

with self._get_conn() as cur:
cur.execute(Queries.PEEK.format(table=self._full_tbl, offset=index))
result = cur.fetchone()
if result:
name, weight, timestamp = result[0], int(result[1]), int(result[2])
item = RateItem(name=name, weight=weight, timestamp=timestamp)

return item
31 changes: 31 additions & 0 deletions pyrate_limiter/clocks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
"""Clock implementation using different backend
"""
from __future__ import annotations

import sqlite3
from time import monotonic
from time import time
from typing import TYPE_CHECKING

from .abstracts import AbstractClock
from .exceptions import PyrateClockException
from .utils import dedicated_sqlite_clock_connection

if TYPE_CHECKING:
from psycopg2.pool import AbstractConnectionPool


class MonotonicClock(AbstractClock):
def __init__(self):
Expand Down Expand Up @@ -45,3 +52,27 @@ def default(cls):
def now(self) -> int:
now = self.conn.execute(self.time_query).fetchone()[0]
return int(now)


class PostgresClock(AbstractClock):
"""Get timestamp using Postgres as remote clock backend"""

def __init__(self, pool: 'AbstractConnectionPool'):
self.pool = pool

def now(self) -> int:
value = 0

with self.pool._getconn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT EXTRACT(epoch FROM current_timestamp) * 1000")
result = cur.fetchone()

if not result:
raise PyrateClockException(self, detail=f"invalid result from query current-timestamp: {result}")

value = int(result[0])

self.pool._putconn(conn)

return value
6 changes: 6 additions & 0 deletions pyrate_limiter/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ def __init__(self, item: RateItem, rate: Rate, actual_delay: int, max_delay: int
"actual_delay": actual_delay,
}
super().__init__(error)


class PyrateClockException(Exception):
def __init__(self, clock: object, detail=None):
error = f"Clock({repr(clock)}) is failing: {detail}"
super().__init__(error)

0 comments on commit 6ca9da3

Please sign in to comment.