Skip to content

Commit

Permalink
feat(Zone): add support for Zone.js
Browse files Browse the repository at this point in the history
Add support for Zone.js in a way which should have minimal impact on performance.

Ensures that all callbacks are invoked in the same `Zone` as the `Zone` which was current at
the time of the callback registration with the `Rx`.

-  Add `getZone()` method which returs always `null` if no `Zone` is present or returrns
   the current zone.
 - All places where the `Zone` needs to be intered are quarded to only go through zone if the
   Zone is different from the current `Zone` for performance reasons.

You can read up on Zones in [The Zone Primer](https://docs.google.com/document/d/1F5Ug0jcrm031vhSMJEOgp1l-Is-Vf0UCNDY-LsQtAIY).
Before this change when Rx runs in the Zone environment it will
propagate Zones according to VM Tasks. In the case of Rx this is not
correct behavior because Rx does its own scheduling and the result is
that all callbacks end up running in the same zone as Task zone. This
behavior is mostly correct most of the time, but there are cases which
result in wrong behavior which this change corrects. This change is
needed to enable proper implementation of
angular/angular#13248.
  • Loading branch information
mhevery committed Jan 9, 2017
1 parent f8ba77d commit cec81de
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 16 deletions.
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@
"copy_src_cjs": "mkdirp ./dist/cjs/src && shx cp -r ./src/* ./dist/cjs/src",
"copy_src_es6": "mkdirp ./dist/es6/src && shx cp -r ./src/* ./dist/es6/src",
"commit": "git-cz",
"compile_dist_cjs": "tsc ./dist/cjs/src/Rx.ts ./dist/cjs/src/add/observable/of.ts -m commonjs --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom --sourceMap --outDir ./dist/cjs --target ES5 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node",
"compile_module_es6": "tsc ./dist/es6/src/Rx.ts ./dist/es6/src/add/observable/of.ts -m es2015 --sourceMap --outDir ./dist/es6 --target ES5 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node --noEmitHelpers --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom ",
"compile_dist_es6_for_docs": "tsc ./dist/es6/src/Rx.ts ./dist/es6/src/add/observable/of.ts ./dist/es6/src/MiscJSDoc.ts -m es2015 --sourceMap --outDir ./dist/es6 --target ES6 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node",
"cover": "shx rm -rf dist/cjs && tsc src/Rx.ts src/add/observable/of.ts -m commonjs --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom --outDir dist/cjs --sourceMap --target ES5 -d && nyc --reporter=lcov --reporter=html --exclude=spec/support/**/* --exclude=spec-js/**/* --exclude=node_modules mocha --opts spec/support/default.opts spec-js",
"compile_dist_cjs": "tsc ./dist/cjs/src/Rx.ts ./dist/cjs/src/add/observable/of.ts ./node_modules/zone.js/dist/zone.js.d.ts -m commonjs --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom --sourceMap --outDir ./dist/cjs --target ES5 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node",
"compile_module_es6": "tsc ./dist/es6/src/Rx.ts ./dist/es6/src/add/observable/of.ts ./node_modules/zone.js/dist/zone.js.d.ts -m es2015 --sourceMap --outDir ./dist/es6 --target ES5 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node --noEmitHelpers --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom ",
"compile_dist_es6_for_docs": "tsc ./dist/es6/src/Rx.ts ./dist/es6/src/add/observable/of.ts ./node_modules/zone.js/dist/zone.js.d.ts ./dist/es6/src/MiscJSDoc.ts -m es2015 --sourceMap --outDir ./dist/es6 --target ES6 -d --diagnostics --pretty --noImplicitAny --noImplicitReturns --noImplicitThis --suppressImplicitAnyIndexErrors --moduleResolution node",
"cover": "shx rm -rf dist/cjs && tsc src/Rx.ts src/add/observable/of.ts ./node_modules/zone.js/dist/zone.js.d.ts -m commonjs --lib es5,es2015.iterable,es2015.collection,es2015.promise,dom --outDir dist/cjs --sourceMap --target ES5 -d && nyc --reporter=lcov --reporter=html --exclude=spec/support/**/* --exclude=spec-js/**/* --exclude=node_modules mocha --opts spec/support/default.opts spec-js",
"decision_tree_widget": "cd doc/decision-tree-widget && npm run build && cd ../..",
"doctoc": "doctoc CONTRIBUTING.md",
"generate_packages": "node .make-packages.js",
Expand Down Expand Up @@ -192,7 +192,8 @@
"watch": "^1.0.1",
"watchify": "3.7.0",
"webpack": "^1.13.1",
"xmlhttprequest": "1.8.0"
"xmlhttprequest": "1.8.0",
"zone.js": "^0.7.4"
},
"engines": {
"npm": ">=2.0.0"
Expand Down
139 changes: 139 additions & 0 deletions spec/Zone-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import {expect} from 'chai';
import 'zone.js';
import {Subscriber, Observable} from '../dist/cjs/Rx';

/**
* The point of these tests, is to ensure that all callbacks execute in the Zone which was active
* when the callback was passed into the Rx.
*
* The implications are:
* - Observable callback passed into `Observable` executes in the same Zone as when the
* `new Observable` was invoked.
* - The subscription callbacks passed into `subscribe` execute in the same Zone as when the
* `subscribe` method was invoked.
* - The operator callbacks passe into `map`, etc..., execute in the same Zone as when the
* `operator` (`lift`) method was invoked.
*/
describe('Zone interaction', () => {
it('should run methods in the zone of declaration', () => {
const log: string[] = [];
const constructorZone: Zone = Zone.current.fork({ name: 'Constructor Zone'});
const subscriptionZone: Zone = Zone.current.fork({ name: 'Subscription Zone'});
let subscriber: Subscriber<string> = null;
const observable = constructorZone.run(() => new Observable<string>((_subscriber) => {
subscriber = _subscriber;
log.push('setup');
expect(Zone.current.name).to.eq(constructorZone.name);
return () => {
expect(Zone.current.name).to.eq(constructorZone.name);
log.push('cleanup');
};
})) as Observable<string>;
subscriptionZone.run(() => observable.subscribe(
() => {
expect(Zone.current.name).to.eq(subscriptionZone.name);
log.push('next');
},
() => null,
() => {
expect(Zone.current.name).to.eq(subscriptionZone.name);
log.push('complete');
}
));
subscriber.next('MyValue');
subscriber.complete();

expect(log).to.deep.equal(['setup', 'next', 'complete', 'cleanup']);
log.length = 0;

subscriptionZone.run(() => observable.subscribe(
() => null,
() => {
expect(Zone.current.name).to.eq(subscriptionZone.name);
log.push('error');
},
() => null
));
subscriber.next('MyValue');
subscriber.error('MyError');

expect(log).to.deep.equal(['setup', 'error', 'cleanup']);
});

it('should run methods in the zone of declaration when nexting synchronously', () => {
const log: string[] = [];
const rootZone: Zone = Zone.current;
const constructorZone: Zone = Zone.current.fork({ name: 'Constructor Zone'});
const subscriptionZone: Zone = Zone.current.fork({ name: 'Subscription Zone'});
const observable = constructorZone.run(() => new Observable<string>((subscriber) => {
// Execute the `next`/`complete` in different zone, and assert that correct zone
// is restored.
rootZone.run(() => {
subscriber.next('MyValue');
subscriber.complete();
});
return () => {
expect(Zone.current.name).to.eq(constructorZone.name);
log.push('cleanup');
};
})) as Observable<string>;

subscriptionZone.run(() => observable.subscribe(
() => {
expect(Zone.current.name).to.eq(subscriptionZone.name);
log.push('next');
},
() => null,
() => {
expect(Zone.current.name).to.eq(subscriptionZone.name);
log.push('complete');
}
));

expect(log).to.deep.equal(['next', 'complete', 'cleanup']);
});

it('should run operators in the zone of declaration', () => {
const log: string[] = [];
const rootZone: Zone = Zone.current;
const constructorZone: Zone = Zone.current.fork({ name: 'Constructor Zone'});
const operatorZone: Zone = Zone.current.fork({ name: 'Operator Zone'});
const subscriptionZone: Zone = Zone.current.fork({ name: 'Subscription Zone'});
let observable = constructorZone.run(() => new Observable<string>((subscriber) => {
// Execute the `next`/`complete` in different zone, and assert that correct zone
// is restored.
rootZone.run(() => {
subscriber.next('MyValue');
subscriber.complete();
});
return () => {
expect(Zone.current.name).to.eq(constructorZone.name);
log.push('cleanup');
};
})) as Observable<string>;

observable = operatorZone.run(() => observable.map((value) => {
expect(Zone.current.name).to.eq(operatorZone.name);
log.push('map: ' + value);
return value;
})) as Observable<string>;

subscriptionZone.run(() => observable.subscribe(
() => {
expect(Zone.current.name).to.eq(subscriptionZone.name);
log.push('next');
},
(e) => {
expect(Zone.current.name).to.eq(subscriptionZone.name);
log.push('error: ' + e);
},
() => {
expect(Zone.current.name).to.eq(subscriptionZone.name);
log.push('complete');
}
));

expect(log).to.deep.equal(['map: MyValue', 'next', 'complete', 'cleanup']);
});

});
1 change: 1 addition & 0 deletions spec/support/default.opts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--require zone.js
--require source-map-support/register
--require spec-js/helpers/testScheduler-ui.js
--ui spec-js/helpers/testScheduler-ui.js
Expand Down
40 changes: 37 additions & 3 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Subscriber } from './Subscriber';
import { Subscription, AnonymousSubscription, TeardownLogic } from './Subscription';
import { root } from './util/root';
import { toSubscriber } from './util/toSubscriber';
import { getZone } from './util/getZone';
import { IfObservable } from './observable/IfObservable';
import { ErrorObservable } from './observable/ErrorObservable';
import { $$observable } from './symbol/observable';
Expand All @@ -29,15 +30,25 @@ export class Observable<T> implements Subscribable<T> {

protected source: Observable<any>;
protected operator: Operator<any, T>;
/**
* The `Zone` which was captured at the time `Observable` got created.
* This is the `Zone` which will be used when invoking the `Observable` callbacks.
*/
private _zone: Zone;

/**
* Note about `zone.js`: When `zone.js` is loaded the `Observable` will capture the `Zone` on
* construction and then ensure that the `subscribe` function as well as the `TeardowLogic`
* execute in the `Zone` which was current an the time of the constructor call.
*
* @constructor
* @param {Function} subscribe the function that is called when the Observable is
* initially subscribed to. This function is given a Subscriber, to which new values
* can be `next`ed, or an `error` method can be called to raise an error, or
* `complete` can be called to notify of a successful completion.
*/
constructor(subscribe?: <R>(this: Observable<T>, subscriber: Subscriber<R>) => TeardownLogic) {
this._zone = getZone();
if (subscribe) {
this._subscribe = subscribe;
}
Expand All @@ -60,6 +71,10 @@ export class Observable<T> implements Subscribable<T> {
/**
* Creates a new Observable, with this Observable as the source, and the passed
* operator defined as the new observable's operator.
*
* Note about `zone.js`: When `zone.js` is loaded the all operator callback function will execute
* in the `Zone` which was current when the operator was registered using `lift` method.
*
* @method lift
* @param {Operator} operator the operator defining the operation to take on the observable
* @return {Observable} a new observable with the Operator applied
Expand All @@ -73,7 +88,12 @@ export class Observable<T> implements Subscribable<T> {

/**
* Registers handlers for handling emitted values, error and completions from the observable, and
* executes the observable's subscriber function, which will take action to set up the underlying data stream
* executes the observable's subscriber function, which will take action to set up the underlying data stream
*
* Note about `zone.js`: When `zone.js` is loaded the `Observable` will capture the `Zone` on
* invocation to `subscribe` and then ensure that the `next`, `error` and `complete` callbacks
* execute in the `Zone` which was current at the time of `subscribe` invocation.
*
* @method subscribe
* @param {PartialObserver|Function} observerOrNext (optional) either an observer defining all functions to be called,
* or the first of three possible handlers, which is the handler for each value emitted from the observable.
Expand All @@ -91,11 +111,25 @@ export class Observable<T> implements Subscribable<T> {

const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
// Only grab a zone if we Zone exists and it is different from the current zone.
const zone = this._zone && this._zone !== getZone() ? this._zone : null;

if (operator) {
operator.call(sink, this.source);
if (zone) {
// Current Zone is different from the intended zone.
// Restore the zone before invoking the operator.
zone.run(operator.call, operator, [sink, this.source]);
} else {
operator.call(sink, this.source);
}
} else {
sink.add(this._subscribe(sink));
if (zone) {
// Current Zone is different from the intended zone.
// Restore the zone before invoking the subscribe callback.
zone.run(() => sink.add(this._subscribe(sink)));
} else {
sink.add(this._subscribe(sink));
}
}

if (sink.syncErrorThrowable) {
Expand Down
25 changes: 22 additions & 3 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { isFunction } from './util/isFunction';
import {getZone} from './util/getZone';
import { Observer, PartialObserver } from './Observer';
import { Subscription } from './Subscription';
import { empty as emptyObserver } from './Observer';
Expand Down Expand Up @@ -92,7 +93,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
*/
next(value?: T): void {
if (!this.isStopped) {
this._next(value);
if (this._zone && this._zone != getZone()) {
// Current Zone is different from the intended zone.
// Restore the zone before `next`ing.
this._zone.run(this._next, this, [value]);
} else {
this._next(value);
}
}
}

Expand All @@ -106,7 +113,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
error(err?: any): void {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
if (this._zone && this._zone != getZone()) {
// Current Zone is different from the intended zone.
// Restore the zone before `error`ing.
this._zone.run(this._error, this, [err]);
} else {
this._error(err);
}
}
}

Expand All @@ -119,7 +132,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
complete(): void {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
if (this._zone && this._zone != getZone()) {
// Current Zone is different from the intended zone.
// Restore the zone before `complete`ing.
this._zone.run(this._complete, this);
} else {
this._complete();
}
}
}

Expand Down
14 changes: 12 additions & 2 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { isObject } from './util/isObject';
import { isFunction } from './util/isFunction';
import { tryCatch } from './util/tryCatch';
import { errorObject } from './util/errorObject';
import { getZone } from './util/getZone';
import { UnsubscriptionError } from './util/UnsubscriptionError';

export interface AnonymousSubscription {
Expand Down Expand Up @@ -40,13 +41,20 @@ export class Subscription implements ISubscription {
*/
public closed: boolean = false;

/**
* The `Zone` which was captured at the time `subscribe` was invoked.
* This is the `Zone` which will be used when invoking the `next`, `error', 'complete' callbacks.
*/
protected _zone: Zone;
private _subscriptions: ISubscription[];

/**
* @param {function(): void} [unsubscribe] A function describing how to
* perform the disposal of resources when the `unsubscribe` method is called.
*/
constructor(unsubscribe?: () => void) {
this._zone = getZone();
this._subscriptions = null;
if (unsubscribe) {
(<any> this)._unsubscribe = unsubscribe;
}
Expand All @@ -73,7 +81,8 @@ export class Subscription implements ISubscription {
(<any> this)._subscriptions = null;

if (isFunction(_unsubscribe)) {
let trial = tryCatch(_unsubscribe).call(this);
// Ensure that the subscription cleanup callback runs in the Subscription zone.
let trial = tryCatch(_unsubscribe, this._zone).call(this);
if (trial === errorObject) {
hasErrors = true;
errors = errors || (
Expand All @@ -91,7 +100,8 @@ export class Subscription implements ISubscription {
while (++index < len) {
const sub = _subscriptions[index];
if (isObject(sub)) {
let trial = tryCatch(sub.unsubscribe).call(sub);
// Ensure that the subscription cleanup callback runs in the Subscription zone.
let trial = tryCatch(sub.unsubscribe, sub._zone).call(sub);
if (trial === errorObject) {
hasErrors = true;
errors = errors || [];
Expand Down
9 changes: 9 additions & 0 deletions src/util/getZone.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* This function returns the current `Zone` if `Zone` is loaded or `null` if `Zone` is not loaded.
*
* It is expected that the VM will inline the `() => null` case when no `Zone` is present resulting
* in no performance impact.
*/
export const getZone: () => Zone = typeof Zone !== 'undefined' && Zone.current
? () => Zone.current
: () => null;
13 changes: 11 additions & 2 deletions src/util/tryCatch.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
import { errorObject } from './errorObject';
import { getZone } from './getZone';

let tryCatchTarget: Function;
let tryCatchZone: Zone;

function tryCatcher(this: any): any {
try {
return tryCatchTarget.apply(this, arguments);
return tryCatchZone && tryCatchZone != getZone()
? tryCatchZone.run(tryCatchTarget, this, arguments as any)
: tryCatchTarget.apply(this, arguments);
} catch (e) {
errorObject.e = e;
return errorObject;
} finally {
// Cleanup to prevent unnecessarily holding onto memory.
tryCatchZone = null;
tryCatchTarget = null;
}
}

export function tryCatch<T extends Function>(fn: T): T {
export function tryCatch<T extends Function>(fn: T, zone?: Zone): T {
tryCatchTarget = fn;
tryCatchZone = zone;
return <any>tryCatcher;
};
Loading

0 comments on commit cec81de

Please sign in to comment.