Skip to content

Commit

Permalink
boxplot on worker
Browse files Browse the repository at this point in the history
  • Loading branch information
sgratzl committed Dec 17, 2018
1 parent ad8f37e commit 841c69e
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 62 deletions.
47 changes: 32 additions & 15 deletions src/internal/math.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ export function getNumberOfBins(length: number) {
return Math.ceil(Math.log(length) / Math.LN2) + 1;
}

export interface IHistGenerator {
bins: INumberBin[];
toBin(value: number): number;
}

export function min(values: number[]): number;
export function min<T>(values: T[], acc?: (v: T) => number): number;
export function min<T>(values: T[], acc?: (v: T) => number) {
Expand Down Expand Up @@ -663,6 +658,14 @@ export interface IWorkerMessage {
uid: number;
}

export interface IStatsWorkerMessage extends IWorkerMessage {
refIndices: string | null;
indices?: UIntTypedArray;

refData: string;
data?: UIntTypedArray | Float32Array | Int32Array;
}


export interface ISortMessageRequest {
type: 'sort';
Expand Down Expand Up @@ -789,7 +792,7 @@ export function dateValueCache2Value(v: number) {
return v === MISSING_DATE ? null : new Date(v);
}

export function categoricalValueCacheBuilder(length: number, categories: {name: string}[]) {
export function categoricalValueCacheBuilder(length: number, categories: {name: string}[]) {
const vs = createIndexArray(length, categories.length + 1);
const name2index = new Map<string, number>();
for (let i = 0; i < categories.length; ++i) {
Expand All @@ -815,7 +818,6 @@ function sortWorkerMain(self: IPoorManWorkerScope) {
sortComplex(r.indices, r.sortOrders);
}
const order = r.indices;
refs.set(r.ref, order.slice());

self.postMessage(<ISortMessageResponse>{
type: r.type,
Expand Down Expand Up @@ -846,9 +848,20 @@ function sortWorkerMain(self: IPoorManWorkerScope) {
}
};

const dateStats = (r: IDateStatsMessageRequest) => {
const data = r.data ? r.data : <UIntTypedArray>refs.get(r.refData)!;
const resolveRefs = <T extends UIntTypedArray | Float32Array | Int32Array>(r: IStatsWorkerMessage) => {
const data: T = r.data ? <T><unknown>r.data : <T><unknown>refs.get(r.refData)!;
const indices = r.indices ? r.indices : (r.refIndices ? <UIntTypedArray>refs.get(r.refIndices)! : undefined);
if (r.refData) {
refs.set(r.refData, data);
}
if (r.refIndices) {
refs.set(r.refIndices, indices!);
}
return {data, indices};
};

const dateStats = (r: IDateStatsMessageRequest) => {
const {data, indices} = resolveRefs<Int32Array>(r);

const b = dateStatsBuilder(r.template);
if (indices) {
Expand All @@ -871,8 +884,12 @@ function sortWorkerMain(self: IPoorManWorkerScope) {
};

const categoricalStats = (r: ICategoricalStatsMessageRequest) => {
const data = r.data ? r.data : <UIntTypedArray>refs.get(r.refData)!;
const indices = r.indices ? r.indices : (r.refIndices ? <UIntTypedArray>refs.get(r.refIndices)! : undefined);
const {data, indices} = resolveRefs<UIntTypedArray>(r);

refs.set(r.refData, data);
if (r.refIndices) {
refs.set(r.refIndices, indices!);
}

const cats = r.categories.map((name) => ({name}));
const b = categoricalStatsBuilder(cats);
Expand All @@ -896,8 +913,7 @@ function sortWorkerMain(self: IPoorManWorkerScope) {
};

const numberStats = (r: INumberStatsMessageRequest) => {
const data = r.data ? r.data : <Float32Array>refs.get(r.refData)!;
const indices = r.indices ? r.indices : (r.refIndices ? <UIntTypedArray>refs.get(r.refIndices)! : undefined);
const {data, indices} = resolveRefs<Float32Array>(r);

const b = normalizedStatsBuilder(r.numberOfBins);

Expand All @@ -921,8 +937,7 @@ function sortWorkerMain(self: IPoorManWorkerScope) {
};

const boxplotStats = (r: IBoxPlotStatsMessageRequest) => {
const data = r.data ? r.data : <Float32Array>refs.get(r.refData)!;
const indices = r.indices ? r.indices : (r.refIndices ? <UIntTypedArray>refs.get(r.refIndices)! : undefined);
const {data, indices} = resolveRefs<Float32Array>(r);

const b = boxplotBuilder(indices ? indices.length : undefined);

Expand Down Expand Up @@ -967,6 +982,8 @@ function sortWorkerMain(self: IPoorManWorkerScope) {
}

export const WORKER_BLOB = createWorkerCodeBlob([
pushAll.toString(),
quantile.toString(),
normalizedStatsBuilder.toString(),
boxplotBuilder.toString(),
computeGranularity.toString(),
Expand Down
115 changes: 94 additions & 21 deletions src/internal/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {IWorkerMessage} from './math';
import {IWorkerMessage, IStatsWorkerMessage, INumberStatsMessageRequest, IAdvancedBoxPlotData, ICategoricalStatistics, IDateStatistics, IStatistics, ICategoricalStatsMessageRequest, IDateStatsMessageRequest} from './math';
import {UIntTypedArray, IndicesArray} from '../model';

export interface IPoorManWorkerScopeEventMap {
message: MessageEvent;
Expand Down Expand Up @@ -41,16 +42,15 @@ const MIN_WORKER_THREADS = 1;
const THREAD_CLEANUP_TIME = 10000; // 10s

export class WorkerTaskScheduler {
private readonly workers: {worker: Worker, tasks: Set<number>}[] = [];
private readonly workers: {worker: Worker, tasks: Set<number>, refs: Set<string>}[] = [];
private cleanUpWorkerTimer: number = -1;
private workerTaskCounter = 0;

constructor(private readonly blob: string, private readonly initWorker: (push: (type: string, msg: any) => void) => void) {
constructor(private readonly blob: string) {
// start with two worker
for (let i = 0; i < MIN_WORKER_THREADS; ++i) {
const w = new Worker(blob);
initWorker(this.sendMessageTo(w));
this.workers.push({worker: w, tasks: new Set()});
this.workers.push({worker: w, tasks: new Set(), refs: new Set()});
}
}

Expand All @@ -76,15 +76,15 @@ export class WorkerTaskScheduler {

if (this.workers.length >= MAX_WORKER_THREADS) {
// find the one with the fewest tasks
return this.workers.reduce((a, b) => a == null || a.tasks.size > b.tasks.size ? b : a, <{worker: Worker, tasks: Set<number>} | null>null)!;
return this.workers.reduce((a, b) => a == null || a.tasks.size > b.tasks.size ? b : a, <{worker: Worker, tasks: Set<number>, refs: Set<string>} | null>null)!;
}

// create new one
const r = {
worker: new Worker(this.blob),
tasks: new Set<number>()
tasks: new Set<number>(),
refs: new Set()
};
this.initWorker(this.sendMessageTo(r.worker));
this.workers.push(r);
return r;
}
Expand All @@ -95,6 +95,56 @@ export class WorkerTaskScheduler {
}
}

pushStats(type: 'numberStats', args: Exclude<INumberStatsMessageRequest, IStatsWorkerMessage>, refData: string, data: Float32Array, refIndices?: string, indices?: UIntTypedArray): Promise<IStatistics>;
pushStats(type: 'boxplotStats', args: {}, refData: string, data: Float32Array, refIndices?: string, indices?: UIntTypedArray): Promise<IAdvancedBoxPlotData>;
pushStats(type: 'categoricalStats', args: Exclude<ICategoricalStatsMessageRequest, IStatsWorkerMessage>, refData: string, data: UIntTypedArray, refIndices?: string, indices?: UIntTypedArray): Promise<ICategoricalStatistics>;
pushStats(type: 'dateStats', args: Exclude<IDateStatsMessageRequest, IStatsWorkerMessage>, refData: string, data: Int32Array, refIndices?: string, indices?: UIntTypedArray): Promise<IDateStatistics>;
pushStats(type: 'numberStats' | 'boxplotStats' | 'categoricalStats' | 'dateStats', args: any, refData: string, data: Float32Array | UIntTypedArray | Int32Array, refIndices?: string, indices?: UIntTypedArray) {
return new Promise((resolve) => {
const uid = this.workerTaskCounter++;
const {worker, tasks, refs} = this.checkOutWorker();

const receiver = (msg: MessageEvent) => {
const r = <IWorkerMessage>msg.data;
if (r.uid !== uid || r.type !== type) {
return;
}
worker.removeEventListener('message', receiver);
tasks.delete(uid);
this.finshedTask();
resolve((<any>r).stats);
};

worker.addEventListener('message', receiver);
tasks.add(uid);

const msg: any = Object.assign({
type,
uid,
refData,
refIndices: refIndices || null
}, args);

if (!refData || !refs.has(refData)) {
// need to transfer
msg.data = data;
if (refData) {
refs.add(refData);
}
}
if (indices && (!refIndices || !refs.has(refIndices))) {
// need to transfer
msg.indices = indices!;
if (refIndices) {
refs.add(refIndices);
}
}
console.log(msg);

worker.postMessage(msg);
});
}

push<M, R, T>(type: string, args: Exclude<M, IWorkerMessage>, transferAbles: ArrayBuffer[], toResult: (r: R) => T) {
return new Promise<T>((resolve) => {
const uid = this.workerTaskCounter++;
Expand All @@ -113,13 +163,47 @@ export class WorkerTaskScheduler {

worker.addEventListener('message', receiver);
tasks.add(uid);
worker.postMessage(Object.assign({
const msg = Object.assign({
type,
uid
}, args), transferAbles);
}, args);
console.log(msg);
worker.postMessage(msg, transferAbles);
});
}

setRef(ref: string, data: Float32Array | UIntTypedArray | Int32Array | IndicesArray) {
for (const w of this.workers) {
w.refs.add(ref);
}
this.broadCast('setRef', {
ref,
data
});
}

deleteRef(ref: string, startsWith = false) {
const uid = this.workerTaskCounter++;
const msg = {
type: 'deleteRef',
uid,
ref,
startsWith
};
for (const w of this.workers) {
w.worker.postMessage(msg);
if (startsWith) {
w.refs.delete(ref);
continue;
}
for (const r of Array.from(w.refs)) {
if (r.startsWith(ref)) {
w.refs.delete(r);
}
}
}
}

broadCast<T>(type: string, msg: T) {
const uid = this.workerTaskCounter++;
// don't store in tasks queue since there is no response
Expand All @@ -131,15 +215,4 @@ export class WorkerTaskScheduler {
}
}

private sendMessageTo(worker: Worker) {
return <T>(type: string, msg: T) => {
const uid = this.workerTaskCounter++;
// don't store in tasks queue since there is no response
worker.postMessage(Object.assign({
type,
uid
}, msg));
};
}

}
Loading

0 comments on commit 841c69e

Please sign in to comment.