Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/60242.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Exit gracefully on ctrl+c.
110 changes: 58 additions & 52 deletions salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,62 +663,68 @@ def __init__(self, socket_path, io_loop=None):
@salt.ext.tornado.gen.coroutine
def _read(self, timeout, callback=None):
try:
yield self._read_in_progress.acquire(timeout=0.00000001)
except salt.ext.tornado.gen.TimeoutError:
raise salt.ext.tornado.gen.Return(None)

exc_to_raise = None
ret = None
try:
while True:
if self._read_stream_future is None:
self._read_stream_future = self.stream.read_bytes(
4096, partial=True
)

if timeout is None:
wire_bytes = yield self._read_stream_future
else:
wire_bytes = yield FutureWithTimeout(
self.io_loop, self._read_stream_future, timeout
)
self._read_stream_future = None
try:
yield self._read_in_progress.acquire(timeout=0.00000001)
except salt.ext.tornado.gen.TimeoutError:
raise salt.ext.tornado.gen.Return(None)

# Remove the timeout once we get some data or an exception
# occurs. We will assume that the rest of the data is already
# there or is coming soon if an exception doesn't occur.
timeout = None

self.unpacker.feed(wire_bytes)
first_sync_msg = True
for framed_msg in self.unpacker:
if callback:
self.io_loop.spawn_callback(callback, framed_msg["body"])
elif first_sync_msg:
ret = framed_msg["body"]
first_sync_msg = False
else:
self._saved_data.append(framed_msg["body"])
if not first_sync_msg:
# We read at least one piece of data and we're on sync run
break
except TornadoTimeoutError:
# In the timeout case, just return None.
# Keep 'self._read_stream_future' alive.
exc_to_raise = None
ret = None
except StreamClosedError as exc:
log.trace("Subscriber disconnected from IPC %s", self.socket_path)
self._read_stream_future = None
except Exception as exc: # pylint: disable=broad-except
log.error("Exception occurred in Subscriber while handling stream: %s", exc)
self._read_stream_future = None
exc_to_raise = exc
try:
while True:
if self._read_stream_future is None:
self._read_stream_future = self.stream.read_bytes(
4096, partial=True
)

if timeout is None:
wire_bytes = yield self._read_stream_future
else:
wire_bytes = yield FutureWithTimeout(
self.io_loop, self._read_stream_future, timeout
)
self._read_stream_future = None

# Remove the timeout once we get some data or an exception
# occurs. We will assume that the rest of the data is already
# there or is coming soon if an exception doesn't occur.
timeout = None

self.unpacker.feed(wire_bytes)
first_sync_msg = True
for framed_msg in self.unpacker:
if callback:
self.io_loop.spawn_callback(callback, framed_msg["body"])
elif first_sync_msg:
ret = framed_msg["body"]
first_sync_msg = False
else:
self._saved_data.append(framed_msg["body"])
if not first_sync_msg:
# We read at least one piece of data and we're on sync run
break
except TornadoTimeoutError:
# In the timeout case, just return None.
# Keep 'self._read_stream_future' alive.
ret = None
except StreamClosedError as exc:
log.trace("Subscriber disconnected from IPC %s", self.socket_path)
self._read_stream_future = None
except Exception as exc: # pylint: disable=broad-except
log.error(
"Exception occurred in Subscriber while handling stream: %s", exc
)
self._read_stream_future = None
exc_to_raise = exc

self._read_in_progress.release()
self._read_in_progress.release()

if exc_to_raise is not None:
raise exc_to_raise # pylint: disable=E0702
raise salt.ext.tornado.gen.Return(ret)
if exc_to_raise is not None:
raise exc_to_raise # pylint: disable=E0702
raise salt.ext.tornado.gen.Return(ret)
# Handle ctrl+c gracefully
except TypeError:
pass

@salt.ext.tornado.gen.coroutine
def read(self, timeout):
Expand Down
3 changes: 3 additions & 0 deletions tests/pytests/integration/cli/test_salt.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,8 @@ def test_interrupt_on_long_running_job(salt_cli, salt_master, salt_minion):
), "The command wasn't actually terminated. Took {} seconds.".format(
round(stop - start, 2)
)

# Make sure the ctrl+c exited gracefully
assert "Exiting gracefully on Ctrl-c" in ret.stderr
assert "Exception ignored in" not in ret.stderr
assert "This job's jid is" in ret.stderr