Skip to content

Commit

Permalink
Adding repo locks using notes field [RHELDST-14968]
Browse files Browse the repository at this point in the history
There are some opperations in pulp which should not be done in parallel.
Multiple instances of the same task can cause a race condition and
overwrite data.

This change uses a repo's notes field to implement a locking system.
This signals to other tasks that the repo is being worked on and should
not be modified until the lock is removed.
  • Loading branch information
amcmahon-rh committed Mar 22, 2023
1 parent 8bca412 commit 3bcf93b
Show file tree
Hide file tree
Showing 11 changed files with 747 additions and 1 deletion.
83 changes: 83 additions & 0 deletions examples/repo-lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python
from pubtools.pulplib import Client, FakeController, FileRepository
import logging
from argparse import ArgumentParser
from threading import Thread
import os
from time import sleep

log = logging.getLogger("repo_lock")


def set_maintenance(client, maintenance_on, msg):
lock_name = ["pulplib-lock-example"]
maintenance_repo = client.get_repository("redhat-maintenance")
with maintenance_repo.lock("update maintenance repo", duration=600):
log.info(msg)
log.info("Downloading maintenance report")
report = client.get_maintenance_report()
report = report.result()
log.info("Modifying maintenance report")
if maintenance_on:
report = report.add(
lock_name, owner="pulplib-examples",
message="lock demonstration"
)
else:
report = report.remove(lock_name)
client.set_maintenance(report).result()


def make_client(args):
auth = None
if args.fake:
ctrl = FakeController()
ctrl.insert_repository(FileRepository(id="redhat-maintenance"))
return ctrl.client
if args.username:
password = args.password
if password is None:
password = os.environ.get("PULP_PASSWORD")
if not password:
log.warning("No password provided for %s", args.username)
auth = (args.username, args.password)

return Client(args.url, auth=auth, verify=not args.insecure)


def get_args():
parser = ArgumentParser(description="Demonstration of Repo Locking")
parser.add_argument("--url", help="Pulp server URL")
parser.add_argument("--username", help="Pulp username")
parser.add_argument(
"--password", help="Pulp password (or set PULP_PASSWORD in env)"
)
parser.add_argument("--insecure", default=False, action="store_true")
parser.add_argument("--fake", default=False, action="store_true")

return parser.parse_args()


def main():
logging.basicConfig(format="%(message)s", level=logging.INFO)
args = get_args()
client = make_client(args)
add_thread = Thread(
target=set_maintenance,
args=(client, True, "Adding to redhat-maintenance")
)
remove_thread = Thread(
target=set_maintenance,
args=(client, False, "Removing from redhat-maintenance")
)

add_thread.start()
sleep(1)
remove_thread.start()

add_thread.join()
remove_thread.join()


if __name__ == "__main__":
main()
23 changes: 23 additions & 0 deletions pubtools/pulplib/_impl/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from more_executors.futures import f_map, f_flat_map, f_return, f_proxy, f_sequence
from io import StringIO

from ..model.repository.repo_lock import LOCK_CLAIM_STR
from ..page import Page
from ..criteria import Criteria
from ..model import (
Expand Down Expand Up @@ -805,6 +806,28 @@ def _do_unassociate(self, repo_id, criteria=None):
self._do_request, method="POST", url=url, json=body
)

def _get_repo_lock_data(self, repo_id):
repo_raw_f = self._request_executor.submit(
self._do_request,
url=os.path.join(self._url, "pulp/api/v2/repositories/%s/" % repo_id),
method="GET",
)
notes_f = f_map(repo_raw_f, lambda data: (data.get("notes") or {}))
return f_map(
notes_f,
lambda notes: {key: notes[key] for key in notes if LOCK_CLAIM_STR in key},
)

def _update_repo_lock_data(self, repo_id, note_delta, await_result=False):
update_f = self._request_executor.submit(
self._do_request,
url=os.path.join(self._url, "pulp/api/v2/repositories/%s/" % repo_id),
method="PUT",
json={"delta": {"notes": note_delta}},
)
if await_result:
update_f.result()

def _compile_notes(self, repo):
# Given a repo we're about to publish, calculate and set certain notes
# derived from the repo contents.
Expand Down
44 changes: 44 additions & 0 deletions pubtools/pulplib/_impl/fake/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
Publish = namedtuple("Publish", ["repository", "tasks"])
Upload = namedtuple("Upload", ["repository", "tasks", "name", "sha256"])
Sync = namedtuple("Sync", ["repository", "tasks", "sync_config"])
RepoLockRecord = namedtuple("RepoLockRecord", ["repository", "action"])


