-
-
Notifications
You must be signed in to change notification settings - Fork 551
/
index.ts
277 lines (258 loc) Β· 8.08 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import { Channels, JupyterMessage } from "@nteract/messaging";
import * as moduleJMP from "jmp";
import { fromEvent, merge, Observable, Subject, Subscriber } from "rxjs";
import { FromEventTarget } from "rxjs/internal/observable/fromEvent";
import { map, publish, refCount } from "rxjs/operators";
import uuid from "uuid/v4";
export const ZMQType = {
frontend: {
iopub: "sub",
stdin: "dealer",
shell: "dealer",
control: "dealer"
}
};
type ChannelName = "iopub" | "stdin" | "shell" | "control";
export interface JupyterConnectionInfo {
version: number;
iopub_port: number;
shell_port: number;
stdin_port: number;
control_port: number;
signature_scheme: "hmac-sha256";
hb_port: number;
ip: string;
key: string;
transport: "tcp" | "ipc";
}
interface HeaderFiller {
session: string;
username: string;
}
/**
* Takes a Jupyter spec connection info object and channel and returns the
* string for a channel. Abstracts away tcp and ipc connection string
* formatting
*
* @param config Jupyter connection information
* @param channel Jupyter channel ("iopub", "shell", "control", "stdin")
*
* @returns The connection string
*/
export const formConnectionString = (
config: JupyterConnectionInfo,
channel: ChannelName
) => {
const portDelimiter = config.transport === "tcp" ? ":" : "-";
const port = config[`${channel}_port` as keyof JupyterConnectionInfo];
if (!port) {
throw new Error(`Port not found for channel "${channel}"`);
}
return `${config.transport}://${config.ip}${portDelimiter}${port}`;
};
/**
* Creates a socket for the given channel with ZMQ channel type given a config
*
* @param channel Jupyter channel ("iopub", "shell", "control", "stdin")
* @param identity UUID
* @param config Jupyter connection information
*
* @returns The new Jupyter ZMQ socket
*/
export const createSocket = (
channel: ChannelName,
identity: string,
config: JupyterConnectionInfo,
jmp = moduleJMP
): Promise<moduleJMP.Socket> => {
const zmqType = ZMQType.frontend[channel];
const scheme = config.signature_scheme.slice("hmac-".length);
const socket = new jmp.Socket(zmqType, scheme, config.key);
socket.identity = identity;
const url = formConnectionString(config, channel);
return verifiedConnect(socket, url);
};
/**
* Ensures the socket is ready after connecting.
*
* @param socket A 0MQ socket
* @param url Creates a connection string to connect the socket to
*
* @returns A Promise resolving to the same socket.
*/
export const verifiedConnect = (
socket: moduleJMP.Socket,
url: string
): Promise<moduleJMP.Socket> =>
new Promise(resolve => {
socket.on("connect", () => {
// We are not ready until this happens for all the sockets
socket.unmonitor();
resolve(socket);
});
socket.monitor();
socket.connect(url);
});
export const getUsername = () =>
process.env.LOGNAME ||
process.env.USER ||
process.env.LNAME ||
process.env.USERNAME ||
"username"; // This is the fallback that the classic notebook uses
/**
* Creates a multiplexed set of channels.
*
* @param config Jupyter connection information
* @param config.ip IP address of the kernel
* @param config.transport Transport, e.g. TCP
* @param config.signature_scheme Hashing scheme, e.g. hmac-sha256
* @param config.iopub_port Port for iopub channel
* @param subscription subscribed topic; defaults to all
* @param identity UUID
*
* @returns Subject containing multiplexed channels
*/
export const createMainChannel = async (
config: JupyterConnectionInfo,
subscription: string = "",
identity: string = uuid(),
header: HeaderFiller = {
session: uuid(),
username: getUsername()
},
jmp = moduleJMP
): Promise<Channels> => {
const sockets = await createSockets(config, subscription, identity, jmp);
const main = createMainChannelFromSockets(sockets, header, jmp);
return main;
};
/**
* Sets up the sockets for each of the jupyter channels.
*
* @param config Jupyter connection information
* @param subscription The topic to filter the subscription to the iopub channel on
* @param identity UUID
* @param jmp A reference to the JMP Node module
*
* @returns Sockets for each Jupyter channel
*/
export const createSockets = async (
config: JupyterConnectionInfo,
subscription: string = "",
identity = uuid(),
jmp = moduleJMP
) => {
const [shell, control, stdin, iopub] = await Promise.all([
createSocket("shell", identity, config, jmp),
createSocket("control", identity, config, jmp),
createSocket("stdin", identity, config, jmp),
createSocket("iopub", identity, config, jmp)
]);
// NOTE: ZMQ PUB/SUB subscription (not an Rx subscription)
iopub.subscribe(subscription);
return {
shell,
control,
stdin,
iopub
};
};
/**
* Creates a multiplexed set of channels.
*
* @param sockets An object containing associations between channel types and 0MQ sockets
* @param header The session and username to place in kernel message headers
* @param jmp A reference to the JMP Node module
*
* @returns Creates an Observable for each channel connection that allows us
* to send and receive messages through the Jupyter protocol.
*/
export const createMainChannelFromSockets = (
sockets: {
[name: string]: moduleJMP.Socket;
},
header: HeaderFiller = {
session: uuid(),
username: getUsername()
},
jmp = moduleJMP
): Channels => {
// The mega subject that encapsulates all the sockets as one multiplexed
// stream
const outgoingMessages = Subscriber.create<JupyterMessage>(
message => {
// There's always a chance that a bad message is sent, we'll ignore it
// instead of consuming it
if (!message || !message.channel) {
console.warn("message sent without a channel", message);
return;
}
const socket = sockets[message.channel];
if (!socket) {
// If, for some reason, a message is sent on a channel we don't have
// a socket for, warn about it but don't bomb the stream
console.warn("channel not understood for message", message);
return;
}
try {
const jMessage = new jmp.Message({
// Fold in the setup header to ease usage of messages on channels
header: { ...message.header, ...header },
parent_header: message.parent_header,
content: message.content,
metadata: message.metadata,
buffers: message.buffers
});
socket.send(jMessage);
} catch (err) {
console.error("Error sending message", err, message);
}
},
undefined, // not bothering with sending errors on
() =>
// When the subject is completed / disposed, close all the event
// listeners and shutdown the socket
Object.keys(sockets).forEach(name => {
const socket = sockets[name];
socket.removeAllListeners();
if (socket.close) {
socket.close();
}
})
);
// Messages from kernel on the sockets
const incomingMessages: Observable<JupyterMessage> = merge(
// Form an Observable with each socket
...Object.keys(sockets).map(name => {
const socket = sockets[name];
// fromEvent typings are broken. socket will work as an event target.
return fromEvent(
// Pending a refactor around jmp, this allows us to treat the socket
// as a normal event emitter
(socket as unknown) as FromEventTarget<JupyterMessage>,
"message"
).pipe(
map(
(body: JupyterMessage): JupyterMessage => {
// Route the message for the frontend by setting the channel
const msg = { ...body, channel: name };
// Conform to same message format as notebook websockets
// See https://github.com/n-riesco/jmp/issues/10
delete (msg as any).idents;
return msg;
}
),
publish(),
refCount()
);
})
).pipe(
publish(),
refCount()
);
const subject: Subject<JupyterMessage> = Subject.create(
outgoingMessages,
incomingMessages
);
return subject;
};