Skip to content

Commit

Permalink
Handle all threads stopped correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanMabille committed Feb 1, 2022
1 parent 51ab163 commit d482dbf
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 19 deletions.
3 changes: 2 additions & 1 deletion ipykernel/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
class ControlThread(Thread):

def __init__(self, **kwargs):
Thread.__init__(self, **kwargs)
Thread.__init__(self, name="Control", **kwargs)
self.io_loop = IOLoop(make_current=False)
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True

def run(self):
self.name="Control"
self.io_loop.make_current()
try:
self.io_loop.start()
Expand Down
54 changes: 39 additions & 15 deletions ipykernel/debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session):
self.session = session
self.is_started = False
self.event_callback = event_callback
self.stopped_queue = Queue()

self.started_debug_handlers = {}
for msg_type in Debugger.started_debug_msg_types:
Expand All @@ -300,22 +301,19 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session):

def _handle_event(self, msg):
if msg['event'] == 'stopped':
self.stopped_threads.add(msg['body']['threadId'])
if msg['body']['allThreadsStopped']:
self.stopped_queue.put_nowait(msg)
# Do not forward the event now, will be done in the handle_stopped_event
return
else:
self.stopped_threads.add(msg['body']['threadId'])
self.event_callback(msg)
elif msg['event'] == 'continued':
try:
if msg['allThreadsContinued']:
self.stopped_threads = set()
else:
self.stopped_threads.remove(msg['body']['threadId'])
except Exception:
# Workaround for debugpy/pydev not setting the correct threadId
# after a next request. Does not work if a the code executed on
# the shell spawns additional threads
if len(self.stopped_threads) == 1:
self.stopped_threads = set()
else:
raise Exception('threadId from continued event not in stopped threads set')
self.event_callback(msg)
if msg['body']['allThreadsContinued']:
self.stopped_threads = set()
else:
self.stopped_threads.remove(msg['body']['threadId'])
self.event_callback(msg)

async def _forward_message(self, msg):
return await self.debugpy_client.send_dap_request(msg)
Expand All @@ -334,6 +332,32 @@ def _build_variables_response(self, request, variables):
}
return reply

def _accept_stopped_thread(self, thread_name):
# TODO: identify Thread-2, Thread-3 and Thread-4. These are NOT
# Control, IOPub or Heartbeat threads
forbid_list = [
'IPythonHistorySavingThread',
'Thread-2',
'Thread-3',
'Thread-4'
]
return thread_name not in forbid_list

async def handle_stopped_event(self):
# Wait for a stopped event message in the stopped queue
# This message is used for triggering the 'threads' request
event = await self.stopped_queue.get()
req = {
'seq': event['seq'] + 1,
'type': 'request',
'command': 'threads'
}
rep = await self._forward_message(req)
for t in rep['body']['threads']:
if self._accept_stopped_thread(t['name']):
self.stopped_threads.add(t['id'])
self.event_callback(event)

@property
def tcp_client(self):
return self.debugpy_client
Expand Down
4 changes: 3 additions & 1 deletion ipykernel/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Heartbeat(Thread):
def __init__(self, context, addr=None):
if addr is None:
addr = ('tcp', localhost(), 0)
Thread.__init__(self)
Thread.__init__(self, name="Heartbeat")
self.context = context
self.transport, self.ip, self.port = addr
self.original_port = self.port
Expand All @@ -42,6 +42,7 @@ def __init__(self, context, addr=None):
self.daemon = True
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True
self.name="Heartbeat"

def pick_port(self):
if self.transport == 'tcp':
Expand Down Expand Up @@ -89,6 +90,7 @@ def _bind_socket(self):
return

def run(self):
self.name="Heartbeat"
self.socket = self.context.socket(zmq.ROUTER)
self.socket.linger = 1000
try:
Expand Down
4 changes: 3 additions & 1 deletion ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ def __init__(self, socket, pipe=False):
self._events = deque()
self._event_pipes = WeakSet()
self._setup_event_pipe()
self.thread = threading.Thread(target=self._thread_main)
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
self.thread.daemon = True
self.thread.pydev_do_not_trace = True
self.thread.is_pydev_daemon_thread = True
self.thread.name="IOPub"

def _thread_main(self):
"""The inner loop that's actually run in a thread"""
Expand Down Expand Up @@ -176,6 +177,7 @@ def _check_mp_mode(self):

def start(self):
"""Start the IOPub thread"""
self.thread.name="IOPub"
self.thread.start()
# make sure we don't prevent process exit
# I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
Expand Down
6 changes: 6 additions & 0 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,19 @@ def dispatch_debugpy(self, msg):
def banner(self):
return self.shell.banner

async def poll_stopped_queue(self):
while True:
await self.debugger.handle_stopped_event()

def start(self):
self.shell.exit_now = False
if self.debugpy_stream is None:
self.log.warning("debugpy_stream undefined, debugging will not be enabled")
else:
self.debugpy_stream.on_recv(self.dispatch_debugpy, copy=False)
super().start()
if self.debugpy_stream:
asyncio.run_coroutine_threadsafe(self.poll_stopped_queue(), self.control_thread.io_loop.asyncio_loop)

def set_parent(self, ident, parent, channel='shell'):
"""Overridden from parent to tell the display hook and output streams
Expand Down
8 changes: 7 additions & 1 deletion ipykernel/tests/test_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def test_rich_inspect_at_breakpoint(kernel_with_debug):
f(2, 3)"""


r = wait_for_debug_request(kernel_with_debug, "dumpCell", {"code": code})
source = r["body"]["sourcePath"]

Expand All @@ -246,6 +247,11 @@ def test_rich_inspect_at_breakpoint(kernel_with_debug):

kernel_with_debug.execute(code)

# Wait for stop on breakpoint
msg = {"msg_type": "", "content": {}}
while msg.get('msg_type') != 'debug_event' or msg["content"].get("event") != "stopped":
msg = kernel_with_debug.get_iopub_msg(timeout=TIMEOUT)

stacks = wait_for_debug_request(kernel_with_debug, "stackTrace", {"threadId": 1})[
"body"
]["stackFrames"]
Expand Down Expand Up @@ -276,4 +282,4 @@ def test_rich_inspect_at_breakpoint(kernel_with_debug):
def test_convert_to_long_pathname():
if sys.platform == 'win32':
from ipykernel.compiler import _convert_to_long_pathname
_convert_to_long_pathname(__file__)
_convert_to_long_pathname(__file__)

0 comments on commit d482dbf

Please sign in to comment.