Skip to content

Commit

Permalink
fix(ScalarObservable): remove ScalarObservable optimization
Browse files Browse the repository at this point in the history
- remove usage of ScalarObservable for optimization
- implementation of ScalarObservable still remain for possible further usage

closes ReactiveX#1142, ReactiveX#1150, ReactiveX#1140
  • Loading branch information
kwonoj committed Jan 11, 2016
1 parent 41d39e2 commit 9fef2ff
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 69 deletions.
43 changes: 19 additions & 24 deletions doc/operator-creation.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Operator Creation

There are many ways to create an operator for RxJS. In this version of RxJS, performance was the primary consideration, as such, operator creation
in a way that adheres to the existing structures in this library may not be straight forward. This is an attempt to document how to
There are many ways to create an operator for RxJS. In this version of RxJS, performance was the primary consideration, as such, operator creation
in a way that adheres to the existing structures in this library may not be straight forward. This is an attempt to document how to
create an operator either for yourself, or for this library.

For how to develop a custom operator for *this* library, [see below](#advanced).
Expand All @@ -11,20 +11,20 @@ For how to develop a custom operator for *this* library, [see below](#advanced).

### Guidelines

In the most common case, users might like to create an operator to be used only by by their app. These can be developed in
In the most common case, users might like to create an operator to be used only by by their app. These can be developed in
any way the developer sees fit, but here are some guidelines:

1. __Operators should always return an Observable__. You're performing operations on unknown sets of things to create new sets.
It only makes sense to return a new set. If you create a method that returns something other than an Observable, it's not an operator,
and that's fine.
2. __Be sure to manage subscriptions__ created inside of the Observable your operator returns. Your operator is going to have to
subscribe to the source (or `this`) inside of the returned Observable, be sure that it's returned as part of unsubscribe handler or
subscribe to the source (or `this`) inside of the returned Observable, be sure that it's returned as part of unsubscribe handler or
subscription.
3. __Be sure to handle exceptions from passed functions__. If you're implementing an operator that takes a function as an argument,
3. __Be sure to handle exceptions from passed functions__. If you're implementing an operator that takes a function as an argument,
when you call it, you'll want to wrap it in a `try/catch` and send the error down the `error()` path on the observable.
4. __Be sure to teardown scarce resources__ in your unsubscribe handler of your returned Observable. If you're setting up event handlers
or a web socket, or something like that, the unsubscribe handler is a great place to remove that event handler or close that socket.



<!-- share-code-between-examples -->
Expand All @@ -36,7 +36,7 @@ function mySimpleOperator(someCallback) {
return Observable.create(subscriber => {
// because we're in an arrow function `this` is from the outer scope.
var source = this;

// save our inner subscription
var subscription = source.subscribe(value => {
// important: catch errors from user-provided callbacks
Expand All @@ -45,12 +45,12 @@ function mySimpleOperator(someCallback) {
} catch(err) {
subscriber.error(err);
}
},
},
// be sure to handle errors and completions as appropriate and
// send them along
err => subscriber.error(err),
() => subscriber.complete());

// to return now
return subscription;
});
Expand All @@ -77,10 +77,10 @@ class MyObservable extends Observable {
observable.operator = operator;
return observable;
}

// put it here .. or ..
customOperator() {
/* do things and return an Observable */
/* do things and return an Observable */
}
}

Expand All @@ -101,11 +101,11 @@ someObservable.mySimpleOperator(x => x + '!');

## <a id="advanced"></a>Creating An Operator For Inclusion In *This* Library

__To create an operator for inclusion in this library, it's probably best to work from prior art__. Something
__To create an operator for inclusion in this library, it's probably best to work from prior art__. Something
like the `filter` operator would be a good start. It's not expected that you'll be able to read
this section and suddenly be an expert operator contributor.

**If you find yourself confused, DO NOT worry. Follow prior examples in the repo, submit a PR, and we'll work with you.**
**If you find yourself confused, DO NOT worry. Follow prior examples in the repo, submit a PR, and we'll work with you.**

Hopefully the information provided here will give context to decisions made while developing operators in this library.
There are a few things to know and (try to) understand while developing operators:
Expand All @@ -114,21 +114,21 @@ There are a few things to know and (try to) understand while developing operator
"build their own observable" by pulling in operator methods an adding them to observable in their own module.
It also means operators can be brought in ad-hock and used directly, either with the ES7 function bind operator
in Babel (`::`) or by using it with `.call()`.
2. Every operator has an `Operator` class. The `Operator` class is really a `Subscriber` "factory". It's
what gets passed into the `lift` method to make the "magic" happen. It's sole job is to create the operation's
2. Every operator has an `Operator` class. The `Operator` class is really a `Subscriber` "factory". It's
what gets passed into the `lift` method to make the "magic" happen. It's sole job is to create the operation's
`Subscriber` instance on subscription.
3. Every operator has a `Subscriber` class. This class does *all* of the logic for the operation. It's job is to
handle values being nexted in (generally by overriding `_next()`) and forward it along to the `destination`,
3. Every operator has a `Subscriber` class. This class does *all* of the logic for the operation. It's job is to
handle values being nexted in (generally by overriding `_next()`) and forward it along to the `destination`,
which is the next observer in the chain.
- It's important to note that the `destination` Observer set on any `Subscriber` serves as more than just
the destinations for the events passing through, If the `destination` is a `Subscriber` it also is used to set up
a shared underlying `Subscription`, which, in fact, is also a `Subscriber`, and is the first `Subscriber` in the
a shared underlying `Subscription`, which, in fact, is also a `Subscriber`, and is the first `Subscriber` in the
chain.
- Subscribers all have `add` and `remove` methods that are used for adding and removing inner subscriptions to
the shared underlying subscription.
- When you `subscribe` to an Observable, the functions or Observer you passed are used to create the final `destination`
`Subscriber` for the chain. It's this `Subscriber` that is really also the shared `Subscriptoin` for the operator chain.

### Inner Subscriptions

An "inner subscriber" or "inner subscription" is any subscription created inside of an operator's primary Subscriber. For example,
Expand All @@ -138,8 +138,3 @@ inner subscriptions in this library is that if you pass and set a `destination`
for their `unsubscribe` calls. Meaning if you call `unsubscribe` on them, it might not do anything. As such, it's usually desireable
not to set the `destination` of inner subscriptions. An example of this might be the switch operators, that have a single underlying
inner subscription that needs to unsubscribe independent of the main subscription.

If you find yourself creating inner subscriptions, it might also be worth checking to see if the observable being passed `_isScalar`,
because if it is, you can pull the `value` out of it directly and improve the performance of your operator when it's operating over
scalar observables. For reference a scalar observable is any observable that has a single static value underneath it. `Observable.of('foo')` will
return a `ScalarObservable`, likewise, resolved `PromiseObservable`s will act as scalars.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
},
"scripts": {
"build_all": "npm run build_es6 && npm run build_amd && npm run build_cjs && npm run build_global && npm run generate_packages",
"build_amd": "rm -rf dist/amd && tsc typings/es6-shim/es6-shim.d.ts src/Rx.ts -m amd --outDir dist/amd --sourcemap --target ES5 --diagnostics --pretty --noImplicitAny --suppressImplicitAnyIndexErrors",
"build_cjs": "rm -rf dist/cjs && tsc typings/es6-shim/es6-shim.d.ts src/Rx.ts src/Rx.KitchenSink.ts -m commonjs --outDir dist/cjs --sourcemap --target ES5 -d --diagnostics --pretty --noImplicitAny --suppressImplicitAnyIndexErrors",
"build_es6": "rm -rf dist/es6 && tsc src/Rx.ts src/Rx.KitchenSink.ts --outDir dist/es6 --sourceMap --target ES6 -d --diagnostics --pretty --noImplicitAny --suppressImplicitAnyIndexErrors",
"build_amd": "rm -rf dist/amd && tsc typings/es6-shim/es6-shim.d.ts src/Rx.ts src/observable/ScalarObservable.ts -m amd --outDir dist/amd --sourcemap --target ES5 --diagnostics --pretty --noImplicitAny --suppressImplicitAnyIndexErrors",
"build_cjs": "rm -rf dist/cjs && tsc typings/es6-shim/es6-shim.d.ts src/Rx.ts src/Rx.KitchenSink.ts src/observable/ScalarObservable.ts -m commonjs --outDir dist/cjs --sourcemap --target ES5 -d --diagnostics --pretty --noImplicitAny --suppressImplicitAnyIndexErrors",
"build_es6": "rm -rf dist/es6 && tsc src/Rx.ts src/Rx.KitchenSink.ts src/observable/ScalarObservable.ts --outDir dist/es6 --sourceMap --target ES6 -d --diagnostics --pretty --noImplicitAny --suppressImplicitAnyIndexErrors",
"build_closure": "java -jar ./node_modules/google-closure-compiler/compiler.jar ./dist/global/Rx.umd.js --language_in ECMASCRIPT5 --create_source_map ./dist/global/Rx.umd.min.js.map --js_output_file ./dist/global/Rx.umd.min.js",
"build_global": "rm -rf dist/global && mkdir \"dist/global\" && node tools/make-umd-bundle.js && node tools/make-system-bundle.js && npm run build_closure",
"build_perf": "npm run build_cjs && npm run build_global && webdriver-manager update && npm run perf",
Expand Down
12 changes: 0 additions & 12 deletions spec/observables/of-spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* globals describe, it, expect, expectObservable, rxTestScheduler */
var ArrayObservable = require('../../dist/cjs/observable/fromArray').ArrayObservable;
var ScalarObservable = require('../../dist/cjs/observable/ScalarObservable').ScalarObservable;
var EmptyObservable = require('../../dist/cjs/observable/empty').EmptyObservable;
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
Expand All @@ -20,16 +19,6 @@ describe('Observable.of', function () {
});
});

