Skip to content

Commit

Permalink
Implement fallback for when failing to open control channel
Browse files Browse the repository at this point in the history
  • Loading branch information
martinRenou committed Nov 10, 2021
1 parent 694ed40 commit 5a184b9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 3 deletions.
81 changes: 78 additions & 3 deletions packages/voila/src/manager.ts
Expand Up @@ -40,6 +40,8 @@ import { Widget } from '@lumino/widgets';

import { requireLoader } from './loader';

import { batchRateMap } from './utils';

if (typeof window !== 'undefined' && typeof window.define !== 'undefined') {
window.define('@jupyter-widgets/base', base);
window.define('@jupyter-widgets/controls', controls);
Expand Down Expand Up @@ -192,6 +194,10 @@ export class WidgetManager extends JupyterLabManager {
});
}

/**
* This is the implementation of building widgets models making use of the
* jupyter.widget.control comm channel
*/
async _build_models(): Promise<{ [key: string]: base.WidgetModel }> {
const models: { [key: string]: base.WidgetModel } = {};
const commId = base.uuid();
Expand Down Expand Up @@ -234,9 +240,8 @@ export class WidgetManager extends JupyterLabManager {
}
catch {
console.warn('Failed to open "jupyter.widget.control" comm channel, fallback to slow fetching of widgets.');
// TODO Fallback to the old implementation for old ipywidgets versions
// return this._build_models_slow();
return {};
// Fallback to the old implementation for old ipywidgets versions (<=7.6)
return this._build_models_slow();
}

initComm.close();
Expand Down Expand Up @@ -296,5 +301,75 @@ export class WidgetManager extends JupyterLabManager {
return models;
}

/**
* This is the old implementation of building widgets models
* We keep it around for supporting old ipywidgets versions (<=7.6)
*/
async _build_models_slow(): Promise<{ [key: string]: base.WidgetModel }> {
const comm_ids = await this._get_comm_info();
const models: { [key: string]: base.WidgetModel } = {};
/**
* For the classical notebook, iopub_msg_rate_limit=1000 (default)
* And for zmq, we are affected by the default ZMQ_SNDHWM setting of 1000
* See https://github.com/voila-dashboards/voila/issues/534 for a discussion
*/
const maxMessagesInTransit = 100; // really save limit compared to ZMQ_SNDHWM
const maxMessagesPerSecond = 500; // lets be on the save side, in case the kernel sends more msg'es
const widgets_info = await Promise.all(
batchRateMap(
Object.keys(comm_ids),
async comm_id => {
const comm = await this._create_comm(this.comm_target_name, comm_id);
return this._update_comm(comm);
},
{ room: maxMessagesInTransit, rate: maxMessagesPerSecond }
)
);

await Promise.all(
widgets_info.map(async widget_info => {
const state = (widget_info as any).msg.content.data.state;
try {
const modelPromise = this.new_model(
{
model_name: state._model_name,
model_module: state._model_module,
model_module_version: state._model_module_version,
comm: (widget_info as any).comm
},
state
);
const model = await modelPromise;
models[model.model_id] = model;
} catch (error) {
// Failed to create a widget model, we continue creating other models so that
// other widgets can render
console.error(error);
}
})
);
return models;
}

async _update_comm(
comm: base.IClassicComm
): Promise<{ comm: base.IClassicComm; msg: any }> {
return new Promise((resolve, reject) => {
comm.on_msg(async msg => {
if (msg.content.data.buffer_paths) {
base.put_buffers(
msg.content.data.state,
msg.content.data.buffer_paths,
msg.buffers
);
}
if (msg.content.data.method === 'update') {
resolve({ comm: comm, msg: msg });
}
});
comm.send({ method: 'request_state' }, {});
});
}

private _loader: (name: any, version: any) => Promise<any>;
}
35 changes: 35 additions & 0 deletions packages/voila/src/utils.ts
@@ -0,0 +1,35 @@
import pLimit from 'p-limit';

const delay = (sec: number) =>
new Promise(resolve => setTimeout(resolve, sec * 1000));

/**
* Map a function onto a list where fn is being called at a limit of 'rate' number of calls per second.
* and 'room' number of parallel calls.
* Note that the minimum window at which rate is respected is room/rate seconds.
*/
export const batchRateMap = (
list: string[],
fn: (...args: any[]) => Promise<any>,
{ room, rate }: { room: number; rate: number }
): Promise<any>[] => {
const limit = pLimit(room);
return list.map(async value => {
return new Promise((valueResolve, reject) => {
limit(() => {
// We may not want to start the next job directly, we want to respect the
// throttling/rate, e.g.:
// If we have room for 10 parallel jobs, at a max rate of 100/second, each job
// should take at least 10/100=0.1 seconds.
// If we have room for 100 parallel jobs, and a max rate of 10/second, each
// job should take 100/10=10 seconds. But it will have a burst of 100 calls.
const throttlePromise = delay(room / rate);
// If the job is done, resolve the promise immediately, don't want for the throttle Promise
// This means that if we have room for 10 parallel jobs
// and just 9 jobs, we will never have to wait for the throttlePromise
const resultPromise = fn(value).then(valueResolve);
return Promise.all([resultPromise, throttlePromise]);
});
});
});
};

0 comments on commit 5a184b9

Please sign in to comment.