Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

py: use subprocess to read with timeout #11

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions py/src/gnumake_tokenpool/read_byte.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# pipe one byte from stdin to stdout

# raise OSError("test: asdf") # test exception
# import sys; sys.exit(123) # test error
# import time; time.sleep(999) # test timeout

import os

try:
_bytes = os.read(0, 1)
except BlockingIOError as e:
if e.errno == 11:
# Resource temporarily unavailable
import sys
sys.exit(e.errno)
raise e
except OSError as e:
if e.errno == 9:
# Bad file descriptor = pipe is closed
import sys
sys.exit(e.errno)
raise e

os.write(1, _bytes)
156 changes: 89 additions & 67 deletions py/src/gnumake_tokenpool/tokenpool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys, os, stat, select, signal, time, re
import sys, os, stat, select, time, re, subprocess, _compat_pickle

from contextlib import contextmanager
from datetime import datetime
from typing import List, Any, Iterator, Never

Expand All @@ -15,6 +14,52 @@ class InvalidToken(Exception):
pass


def _parse_exception(_bytes):
#print("_bytes", repr(_bytes))
e_msg = _bytes.strip()
e_trace = b""
e_name = b""
parts = e_msg.rsplit(b"\n", 1)
#print("parts", repr(parts))
if len(parts) == 2:
e_trace, e_msg = parts
parts = e_msg.split(b": ", 1)
#print("parts", repr(parts))
if len(parts) == 2:
e_name, e_msg = parts
e_class = Exception
if re.fullmatch(rb"[A-Z][A-Za-z]+", e_name):
# eval is evil...
# e_class_2 = eval(e_name)
# if issubclass(e_class_2, BaseException):
# e_class = e_class_2
# also check e_name versus list of built-in exceptions
# https://docs.python.org/3/library/exceptions.html
e_name = e_name.decode("ascii")
if (
e_name in _compat_pickle.PYTHON2_EXCEPTIONS or
e_name in _compat_pickle.PYTHON3_OSERROR_EXCEPTIONS or
e_name in _compat_pickle.PYTHON3_IMPORTERROR_EXCEPTIONS or
e_name in _compat_pickle.MULTIPROCESSING_EXCEPTIONS or
e_name == "OSError" or
e_name == "ImportError"
):
e_class = eval(e_name)
try:
e_msg = e_msg.decode("utf8")
except UnicodeDecodeError:
pass
try:
e_trace = e_trace.decode("utf8")
except UnicodeDecodeError:
pass
try:
text = _bytes.decode("utf8").strip()
except UnicodeDecodeError:
text = repr(stderr)
return e_class, e_msg + "\n\nexception parsed from inner exception:\n\n" + text


class JobClient:
"jobclient for the gnumake jobserver"

Expand All @@ -27,11 +72,9 @@ def __init__(
max_load: int | None = None,
debug: bool | None = None,
debug2: bool | None = None,
use_cysignals: bool | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be easier for the use in Sage if we could accept (and ignore) use_cysignals for a while

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just help yourself : )
i dont use this, just sharing my drafts

):

