Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update psycopg to version 3 #3751

Merged
merged 1 commit into from May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/plugin_api/3792.feature
@@ -0,0 +1 @@
Bumped the database backend from psycopg2 to psycopg3.
4 changes: 2 additions & 2 deletions pulpcore/app/models/task.py
Expand Up @@ -192,15 +192,15 @@ def __str__(self):
def __enter__(self):
self.lock = _uuid_to_advisory_lock(self.pk.int)
with connection.cursor() as cursor:
cursor.execute("SELECT pg_try_advisory_lock(%s);", [self.lock])
cursor.execute("SELECT pg_try_advisory_lock(%s)", [self.lock])
acquired = cursor.fetchone()[0]
if not acquired:
raise AdvisoryLockError("Could not acquire lock.")
return self

def __exit__(self, exc_type, exc_value, traceback):
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_unlock(%s);", [self.lock])
cursor.execute("SELECT pg_advisory_unlock(%s)", [self.lock])
released = cursor.fetchone()[0]
if not released:
raise RuntimeError("Lock not held.")
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/app/tasks/analytics.py
Expand Up @@ -52,7 +52,7 @@ def get_analytics_posting_url():


def _get_postgresql_version_string():
return connection.cursor().connection.server_version
return connection.cursor().connection.info.server_version


async def _postgresql_version(analytics):
Expand Down
48 changes: 25 additions & 23 deletions pulpcore/tasking/pulpcore_worker.py
Expand Up @@ -73,23 +73,28 @@ def __init__(self, lock, lock_group=0):

def __enter__(self):
with connection.cursor() as cursor:
cursor.execute("SELECT pg_try_advisory_lock(%s, %s);", [self.lock_group, self.lock])
cursor.execute("SELECT pg_try_advisory_lock(%s, %s)", [self.lock_group, self.lock])
acquired = cursor.fetchone()[0]
if not acquired:
raise AdvisoryLockError("Could not acquire lock.")
return self

def __exit__(self, exc_type, exc_value, traceback):
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_unlock(%s, %s);", [self.lock_group, self.lock])
cursor.execute("SELECT pg_advisory_unlock(%s, %s)", [self.lock_group, self.lock])
released = cursor.fetchone()[0]
if not released:
raise RuntimeError("Lock not held.")


class NewPulpWorker:
def __init__(self):
# Notification states from several signal handlers
self.shutdown_requested = False
self.wakeup = False
self.cancel_task = False

self.task = None
self.name = f"{os.getpid()}@{socket.getfqdn()}"
self.heartbeat_period = settings.WORKER_TTL / 3
self.versions = {app.label: app.version for app in pulp_plugin_configs()}
Expand Down Expand Up @@ -119,6 +124,13 @@ def _signal_handler(self, thesignal, frame):
self.task_grace_timeout = TASK_GRACE_INTERVAL
self.shutdown_requested = True

def _pg_notify_handler(self, notification):
if notification.channel == "pulp_worker_wakeup":
self.wakeup = True
elif self.task and notification.channel == "pulp_worker_cancel":
if notification.payload == str(self.task.pk):
self.cancel_task = True

def handle_worker_heartbeat(self):
"""
Create or update worker heartbeat records.
Expand Down Expand Up @@ -273,32 +285,24 @@ def sleep(self):
"""Wait for signals on the wakeup channel while heart beating."""

_logger.debug(_("Worker %s entering sleep state."), self.name)
wakeup = False
while not self.shutdown_requested:
# Handle all notifications before sleeping in `select`
while connection.connection.notifies:
item = connection.connection.notifies.pop(0)
if item.channel == "pulp_worker_wakeup":
_logger.debug(_("Worker %s received wakeup call."), self.name)
wakeup = True
# ignore all other notifications
if wakeup:
break

while not self.shutdown_requested and not self.wakeup:
r, w, x = select.select(
[self.sentinel, connection.connection], [], [], self.heartbeat_period
)
self.beat()
if connection.connection in r:
connection.connection.poll()
connection.connection.execute("SELECT 1")
if self.sentinel in r:
os.read(self.sentinel, 256)
self.wakeup = False

def supervise_task(self, task):
"""Call and supervise the task process while heart beating.

This function must only be called while holding the lock for that task."""

self.cancel_task = False
self.task = task
task.worker = self.worker
task.save(update_fields=["worker"])
cancel_state = None
Expand All @@ -307,13 +311,6 @@ def supervise_task(self, task):
task_process = Process(target=_perform_task, args=(task.pk, task_working_dir_rel_path))
task_process.start()
while True:
# Handle all notifications before sleeping in `select`
while connection.connection.notifies:
item = connection.connection.notifies.pop(0)
if item.channel == "pulp_worker_cancel" and item.payload == str(task.pk):
_logger.info(_("Received signal to cancel current task %s."), task.pk)
cancel_state = TASK_STATES.CANCELED
# ignore all other notifications
if cancel_state:
if self.task_grace_timeout > 0:
_logger.info("Wait for canceled task to abort.")
Expand All @@ -330,7 +327,10 @@ def supervise_task(self, task):
)
self.beat()
if connection.connection in r:
connection.connection.poll()
connection.connection.execute("SELECT 1")
if self.cancel_task:
_logger.info(_("Received signal to cancel current task %s."), task.pk)
cancel_state = TASK_STATES.CANCELED
if task_process.sentinel in r:
if not task_process.is_alive():
break
Expand Down Expand Up @@ -365,12 +365,14 @@ def supervise_task(self, task):
self.cancel_abandoned_task(task, cancel_state, cancel_reason)
if task.reserved_resources_record:
self.notify_workers()
self.task = None

def run_forever(self):
with WorkerDirectory(self.name):
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# Subscribe to pgsql channels
connection.connection.add_notify_handler(self._pg_notify_handler)
self.cursor.execute("LISTEN pulp_worker_wakeup")
self.cursor.execute("LISTEN pulp_worker_cancel")
while not self.shutdown_requested:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -22,7 +22,7 @@ naya>=1.1.1,<=1.1.1
pulp-glue>=0.18.0,<0.20
protobuf>=4.21.1,<4.22.4
pygtrie>=2.5,<=2.5.0
psycopg2>=2.9.3,<2.9.7
psycopg[binary]>=3.1.8,<=3.1.9
PyYAML>=5.1.1,<=6.0.0
python-gnupg>=0.5,<=0.5.0
redis>=4.3,<4.5.5
Expand Down