-
-
Notifications
You must be signed in to change notification settings - Fork 553
/
comm.js
103 lines (92 loc) · 3.22 KB
/
comm.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// @flow
import { merge } from "rxjs/observable/merge";
import { map, retry, switchMap } from "rxjs/operators";
import { ofType } from "redux-observable";
import { commOpenAction, commMessageAction } from "../actions";
import { createMessage, ofMessageType } from "@nteract/messaging";
import type { ActionsObservable } from "redux-observable";
import type { NewKernelAction } from "../actionTypes";
import { LAUNCH_KERNEL_SUCCESSFUL } from "../actionTypes";
/**
* creates a comm open message
* @param {string} comm_id uuid
* @param {string} target_name comm handler
* @param {any} data up to the target handler
* @param {string} target_module [Optional] used to select a module that is responsible for handling the target_name
* @return {jmp.Message} Message ready to send on the shell channel
*/
export function createCommOpenMessage(
comm_id: string,
target_name: string,
data: any = {},
target_module: string
) {
const msg = createMessage("comm_open", {
content: { comm_id, target_name, data }
});
if (target_module) {
msg.content.target_module = target_module;
}
return msg;
}
/**
* creates a comm message for sending to a kernel
* @param {string} comm_id unique identifier for the comm
* @param {Object} data any data to send for the comm
* @param {Uint8Array} buffers arbitrary binary data to send on the comm
* @return {jmp.Message} jupyter message for comm_msg
*/
export function createCommMessage(
comm_id: string,
data: any = {},
buffers: Uint8Array = new Uint8Array([])
) {
return createMessage("comm_msg", { content: { comm_id, data }, buffers });
}
/**
* creates a comm close message for sending to a kernel
* @param {Object} parent_header header from a parent jupyter message
* @param {string} comm_id unique identifier for the comm
* @param {Object} data any data to send for the comm
* @return {jmp.Message} jupyter message for comm_msg
*/
export function createCommCloseMessage(
parent_header: any,
comm_id: string,
data: any = {}
) {
return createMessage("comm_close", {
content: { comm_id, data },
parent_header
});
}
/**
* creates all comm related actions given a new kernel action
* @param {Object} launchKernelAction a LAUNCH_KERNEL_SUCCESSFUL action
* @return {ActionsObservable} all actions resulting from comm messages on this kernel
*/
export function commActionObservable(action: NewKernelAction) {
const {
payload: { kernel }
} = action;
const commOpenAction$ = kernel.channels.pipe(
ofMessageType("comm_open"),
map(commOpenAction)
);
const commMessageAction$ = kernel.channels.pipe(
ofMessageType("comm_msg"),
map(commMessageAction)
);
return merge(commOpenAction$, commMessageAction$).pipe(retry());
}
/**
* An epic that emits comm actions from the backend kernel
* @param {ActionsObservable} action$ Action Observable from redux-observable
* @param {redux.Store} store the redux store
* @return {ActionsObservable} Comm actions
*/
export const commListenEpic = (action$: ActionsObservable<*>) =>
action$.pipe(
ofType(LAUNCH_KERNEL_SUCCESSFUL),
switchMap(commActionObservable)
); // We have a new channel