self._fdRead: int | None = None
self._fdReadDup: int | None = None
self._fdWrite: int | None = None
self._fifoPath = None
self._fdFifo = None
Expand All @@ -40,6 +83,8 @@ def __init__(
self._fileRead = None
self._fileWrite = None

self._read_byte_py_path = os.path.dirname(__file__) + "/read_byte.py"

self._debug = bool(os.environ.get("DEBUG_JOBCLIENT"))
self._debug2 = bool(os.environ.get("DEBUG_JOBCLIENT_2")) # more verbose

Expand All @@ -51,16 +96,6 @@ def __init__(
self._log = self._get_log(self._debug)
self._log2 = self._get_log(self._debug2)

if use_cysignals is not False:
try:
from cysignals.pysignals import changesignal
except ImportError:
if use_cysignals:
raise
else:
self._log("init: using cysignals.pysignals.changesignal")
self._changesignal = changesignal # type: ignore

makeFlags = os.environ.get("MAKEFLAGS", "")
if makeFlags:
self._log(f"init: MAKEFLAGS: {makeFlags}")
Expand Down Expand Up @@ -181,7 +216,11 @@ def maxLoad(self) -> int | None:
return self._maxLoad


def acquire(self) -> int | None:
def acquire(
self,
# a successful read takes about 0.1 to 0.2 seconds
timeout=0.5,
) -> int | None:
# http://make.mad-scientist.net/papers/jobserver-implementation/

# check if fdRead is readable
Expand All @@ -190,50 +229,43 @@ def acquire(self) -> int | None:
self._log2(f"acquire failed: fd is empty")
return None

# handle race condition:
# between select and read, another process can read from the pipe.
# when the pipe is empty, read can block forever.
# by closing fdReadDup, we interrupt read
if self._fdRead and not self._fdReadDup:
self._fdReadDup = os.dup(self._fdRead)

if not self._fdReadDup:
self._log(f"acquire: failed to duplicate fd")
return None
args = [
sys.executable, # python
self._read_byte_py_path,
]

def read_timeout_handler(_signum: int, _frame: Any) -> None:
self._log(f"acquire: read with timeout {timeout} ...")
buffer = b""
t1 = time.time()
try:
proc = subprocess.run(
args,
capture_output=True,
timeout=timeout,
stdin=self._fdRead,
#check=True, # raise CalledProcessError
)
buffer = proc.stdout
t2 = time.time()
self._log(f"acquire: read done after {t2 - t1} seconds")
except subprocess.TimeoutExpired:
self._log(f"acquire: read timeout")
assert self._fdReadDup
os.close(self._fdReadDup)

# SIGALRM = timer has fired = read timeout
with self._changesignal(signal.SIGALRM, read_timeout_handler):
try:
# Set SA_RESTART to limit EINTR occurrences.
# by default, signal.signal clears the SA_RESTART flag.
# TODO is this necessary?
signal.siginterrupt(signal.SIGALRM, False)

read_timeout = 0.1
signal.setitimer(signal.ITIMER_REAL, read_timeout) # set timer for SIGALRM. unix only

# blocking read
self._log(f"acquire: read with timeout {read_timeout} ...")
buffer = b""
try:
buffer = os.read(self._fdReadDup, 1)
except BlockingIOError as e:
if e.errno == 11: # Resource temporarily unavailable
self._log2(f"acquire failed: fd is empty 2")
return None # jobserver is full, try again later
raise e # unexpected error
except OSError as e:
if e.errno == 9: # EBADF: Bad file descriptor = pipe is closed
self._log(f"acquire: read failed: {e}")
return None # jobserver is full, try again later
raise e # unexpected error
finally:
signal.setitimer(signal.ITIMER_REAL, 0) # clear timer. unix only
return None
#raise TimeoutError
#except subprocess.CalledProcessError as proc:
if proc.returncode != 0:
if proc.returncode == 11: # Resource temporarily unavailable
self._log2(f"acquire failed: fd is empty 2")
return None # jobserver is full, try again later
if proc.returncode == 9: # EBADF: Bad file descriptor = pipe is closed
#self._log(f"acquire: read failed: {e}")
self._log(f"acquire: read failed: Bad file descriptor")
return None # jobserver is full, try again later
if proc.returncode == 1:
e_class, e_msg = _parse_exception(proc.stderr)
raise e_class(e_msg) # unexpected error
e_msg = f"read_byte_py process returned {proc.returncode}. stdout={repr(proc.stdout)}. stderr={repr(proc.stderr)}"
raise Exception(e_msg) # unexpected error

#if len(buffer) == 0:
# return None
Expand Down Expand Up @@ -315,13 +347,3 @@ def _get_stat(self, fd: int) -> os.stat_result:
self._log(f"init failed: fd {fd} stat failed: {e}")
raise NoJobServer()
raise e # unexpected error

@staticmethod
@contextmanager
def _changesignal(sig: int, action: Any) -> Iterator[None]:
old_sig_handler = signal.signal(sig, action)
try:
yield
finally:
# clear signal handler
signal.signal(sig, old_sig_handler)