class FakeClient(object): # pylint:disable = too-many-instance-attributes
Expand Down Expand Up @@ -447,6 +448,49 @@ def _do_unassociate(self, repo_id, criteria=None):

return f_return([task])

def _get_repo_lock_data(self, repo_id):
self._ensure_alive()
data = self.search_repository(Criteria.with_id(repo_id)).result().data
if len(data) != 1:
return f_return_error(PulpException("Repository id=%s not found" % repo_id))
with self._state.lock:
data = (
self._state.repo_locks[repo_id]
if repo_id in self._state.repo_locks
else {}
)
return f_return(data)

def _update_repo_lock_data(self, repo_id, note_delta, await_result=None):
self._ensure_alive()
data = self.search_repository(Criteria.with_id(repo_id)).result().data
if len(data) != 1:
return f_return_error(PulpException("Repository id=%s not found" % repo_id))
with self._state.lock:

data = (
self._state.repo_locks[repo_id]
if repo_id in self._state.repo_locks
else {}
)
for lock_id in note_delta:
data[lock_id] = note_delta[lock_id]

data = {k: data[k] for k in data if data[k] is not None}
self._state.repo_locks[repo_id] = data

if len(note_delta) == 1:
lock_id = list(note_delta)[0]
self._state.repo_lock_history.append(
RepoLockRecord(repo_id, "lock" if note_delta[lock_id] else "unlock")
)
else:
# The only time more than one change should be made is when
# removing multiple expired locks.
self._state.repo_lock_history.append(
RepoLockRecord(repo_id, "multi-unlock")
)

def _request_upload(self, name): # pylint: disable=unused-argument
# Note: old versions had a bug where this function would always
# consume *two* request IDs. We keep that side-effect so that test
Expand Down
8 changes: 8 additions & 0 deletions pubtools/pulplib/_impl/fake/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,11 @@ def insert_task(self, task):
"""
with self._state.lock:
self._state.tasks.append(task)

@property
def repo_lock_history(self):
"""
A list containing all lock actions carried out.
Possible actions are: lock, unlock, multi-unlock
"""
return self._state.repo_lock_history
4 changes: 4 additions & 0 deletions pubtools/pulplib/_impl/fake/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def __init__(self):
self.uuidgen = random.Random()
self.uuidgen.seed(0)
self.unitmaker = units.UnitMaker(self.seen_unit_ids)
# map of repo id => lock claims.
self.repo_locks = {}
# list containing lists of the lock state changes
self.repo_lock_history = []

def insert_repo_units(self, repo_id, units_to_add):
# Insert an iterable of units into a specific repo.
Expand Down
39 changes: 39 additions & 0 deletions pubtools/pulplib/_impl/model/repository/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import logging
import warnings
import json
import os
from functools import partial

from attr import validators, asdict
from frozenlist2 import frozenlist
from more_executors.futures import f_proxy, f_map, f_flat_map

from .repo_lock import RepoLock
from ..attr import pulp_attrib, PULP2_FIELD, PULP2_MUTABLE
from ..common import PulpObject, Deletable, DetachedException
from ..convert import frozenlist_or_none_converter
Expand Down Expand Up @@ -595,6 +597,43 @@ def sync(self, options=None):
)
)

def lock(self, context, duration=None):
"""
Obtain an exclusive advisory lock on this repository.
Returns a context manager representing the lock, intended to be used
via a `with` statement. When the context is entered, the caller will
wait until the lock can be acquired (or raise an exception if the lock
can't be acquired).
Only a single :class:`~pubtools.pulplib.Client` is able to hold the lock
on a repository at any given time. The lock does not prevent modifications
to the repo with the Pulp API, and does not affect other Pulp client
implementations or instances of :class:`~pubtools.pulplib.Client` not
using the `lock` method.
Args:
context:
A short description of the task being carried out with the lock.
This value will be added to the lock in the repo and may be
used for debugging.
duration
Maximum duration of the lock, in seconds.
This value is used only if this client fails to release the
lock (for example, because the current process is killed).
In this case, the duration will be used by other clients in
order to detect and release stale locks, avoiding a deadlock.
There is no way to extend the duration of an acquired lock,
so the caller should always ensure they request a `duration`
high enough to cover the entire expected lifetime of the lock.
"""

return RepoLock(self.id, self._client, context, duration)

def remove_content(self, criteria=None, **kwargs):
"""Remove all content of requested types from this repository.
Expand Down

0 comments on commit 3bcf93b

Please sign in to comment.