Skip to content

Commit

Permalink
Refactored locking implementation.
Browse files Browse the repository at this point in the history
We have refactored the locking implementation in the worker
as a self-contained package "packages/openlock". The package is
also available on github

https://github.com/vdbergh/openlock

and on PyPi

https://pypi.org/search/?q=openlock

This PR cleans up the worker code considerably.
  • Loading branch information
vdbergh committed Jun 26, 2024
1 parent 322eea2 commit c9f9f89
Show file tree
Hide file tree
Showing 9 changed files with 545 additions and 129 deletions.
21 changes: 21 additions & 0 deletions worker/packages/openlock/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 The Fishtest team

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
34 changes: 34 additions & 0 deletions worker/packages/openlock/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# openlock

A locking library not depending on inter-process locking primitives in the OS.

## API

- `FileLock(lock_file, timeout=None)`. Constructor. The optional `timeout` argument is the default for the corresponding argument of `acquire()` (see below). A `FileLock` object supports the context manager protocol.
- `FileLock.acquire(timeout=None)`. Attempts to acquire the lock. The optional `timeout` argument specifies the maximum waiting time in seconds before a `Timeout` exception is raised.
- `FileLock.release()`. Releases the lock. May raise an `InvalidRelease` exception.
- `FileLock.locked()`. Indicates if the lock is held by a process.
- `FileLock.getpid()`. The PID of the process that holds the lock, if any. Otherwise returns `None`.
- `openlock.set_defaults(**kw)`. Sets default values for the internal parameters. Currently `tries`, `retry_period`, `race_delay` and `slow_system_exception` with values of 2, 0.3s, 0.5s and False respectively.
- `openlock.get_defaults()`. Returns a dictionary with the default values for the internal parameters.

## How does it work

A valid lock file has two lines of text containing respectively:

- `pid`: the PID of the process holding the lock;
- `name`: the content of `argv[0]` of the process holding the lock.

A lock file is considered stale if the pair `(pid, name)` does not belong to a Python process in the process table.

A process that seeks to acquire a lock first looks for an existing valid lock file. If it exists then this means that the lock has already been acquired and the process will periodically retry to acquire it - subject to the `timeout` parameter. If there is no lock file, or if it is stale or unparsable, then the process atomically creates a new lock file with its own data. It sleeps 0.5 seconds (configurable) and then checks if the lock file has been overwritten by a different process. If not then it has acquired the lock.

## Issues

- The algorithm fails if a process needs more than 0.5 seconds to create a new lock file after detecting the absence of a valid one. The library will issue a warning if it thinks the system is too slow for the algorithm to work correctly and it will recommend to increase the value of the `race_delay` parameter. Note that the current value of 0.5 seconds is extremely conservative.

- Although it is very unlikely, it may be that the data `(pid, name)` matches a different Python process since PIDs are only unique over the lifetime of a process. In that case the algorithm fails to recognize the lock file as stale.

## History

This is a refactored version of the locking algorithm used by the worker for the Fishtest web application <https://tests.stockfishchess.org/tests>.
13 changes: 13 additions & 0 deletions worker/packages/openlock/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .openlock import ( # noqa: F401
FileLock,
InvalidLockFile,
InvalidOption,
InvalidRelease,
OpenLockException,
SlowSystem,
Timeout,
__version__,
get_defaults,
logger,
set_defaults,
)
29 changes: 29 additions & 0 deletions worker/packages/openlock/_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import sys
import time

from openlock import FileLock, Timeout


def other_process1(lock_file):
r = FileLock(lock_file)
try:
r.acquire(timeout=0)
except Timeout:
return 1
return 0


def other_process2(lock_file):
r = FileLock(lock_file)
r.acquire(timeout=0)
time.sleep(2)
return 2


if __name__ == "__main__":
lock_file = sys.argv[1]
cmd = sys.argv[2]
if cmd == "1":
print(other_process1(lock_file))
else:
print(other_process2(lock_file))
256 changes: 256 additions & 0 deletions worker/packages/openlock/openlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
import atexit
import copy
import logging
import os
import platform
import subprocess
import sys
import tempfile
import threading
import time
from pathlib import Path

__version__ = "1.0.5"

logger = logging.getLogger(__name__)

IS_WINDOWS = "windows" in platform.system().lower()


def pid_valid_windows(pid, name):
cmdlet = (
"(Get-CimInstance Win32_Process " "-Filter 'ProcessId = {}').CommandLine"
).format(pid)
cmd = [
"powershell",
cmdlet,
]
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True,
bufsize=1,
close_fds=not IS_WINDOWS,
) as p:
for line in iter(p.stdout.readline, ""):
line = line.lower()
if name.lower() in line and "python" in line:
return True
return False


def pid_valid_posix(pid, name):
# for busybox these options are undocumented...
cmd = ["ps", "-f", "-A"]

