diff --git a/docs/site/Context.md b/docs/site/Context.md index b038f747ef41..8e7001592ea7 100644 --- a/docs/site/Context.md +++ b/docs/site/Context.md @@ -229,3 +229,195 @@ class HelloController { These "sugar" decorators allow you to quickly build up your application without having to code up all the additional logic by simply giving LoopBack hints (in the form of metadata) to your intent. + +## Context events + +The `Context` emits the following events: + +- `bind`: Emitted when a new binding is added to the context. + - binding: the newly added binding object + - context: Owner context of the binding object +- `unbind`: Emitted when an existing binding is removed from the context + - binding: the newly removed binding object + - context: Owner context of the binding object +- `error`: Emitted when an observer throws an error during the notification + process + - err: the error object thrown + +When an existing binding key is replaced with a new one, an `unbind` event is +emitted for the existing binding followed by a `bind` event for the new binding. + +If a context has a parent, binding events from the parent are re-emitted on the +context when the binding key does not exist within the current context. + +## Context observers + +Bindings can be added or removed to a context object. With emitted context +events, we can add listeners to a context object to be invoked when bindings +come and go. There are a few caveats associated with that: + +1. The binding object might not be fully configured when a `bind` event is + emitted. + + For example: + + ```ts + const ctx = new Context(); + ctx + .bind('foo') + .to('foo-value') + .tag('foo-tag'); + ctx.on('bind', binding => { + console.log(binding.tagNames); // returns an empty array `[]` + }); + ``` + + The context object emits a `bind` event when `ctx.bind` method is called. It + does not control the fluent apis `.to('foo-value').tag('foo-tag')`, which + happens on the newly created binding object. As a result, the `bind` event + listener receives a binding object which only has the binding key populated. + + A workaround is to create the binding first before add it to a context: + + ```ts + const ctx = new Context(); + const binding = Binding.create('foo') + .to('foo-value') + .tag('foo-tag'); + ctx.add(binding); + ctx.on('bind', binding => { + console.log(binding.tagMap); // returns `['foo-tag']` + }); + ``` + +2. It's hard for event listeners to perform asynchronous operations. + +To make it easy to support asynchronous event processing, we introduce +`ContextObserver` and corresponding APIs on `Context`: + +1. `ContextObserverFn` type and `ContextObserver` interface + +```ts +/** + * Listen on `bind`, `unbind`, or other events + * @param eventType Context event type + * @param binding The binding as event source + * @param context Context object for the binding event + */ +export type ContextObserverFn = ( + eventType: ContextEventType, + binding: Readonly>, + context: Context, +) => ValueOrPromise; + +/** + * Observers of context bind/unbind events + */ +export interface ContextObserver { + /** + * An optional filter function to match bindings. If not present, the listener + * will be notified of all binding events. + */ + filter?: BindingFilter; + + /** + * Listen on `bind`, `unbind`, or other events + * @param eventType Context event type + * @param binding The binding as event source + */ + observe: ContextObserverFn; +} + +/** + * Context event observer type - An instance of `ContextObserver` or a function + */ +export type ContextEventObserver = ContextObserver | ContextObserverFn; +``` + +If `filter` is not required, we can simply use `ContextObserverFn`. + +2. Context APIs + +- `subscribe(observer: ContextEventObserver)` + + Add a context event observer to the context chain, including its ancestors + +- `unsubscribe(observer: ContextEventObserver)` + + Remove the context event observer from the context chain + +- `close()` + + Close the context and release references to other objects in the context + chain. Please note a child context registers event listeners with its parent + context. As a result, the `close` method must be called to avoid memory leak + if the child context is to be recycled. + +To react on context events asynchronously, we need to implement the +`ContextObserver` interface or provide a `ContextObserverFn` and register it +with the context. + +For example: + +```ts +const app = new Context('app'); +server = new Context(app, 'server'); + +const observer: ContextObserver = { + // Only interested in bindings tagged with `foo` + filter: binding => binding.tagMap.foo != null, + + observe(event: ContextEventType, binding: Readonly>) { + if (event === 'bind') { + console.log('bind: %s', binding.key); + // ... perform async operation + } else if (event === 'unbind') { + console.log('unbind: %s', binding.key); + // ... perform async operation + } + }, +}; + +server.subscribe(observer); +server + .bind('foo-server') + .to('foo-value') + .tag('foo'); +app + .bind('foo-app') + .to('foo-value') + .tag('foo'); + +// The following messages will be printed: +// bind: foo-server +// bind: foo-app +``` + +Please note when an observer subscribes to a context, it will be registered with +all contexts on the chain. In the example above, the observer is added to both +`server` and `app` contexts so that it can be notified when bindings are added +or removed from any of the context on the chain. + +- Observers are called in the next turn of + [Promise micro-task queue](https://jsblog.insiderattack.net/promises-next-ticks-and-immediates-nodejs-event-loop-part-3-9226cbe7a6aa) + +- When there are multiple async observers registered, they are notified in + series for an event. + +- When multiple binding events are emitted in the same event loop tick and there + are async observers registered, such events are queued and observers are + notified by the order of events. + +### Observer error handling + +It's recommended that `ContextEventObserver` implementations should not throw +errors in their code. Errors thrown by context event observers are reported as +follows over the context chain. + +1. Check if the current context object has `error` listeners, if yes, emit an + `error` event on the context and we're done. if not, try its parent context + by repeating step 1. + +2. If no context object of the chain has `error` listeners, emit an `error` + event on the current context. As a result, the process exits abnormally. See + https://nodejs.org/api/events.html#events_error_events for more details. diff --git a/packages/context/package.json b/packages/context/package.json index 96a2d286d989..9742286d00a5 100644 --- a/packages/context/package.json +++ b/packages/context/package.json @@ -21,6 +21,7 @@ "dependencies": { "@loopback/metadata": "^1.0.6", "debug": "^4.0.1", + "p-event": "^2.3.1", "uuid": "^3.2.1" }, "devDependencies": { diff --git a/packages/context/src/__tests__/unit/context-observer.unit.ts b/packages/context/src/__tests__/unit/context-observer.unit.ts new file mode 100644 index 000000000000..2f1571d8d2f0 --- /dev/null +++ b/packages/context/src/__tests__/unit/context-observer.unit.ts @@ -0,0 +1,365 @@ +// Copyright IBM Corp. 2018. All Rights Reserved. +// Node module: @loopback/context +// This file is licensed under the MIT License. +// License text available at https://opensource.org/licenses/MIT + +import {expect} from '@loopback/testlab'; +import {promisify} from 'util'; +import { + Binding, + Context, + ContextEventType, + ContextObserver, + filterByTag, +} from '../..'; + +const pEvent = require('p-event'); +const setImmediateAsync = promisify(setImmediate); + +/** + * Create a subclass of context so that we can access parents and registry + * for assertions + */ +class TestContext extends Context { + get parentEventListeners() { + return this._parentEventListeners; + } + /** + * Wait until the context event queue is empty or an error is thrown + */ + waitUntilObserversNotified(): Promise { + return this.waitUntilPendingNotificationsDone(100); + } +} + +describe('Context', () => { + let ctx: TestContext; + beforeEach('given a context', createContext); + + describe('observer subscription', () => { + let nonMatchingObserver: ContextObserver; + + beforeEach(givenNonMatchingObserver); + + it('subscribes observers', () => { + ctx.subscribe(nonMatchingObserver); + expect(ctx.isSubscribed(nonMatchingObserver)).to.true(); + }); + + it('unsubscribes observers', () => { + ctx.subscribe(nonMatchingObserver); + expect(ctx.isSubscribed(nonMatchingObserver)).to.true(); + ctx.unsubscribe(nonMatchingObserver); + expect(ctx.isSubscribed(nonMatchingObserver)).to.false(); + }); + + it('allows subscription.unsubscribe()', () => { + const subscription = ctx.subscribe(nonMatchingObserver); + expect(ctx.isSubscribed(nonMatchingObserver)).to.true(); + subscription.unsubscribe(); + expect(ctx.isSubscribed(nonMatchingObserver)).to.false(); + expect(subscription.closed).to.be.true(); + }); + + it('registers observers on context with parent', () => { + const childCtx = new TestContext(ctx, 'child'); + expect(childCtx.parentEventListeners).to.be.undefined(); + childCtx.subscribe(nonMatchingObserver); + expect(childCtx.parentEventListeners!.has('bind')).to.be.true(); + expect(childCtx.parentEventListeners!.has('unbind')).to.be.true(); + expect(childCtx.isSubscribed(nonMatchingObserver)).to.true(); + expect(ctx.isSubscribed(nonMatchingObserver)).to.false(); + }); + + it('un-registers observers on context chain', () => { + const childCtx = new Context(ctx, 'child'); + childCtx.subscribe(nonMatchingObserver); + expect(childCtx.isSubscribed(nonMatchingObserver)).to.true(); + expect(ctx.isSubscribed(nonMatchingObserver)).to.false(); + childCtx.unsubscribe(nonMatchingObserver); + expect(childCtx.isSubscribed(nonMatchingObserver)).to.false(); + expect(ctx.isSubscribed(nonMatchingObserver)).to.false(); + }); + + it('un-registers observers on context chain during close', () => { + const childCtx = new TestContext(ctx, 'child'); + childCtx.subscribe(nonMatchingObserver); + const parentEventListeners = new Map(childCtx.parentEventListeners!); + childCtx.close(); + for (const [event, listener] of parentEventListeners) { + expect(ctx.listeners(event)).to.not.containEql(listener); + } + expect(childCtx.parentEventListeners).to.be.undefined(); + expect(childCtx.isSubscribed(nonMatchingObserver)).to.false(); + }); + + function givenNonMatchingObserver() { + nonMatchingObserver = { + filter: binding => false, + observe: () => {}, + }; + } + }); + + describe('event notification', () => { + const events: string[] = []; + let nonMatchingObserverCalled = false; + + beforeEach(givenObservers); + + it('emits one bind event to matching observers', async () => { + ctx.bind('foo').to('foo-value'); + await ctx.waitUntilObserversNotified(); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + ]); + expect(nonMatchingObserverCalled).to.be.false(); + }); + + it('emits multiple bind events to matching observers', async () => { + ctx.bind('foo').to('foo-value'); + ctx.bind('xyz').to('xyz-value'); + await ctx.waitUntilObserversNotified(); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + 'SYNC:xyz:xyz-value:bind', + 'ASYNC:xyz:xyz-value:bind', + ]); + }); + + it('emits unbind event to matching observers', async () => { + ctx.bind('foo').to('foo-value'); + await ctx.waitUntilObserversNotified(); + ctx.unbind('foo'); + await ctx.waitUntilObserversNotified(); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + 'SYNC:foo:foo-value:unbind', + 'ASYNC:foo:foo-value:unbind', + ]); + expect(nonMatchingObserverCalled).to.be.false(); + }); + + it('emits bind/unbind events for rebind to matching observers', async () => { + ctx.bind('foo').to('foo-value'); + await ctx.waitUntilObserversNotified(); + ctx.bind('foo').to('new-foo-value'); + await ctx.waitUntilObserversNotified(); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + 'SYNC:foo:foo-value:unbind', + 'ASYNC:foo:foo-value:unbind', + 'SYNC:foo:new-foo-value:bind', + 'ASYNC:foo:new-foo-value:bind', + ]); + expect(nonMatchingObserverCalled).to.be.false(); + }); + + it('does not trigger observers if affected binding is the same', async () => { + const binding = ctx.bind('foo').to('foo-value'); + await ctx.waitUntilObserversNotified(); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + ]); + ctx.add(binding); + await ctx.waitUntilObserversNotified(); + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + ]); + }); + + it('reports error if an observer fails', () => { + ctx.bind('bar').to('bar-value'); + return expect(ctx.waitUntilObserversNotified()).to.be.rejectedWith( + 'something wrong', + ); + }); + + it('does not trigger observers registered after an event', async () => { + ctx.bind('foo').to('foo-value'); + process.nextTick(() => { + // Register a new observer after 1st event + ctx.subscribe((event, binding, context) => { + const val = binding.getValue(context); + events.push(`LATE:${binding.key}:${val}:${event}`); + }); + }); + + await ctx.waitUntilObserversNotified(); + // The late observer is not triggered + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + ]); + // Emit another event + ctx.bind('xyz').to('xyz-value'); + await ctx.waitUntilObserversNotified(); + // Now the late observer is triggered + expect(events).to.eql([ + 'SYNC:foo:foo-value:bind', + 'ASYNC:foo:foo-value:bind', + 'SYNC:xyz:xyz-value:bind', + 'ASYNC:xyz:xyz-value:bind', + 'LATE:xyz:xyz-value:bind', + ]); + }); + + function givenObservers() { + nonMatchingObserverCalled = false; + events.splice(0, events.length); + // An observer does not match the criteria + const nonMatchingObserver: ContextObserver = { + filter: binding => false, + observe: () => { + nonMatchingObserverCalled = true; + }, + }; + // A sync observer matches the criteria + const matchingObserver: ContextObserver = { + observe: (event, binding, context) => { + // Make sure the binding is configured with value + // when the observer is notified + const val = binding.getValue(context); + events.push(`SYNC:${binding.key}:${val}:${event}`); + }, + }; + // An async observer matches the criteria + const matchingAsyncObserver: ContextObserver = { + filter: binding => true, + observe: async (event, binding, context) => { + await setImmediateAsync(); + const val = binding.getValue(context); + events.push(`ASYNC:${binding.key}:${val}:${event}`); + }, + }; + // An async observer matches the criteria that throws an error + const matchingAsyncObserverWithError: ContextObserver = { + filter: binding => binding.key === 'bar', + observe: async () => { + await setImmediateAsync(); + throw new Error('something wrong'); + }, + }; + ctx.subscribe(nonMatchingObserver); + ctx.subscribe(matchingObserver); + ctx.subscribe(matchingAsyncObserver); + ctx.subscribe(matchingAsyncObserverWithError); + } + }); + + describe('event notification for context chain', () => { + let app: Context; + let server: Context; + + let contextObserver: MyObserverForControllers; + beforeEach(givenControllerObserver); + + it('receives notifications of matching binding events', async () => { + const controllers = await getControllers(); + // We have server: 1, app: 2 + // NOTE: The controllers are not guaranteed to be ['1', '2'] as the events + // are emitted by two context objects and they are processed asynchronously + expect(controllers).to.containEql('1@server'); + expect(controllers).to.containEql('2@app'); + server.unbind('controllers.1'); + // Now we have app: 2 + expect(await getControllers()).to.eql(['2@app']); + app.unbind('controllers.2'); + // All controllers are gone from the context chain + expect(await getControllers()).to.eql([]); + // Add a new controller - server: 3 + givenController(server, '3'); + expect(await getControllers()).to.eql(['3@server']); + }); + + it('does not emit matching binding events from parent if shadowed', async () => { + // We have server: 1, app: 2 + givenController(app, '1'); + // All controllers are gone from the context chain + expect(await getControllers()).to.not.containEql('1@app'); + }); + + it('reports error on current context if an observer fails', async () => { + const err = new Error('something wrong'); + server.subscribe((event, binding) => { + if (binding.key === 'bar') { + return Promise.reject(err); + } + }); + server.bind('bar').to('bar-value'); + // Please note the following code registers an `error` listener on `server` + const obj = await pEvent(server, 'error'); + expect(obj).to.equal(err); + }); + + it('reports error on the first context with error listeners on the chain', async () => { + const err = new Error('something wrong'); + server.subscribe((event, binding) => { + if (binding.key === 'bar') { + return Promise.reject(err); + } + }); + server.bind('bar').to('bar-value'); + // No error listener is registered on `server` + const obj = await pEvent(app, 'error'); + expect(obj).to.equal(err); + }); + + class MyObserverForControllers implements ContextObserver { + controllers: Set = new Set(); + filter = filterByTag('controller'); + observe( + event: ContextEventType, + binding: Readonly>, + context: Context, + ) { + const name = `${binding.tagMap.name}@${context.name}`; + if (event === 'bind') { + this.controllers.add(name); + } else if (event === 'unbind') { + this.controllers.delete(name); + } + } + } + + function givenControllerObserver() { + givenServerWithinAnApp(); + contextObserver = new MyObserverForControllers(); + server.subscribe(contextObserver); + givenController(server, '1'); + givenController(app, '2'); + } + + function givenController(context: Context, controllerName: string) { + class MyController { + name = controllerName; + } + context + .bind(`controllers.${controllerName}`) + .toClass(MyController) + .tag('controller', {name: controllerName}); + } + + async function getControllers() { + return new Promise(resolve => { + // Wrap it inside `setImmediate` to make the events are triggered + setImmediate(() => resolve(Array.from(contextObserver.controllers))); + }); + } + + function givenServerWithinAnApp() { + app = new Context('app'); + server = new Context(app, 'server'); + } + }); + + function createContext() { + ctx = new TestContext(); + } +}); diff --git a/packages/context/src/__tests__/unit/context.unit.ts b/packages/context/src/__tests__/unit/context.unit.ts index e0896a8c9265..9bb6775c8a38 100644 --- a/packages/context/src/__tests__/unit/context.unit.ts +++ b/packages/context/src/__tests__/unit/context.unit.ts @@ -5,12 +5,12 @@ import {expect} from '@loopback/testlab'; import { - Context, Binding, + BindingKey, BindingScope, BindingType, + Context, isPromiseLike, - BindingKey, } from '../..'; /** @@ -61,7 +61,7 @@ describe('Context constructor', () => { }); describe('Context', () => { - let ctx: Context; + let ctx: TestContext; beforeEach('given a context', createContext); describe('bind', () => { @@ -640,6 +640,22 @@ describe('Context', () => { }); }); + describe('close()', () => { + it('clears all bindings', () => { + ctx.bind('foo').to('foo-value'); + expect(ctx.bindingMap.size).to.eql(1); + ctx.close(); + expect(ctx.bindingMap.size).to.eql(0); + }); + + it('dereferences parent', () => { + const childCtx = new TestContext(ctx); + expect(childCtx.parent).to.equal(ctx); + childCtx.close(); + expect(childCtx.parent).to.be.undefined(); + }); + }); + describe('toJSON()', () => { it('converts to plain JSON object', () => { ctx @@ -682,6 +698,6 @@ describe('Context', () => { }); function createContext() { - ctx = new Context(); + ctx = new TestContext(); } }); diff --git a/packages/context/src/context-observer.ts b/packages/context/src/context-observer.ts new file mode 100644 index 000000000000..38145f8628b5 --- /dev/null +++ b/packages/context/src/context-observer.ts @@ -0,0 +1,87 @@ +// Copyright IBM Corp. 2018. All Rights Reserved. +// Node module: @loopback/context +// This file is licensed under the MIT License. +// License text available at https://opensource.org/licenses/MIT + +import {Binding} from './binding'; +import {BindingFilter} from './binding-filter'; +import {ValueOrPromise} from './value-promise'; +import {Context} from './context'; + +/** + * Context event types. We support `bind` and `unbind` for now but + * keep it open for new types + */ +export type ContextEventType = 'bind' | 'unbind' | string; + +/** + * Listen on `bind`, `unbind`, or other events + * @param eventType Context event type + * @param binding The binding as event source + * @param context Context object for the binding event + */ +export type ContextObserverFn = ( + eventType: ContextEventType, + binding: Readonly>, + context: Context, +) => ValueOrPromise; + +/** + * Observers of context bind/unbind events + */ +export interface ContextObserver { + /** + * An optional filter function to match bindings. If not present, the listener + * will be notified of all binding events. + */ + filter?: BindingFilter; + + /** + * Listen on `bind`, `unbind`, or other events + * @param eventType Context event type + * @param binding The binding as event source + */ + observe: ContextObserverFn; +} + +/** + * Context event observer type - An instance of `ContextObserver` or a function + */ +export type ContextEventObserver = ContextObserver | ContextObserverFn; + +/** + * Subscription of context events. It's modeled after + * https://github.com/tc39/proposal-observable. + */ +export interface Subscription { + /** + * unsubscribe + */ + unsubscribe(): void; + /** + * Is the subscription closed? + */ + closed: boolean; +} + +/** + * Event data for observer notifications + */ +export type Notification = { + /** + * Context event type - bind/unbind + */ + eventType: ContextEventType; + /** + * Binding added/removed + */ + binding: Readonly>; + /** + * Owner context for the binding + */ + context: Context; + /** + * A snapshot of observers when the original event is emitted + */ + observers: Set; +}; diff --git a/packages/context/src/context.ts b/packages/context/src/context.ts index aa862b015886..4657938856f3 100644 --- a/packages/context/src/context.ts +++ b/packages/context/src/context.ts @@ -4,20 +4,40 @@ // License text available at https://opensource.org/licenses/MIT import * as debugModule from 'debug'; +import {EventEmitter} from 'events'; import {v1 as uuidv1} from 'uuid'; import {ValueOrPromise} from '.'; import {Binding, BindingTag} from './binding'; +import {BindingFilter, filterByKey, filterByTag} from './binding-filter'; import {BindingAddress, BindingKey} from './binding-key'; +import { + ContextEventObserver, + ContextEventType, + ContextObserver, + Notification, + Subscription, +} from './context-observer'; import {ResolutionOptions, ResolutionSession} from './resolution-session'; import {BoundValue, getDeepProperty, isPromiseLike} from './value-promise'; -import {BindingFilter, filterByKey, filterByTag} from './binding-filter'; + +/** + * Polyfill Symbol.asyncIterator as required by TypeScript for Node 8.x. + * See https://www.typescriptlang.org/docs/handbook/release-notes/typescript-2-3.html + */ +if (!Symbol.asyncIterator) { + // tslint:disable-next-line:no-any + (Symbol as any).asyncIterator = Symbol.for('Symbol.asyncIterator'); +} + +// FIXME: `@types/p-event` is out of date against `p-event@2.2.0` +const pEvent = require('p-event'); const debug = debugModule('loopback:context'); /** * Context provides an implementation of Inversion of Control (IoC) container */ -export class Context { +export class Context extends EventEmitter { /** * Name of the context */ @@ -34,10 +54,57 @@ export class Context { protected _parent?: Context; /** - * Create a new context + * Event listeners for parent context keyed by event names. It keeps track + * of listeners from this context against its parent so that we can remove + * these listeners when this context is closed. + */ + protected _parentEventListeners: + | Map< + string, + // tslint:disable-next-line:no-any + (...args: any[]) => void + > + | undefined; + + /** + * A list of registered context observers. The Set will be created when the + * first observer is added. + */ + protected observers: Set | undefined; + + /** + * Internal counter for pending notification events which are yet to be + * processed by observers. + */ + private pendingNotifications = 0; + + /** + * Queue for background notifications for observers + */ + private notificationQueue: AsyncIterableIterator | undefined; + + /** + * Create a new context. For example, + * ```ts + * // Create a new root context, let the framework to create a unique name + * const rootCtx = new Context(); + * + * // Create a new child context inheriting bindings from `rootCtx` + * const childCtx = new Context(rootCtx); + * + * // Create another root context called "application" + * const appCtx = new Context('application'); + * + * // Create a new child context called "request" and inheriting bindings + * // from `appCtx` + * const reqCtx = new Context(appCtx, 'request'); + * ``` * @param _parent The optional parent context + * @param name Name of the context, if not provided, a `uuid` will be + * generated as the name */ constructor(_parent?: Context | string, name?: string) { + super(); if (typeof _parent === 'string') { name = _parent; _parent = undefined; @@ -46,6 +113,193 @@ export class Context { this.name = name || uuidv1(); } + /** + * Wrap the debug statement so that it always print out the context name + * as the prefix + * @param args Arguments for the debug + */ + // tslint:disable-next-line:no-any + private _debug(...args: any[]) { + /* istanbul ignore if */ + if (!debug.enabled) return; + const formatter = args.shift(); + if (typeof formatter === 'string') { + debug(`[%s] ${formatter}`, this.name, ...args); + } else { + debug('[%s] ', this.name, formatter, ...args); + } + } + + /** + * Set up an internal listener to notify registered observers asynchronously + * upon `bind` and `unbind` events. This method will be called lazily when + * the first observer is added. + */ + private setupEventHandlersIfNeeded() { + if (this.notificationQueue != null) return; + + this.addParentEventListener('bind'); + this.addParentEventListener('unbind'); + + // The following are two async functions. Returned promises are ignored as + // they are long-running background tasks. + this.startNotificationTask().catch(err => { + this.handleNotificationError(err); + }); + + let ctx = this._parent; + while (ctx) { + ctx.setupEventHandlersIfNeeded(); + ctx = ctx._parent; + } + } + + /** + * Add an event listener to its parent context so that this context will + * be notified of parent events, such as `bind` or `unbind`. + * @param event Event name + */ + private addParentEventListener(event: string) { + if (this._parent == null) return; + + // Keep track of parent event listeners so that we can remove them + this._parentEventListeners = this._parentEventListeners || new Map(); + if (this._parentEventListeners.has(event)) return; + + const parentEventListener = ( + binding: Readonly>, + context: Context, + ) => { + // Propagate the event to this context only if the binding key does not + // exist in this context. The parent binding is shadowed if there is a + // binding with the same key in this one. + if (this.contains(binding.key)) { + this._debug( + 'Event %s %s is not re-emitted from %s to %s', + event, + binding.key, + context.name, + this.name, + ); + return; + } + this._debug( + 'Re-emitting %s %s from %s to %s', + event, + binding.key, + context.name, + this.name, + ); + this.emit(event, binding, context); + }; + this._parentEventListeners.set(event, parentEventListener); + // Listen on the parent context events + this._parent.on(event, parentEventListener); + } + + /** + * Handle errors caught during the notification of observers + * @param err Error + */ + private handleNotificationError(err: unknown) { + // Bubbling up the error event over the context chain + // until we find an error listener + let ctx: Context | undefined = this; + while (ctx) { + if (ctx.listenerCount('error') === 0) { + // No error listener found, try its parent + ctx = ctx._parent; + continue; + } + this._debug('Emitting error to context %s', ctx.name, err); + ctx.emit('error', err); + return; + } + // No context with error listeners found + this._debug('No error handler is configured for the context chain', err); + // Let it crash now by emitting an error event + this.emit('error', err); + } + + /** + * Start a background task to listen on context events and notify observers + */ + private startNotificationTask() { + // Set up listeners on `bind` and `unbind` for notifications + this.setupNotification('bind', 'unbind'); + + // Create an async iterator for the `notification` event as a queue + this.notificationQueue = pEvent.iterator(this, 'notification'); + + return this.processNotifications(); + } + + /** + * Process notification events as they arrive on the queue + */ + private async processNotifications() { + const events = this.notificationQueue; + if (events == null) return; + for await (const {eventType, binding, context, observers} of events) { + // The loop will happen asynchronously upon events + try { + // The execution of observers happen in the Promise micro-task queue + await this.notifyObservers(eventType, binding, context, observers); + this.pendingNotifications--; + this._debug( + 'Observers notified for %s of binding %s', + eventType, + binding.key, + ); + this.emit('observersNotified', {eventType, binding}); + } catch (err) { + this.pendingNotifications--; + this._debug('Error caught from observers', err); + // Errors caught from observers. Emit it to the current context. + // If no error listeners are registered, crash the process. + this.emit('error', err); + } + } + } + + /** + * Listen on given event types and emit `notification` event. This method + * merge multiple event types into one for notification. + * @param eventTypes Context event types + */ + private setupNotification(...eventTypes: ContextEventType[]) { + for (const eventType of eventTypes) { + this.on(eventType, (binding, context) => { + // No need to schedule notifications if no observers are present + if (!this.observers || this.observers.size === 0) return; + // Track pending events + this.pendingNotifications++; + // Take a snapshot of current observers to ensure notifications of this + // event will only be sent to current ones. Emit a new event to notify + // current context observers. + this.emit('notification', { + eventType, + binding, + context, + observers: new Set(this.observers), + }); + }); + } + } + + /** + * Wait until observers are notified for all of currently pending notification + * events. + * + * This method is for test only to perform assertions after observers are + * notified for relevant events. + */ + protected async waitUntilPendingNotificationsDone(timeout?: number) { + const count = this.pendingNotifications; + if (count === 0) return; + await pEvent.multiple(this, 'observersNotified', {count, timeout}); + } + /** * Create a binding with the given key in the context. If a locked binding * already exists with the same key, an error will be thrown. @@ -67,19 +321,22 @@ export class Context { */ add(binding: Binding): this { const key = binding.key; - /* istanbul ignore if */ - if (debug.enabled) { - debug('Adding binding: %s', key); - } - + this._debug('[%s] Adding binding: %s', key); + let existingBinding: Binding | undefined; const keyExists = this.registry.has(key); if (keyExists) { - const existingBinding = this.registry.get(key); + existingBinding = this.registry.get(key); const bindingIsLocked = existingBinding && existingBinding.isLocked; if (bindingIsLocked) throw new Error(`Cannot rebind key "${key}" to a locked binding`); } this.registry.set(key, binding); + if (existingBinding !== binding) { + if (existingBinding != null) { + this.emit('unbind', existingBinding, this); + } + this.emit('bind', binding, this); + } return this; } @@ -94,12 +351,101 @@ export class Context { * @returns true if the binding key is found and removed from this context */ unbind(key: BindingAddress): boolean { + this._debug('Unbind %s', key); key = BindingKey.validate(key); const binding = this.registry.get(key); + // If not found, return `false` if (binding == null) return false; if (binding && binding.isLocked) throw new Error(`Cannot unbind key "${key}" of a locked binding`); - return this.registry.delete(key); + this.registry.delete(key); + this.emit('unbind', binding, this); + return true; + } + + /** + * Add a context event observer to the context + * @param observer Context observer instance or function + */ + subscribe(observer: ContextEventObserver): Subscription { + this.observers = this.observers || new Set(); + this.setupEventHandlersIfNeeded(); + this.observers.add(observer); + return new ContextSubscription(this, observer); + } + + /** + * Remove the context event observer from the context + * @param observer Context event observer + */ + unsubscribe(observer: ContextEventObserver): boolean { + if (!this.observers) return false; + return this.observers.delete(observer); + } + + /** + * Close the context and release references to other objects in the context + * chain. + * + * This method MUST be called to avoid memory leaks once a context object is + * no longer needed and should be recycled. An example is the `RequestContext`, + * which is created per request. + */ + close() { + this._debug('Closing context...'); + this.observers = undefined; + if (this.notificationQueue != null) { + // Cancel the notification iterator + this.notificationQueue.return!(undefined).catch(err => { + this.handleNotificationError(err); + }); + this.notificationQueue = undefined; + } + if (this._parent && this._parentEventListeners) { + for (const [event, listener] of this._parentEventListeners) { + this._parent.removeListener(event, listener); + } + this._parentEventListeners = undefined; + } + this.registry.clear(); + this._parent = undefined; + } + + /** + * Check if an observer is subscribed to this context + * @param observer Context observer + */ + isSubscribed(observer: ContextObserver) { + if (!this.observers) return false; + return this.observers.has(observer); + } + + /** + * Publish an event to the registered observers. Please note the + * notification is queued and performed asynchronously so that we allow fluent + * APIs such as `ctx.bind('key').to(...).tag(...);` and give observers the + * fully populated binding. + * + * @param eventType Event names: `bind` or `unbind` + * @param binding Binding bound or unbound + * @param context Owner context + * @param observers Current set of context observers + */ + protected async notifyObservers( + eventType: ContextEventType, + binding: Readonly>, + context: Context, + observers = this.observers, + ) { + if (!observers || observers.size === 0) return; + + for (const observer of observers) { + if (typeof observer === 'function') { + await observer(eventType, binding, context); + } else if (!observer.filter || observer.filter(binding)) { + await observer.observe(eventType, binding, context); + } + } } /** @@ -137,7 +483,7 @@ export class Context { } /** - * Find bindings using the key pattern + * Find bindings using a key pattern or filter function * @param pattern A filter function, a regexp or a wildcard pattern with * optional `*` and `?`. Find returns such bindings where the key matches * the provided pattern. @@ -253,10 +599,7 @@ export class Context { keyWithPath: BindingAddress, optionsOrSession?: ResolutionOptions | ResolutionSession, ): Promise { - /* istanbul ignore if */ - if (debug.enabled) { - debug('Resolving binding: %s', keyWithPath); - } + this._debug('Resolving binding: %s', keyWithPath); return await this.getValueOrPromise( keyWithPath, optionsOrSession, @@ -323,10 +666,8 @@ export class Context { keyWithPath: BindingAddress, optionsOrSession?: ResolutionOptions | ResolutionSession, ): ValueType | undefined { - /* istanbul ignore if */ - if (debug.enabled) { - debug('Resolving binding synchronously: %s', keyWithPath); - } + this._debug('Resolving binding synchronously: %s', keyWithPath); + const valueOrPromise = this.getValueOrPromise( keyWithPath, optionsOrSession, @@ -451,3 +792,24 @@ export class Context { return json; } } + +/** + * An implementation of `Subscription` interface for context events + */ +class ContextSubscription implements Subscription { + constructor( + protected context: Context, + protected observer: ContextEventObserver, + ) {} + + private _closed = false; + + unsubscribe() { + this.context.unsubscribe(this.observer); + this._closed = true; + } + + get closed() { + return this._closed; + } +} diff --git a/packages/context/src/index.ts b/packages/context/src/index.ts index 4a83b7c580ef..0b204e662562 100644 --- a/packages/context/src/index.ts +++ b/packages/context/src/index.ts @@ -10,6 +10,7 @@ export * from './binding-inspector'; export * from './binding-key'; export * from './binding-filter'; export * from './context'; +export * from './context-observer'; export * from './inject'; export * from './keys'; export * from './provider'; diff --git a/packages/rest/package.json b/packages/rest/package.json index 169cd3cda55c..cb29b00940a9 100644 --- a/packages/rest/package.json +++ b/packages/rest/package.json @@ -40,6 +40,7 @@ "http-errors": "^1.6.3", "js-yaml": "^3.11.0", "lodash": "^4.17.11", + "on-finished": "^2.3.0", "openapi-schema-to-json-schema": "^2.1.0", "openapi3-ts": "^1.0.0", "path-to-regexp": "^3.0.0", @@ -59,6 +60,7 @@ "@types/lodash": "^4.14.106", "@types/multer": "^1.3.7", "@types/node": "^10.11.2", + "@types/on-finished": "^2.3.1", "@types/qs": "^6.5.1", "multer": "^1.4.1" }, diff --git a/packages/rest/src/request-context.ts b/packages/rest/src/request-context.ts index a928b3793327..b681ef1d4371 100644 --- a/packages/rest/src/request-context.ts +++ b/packages/rest/src/request-context.ts @@ -4,8 +4,9 @@ // License text available at https://opensource.org/licenses/MIT import {Context} from '@loopback/context'; -import {HandlerContext, Request, Response} from './types'; +import * as onFinished from 'on-finished'; import {RestBindings} from './keys'; +import {HandlerContext, Request, Response} from './types'; /** * A per-request Context combining an IoC container with handler context @@ -20,6 +21,9 @@ export class RequestContext extends Context implements HandlerContext { ) { super(parent, name); this._setupBindings(request, response); + onFinished(this.response, () => { + this.close(); + }); } private _setupBindings(request: Request, response: Response) {