This repository has been archived by the owner on Jun 3, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 236
/
comms.py
85 lines (64 loc) · 2.53 KB
/
comms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import IPython
from ipykernel.comm import Comm
import time
import sys
_jupyter_config = {}
_dash_comm = Comm(target_name='jupyter_dash')
def _send_jupyter_config_comm_request():
# If running in an ipython kernel,
# request that the front end extension send us the notebook server base URL
if IPython.get_ipython() is not None:
if _dash_comm.kernel is not None:
_dash_comm.send({
'type': 'base_url_request'
})
@_dash_comm.on_msg
def _receive_message(msg):
msg_data = msg.get('content').get('data')
msg_type = msg_data.get('type', None)
if msg_type == 'base_url_response':
_jupyter_config.update(msg_data)
def _jupyter_comm_response_received():
return bool(_jupyter_config)
def _request_jupyter_config(timeout=2):
# Heavily inspired by implementation of CaptureExecution in the
if _dash_comm.kernel is None:
# Not in jupyter setting
return
_send_jupyter_config_comm_request()
# Get shell and kernel
shell = IPython.get_ipython()
kernel = shell.kernel
# Start capturing shell events to replay later
captured_events = []
def capture_event(stream, ident, parent):
captured_events.append((stream, ident, parent))
kernel.shell_handlers['execute_request'] = capture_event
# increment execution count to avoid collision error
shell.execution_count += 1
# Allow kernel to execute comms until we receive the jupyter configuration comm
# response
t0 = time.time()
while True:
if (time.time() - t0) > timeout:
# give up
raise EnvironmentError(
"Unable to communicate with the jupyter_dash notebook or JupyterLab \n"
"extension required to infer Jupyter configuration."
)
if _jupyter_comm_response_received():
break
kernel.do_one_iteration()
# Stop capturing events, revert the kernel shell handler to the default
# execute_request behavior
kernel.shell_handlers['execute_request'] = kernel.execute_request
# Replay captured events
# need to flush before replaying so messages show up in current cell not
# replay cells
sys.stdout.flush()
sys.stderr.flush()
for stream, ident, parent in captured_events:
# Using kernel.set_parent is the key to getting the output of the replayed
# events to show up in the cells that were captured instead of the current cell
kernel.set_parent(ident, parent)
kernel.execute_request(stream, ident, parent)