with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True,
bufsize=1,
close_fds=not IS_WINDOWS,
) as p:
for line in iter(p.stdout.readline, ""):
line = line.lower()
line_ = line.split()
if len(line_) == 0:
continue
if "pid" in line_:
# header
index = line_.index("pid")
continue
try:
pid_ = int(line_[index])
except ValueError:
continue
if name.lower() in line and "python" in line and pid == pid_:
return True
return False


def pid_valid(pid, name):
if IS_WINDOWS:
return pid_valid_windows(pid, name)
else:
return pid_valid_posix(pid, name)


class OpenLockException(Exception):
pass


class Timeout(OpenLockException):
pass


class InvalidRelease(OpenLockException):
pass


class InvalidLockFile(OpenLockException):
pass


class SlowSystem(OpenLockException):
pass


class InvalidOption(OpenLockException):
pass


_defaults = {
"race_delay": 0.5,
"tries": 2,
"retry_period": 0.3,
"slow_system_exception": False,
}


def get_defaults():
return copy.copy(_defaults)


def set_defaults(**kw):
if not set(kw.keys()).issubset(set(_defaults.keys())):
raise InvalidOption()
_defaults.update(kw)


class FileLock:
def __init__(
self,
lock_file,
timeout=None,
):
self.__lock_file = Path(lock_file)
self.__timeout = timeout
self.__lock = threading.Lock()
self.__acquired = False
self.__retry_period = _defaults["retry_period"]
self.__race_delay = _defaults["race_delay"]
self.__tries = _defaults["tries"]
self.__slow_system_exception = _defaults["slow_system_exception"]
logger.debug(f"{self} created")

def __lock_state(self):
try:
with open(self.__lock_file) as f:
s = f.readlines()
except FileNotFoundError:
return {"state": "unlocked", "reason": "file not found"}
except Exception as e:
logger.exception(f"Error accessing '{self.__lock_file}': {str(e)}")
raise
try:
pid = int(s[0])
name = s[1].strip()
except (ValueError, IndexError):
return {"state": "unlocked", "reason": "invalid lock file"}

if not pid_valid(pid, name):
return {
"state": "unlocked",
"reason": "pid invalid",
"pid": pid,
"name": name,
}

return {"state": "locked", "pid": pid, "name": name}

def __remove_lock_file(self):
try:
os.remove(self.__lock_file)
logger.debug(f"Lock file '{self.__lock_file}' removed")
except OSError:
pass

def __write_lock_file(self, pid, name):
temp_file = tempfile.NamedTemporaryFile(
dir=os.path.dirname(self.__lock_file), delete=False
)
temp_file.write(f"{os.getpid()}\n{name}\n".encode())
temp_file.close()
os.replace(temp_file.name, self.__lock_file)

def __acquire_once(self):
t = time.time()
lock_state = self.__lock_state()
logger.debug(f"{self}: {lock_state}")
for _ in range(0, self.__tries):
if lock_state["state"] == "locked":
return
self.__write_lock_file(os.getpid(), sys.argv[0])
tt = time.time()
if tt - t >= (2 / 3) * self.__race_delay:
message = (
"Slow system detected!! Consider increasing the "
"'race_delay' parameter "
f"(current value: {self.__race_delay:2f}, used: {tt-t:2f})."
)
logger.warning(message)
if self.__slow_system_exception:
raise SlowSystem(message)
time.sleep(self.__race_delay)
t = time.time()
lock_state = self.__lock_state()
logger.debug(f"{self}: {lock_state}")
if lock_state["state"] == "locked":
if lock_state["pid"] == os.getpid():
logger.debug(f"{self} acquired")
self.__acquired = True
atexit.register(self.__remove_lock_file)
return
raise InvalidLockFile("Unable to obtain a valid lock file")

def acquire(self, timeout=None):
if timeout is None:
timeout = self.__timeout
start_time = time.time()
with self.__lock:
while True:
if not self.__acquired:
self.__acquire_once()
if self.__acquired:
break
now = time.time()
if timeout is not None and now - start_time >= timeout:
raise Timeout(f"Unable to acquire {self}")
time.sleep(self.__retry_period)

def release(self):
with self.__lock:
if not self.__acquired:
raise InvalidRelease(f"Attempt at releasing {self} which we do not own")
self.__acquired = False
self.__remove_lock_file()
atexit.unregister(self.__remove_lock_file)
logger.debug(f"{self} released")

def locked(self):
with self.__lock:
return self.__lock_state()["state"] == "locked"

def getpid(self):
with self.__lock:
if self.__acquired:
return os.getpid()
lock_state = self.__lock_state()
if lock_state["state"] == "locked":
return lock_state["pid"]
else:
return None

def __enter__(self):
self.acquire()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.release()

def __str__(self):
return f"FileLock('{self.__lock_file}')"

__repr__ = __str__
Loading

0 comments on commit c9f9f89

Please sign in to comment.