it('should return a scalar observable if only passed one value', function () {
var obs = Observable.of('one');
expect(obs instanceof ScalarObservable).toBe(true);
});

it('should return a scalar observable if only passed one value and a scheduler', function () {
var obs = Observable.of('one', Rx.Scheduler.queue);
expect(obs instanceof ScalarObservable).toBe(true);
});

it('should return an array observable if passed many values', function () {
var obs = Observable.of('one', 'two', 'three');
expect(obs instanceof ArrayObservable).toBe(true);
Expand Down Expand Up @@ -60,7 +49,6 @@ describe('Observable.of', function () {
Observable.of('a', 'b', 'c', rxTestScheduler),
rxTestScheduler
);
expect(source instanceof ScalarObservable).toBe(true);
var result = source.concatAll();
expectObservable(result).toBe('(abc|)');
});
Expand Down
5 changes: 1 addition & 4 deletions src/observable/fromArray.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {ScalarObservable} from './ScalarObservable';
import {EmptyObservable} from './empty';
import {Subscriber} from '../Subscriber';
import {isScheduler} from '../util/isScheduler';
Expand All @@ -21,10 +20,8 @@ export class ArrayObservable<T> extends Observable<T> {
}

const len = array.length;
if (len > 1) {
if (len > 0) {
return new ArrayObservable<T>(<any>array, scheduler);
} else if (len === 1) {
return new ScalarObservable<T>(<any>array[0], scheduler);
} else {
return new EmptyObservable<T>(scheduler);
}
Expand Down
22 changes: 0 additions & 22 deletions src/operator/every.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {Observable} from '../Observable';
import {ScalarObservable} from '../observable/ScalarObservable';
import {ArrayObservable} from '../observable/fromArray';
import {ErrorObservable} from '../observable/throw';
import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
Expand All @@ -17,25 +14,6 @@ import {errorObject} from '../util/errorObject';
export function every<T>(predicate: (value: T, index: number, source: Observable<T>) => boolean,
thisArg?: any): Observable<boolean> {
const source = this;
if (source._isScalar) {
const result: boolean = tryCatch(predicate).call(thisArg || this, source.value, 0, source);
if (result === errorObject) {
return new ErrorObservable(errorObject.e, source.scheduler);
} else {
return new ScalarObservable(result, source.scheduler);
}
}

if (source instanceof ArrayObservable) {
const array = (<ArrayObservable<T>>source).array;
const result = tryCatch((array: T[], predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg: any) =>
array.every(<any>predicate, thisArg))(array, predicate, thisArg);
if (result === errorObject) {
return new ErrorObservable(errorObject.e, source.scheduler);
} else {
return new ScalarObservable(result, source.scheduler);
}
}
return source.lift(new EveryOperator(predicate, thisArg, source));
}

Expand Down
5 changes: 1 addition & 4 deletions src/operator/startWith.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {ArrayObservable} from '../observable/fromArray';
import {ScalarObservable} from '../observable/ScalarObservable';
import {EmptyObservable} from '../observable/empty';
import {concat} from './concat-static';
import {isScheduler} from '../util/isScheduler';
Expand All @@ -15,9 +14,7 @@ export function startWith<T>(...array: Array<T | Scheduler>): Observable<T> {
}

const len = array.length;
if (len === 1) {
return concat(new ScalarObservable<T>(<T>array[0], scheduler), <Observable<T>>this);
} else if (len > 1) {
if (len > 0) {
return concat(new ArrayObservable<T>(<T[]>array, scheduler), <Observable<T>>this);
} else {
return concat(new EmptyObservable<T>(scheduler), <Observable<T>>this);
Expand Down

0 comments on commit 9fef2ff

Please sign in to comment.