Skip to content

Commit

Permalink
fix: merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
pradeepranwa1 committed Jul 16, 2024
2 parents 953b686 + 11466e0 commit df0c4e7
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 121 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,4 @@ dmypy.json

.idea/
*.iml
.vscode
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Semantic Versioning Changelog

## [1.1.1](https://github.com/pycasbin/postgresql-watcher/compare/v1.1.0...v1.1.1) (2024-07-16)


### Bug Fixes

* fixed `should_reload` behaviour, close PostgreSQL connections, block until `PostgresqlWatcher` is ready, refactorings ([#29](https://github.com/pycasbin/postgresql-watcher/issues/29)) ([8382db4](https://github.com/pycasbin/postgresql-watcher/commit/8382db4d25825c4d2637dfd68a468dfc4828ae35))

# [1.1.0](https://github.com/pycasbin/postgresql-watcher/compare/v1.0.0...v1.1.0) (2024-07-03)


Expand Down
44 changes: 36 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pip install casbin-postgresql-watcher
```

## Basic Usage Example
### With Flask-authz

```python
from flask_authz import CasbinEnforcer
from postgresql_watcher import PostgresqlWatcher
Expand All @@ -25,25 +25,53 @@ from casbin.persist.adapters import FileAdapter

casbin_enforcer = CasbinEnforcer(app, adapter)
watcher = PostgresqlWatcher(host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME)
watcher.set_update_callback(casbin_enforcer.e.load_policy)
watcher.set_update_callback(casbin_enforcer.load_policy)
casbin_enforcer.set_watcher(watcher)

# Call should_reload before every call of enforce to make sure
# the policy is update to date
watcher.should_reload()
if casbin_enforcer.enforce("alice", "data1", "read"):
# permit alice to read data1
pass
else:
# deny the request, show an error
pass
```

## Basic Usage Example With SSL Enabled
alternatively, if you need more control

See [PostgresQL documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS) for full details of SSL parameters.

### With Flask-authz
```python
from flask_authz import CasbinEnforcer
from postgresql_watcher import PostgresqlWatcher
from flask import Flask
from casbin.persist.adapters import FileAdapter

casbin_enforcer = CasbinEnforcer(app, adapter)
watcher = PostgresqlWatcher(host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME, sslmode="verify_full", sslcert=SSLCERT, sslrootcert=SSLROOTCERT, sslkey=SSLKEY)
watcher.set_update_callback(casbin_enforcer.e.load_policy)
watcher = PostgresqlWatcher(host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME)
casbin_enforcer.set_watcher(watcher)

# Call should_reload before every call of enforce to make sure
# the policy is update to date
if watcher.should_reload():
casbin_enforcer.load_policy()

if casbin_enforcer.enforce("alice", "data1", "read"):
# permit alice to read data1
pass
else:
# deny the request, show an error
pass
```

## Basic Usage Example With SSL Enabled

See [PostgresQL documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS) for full details of SSL parameters.

```python
...
watcher = PostgresqlWatcher(host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME, sslmode="verify_full", sslcert=SSLCERT, sslrootcert=SSLROOTCERT, sslkey=SSLKEY)
...
```


Expand Down
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
black==20.8b1
black==24.4.2
2 changes: 1 addition & 1 deletion postgresql_watcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .watcher import PostgresqlWatcher
from .watcher import PostgresqlWatcher, PostgresqlWatcherChannelSubscriptionTimeoutError
108 changes: 108 additions & 0 deletions postgresql_watcher/casbin_channel_subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from enum import IntEnum
from logging import Logger
from multiprocessing.connection import Connection
from select import select
from signal import signal, SIGINT, SIGTERM
from time import sleep
from typing import Optional

from psycopg2 import connect, extensions, InterfaceError


CASBIN_CHANNEL_SELECT_TIMEOUT = 1 # seconds


def casbin_channel_subscription(
process_conn: Connection,
logger: Logger,
host: str,
user: str,
password: str,
channel_name: str,
port: int = 5432,
dbname: str = "postgres",
delay: int = 2,
sslmode: Optional[str] = None,
sslrootcert: Optional[str] = None,
sslcert: Optional[str] = None,
sslkey: Optional[str] = None,
):
# delay connecting to postgresql (postgresql connection failure)
sleep(delay)
db_connection = connect(
host=host,
port=port,
user=user,
password=password,
dbname=dbname,
sslmode=sslmode,
sslrootcert=sslrootcert,
sslcert=sslcert,
sslkey=sslkey,
)
# Can only receive notifications when not in transaction, set this for easier usage
db_connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
db_cursor = db_connection.cursor()
context_manager = _ConnectionManager(db_connection, db_cursor)

with context_manager:
db_cursor.execute(f"LISTEN {channel_name};")
logger.debug("Waiting for casbin policy update")
process_conn.send(_ChannelSubscriptionMessage.IS_READY)

while not db_cursor.closed:
try:
select_result = select(
[db_connection],
[],
[],
CASBIN_CHANNEL_SELECT_TIMEOUT,
)
if select_result != ([], [], []):
logger.debug("Casbin policy update identified")
db_connection.poll()
while db_connection.notifies:
notify = db_connection.notifies.pop(0)
logger.debug(f"Notify: {notify.payload}")
process_conn.send(_ChannelSubscriptionMessage.RECEIVED_UPDATE)
except (InterfaceError, OSError) as e:
# Log an exception if these errors occurred without the context beeing closed
if not context_manager.connections_were_closed:
logger.critical(e, exc_info=True)
break


class _ChannelSubscriptionMessage(IntEnum):
IS_READY = 1
RECEIVED_UPDATE = 2


class _ConnectionManager:
"""
You can not use 'with' and a connection / cursor directly in this setup.
For more details see this issue: https://github.com/psycopg/psycopg2/issues/941#issuecomment-864025101.
As a workaround this connection manager / context manager class is used, that also handles SIGINT and SIGTERM and
closes the database connection.
"""

def __init__(self, connection, cursor) -> None:
self.connection = connection
self.cursor = cursor
self.connections_were_closed = False

def __enter__(self):
signal(SIGINT, self._close_connections)
signal(SIGTERM, self._close_connections)
return self

def _close_connections(self, *_):
if self.cursor is not None:
self.cursor.close()
self.cursor = None
if self.connection is not None:
self.connection.close()
self.connection = None
self.connections_were_closed = True

def __exit__(self, *_):
self._close_connections()
Loading

0 comments on commit df0c4e7

Please sign in to comment.