Skip to content

Commit

Permalink
Add a timeout to queuefile to ensure it doesn't block forever
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Schorr committed Jan 9, 2020
1 parent a41dcdd commit 9a05ca9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 13 deletions.
39 changes: 32 additions & 7 deletions endpoints/verbs/__init__.py
Expand Up @@ -57,6 +57,7 @@


LAYER_MIMETYPE = "binary/octet-stream"
QUEUE_FILE_TIMEOUT = 15 # seconds


class VerbReporter(TarLayerFormatterReporter):
Expand Down Expand Up @@ -128,13 +129,23 @@ def _sign_derived_image(verb, derived_image, queue_file):
registry_model.set_derived_image_signature(derived_image, signer.name, signature)


def _write_derived_image_to_storage(verb, derived_image, queue_file):
def _write_derived_image_to_storage(
verb, derived_image, queue_file, namespace, repository, tag_name
):
""" Read from the generated stream and write it back to the storage engine. This method runs in a
separate process.
"""

def handle_exception(ex):
logger.debug("Exception when building %s derived image %s: %s", verb, derived_image, ex)
logger.debug(
"Exception when building %s derived image %s (%s/%s:%s): %s",
verb,
derived_image,
namespace,
repository,
tag_name,
ex,
)

with database.UseThenDisconnect(app.config):
registry_model.delete_derived_image(derived_image)
Expand All @@ -149,7 +160,15 @@ def handle_exception(ex):
derived_image.blob.placements, derived_image.blob.storage_path, queue_file
)
except IOError as ex:
logger.debug("Exception when writing %s derived image %s: %s", verb, derived_image, ex)
logger.error(
"Exception when writing %s derived image %s (%s/%s:%s): %s",
verb,
derived_image,
namespace,
repository,
tag_name,
ex,
)

with database.UseThenDisconnect(app.config):
registry_model.delete_derived_image(derived_image)
Expand Down Expand Up @@ -453,22 +472,28 @@ def _store_metadata_and_cleanup():
finished=_store_metadata_and_cleanup,
)

client_queue_file = QueueFile(queue_process.create_queue(), "client")
client_queue_file = QueueFile(
queue_process.create_queue(), "client", timeout=QUEUE_FILE_TIMEOUT
)

if not is_readonly:
storage_queue_file = QueueFile(queue_process.create_queue(), "storage")
storage_queue_file = QueueFile(
queue_process.create_queue(), "storage", timeout=QUEUE_FILE_TIMEOUT
)

# If signing is required, add a QueueFile for signing the image as we stream it out.
signing_queue_file = None
if sign and signer.name:
signing_queue_file = QueueFile(queue_process.create_queue(), "signing")
signing_queue_file = QueueFile(
queue_process.create_queue(), "signing", timeout=QUEUE_FILE_TIMEOUT
)

# Start building.
queue_process.run()

# Start the storage saving.
if not is_readonly:
storage_args = (verb, derived_image, storage_queue_file)
storage_args = (verb, derived_image, storage_queue_file, namespace, repository, tag_name)
QueueProcess.run_process(_write_derived_image_to_storage, storage_args, finished=_cleanup)

if sign and signer.name:
Expand Down
19 changes: 14 additions & 5 deletions util/registry/queuefile.py
@@ -1,9 +1,12 @@
from multiprocessing.queues import Empty, Queue


class QueueFile(object):
""" Class which implements a file-like interface and reads QueueResult's from a blocking
multiprocessing queue.
"""

def __init__(self, queue, name=None):
def __init__(self, queue, name=None, timeout=None):
self._queue = queue
self._closed = False
self._done = False
Expand All @@ -12,6 +15,7 @@ def __init__(self, queue, name=None):
self._name = name
self.raised_exception = False
self._exception_handlers = []
self._timeout = timeout

def add_exception_handler(self, handler):
self._exception_handlers.append(handler)
Expand All @@ -30,24 +34,29 @@ def read(self, size=-1):

# Loop until we reach the requested data size (or forever if all data was requested).
while (len(self._buffer) < size) or (size == -1):
result = self._queue.get(block=True)
exception = None
try:
result = self._queue.get(block=True, timeout=self._timeout)
exception = result.exception
except Empty as em:
exception = em

# Check for any exceptions raised by the queue process.
if result.exception is not None:
if exception is not None:
self._closed = True
self.raised_exception = True

# Fire off the exception to any registered handlers. If no handlers were registered,
# then raise the exception locally.
handled = False
for handler in self._exception_handlers:
handler(result.exception)
handler(exception)
handled = True

if handled:
return ""
else:
raise result.exception
raise exception

# Check for no further data. If the QueueProcess has finished producing data, then break
# out of the loop to return the data already acquired.
Expand Down
2 changes: 1 addition & 1 deletion util/registry/test/test_queuefile.py
Expand Up @@ -10,7 +10,7 @@ class FakeQueue(object):
def __init__(self):
self.items = []

def get(self, block):
def get(self, block, timeout=None):
return self.items.pop(0)

def put(self, data):
Expand Down

0 comments on commit 9a05ca9

Please sign in to comment.