From cec7fd07fc07c1137a37437bdfb8840dfad37c97 Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Fri, 25 Jan 2019 07:56:10 -0800 Subject: [PATCH 1/7] feat(context): introduce context listener for bind/unbind events --- packages/context/src/context-listener.ts | 49 ++++++++ packages/context/src/context.ts | 140 ++++++++++++++++++++- packages/context/src/index.ts | 1 + packages/context/test/unit/context.unit.ts | 111 +++++++++++++++- 4 files changed, 295 insertions(+), 6 deletions(-) create mode 100644 packages/context/src/context-listener.ts diff --git a/packages/context/src/context-listener.ts b/packages/context/src/context-listener.ts new file mode 100644 index 000000000000..d26a60b871f9 --- /dev/null +++ b/packages/context/src/context-listener.ts @@ -0,0 +1,49 @@ +// Copyright IBM Corp. 2018. All Rights Reserved. +// Node module: @loopback/context +// This file is licensed under the MIT License. +// License text available at https://opensource.org/licenses/MIT + +import {Binding} from './binding'; +import {BindingFilter} from './binding-filter'; +import {ValueOrPromise} from './value-promise'; + +/** + * Context event types. We support `bind` and `unbind` for now but + * keep it open for new types + */ +export type ContextEventType = 'bind' | 'unbind' | string; + +/** + * Listeners of context bind/unbind events + */ +export interface ContextEventListener { + /** + * A filter function to match bindings + */ + filter?: BindingFilter; + + /** + * Listen on `bind`, `unbind`, or other events + * @param eventType Context event type + * @param binding The binding as event source + */ + listen( + eventType: ContextEventType, + binding: Readonly>, + ): ValueOrPromise; +} + +/** + * Subscription of context events. It's modeled after + * https://github.com/tc39/proposal-observable. + */ +export interface Subscription { + /** + * unsubscribe + */ + unsubscribe(): void; + /** + * Is the subscription closed? + */ + closed: boolean; +} diff --git a/packages/context/src/context.ts b/packages/context/src/context.ts index aa862b015886..3e4d0615fa78 100644 --- a/packages/context/src/context.ts +++ b/packages/context/src/context.ts @@ -7,10 +7,15 @@ import * as debugModule from 'debug'; import {v1 as uuidv1} from 'uuid'; import {ValueOrPromise} from '.'; import {Binding, BindingTag} from './binding'; +import {BindingFilter, filterByKey, filterByTag} from './binding-filter'; import {BindingAddress, BindingKey} from './binding-key'; +import { + ContextEventListener, + ContextEventType, + Subscription, +} from './context-listener'; import {ResolutionOptions, ResolutionSession} from './resolution-session'; import {BoundValue, getDeepProperty, isPromiseLike} from './value-promise'; -import {BindingFilter, filterByKey, filterByTag} from './binding-filter'; const debug = debugModule('loopback:context'); @@ -34,8 +39,29 @@ export class Context { protected _parent?: Context; /** - * Create a new context + * A list of registered context listeners + */ + protected readonly listeners: Set = new Set(); + + /** + * Create a new context. For example, + * ```ts + * // Create a new root context, let the framework to create a unique name + * const rootCtx = new Context(); + * + * // Create a new child context inheriting bindings from `rootCtx` + * const childCtx = new Context(rootCtx); + * + * // Create another root context called "application" + * const appCtx = new Context('application'); + * + * // Create a new child context called "request" and inheriting bindings + * // from `appCtx` + * const reqCtx = new Context(appCtx, 'request'); + * ``` * @param _parent The optional parent context + * @param name Name of the context, if not provided, a `uuid` will be + * generated as the name */ constructor(_parent?: Context | string, name?: string) { if (typeof _parent === 'string') { @@ -72,14 +98,21 @@ export class Context { debug('Adding binding: %s', key); } + let existingBinding: Binding | undefined; const keyExists = this.registry.has(key); if (keyExists) { - const existingBinding = this.registry.get(key); + existingBinding = this.registry.get(key); const bindingIsLocked = existingBinding && existingBinding.isLocked; if (bindingIsLocked) throw new Error(`Cannot rebind key "${key}" to a locked binding`); } this.registry.set(key, binding); + if (existingBinding !== binding) { + if (existingBinding != null) { + this.notifyListeners('unbind', existingBinding); + } + this.notifyListeners('bind', binding); + } return this; } @@ -96,10 +129,75 @@ export class Context { unbind(key: BindingAddress): boolean { key = BindingKey.validate(key); const binding = this.registry.get(key); + // If not found, return `false` if (binding == null) return false; if (binding && binding.isLocked) throw new Error(`Cannot unbind key "${key}" of a locked binding`); - return this.registry.delete(key); + this.registry.delete(key); + this.notifyListeners('unbind', binding); + return true; + } + + /** + * Add the context listener as an event listener to the context chain, + * including its ancestors + * @param listener Context listener + */ + subscribe(listener: ContextEventListener): Subscription { + let ctx: Context | undefined = this; + while (ctx != null) { + ctx.listeners.add(listener); + ctx = ctx._parent; + } + return new ContextSubscription(this, listener); + } + + /** + * Remove the context listener from the context chain + * @param listener Context listener + */ + unsubscribe(listener: ContextEventListener) { + let ctx: Context | undefined = this; + while (ctx != null) { + ctx.listeners.delete(listener); + ctx = ctx._parent; + } + } + + /** + * Check if a listener is subscribed to this context + * @param listener Context listener + */ + isSubscribed(listener: ContextEventListener) { + return this.listeners.has(listener); + } + + /** + * Publish an event to the registered listeners. Please note the + * notification happens using `process.nextTick` so that we allow fluent APIs + * such as `ctx.bind('key').to(...).tag(...);` and give listeners the fully + * populated binding + * + * @param event Event names: `bind` or `unbind` + * @param binding Binding bound or unbound + */ + protected notifyListeners( + event: ContextEventType, + binding: Readonly>, + ) { + // Notify listeners in the next tick + process.nextTick(async () => { + for (const listener of this.listeners) { + if (!listener.filter || listener.filter(binding)) { + try { + await listener.listen(event, binding); + } catch (err) { + debug('Error thrown by a listener is ignored', err, event, binding); + // Ignore the error + } + } + } + }); } /** @@ -150,6 +248,19 @@ export class Context { * - return `true` to include the binding in the results * - return `false` to exclude it. */ + find( + pattern?: string | RegExp, + ): Readonly>[]; + + /** + * Find bindings using a filter function + * @param filter A function to test on the binding. It returns `true` to + * include the binding or `false` to exclude the binding. + */ + find( + filter: BindingFilter, + ): Readonly>[]; + find( pattern?: string | RegExp | BindingFilter, ): Readonly>[] { @@ -451,3 +562,24 @@ export class Context { return json; } } + +/** + * An implementation of `Subscription` interface for context events + */ +class ContextSubscription implements Subscription { + constructor( + protected context: Context, + protected listener: ContextEventListener, + ) {} + + private _closed = false; + + unsubscribe() { + this.context.unsubscribe(this.listener); + this._closed = true; + } + + get closed() { + return this._closed; + } +} diff --git a/packages/context/src/index.ts b/packages/context/src/index.ts index 4a83b7c580ef..a8695222ea30 100644 --- a/packages/context/src/index.ts +++ b/packages/context/src/index.ts @@ -10,6 +10,7 @@ export * from './binding-inspector'; export * from './binding-key'; export * from './binding-filter'; export * from './context'; +export * from './context-listener'; export * from './inject'; export * from './keys'; export * from './provider'; diff --git a/packages/context/test/unit/context.unit.ts b/packages/context/test/unit/context.unit.ts index e0896a8c9265..4c8d99c97b7b 100644 --- a/packages/context/test/unit/context.unit.ts +++ b/packages/context/test/unit/context.unit.ts @@ -5,14 +5,18 @@ import {expect} from '@loopback/testlab'; import { - Context, Binding, + BindingKey, BindingScope, BindingType, + Context, + ContextEventListener, isPromiseLike, - BindingKey, } from '../..'; +import {promisify} from 'util'; +const nextTick = promisify(process.nextTick); + /** * Create a subclass of context so that we can access parents and registry * for assertions @@ -681,6 +685,109 @@ describe('Context', () => { }); }); + describe('listener subscription', () => { + let nonMatchingListener: ContextEventListener; + + beforeEach(givenNonMatchingListener); + + it('subscribes listeners', () => { + ctx.subscribe(nonMatchingListener); + expect(ctx.isSubscribed(nonMatchingListener)).to.true(); + }); + + it('unsubscribes listeners', () => { + ctx.subscribe(nonMatchingListener); + expect(ctx.isSubscribed(nonMatchingListener)).to.true(); + ctx.unsubscribe(nonMatchingListener); + expect(ctx.isSubscribed(nonMatchingListener)).to.false(); + }); + + it('allows subscription.unsubscribe()', () => { + const subscription = ctx.subscribe(nonMatchingListener); + expect(ctx.isSubscribed(nonMatchingListener)).to.true(); + subscription.unsubscribe(); + expect(ctx.isSubscribed(nonMatchingListener)).to.false(); + expect(subscription.closed).to.be.true(); + }); + + it('registers listeners on context chain', () => { + const childCtx = new Context(ctx, 'child'); + childCtx.subscribe(nonMatchingListener); + expect(childCtx.isSubscribed(nonMatchingListener)).to.true(); + expect(ctx.isSubscribed(nonMatchingListener)).to.true(); + }); + + it('un-registers listeners on context chain', () => { + const childCtx = new Context(ctx, 'child'); + childCtx.subscribe(nonMatchingListener); + expect(childCtx.isSubscribed(nonMatchingListener)).to.true(); + expect(ctx.isSubscribed(nonMatchingListener)).to.true(); + childCtx.unsubscribe(nonMatchingListener); + expect(childCtx.isSubscribed(nonMatchingListener)).to.false(); + expect(ctx.isSubscribed(nonMatchingListener)).to.false(); + }); + + function givenNonMatchingListener() { + nonMatchingListener = { + filter: binding => false, + listen: (event, binding) => {}, + }; + } + }); + + describe('event notification', () => { + let matchingListener: ContextEventListener; + let nonMatchingListener: ContextEventListener; + const events: string[] = []; + let nonMatchingListenerCalled = false; + + beforeEach(givenListeners); + + it('emits bind event to matching listeners', async () => { + ctx.bind('foo').to('foo'); + await nextTick(); + expect(events).to.eql(['foo:bind']); + expect(nonMatchingListenerCalled).to.be.false(); + }); + + it('emits unbind event to matching listeners', async () => { + ctx.bind('foo').to('foo'); + await nextTick(); + ctx.unbind('foo'); + await nextTick(); + expect(events).to.eql(['foo:bind', 'foo:unbind']); + expect(nonMatchingListenerCalled).to.be.false(); + }); + + it('does not trigger listeners if affected binding is the same', async () => { + const binding = ctx.bind('foo').to('foo'); + await nextTick(); + expect(events).to.eql(['foo:bind']); + ctx.add(binding); + await nextTick(); + expect(events).to.eql(['foo:bind']); + }); + + function givenListeners() { + nonMatchingListenerCalled = false; + events.splice(0, events.length); + nonMatchingListener = { + filter: binding => false, + listen: (event, binding) => { + nonMatchingListenerCalled = true; + }, + }; + matchingListener = { + filter: binding => true, + listen: (event, binding) => { + events.push(`${binding.key}:${event}`); + }, + }; + ctx.subscribe(nonMatchingListener); + ctx.subscribe(matchingListener); + } + }); + function createContext() { ctx = new Context(); } From c2b3218e873f8a5361387e028819ac4d989afd12 Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Mon, 28 Jan 2019 10:43:36 -0800 Subject: [PATCH 2/7] chore(context): use queue to handle event notifications --- packages/context/package.json | 1 + packages/context/src/context.ts | 69 ++++++++-------- packages/context/test/unit/context.unit.ts | 92 +++++++++++++++++----- 3 files changed, 111 insertions(+), 51 deletions(-) diff --git a/packages/context/package.json b/packages/context/package.json index 5df62eefa0fb..4d0db7153473 100644 --- a/packages/context/package.json +++ b/packages/context/package.json @@ -21,6 +21,7 @@ "dependencies": { "@loopback/metadata": "^1.0.5", "debug": "^4.0.1", + "queue": "^5.0.0", "uuid": "^3.2.1" }, "devDependencies": { diff --git a/packages/context/src/context.ts b/packages/context/src/context.ts index 3e4d0615fa78..6cc6ff677852 100644 --- a/packages/context/src/context.ts +++ b/packages/context/src/context.ts @@ -4,6 +4,7 @@ // License text available at https://opensource.org/licenses/MIT import * as debugModule from 'debug'; +import Queue from 'queue'; import {v1 as uuidv1} from 'uuid'; import {ValueOrPromise} from '.'; import {Binding, BindingTag} from './binding'; @@ -43,6 +44,11 @@ export class Context { */ protected readonly listeners: Set = new Set(); + /** + * Queue for context event notifications + */ + protected readonly eventQueue = new Queue({concurrency: 1, autostart: true}); + /** * Create a new context. For example, * ```ts @@ -174,30 +180,44 @@ export class Context { /** * Publish an event to the registered listeners. Please note the - * notification happens using `process.nextTick` so that we allow fluent APIs - * such as `ctx.bind('key').to(...).tag(...);` and give listeners the fully - * populated binding + * notification is queued and performed asynchronously so that we allow fluent + * APIs such as `ctx.bind('key').to(...).tag(...);` and give listeners the + * fully populated binding. * - * @param event Event names: `bind` or `unbind` + * @param eventType Event names: `bind` or `unbind` * @param binding Binding bound or unbound */ protected notifyListeners( - event: ContextEventType, + eventType: ContextEventType, binding: Readonly>, ) { - // Notify listeners in the next tick - process.nextTick(async () => { - for (const listener of this.listeners) { - if (!listener.filter || listener.filter(binding)) { - try { - await listener.listen(event, binding); - } catch (err) { - debug('Error thrown by a listener is ignored', err, event, binding); - // Ignore the error + if (this.listeners.size === 0) return; + // Schedule the notification task into the event queue + const task = () => { + return new Promise((resolve, reject) => { + // Run notifications in nextTick so that the binding is fully populated + process.nextTick(async () => { + for (const listener of this.listeners) { + if (!listener.filter || listener.filter(binding)) { + try { + await listener.listen(eventType, binding); + } catch (err) { + debug( + 'Error thrown by a listener is ignored', + err, + eventType, + binding, + ); + reject(err); + return; + } + } } - } - } - }); + resolve(); + }); + }); + }; + this.eventQueue.push(task); } /** @@ -235,7 +255,7 @@ export class Context { } /** - * Find bindings using the key pattern + * Find bindings using a key pattern or filter function * @param pattern A filter function, a regexp or a wildcard pattern with * optional `*` and `?`. Find returns such bindings where the key matches * the provided pattern. @@ -248,19 +268,6 @@ export class Context { * - return `true` to include the binding in the results * - return `false` to exclude it. */ - find( - pattern?: string | RegExp, - ): Readonly>[]; - - /** - * Find bindings using a filter function - * @param filter A function to test on the binding. It returns `true` to - * include the binding or `false` to exclude the binding. - */ - find( - filter: BindingFilter, - ): Readonly>[]; - find( pattern?: string | RegExp | BindingFilter, ): Readonly>[] { diff --git a/packages/context/test/unit/context.unit.ts b/packages/context/test/unit/context.unit.ts index 4c8d99c97b7b..950381c352f8 100644 --- a/packages/context/test/unit/context.unit.ts +++ b/packages/context/test/unit/context.unit.ts @@ -15,7 +15,7 @@ import { } from '../..'; import {promisify} from 'util'; -const nextTick = promisify(process.nextTick); +const setImmediatePromise = promisify(setImmediate); /** * Create a subclass of context so that we can access parents and registry @@ -29,6 +29,24 @@ class TestContext extends Context { const map = new Map(this.registry); return map; } + /** + * Wait until the context event queue is empty + */ + waitUntilEventsProcessed() { + return new Promise((resolve, reject) => { + if (this.eventQueue.length === 0) { + resolve(); + return; + } + this.eventQueue.on('end', err => { + if (err) reject(err); + else resolve(); + }); + this.eventQueue.on('error', err => { + reject(err); + }); + }); + } } describe('Context constructor', () => { @@ -65,7 +83,7 @@ describe('Context constructor', () => { }); describe('Context', () => { - let ctx: Context; + let ctx: TestContext; beforeEach('given a context', createContext); describe('bind', () => { @@ -736,59 +754,93 @@ describe('Context', () => { }); describe('event notification', () => { - let matchingListener: ContextEventListener; - let nonMatchingListener: ContextEventListener; const events: string[] = []; let nonMatchingListenerCalled = false; beforeEach(givenListeners); it('emits bind event to matching listeners', async () => { - ctx.bind('foo').to('foo'); - await nextTick(); - expect(events).to.eql(['foo:bind']); + ctx.bind('foo').to('foo-value'); + await ctx.waitUntilEventsProcessed(); + expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); expect(nonMatchingListenerCalled).to.be.false(); }); it('emits unbind event to matching listeners', async () => { - ctx.bind('foo').to('foo'); - await nextTick(); + ctx.bind('foo').to('foo-value'); + await ctx.waitUntilEventsProcessed(); ctx.unbind('foo'); - await nextTick(); - expect(events).to.eql(['foo:bind', 'foo:unbind']); + await ctx.waitUntilEventsProcessed(); + expect(events).to.eql([ + '1:foo:foo-value:bind', + '2:foo:foo-value:bind', + '1:foo:foo-value:unbind', + '2:foo:foo-value:unbind', + ]); expect(nonMatchingListenerCalled).to.be.false(); }); it('does not trigger listeners if affected binding is the same', async () => { - const binding = ctx.bind('foo').to('foo'); - await nextTick(); - expect(events).to.eql(['foo:bind']); + const binding = ctx.bind('foo').to('foo-value'); + await ctx.waitUntilEventsProcessed(); + expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); ctx.add(binding); - await nextTick(); - expect(events).to.eql(['foo:bind']); + await ctx.waitUntilEventsProcessed(); + expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); + }); + + it('reports error if a listener fails', () => { + ctx.bind('bar').to('bar-value'); + return expect(ctx.waitUntilEventsProcessed()).to.be.rejectedWith( + 'something wrong', + ); }); function givenListeners() { nonMatchingListenerCalled = false; events.splice(0, events.length); - nonMatchingListener = { + // A listener does not match the criteria + const nonMatchingListener: ContextEventListener = { filter: binding => false, listen: (event, binding) => { nonMatchingListenerCalled = true; }, }; - matchingListener = { + // A sync listener matches the criteria + const matchingListener: ContextEventListener = { filter: binding => true, listen: (event, binding) => { - events.push(`${binding.key}:${event}`); + // Make sure the binding is configured with value + // when the listener is notified + const val = binding.getValue(ctx); + events.push(`1:${binding.key}:${val}:${event}`); + }, + }; + // An async listener matches the criteria + const matchingAsyncListener: ContextEventListener = { + filter: binding => true, + listen: async (event, binding) => { + await setImmediatePromise(); + const val = binding.getValue(ctx); + events.push(`2:${binding.key}:${val}:${event}`); + }, + }; + // An async listener matches the criteria that throws an error + const matchingAsyncListenerWithError: ContextEventListener = { + filter: binding => binding.key === 'bar', + listen: async (event, binding) => { + await setImmediatePromise(); + throw new Error('something wrong'); }, }; ctx.subscribe(nonMatchingListener); ctx.subscribe(matchingListener); + ctx.subscribe(matchingAsyncListener); + ctx.subscribe(matchingAsyncListenerWithError); } }); function createContext() { - ctx = new Context(); + ctx = new TestContext(); } }); From 2d9209321af31f1b78bac7abdcee436f6da3d4c1 Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Tue, 29 Jan 2019 08:14:32 -0800 Subject: [PATCH 3/7] chore(context): address review comments --- packages/context/src/context-listener.ts | 5 ++- packages/context/src/context.ts | 18 ++++------- packages/context/test/unit/context.unit.ts | 37 ++++++++++++++-------- 3 files changed, 34 insertions(+), 26 deletions(-) diff --git a/packages/context/src/context-listener.ts b/packages/context/src/context-listener.ts index d26a60b871f9..26949468670f 100644 --- a/packages/context/src/context-listener.ts +++ b/packages/context/src/context-listener.ts @@ -6,6 +6,7 @@ import {Binding} from './binding'; import {BindingFilter} from './binding-filter'; import {ValueOrPromise} from './value-promise'; +import {Context} from './context'; /** * Context event types. We support `bind` and `unbind` for now but @@ -18,7 +19,8 @@ export type ContextEventType = 'bind' | 'unbind' | string; */ export interface ContextEventListener { /** - * A filter function to match bindings + * An optional filter function to match bindings. If not present, the listener + * will be notified of all binding events. */ filter?: BindingFilter; @@ -30,6 +32,7 @@ export interface ContextEventListener { listen( eventType: ContextEventType, binding: Readonly>, + context: Context, ): ValueOrPromise; } diff --git a/packages/context/src/context.ts b/packages/context/src/context.ts index 6cc6ff677852..8e5c9d40898b 100644 --- a/packages/context/src/context.ts +++ b/packages/context/src/context.ts @@ -145,9 +145,8 @@ export class Context { } /** - * Add the context listener as an event listener to the context chain, - * including its ancestors - * @param listener Context listener + * Add a context event listener to the context chain, including its ancestors + * @param listener Context event listener */ subscribe(listener: ContextEventListener): Subscription { let ctx: Context | undefined = this; @@ -159,8 +158,8 @@ export class Context { } /** - * Remove the context listener from the context chain - * @param listener Context listener + * Remove the context event listener from the context chain + * @param listener Context event listener */ unsubscribe(listener: ContextEventListener) { let ctx: Context | undefined = this; @@ -200,14 +199,9 @@ export class Context { for (const listener of this.listeners) { if (!listener.filter || listener.filter(binding)) { try { - await listener.listen(eventType, binding); + await listener.listen(eventType, binding, this); } catch (err) { - debug( - 'Error thrown by a listener is ignored', - err, - eventType, - binding, - ); + debug(err, eventType, binding); reject(err); return; } diff --git a/packages/context/test/unit/context.unit.ts b/packages/context/test/unit/context.unit.ts index 950381c352f8..f80091ee9f8e 100644 --- a/packages/context/test/unit/context.unit.ts +++ b/packages/context/test/unit/context.unit.ts @@ -15,7 +15,7 @@ import { } from '../..'; import {promisify} from 'util'; -const setImmediatePromise = promisify(setImmediate); +const setImmediateAsync = promisify(setImmediate); /** * Create a subclass of context so that we can access parents and registry @@ -30,7 +30,7 @@ class TestContext extends Context { return map; } /** - * Wait until the context event queue is empty + * Wait until the context event queue is empty or an error is thrown */ waitUntilEventsProcessed() { return new Promise((resolve, reject) => { @@ -38,11 +38,11 @@ class TestContext extends Context { resolve(); return; } - this.eventQueue.on('end', err => { + this.eventQueue.once('end', err => { if (err) reject(err); else resolve(); }); - this.eventQueue.on('error', err => { + this.eventQueue.once('error', err => { reject(err); }); }); @@ -759,13 +759,25 @@ describe('Context', () => { beforeEach(givenListeners); - it('emits bind event to matching listeners', async () => { + it('emits one bind event to matching listeners', async () => { ctx.bind('foo').to('foo-value'); await ctx.waitUntilEventsProcessed(); expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); expect(nonMatchingListenerCalled).to.be.false(); }); + it('emits multiple bind events to matching listeners', async () => { + ctx.bind('foo').to('foo-value'); + ctx.bind('xyz').to('xyz-value'); + await ctx.waitUntilEventsProcessed(); + expect(events).to.eql([ + '1:foo:foo-value:bind', + '2:foo:foo-value:bind', + '1:xyz:xyz-value:bind', + '2:xyz:xyz-value:bind', + ]); + }); + it('emits unbind event to matching listeners', async () => { ctx.bind('foo').to('foo-value'); await ctx.waitUntilEventsProcessed(); @@ -808,28 +820,27 @@ describe('Context', () => { }; // A sync listener matches the criteria const matchingListener: ContextEventListener = { - filter: binding => true, - listen: (event, binding) => { + listen: (event, binding, context) => { // Make sure the binding is configured with value // when the listener is notified - const val = binding.getValue(ctx); + const val = binding.getValue(context); events.push(`1:${binding.key}:${val}:${event}`); }, }; // An async listener matches the criteria const matchingAsyncListener: ContextEventListener = { filter: binding => true, - listen: async (event, binding) => { - await setImmediatePromise(); - const val = binding.getValue(ctx); + listen: async (event, binding, context) => { + await setImmediateAsync(); + const val = binding.getValue(context); events.push(`2:${binding.key}:${val}:${event}`); }, }; // An async listener matches the criteria that throws an error const matchingAsyncListenerWithError: ContextEventListener = { filter: binding => binding.key === 'bar', - listen: async (event, binding) => { - await setImmediatePromise(); + listen: async () => { + await setImmediateAsync(); throw new Error('something wrong'); }, }; From 27aa83e9631ee2668a95f30730065c34ce96ac57 Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Tue, 29 Jan 2019 15:18:26 -0800 Subject: [PATCH 4/7] chore(context): make Context extend from EventEmitter --- packages/context/src/context.ts | 45 +++++++++++++++++----- packages/context/test/unit/context.unit.ts | 4 +- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/packages/context/src/context.ts b/packages/context/src/context.ts index 8e5c9d40898b..d47c9d38947b 100644 --- a/packages/context/src/context.ts +++ b/packages/context/src/context.ts @@ -4,6 +4,7 @@ // License text available at https://opensource.org/licenses/MIT import * as debugModule from 'debug'; +import {EventEmitter} from 'events'; import Queue from 'queue'; import {v1 as uuidv1} from 'uuid'; import {ValueOrPromise} from '.'; @@ -23,7 +24,7 @@ const debug = debugModule('loopback:context'); /** * Context provides an implementation of Inversion of Control (IoC) container */ -export class Context { +export class Context extends EventEmitter { /** * Name of the context */ @@ -42,7 +43,7 @@ export class Context { /** * A list of registered context listeners */ - protected readonly listeners: Set = new Set(); + protected readonly observers: Set = new Set(); /** * Queue for context event notifications @@ -70,12 +71,36 @@ export class Context { * generated as the name */ constructor(_parent?: Context | string, name?: string) { + super(); if (typeof _parent === 'string') { name = _parent; _parent = undefined; } this._parent = _parent; this.name = name || uuidv1(); + + this.setupEventHandlers(); + } + + /** + * Set up an internal listener to notify registered observers asynchronously + * upon `bind` and `unbind` events + */ + private setupEventHandlers() { + for (const event of ['bind', 'unbind']) { + this.on(event, (binding: Readonly>) => { + this.notifyListeners(event, binding); + }); + } + this.eventQueue.on('error', err => { + this.emit('error', err); + }); + this.eventQueue.on('end', err => { + if (err) this.emit('error', err); + else { + this.emit('observersNotified'); + } + }); } /** @@ -115,9 +140,9 @@ export class Context { this.registry.set(key, binding); if (existingBinding !== binding) { if (existingBinding != null) { - this.notifyListeners('unbind', existingBinding); + this.emit('unbind', existingBinding); } - this.notifyListeners('bind', binding); + this.emit('bind', binding); } return this; } @@ -140,7 +165,7 @@ export class Context { if (binding && binding.isLocked) throw new Error(`Cannot unbind key "${key}" of a locked binding`); this.registry.delete(key); - this.notifyListeners('unbind', binding); + this.emit('unbind', binding); return true; } @@ -151,7 +176,7 @@ export class Context { subscribe(listener: ContextEventListener): Subscription { let ctx: Context | undefined = this; while (ctx != null) { - ctx.listeners.add(listener); + ctx.observers.add(listener); ctx = ctx._parent; } return new ContextSubscription(this, listener); @@ -164,7 +189,7 @@ export class Context { unsubscribe(listener: ContextEventListener) { let ctx: Context | undefined = this; while (ctx != null) { - ctx.listeners.delete(listener); + ctx.observers.delete(listener); ctx = ctx._parent; } } @@ -174,7 +199,7 @@ export class Context { * @param listener Context listener */ isSubscribed(listener: ContextEventListener) { - return this.listeners.has(listener); + return this.observers.has(listener); } /** @@ -190,13 +215,13 @@ export class Context { eventType: ContextEventType, binding: Readonly>, ) { - if (this.listeners.size === 0) return; + if (this.observers.size === 0) return; // Schedule the notification task into the event queue const task = () => { return new Promise((resolve, reject) => { // Run notifications in nextTick so that the binding is fully populated process.nextTick(async () => { - for (const listener of this.listeners) { + for (const listener of this.observers) { if (!listener.filter || listener.filter(binding)) { try { await listener.listen(eventType, binding, this); diff --git a/packages/context/test/unit/context.unit.ts b/packages/context/test/unit/context.unit.ts index f80091ee9f8e..f472ca3816f0 100644 --- a/packages/context/test/unit/context.unit.ts +++ b/packages/context/test/unit/context.unit.ts @@ -38,11 +38,11 @@ class TestContext extends Context { resolve(); return; } - this.eventQueue.once('end', err => { + this.once('observersNotified', err => { if (err) reject(err); else resolve(); }); - this.eventQueue.once('error', err => { + this.once('error', err => { reject(err); }); }); From c48722b514e8e7fa0edc3349288652269740308d Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Wed, 30 Jan 2019 08:22:31 -0800 Subject: [PATCH 5/7] chore(context): rename ContextEventListener to ContextObserver --- ...ontext-listener.ts => context-observer.ts} | 6 +- packages/context/src/context.ts | 54 +++---- packages/context/src/index.ts | 2 +- packages/context/test/unit/context.unit.ts | 132 +++++++++--------- 4 files changed, 98 insertions(+), 96 deletions(-) rename packages/context/src/{context-listener.ts => context-observer.ts} (93%) diff --git a/packages/context/src/context-listener.ts b/packages/context/src/context-observer.ts similarity index 93% rename from packages/context/src/context-listener.ts rename to packages/context/src/context-observer.ts index 26949468670f..c53b39d273d1 100644 --- a/packages/context/src/context-listener.ts +++ b/packages/context/src/context-observer.ts @@ -15,9 +15,9 @@ import {Context} from './context'; export type ContextEventType = 'bind' | 'unbind' | string; /** - * Listeners of context bind/unbind events + * Observers of context bind/unbind events */ -export interface ContextEventListener { +export interface ContextObserver { /** * An optional filter function to match bindings. If not present, the listener * will be notified of all binding events. @@ -29,7 +29,7 @@ export interface ContextEventListener { * @param eventType Context event type * @param binding The binding as event source */ - listen( + observe( eventType: ContextEventType, binding: Readonly>, context: Context, diff --git a/packages/context/src/context.ts b/packages/context/src/context.ts index d47c9d38947b..53d3d411c2e9 100644 --- a/packages/context/src/context.ts +++ b/packages/context/src/context.ts @@ -12,10 +12,10 @@ import {Binding, BindingTag} from './binding'; import {BindingFilter, filterByKey, filterByTag} from './binding-filter'; import {BindingAddress, BindingKey} from './binding-key'; import { - ContextEventListener, + ContextObserver, ContextEventType, Subscription, -} from './context-listener'; +} from './context-observer'; import {ResolutionOptions, ResolutionSession} from './resolution-session'; import {BoundValue, getDeepProperty, isPromiseLike} from './value-promise'; @@ -41,9 +41,9 @@ export class Context extends EventEmitter { protected _parent?: Context; /** - * A list of registered context listeners + * A list of registered context observers */ - protected readonly observers: Set = new Set(); + protected readonly observers: Set = new Set(); /** * Queue for context event notifications @@ -88,10 +88,12 @@ export class Context extends EventEmitter { */ private setupEventHandlers() { for (const event of ['bind', 'unbind']) { + // Listen on events and notify observers this.on(event, (binding: Readonly>) => { - this.notifyListeners(event, binding); + this.notifyObservers(event, binding); }); } + // Relay events from the event queue this.eventQueue.on('error', err => { this.emit('error', err); }); @@ -170,48 +172,48 @@ export class Context extends EventEmitter { } /** - * Add a context event listener to the context chain, including its ancestors - * @param listener Context event listener + * Add a context event observer to the context chain, including its ancestors + * @param observer Context event observer */ - subscribe(listener: ContextEventListener): Subscription { + subscribe(observer: ContextObserver): Subscription { let ctx: Context | undefined = this; while (ctx != null) { - ctx.observers.add(listener); + ctx.observers.add(observer); ctx = ctx._parent; } - return new ContextSubscription(this, listener); + return new ContextSubscription(this, observer); } /** - * Remove the context event listener from the context chain - * @param listener Context event listener + * Remove the context event observer from the context chain + * @param observer Context event observer */ - unsubscribe(listener: ContextEventListener) { + unsubscribe(observer: ContextObserver) { let ctx: Context | undefined = this; while (ctx != null) { - ctx.observers.delete(listener); + ctx.observers.delete(observer); ctx = ctx._parent; } } /** - * Check if a listener is subscribed to this context - * @param listener Context listener + * Check if an observer is subscribed to this context + * @param observer Context observer */ - isSubscribed(listener: ContextEventListener) { - return this.observers.has(listener); + isSubscribed(observer: ContextObserver) { + return this.observers.has(observer); } /** - * Publish an event to the registered listeners. Please note the + * Publish an event to the registered observers. Please note the * notification is queued and performed asynchronously so that we allow fluent - * APIs such as `ctx.bind('key').to(...).tag(...);` and give listeners the + * APIs such as `ctx.bind('key').to(...).tag(...);` and give observers the * fully populated binding. * * @param eventType Event names: `bind` or `unbind` * @param binding Binding bound or unbound */ - protected notifyListeners( + protected notifyObservers( eventType: ContextEventType, binding: Readonly>, ) { @@ -221,10 +223,10 @@ export class Context extends EventEmitter { return new Promise((resolve, reject) => { // Run notifications in nextTick so that the binding is fully populated process.nextTick(async () => { - for (const listener of this.observers) { - if (!listener.filter || listener.filter(binding)) { + for (const observer of this.observers) { + if (!observer.filter || observer.filter(binding)) { try { - await listener.listen(eventType, binding, this); + await observer.observe(eventType, binding, this); } catch (err) { debug(err, eventType, binding); reject(err); @@ -595,13 +597,13 @@ export class Context extends EventEmitter { class ContextSubscription implements Subscription { constructor( protected context: Context, - protected listener: ContextEventListener, + protected observer: ContextObserver, ) {} private _closed = false; unsubscribe() { - this.context.unsubscribe(this.listener); + this.context.unsubscribe(this.observer); this._closed = true; } diff --git a/packages/context/src/index.ts b/packages/context/src/index.ts index a8695222ea30..0b204e662562 100644 --- a/packages/context/src/index.ts +++ b/packages/context/src/index.ts @@ -10,7 +10,7 @@ export * from './binding-inspector'; export * from './binding-key'; export * from './binding-filter'; export * from './context'; -export * from './context-listener'; +export * from './context-observer'; export * from './inject'; export * from './keys'; export * from './provider'; diff --git a/packages/context/test/unit/context.unit.ts b/packages/context/test/unit/context.unit.ts index f472ca3816f0..53838a3225c9 100644 --- a/packages/context/test/unit/context.unit.ts +++ b/packages/context/test/unit/context.unit.ts @@ -10,7 +10,7 @@ import { BindingScope, BindingType, Context, - ContextEventListener, + ContextObserver, isPromiseLike, } from '../..'; @@ -32,7 +32,7 @@ class TestContext extends Context { /** * Wait until the context event queue is empty or an error is thrown */ - waitUntilEventsProcessed() { + waitUntilObserversNotified() { return new Promise((resolve, reject) => { if (this.eventQueue.length === 0) { resolve(); @@ -703,73 +703,73 @@ describe('Context', () => { }); }); - describe('listener subscription', () => { - let nonMatchingListener: ContextEventListener; + describe('observer subscription', () => { + let nonMatchingObserver: ContextObserver; - beforeEach(givenNonMatchingListener); + beforeEach(givenNonMatchingObserver); - it('subscribes listeners', () => { - ctx.subscribe(nonMatchingListener); - expect(ctx.isSubscribed(nonMatchingListener)).to.true(); + it('subscribes observers', () => { + ctx.subscribe(nonMatchingObserver); + expect(ctx.isSubscribed(nonMatchingObserver)).to.true(); }); - it('unsubscribes listeners', () => { - ctx.subscribe(nonMatchingListener); - expect(ctx.isSubscribed(nonMatchingListener)).to.true(); - ctx.unsubscribe(nonMatchingListener); - expect(ctx.isSubscribed(nonMatchingListener)).to.false(); + it('unsubscribes observers', () => { + ctx.subscribe(nonMatchingObserver); + expect(ctx.isSubscribed(nonMatchingObserver)).to.true(); + ctx.unsubscribe(nonMatchingObserver); + expect(ctx.isSubscribed(nonMatchingObserver)).to.false(); }); it('allows subscription.unsubscribe()', () => { - const subscription = ctx.subscribe(nonMatchingListener); - expect(ctx.isSubscribed(nonMatchingListener)).to.true(); + const subscription = ctx.subscribe(nonMatchingObserver); + expect(ctx.isSubscribed(nonMatchingObserver)).to.true(); subscription.unsubscribe(); - expect(ctx.isSubscribed(nonMatchingListener)).to.false(); + expect(ctx.isSubscribed(nonMatchingObserver)).to.false(); expect(subscription.closed).to.be.true(); }); - it('registers listeners on context chain', () => { + it('registers observers on context chain', () => { const childCtx = new Context(ctx, 'child'); - childCtx.subscribe(nonMatchingListener); - expect(childCtx.isSubscribed(nonMatchingListener)).to.true(); - expect(ctx.isSubscribed(nonMatchingListener)).to.true(); + childCtx.subscribe(nonMatchingObserver); + expect(childCtx.isSubscribed(nonMatchingObserver)).to.true(); + expect(ctx.isSubscribed(nonMatchingObserver)).to.true(); }); - it('un-registers listeners on context chain', () => { + it('un-registers observers on context chain', () => { const childCtx = new Context(ctx, 'child'); - childCtx.subscribe(nonMatchingListener); - expect(childCtx.isSubscribed(nonMatchingListener)).to.true(); - expect(ctx.isSubscribed(nonMatchingListener)).to.true(); - childCtx.unsubscribe(nonMatchingListener); - expect(childCtx.isSubscribed(nonMatchingListener)).to.false(); - expect(ctx.isSubscribed(nonMatchingListener)).to.false(); + childCtx.subscribe(nonMatchingObserver); + expect(childCtx.isSubscribed(nonMatchingObserver)).to.true(); + expect(ctx.isSubscribed(nonMatchingObserver)).to.true(); + childCtx.unsubscribe(nonMatchingObserver); + expect(childCtx.isSubscribed(nonMatchingObserver)).to.false(); + expect(ctx.isSubscribed(nonMatchingObserver)).to.false(); }); - function givenNonMatchingListener() { - nonMatchingListener = { + function givenNonMatchingObserver() { + nonMatchingObserver = { filter: binding => false, - listen: (event, binding) => {}, + observe: (event, binding) => {}, }; } }); describe('event notification', () => { const events: string[] = []; - let nonMatchingListenerCalled = false; + let nonMatchingObserverCalled = false; - beforeEach(givenListeners); + beforeEach(givenObservers); - it('emits one bind event to matching listeners', async () => { + it('emits one bind event to matching observers', async () => { ctx.bind('foo').to('foo-value'); - await ctx.waitUntilEventsProcessed(); + await ctx.waitUntilObserversNotified(); expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); - expect(nonMatchingListenerCalled).to.be.false(); + expect(nonMatchingObserverCalled).to.be.false(); }); - it('emits multiple bind events to matching listeners', async () => { + it('emits multiple bind events to matching observers', async () => { ctx.bind('foo').to('foo-value'); ctx.bind('xyz').to('xyz-value'); - await ctx.waitUntilEventsProcessed(); + await ctx.waitUntilObserversNotified(); expect(events).to.eql([ '1:foo:foo-value:bind', '2:foo:foo-value:bind', @@ -778,76 +778,76 @@ describe('Context', () => { ]); }); - it('emits unbind event to matching listeners', async () => { + it('emits unbind event to matching observers', async () => { ctx.bind('foo').to('foo-value'); - await ctx.waitUntilEventsProcessed(); + await ctx.waitUntilObserversNotified(); ctx.unbind('foo'); - await ctx.waitUntilEventsProcessed(); + await ctx.waitUntilObserversNotified(); expect(events).to.eql([ '1:foo:foo-value:bind', '2:foo:foo-value:bind', '1:foo:foo-value:unbind', '2:foo:foo-value:unbind', ]); - expect(nonMatchingListenerCalled).to.be.false(); + expect(nonMatchingObserverCalled).to.be.false(); }); - it('does not trigger listeners if affected binding is the same', async () => { + it('does not trigger observers if affected binding is the same', async () => { const binding = ctx.bind('foo').to('foo-value'); - await ctx.waitUntilEventsProcessed(); + await ctx.waitUntilObserversNotified(); expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); ctx.add(binding); - await ctx.waitUntilEventsProcessed(); + await ctx.waitUntilObserversNotified(); expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); }); - it('reports error if a listener fails', () => { + it('reports error if an observer fails', () => { ctx.bind('bar').to('bar-value'); - return expect(ctx.waitUntilEventsProcessed()).to.be.rejectedWith( + return expect(ctx.waitUntilObserversNotified()).to.be.rejectedWith( 'something wrong', ); }); - function givenListeners() { - nonMatchingListenerCalled = false; + function givenObservers() { + nonMatchingObserverCalled = false; events.splice(0, events.length); - // A listener does not match the criteria - const nonMatchingListener: ContextEventListener = { + // An observer does not match the criteria + const nonMatchingObserver: ContextObserver = { filter: binding => false, - listen: (event, binding) => { - nonMatchingListenerCalled = true; + observe: (event, binding) => { + nonMatchingObserverCalled = true; }, }; - // A sync listener matches the criteria - const matchingListener: ContextEventListener = { - listen: (event, binding, context) => { + // A sync observer matches the criteria + const matchingObserver: ContextObserver = { + observe: (event, binding, context) => { // Make sure the binding is configured with value - // when the listener is notified + // when the observer is notified const val = binding.getValue(context); events.push(`1:${binding.key}:${val}:${event}`); }, }; - // An async listener matches the criteria - const matchingAsyncListener: ContextEventListener = { + // An async observer matches the criteria + const matchingAsyncObserver: ContextObserver = { filter: binding => true, - listen: async (event, binding, context) => { + observe: async (event, binding, context) => { await setImmediateAsync(); const val = binding.getValue(context); events.push(`2:${binding.key}:${val}:${event}`); }, }; - // An async listener matches the criteria that throws an error - const matchingAsyncListenerWithError: ContextEventListener = { + // An async observer matches the criteria that throws an error + const matchingAsyncObserverWithError: ContextObserver = { filter: binding => binding.key === 'bar', - listen: async () => { + observe: async () => { await setImmediateAsync(); throw new Error('something wrong'); }, }; - ctx.subscribe(nonMatchingListener); - ctx.subscribe(matchingListener); - ctx.subscribe(matchingAsyncListener); - ctx.subscribe(matchingAsyncListenerWithError); + ctx.subscribe(nonMatchingObserver); + ctx.subscribe(matchingObserver); + ctx.subscribe(matchingAsyncObserver); + ctx.subscribe(matchingAsyncObserverWithError); } }); From 985c090c785fd919cece6e635972d9253c3b605d Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Thu, 31 Jan 2019 09:48:09 -0800 Subject: [PATCH 6/7] chore(context): use p-event as the notification queue --- packages/context/package.json | 2 +- packages/context/src/context.ts | 89 ++++++++++++---------- packages/context/test/unit/context.unit.ts | 89 ++++++++++++++++++---- 3 files changed, 124 insertions(+), 56 deletions(-) diff --git a/packages/context/package.json b/packages/context/package.json index 4d0db7153473..accfc379c8d7 100644 --- a/packages/context/package.json +++ b/packages/context/package.json @@ -21,7 +21,7 @@ "dependencies": { "@loopback/metadata": "^1.0.5", "debug": "^4.0.1", - "queue": "^5.0.0", + "p-event": "^2.2.0", "uuid": "^3.2.1" }, "devDependencies": { diff --git a/packages/context/src/context.ts b/packages/context/src/context.ts index 53d3d411c2e9..dc39cdff69e7 100644 --- a/packages/context/src/context.ts +++ b/packages/context/src/context.ts @@ -5,20 +5,22 @@ import * as debugModule from 'debug'; import {EventEmitter} from 'events'; -import Queue from 'queue'; import {v1 as uuidv1} from 'uuid'; import {ValueOrPromise} from '.'; import {Binding, BindingTag} from './binding'; import {BindingFilter, filterByKey, filterByTag} from './binding-filter'; import {BindingAddress, BindingKey} from './binding-key'; import { - ContextObserver, ContextEventType, + ContextObserver, Subscription, } from './context-observer'; import {ResolutionOptions, ResolutionSession} from './resolution-session'; import {BoundValue, getDeepProperty, isPromiseLike} from './value-promise'; +// FIXME: `@types/p-event` is out of date against `p-event@2.2.0` +const pEvent = require('p-event'); + const debug = debugModule('loopback:context'); /** @@ -46,9 +48,9 @@ export class Context extends EventEmitter { protected readonly observers: Set = new Set(); /** - * Queue for context event notifications + * Internal counter for pending events which observers have not processed yet */ - protected readonly eventQueue = new Queue({concurrency: 1, autostart: true}); + private pendingEvents = 0; /** * Create a new context. For example, @@ -87,22 +89,44 @@ export class Context extends EventEmitter { * upon `bind` and `unbind` events */ private setupEventHandlers() { - for (const event of ['bind', 'unbind']) { - // Listen on events and notify observers - this.on(event, (binding: Readonly>) => { - this.notifyObservers(event, binding); - }); - } - // Relay events from the event queue - this.eventQueue.on('error', err => { - this.emit('error', err); + // Ideally p-event should allow multiple event types in an iterator + this.observeEvent('bind'); + this.observeEvent('unbind'); + } + + /** + * Listen on context events and notify observers + * @param eventType Context event type + */ + private async observeEvent(eventType: ContextEventType) { + this.on(eventType, () => { + // Track pending events + this.pendingEvents++; }); - this.eventQueue.on('end', err => { - if (err) this.emit('error', err); - else { - this.emit('observersNotified'); + // Create an async iterator from the given event type + const bindings: AsyncIterable>> = pEvent.iterator( + this, + eventType, + ); + for await (const binding of bindings) { + try { + await this.notifyObservers(eventType, binding); + this.pendingEvents--; + this.emit('idle'); + } catch (err) { + this.pendingEvents--; + this.emit('error', err); } - }); + } + } + + /** + * Wait until event notification is idle + */ + protected async waitForIdle() { + const count = this.pendingEvents; + if (count === 0) return; + await pEvent.multiple(this, 'idle', {count}); } /** @@ -213,32 +237,17 @@ export class Context extends EventEmitter { * @param eventType Event names: `bind` or `unbind` * @param binding Binding bound or unbound */ - protected notifyObservers( + protected async notifyObservers( eventType: ContextEventType, binding: Readonly>, ) { if (this.observers.size === 0) return; - // Schedule the notification task into the event queue - const task = () => { - return new Promise((resolve, reject) => { - // Run notifications in nextTick so that the binding is fully populated - process.nextTick(async () => { - for (const observer of this.observers) { - if (!observer.filter || observer.filter(binding)) { - try { - await observer.observe(eventType, binding, this); - } catch (err) { - debug(err, eventType, binding); - reject(err); - return; - } - } - } - resolve(); - }); - }); - }; - this.eventQueue.push(task); + + for (const observer of this.observers) { + if (!observer.filter || observer.filter(binding)) { + await observer.observe(eventType, binding, this); + } + } } /** diff --git a/packages/context/test/unit/context.unit.ts b/packages/context/test/unit/context.unit.ts index 53838a3225c9..c6621f3187ec 100644 --- a/packages/context/test/unit/context.unit.ts +++ b/packages/context/test/unit/context.unit.ts @@ -4,17 +4,19 @@ // License text available at https://opensource.org/licenses/MIT import {expect} from '@loopback/testlab'; +import {promisify} from 'util'; import { Binding, BindingKey, BindingScope, BindingType, Context, + ContextEventType, ContextObserver, + filterByTag, isPromiseLike, } from '../..'; -import {promisify} from 'util'; const setImmediateAsync = promisify(setImmediate); /** @@ -29,23 +31,12 @@ class TestContext extends Context { const map = new Map(this.registry); return map; } + /** * Wait until the context event queue is empty or an error is thrown */ - waitUntilObserversNotified() { - return new Promise((resolve, reject) => { - if (this.eventQueue.length === 0) { - resolve(); - return; - } - this.once('observersNotified', err => { - if (err) reject(err); - else resolve(); - }); - this.once('error', err => { - reject(err); - }); - }); + waitUntilObserversNotified(): Promise { + return this.waitForIdle(); } } @@ -851,6 +842,74 @@ describe('Context', () => { } }); + describe('event notification for context chain', () => { + let app: Context; + let server: Context; + + let contextListener: MyObserverForControllers; + beforeEach(givenControllerListener); + + it('receives notifications of matching binding events', async () => { + const controllers = await getControllers(); + // We have server: 1, app: 2 + // NOTE: The controllers are not guaranteed to be ['1', '2'] as the events + // are emitted by two context objects and they are processed asynchronously + expect(controllers).to.containEql('1'); + expect(controllers).to.containEql('2'); + server.unbind('controllers.1'); + // Now we have app: 2 + expect(await getControllers()).to.eql(['2']); + app.unbind('controllers.2'); + // All controllers are gone from the context chain + expect(await getControllers()).to.eql([]); + // Add a new controller - server: 3 + givenController(server, '3'); + expect(await getControllers()).to.eql(['3']); + }); + + class MyObserverForControllers implements ContextObserver { + controllers: Set = new Set(); + filter = filterByTag('controller'); + observe(event: ContextEventType, binding: Readonly>) { + if (event === 'bind') { + this.controllers.add(binding.tagMap.name); + } else if (event === 'unbind') { + this.controllers.delete(binding.tagMap.name); + } + } + } + + function givenControllerListener() { + givenServerWithinAnApp(); + contextListener = new MyObserverForControllers(); + server.subscribe(contextListener); + givenController(server, '1'); + givenController(app, '2'); + } + + function givenController(context: Context, controllerName: string) { + class MyController { + name = controllerName; + } + context + .bind(`controllers.${controllerName}`) + .toClass(MyController) + .tag('controller', {name: controllerName}); + } + + async function getControllers() { + return new Promise(resolve => { + // Wrap it inside `setImmediate` to make the events are triggered + setImmediate(() => resolve(Array.from(contextListener.controllers))); + }); + } + + function givenServerWithinAnApp() { + app = new Context('app'); + server = new Context(app, 'server'); + } + }); + function createContext() { ctx = new TestContext(); } From fd3464d79c3ff051e54f674156a527524f0cbabb Mon Sep 17 00:00:00 2001 From: Raymond Feng Date: Fri, 1 Feb 2019 09:12:51 -0800 Subject: [PATCH 7/7] chore(context): address review comments --- packages/context/src/context.ts | 45 +++++++++--- packages/context/test/unit/context.unit.ts | 85 ++++++++++++++++++---- 2 files changed, 104 insertions(+), 26 deletions(-) diff --git a/packages/context/src/context.ts b/packages/context/src/context.ts index dc39cdff69e7..61a1950baf1a 100644 --- a/packages/context/src/context.ts +++ b/packages/context/src/context.ts @@ -89,44 +89,63 @@ export class Context extends EventEmitter { * upon `bind` and `unbind` events */ private setupEventHandlers() { - // Ideally p-event should allow multiple event types in an iterator - this.observeEvent('bind'); - this.observeEvent('unbind'); + // The following are two async functions. Returned promises are ignored as + // they are long-running background tasks. + this.startNotificationTask('bind').catch(err => { + // Catch error to avoid lint violations + this.emit('error', err); + }); + this.startNotificationTask('unbind').catch(err => { + // Catch error to avoid lint violations + this.emit('error', err); + }); } /** - * Listen on context events and notify observers + * Start a background task to listen on context events and notify observers * @param eventType Context event type */ - private async observeEvent(eventType: ContextEventType) { + private async startNotificationTask(eventType: ContextEventType) { + let currentObservers = this.observers; this.on(eventType, () => { // Track pending events this.pendingEvents++; + // Take a snapshot of current observers to ensure notifications of this + // event will only be sent to current ones + currentObservers = new Set(this.observers); }); + // FIXME(rfeng): p-event should allow multiple event types in an iterator. // Create an async iterator from the given event type const bindings: AsyncIterable>> = pEvent.iterator( this, eventType, ); for await (const binding of bindings) { + // The loop will happen asynchronously upon events try { - await this.notifyObservers(eventType, binding); + // The execution of observers happen in the Promise micro-task queue + await this.notifyObservers(eventType, binding, currentObservers); this.pendingEvents--; - this.emit('idle'); + this.emit('observersNotified'); } catch (err) { this.pendingEvents--; + // Errors caught from observers. Emit it to the current context. + // If no error listeners are registered, crash the process. this.emit('error', err); } } } /** - * Wait until event notification is idle + * Wait until observers are notified for all of currently pending events. + * + * This method is for test only to perform assertions after observers are + * notified for relevant events. */ - protected async waitForIdle() { + protected async waitForObserversNotifiedForPendingEvents() { const count = this.pendingEvents; if (count === 0) return; - await pEvent.multiple(this, 'idle', {count}); + await pEvent.multiple(this, 'observersNotified', {count}); } /** @@ -236,14 +255,16 @@ export class Context extends EventEmitter { * * @param eventType Event names: `bind` or `unbind` * @param binding Binding bound or unbound + * @param observers Current set of context observers */ protected async notifyObservers( eventType: ContextEventType, binding: Readonly>, + observers = this.observers, ) { - if (this.observers.size === 0) return; + if (observers.size === 0) return; - for (const observer of this.observers) { + for (const observer of observers) { if (!observer.filter || observer.filter(binding)) { await observer.observe(eventType, binding, this); } diff --git a/packages/context/test/unit/context.unit.ts b/packages/context/test/unit/context.unit.ts index c6621f3187ec..ddc7a8adfc4b 100644 --- a/packages/context/test/unit/context.unit.ts +++ b/packages/context/test/unit/context.unit.ts @@ -36,7 +36,7 @@ class TestContext extends Context { * Wait until the context event queue is empty or an error is thrown */ waitUntilObserversNotified(): Promise { - return this.waitForIdle(); + return this.waitForObserversNotifiedForPendingEvents(); } } @@ -753,7 +753,10 @@ describe('Context', () => { it('emits one bind event to matching observers', async () => { ctx.bind('foo').to('foo-value'); await ctx.waitUntilObserversNotified(); - expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + ]); expect(nonMatchingObserverCalled).to.be.false(); }); @@ -762,10 +765,10 @@ describe('Context', () => { ctx.bind('xyz').to('xyz-value'); await ctx.waitUntilObserversNotified(); expect(events).to.eql([ - '1:foo:foo-value:bind', - '2:foo:foo-value:bind', - '1:xyz:xyz-value:bind', - '2:xyz:xyz-value:bind', + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + 'SYNC:xyz:xyz-value:bind', + 'ASYNC:xyz:xyz-value:bind', ]); }); @@ -775,10 +778,26 @@ describe('Context', () => { ctx.unbind('foo'); await ctx.waitUntilObserversNotified(); expect(events).to.eql([ - '1:foo:foo-value:bind', - '2:foo:foo-value:bind', - '1:foo:foo-value:unbind', - '2:foo:foo-value:unbind', + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + 'SYNC:foo:foo-value:unbind', + 'ASYNC:foo:foo-value:unbind', + ]); + expect(nonMatchingObserverCalled).to.be.false(); + }); + + it('emits bind/unbind events for rebind to matching observers', async () => { + ctx.bind('foo').to('foo-value'); + await ctx.waitUntilObserversNotified(); + ctx.bind('foo').to('new-foo-value'); + await ctx.waitUntilObserversNotified(); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + 'SYNC:foo:foo-value:unbind', + 'SYNC:foo:new-foo-value:bind', + 'ASYNC:foo:foo-value:unbind', + 'ASYNC:foo:new-foo-value:bind', ]); expect(nonMatchingObserverCalled).to.be.false(); }); @@ -786,10 +805,16 @@ describe('Context', () => { it('does not trigger observers if affected binding is the same', async () => { const binding = ctx.bind('foo').to('foo-value'); await ctx.waitUntilObserversNotified(); - expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + ]); ctx.add(binding); await ctx.waitUntilObserversNotified(); - expect(events).to.eql(['1:foo:foo-value:bind', '2:foo:foo-value:bind']); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + ]); }); it('reports error if an observer fails', () => { @@ -799,6 +824,38 @@ describe('Context', () => { ); }); + it('does not trigger observers registered after an event', async () => { + ctx.bind('foo').to('foo-value'); + process.nextTick(() => { + // Register a new observer after 1st event + const anotherObserver: ContextObserver = { + observe: (event, binding, context) => { + const val = binding.getValue(context); + events.push(`LATE:${binding.key}:${val}:${event}`); + }, + }; + ctx.subscribe(anotherObserver); + }); + + await ctx.waitUntilObserversNotified(); + // The late observer is not triggered + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + ]); + // Emit another event + ctx.bind('xyz').to('xyz-value'); + await ctx.waitUntilObserversNotified(); + // Now the late observer is triggered + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + 'SYNC:xyz:xyz-value:bind', + 'ASYNC:xyz:xyz-value:bind', + 'LATE:xyz:xyz-value:bind', + ]); + }); + function givenObservers() { nonMatchingObserverCalled = false; events.splice(0, events.length); @@ -815,7 +872,7 @@ describe('Context', () => { // Make sure the binding is configured with value // when the observer is notified const val = binding.getValue(context); - events.push(`1:${binding.key}:${val}:${event}`); + events.push(`SYNC:${binding.key}:${val}:${event}`); }, }; // An async observer matches the criteria @@ -824,7 +881,7 @@ describe('Context', () => { observe: async (event, binding, context) => { await setImmediateAsync(); const val = binding.getValue(context); - events.push(`2:${binding.key}:${val}:${event}`); + events.push(`ASYNC:${binding.key}:${val}:${event}`); }, }; // An async observer matches the criteria that throws an error