Skip to content

Commit

Permalink
feat: Using a queue for sync scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
mnasyrov committed Oct 24, 2023
1 parent f84c95b commit e8e1882
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 22 deletions.
33 changes: 19 additions & 14 deletions src/core/schedulers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { dump } from '../test/dump';

import { Queue } from './utils';

export type TaskScheduler<T> = Readonly<{
isEmpty(): boolean;
schedule(entry: T): void;
Expand All @@ -11,6 +13,15 @@ export function defaultRunnableAction(task: Runnable): void {
task.run();
}

const queueMicrotaskPolyfill = (task: () => void): unknown =>
Promise.resolve().then(task);

// Polyfill is faster, the benchmark needs to be reviewed.
// const queueMicrotask =
// 'queueMicrotask' in global ? global.queueMicrotask : queueMicrotaskPolyfill;

const queueMicrotask = queueMicrotaskPolyfill;

export class MicrotaskScheduler<T> implements TaskScheduler<T> {
private queue: T[] = [];
private isActive = false;
Expand All @@ -27,7 +38,7 @@ export class MicrotaskScheduler<T> implements TaskScheduler<T> {

// if (prevSize === 0 && !this.isActive) {
if (prevSize === 0) {
Promise.resolve().then(() => this.execute());
queueMicrotask(this.execute);
}
};

Expand Down Expand Up @@ -59,33 +70,27 @@ export class MicrotaskScheduler<T> implements TaskScheduler<T> {
}

export class SyncTaskScheduler<T> implements TaskScheduler<T> {
private queue: T[] = [];
private queue = new Queue<T>();
private isActive = false;

constructor(private readonly action: (entry: T) => void) {}

isEmpty = (): boolean => this.queue.length === 0;
isEmpty = (): boolean => !this.queue.head;

schedule = (entry: T): void => {
this.queue.push(entry);
this.execute();
if (!this.isActive) this.execute();
};

execute = (): void => {
if (this.isActive) {
return;
}
if (this.isActive) return;

this.isActive = true;

try {
while (this.queue.length > 0) {
const list = this.queue;
this.queue = [];

for (const entry of list) {
this.action(entry);
}
let entry;
while ((entry = this.queue.get())) {
this.action(entry);
}
} finally {
this.isActive = false;
Expand Down
32 changes: 32 additions & 0 deletions src/core/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,35 @@ export function createLatch<T = void>(): Latch<T> {

return result;
}

export type QueueEntry<T> = { item: T; next?: QueueEntry<T> };

export class Queue<T> {
head: QueueEntry<T> | undefined;
tail: QueueEntry<T> | undefined;

push(item: T) {
const entry = { item };

if (this.tail) {
this.tail.next = entry;
} else {
this.head = entry;
}

this.tail = entry;
}

get(): T | undefined {
const entry = this.head;

if (entry) {
const next = (this.head = entry.next);
if (!next) this.tail = undefined;

return entry.item;
}

return undefined;
}
}
17 changes: 9 additions & 8 deletions src/test/dump.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export function dump(message: string, value?: any): void {
const clone =
value === undefined ? undefined : JSON.parse(JSON.stringify(value));

if (clone === undefined) {
console.log(`!!! ${message}`);
} else {
console.log(`!!! ${message}`, clone);
}
// const clone =
// value === undefined ? undefined : JSON.parse(JSON.stringify(value));
//
// if (clone === undefined) {
// console.log(`!!! ${message}`);
// } else {
// console.log(`!!! ${message}`, clone);
// }
}

0 comments on commit e8e1882

Please sign in to comment.