-
-
Notifications
You must be signed in to change notification settings - Fork 643
/
dispatcher.ts
78 lines (69 loc) · 2.76 KB
/
dispatcher.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import {Actor, MessageHandler} from './actor';
import {getGlobalWorkerPool} from './global_worker_pool';
import {GLOBAL_DISPATCHER_ID, makeRequest} from './ajax';
import type {WorkerPool} from './worker_pool';
import type {WorkerSource} from '../source/worker_source'; /* eslint-disable-line */ // this is used for the docs' import
import type {RequestResponseMessageMap} from './actor_messages';
import {MessageType} from './actor_messages';
/**
* Responsible for sending messages from a {@link Source} to an associated
* {@link WorkerSource}.
*/
export class Dispatcher {
workerPool: WorkerPool;
actors: Array<Actor>;
currentActor: number;
id: string | number;
constructor(workerPool: WorkerPool, mapId: string | number) {
this.workerPool = workerPool;
this.actors = [];
this.currentActor = 0;
this.id = mapId;
const workers = this.workerPool.acquire(mapId);
for (let i = 0; i < workers.length; i++) {
const worker = workers[i];
const actor = new Actor(worker, mapId);
actor.name = `Worker ${i}`;
this.actors.push(actor);
}
if (!this.actors.length) throw new Error('No actors found');
}
/**
* Broadcast a message to all Workers.
*/
broadcast<T extends MessageType>(type: T, data: RequestResponseMessageMap[T][0]): Promise<RequestResponseMessageMap[T][1][]> {
const promises: Promise<RequestResponseMessageMap[T][1]>[] = [];
for (const actor of this.actors) {
promises.push(actor.sendAsync({type, data}));
}
return Promise.all(promises);
}
/**
* Acquires an actor to dispatch messages to. The actors are distributed in round-robin fashion.
* @returns An actor object backed by a web worker for processing messages.
*/
getActor(): Actor {
this.currentActor = (this.currentActor + 1) % this.actors.length;
return this.actors[this.currentActor];
}
remove(mapRemoved: boolean = true) {
this.actors.forEach((actor) => { actor.remove(); });
this.actors = [];
if (mapRemoved) this.workerPool.release(this.id);
}
public registerMessageHandler<T extends MessageType>(type: T, handler: MessageHandler<T>) {
for (const actor of this.actors) {
actor.registerMessageHandler(type, handler);
}
}
}
let globalDispatcher: Dispatcher;
export function getGlobalDispatcher(): Dispatcher {
if (!globalDispatcher) {
globalDispatcher = new Dispatcher(getGlobalWorkerPool(), GLOBAL_DISPATCHER_ID);
globalDispatcher.registerMessageHandler(MessageType.getResource, (_mapId, params, abortController) => {
return makeRequest(params, abortController);
});
}
return globalDispatcher;
}