Skip to content
This repository was archived by the owner on May 24, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 211 additions & 128 deletions packages/api/src/pubsub/eth/eth.js

Large diffs are not rendered by default.

582 changes: 380 additions & 202 deletions packages/api/src/pubsub/pubsub.spec.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions packages/api/src/transport/ws/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ class Ws extends JsonRpcBase {
}

_extract (result) {
const { result: res, id, method, params } = result;
const { result: res, error, id, method, params } = result;
const msg = this._messages[id];

// initial pubsub ACK
if (id && msg.subscription) {
if (id && msg.subscription && !error) {
// save subscription to map subId -> messageId
this._subscriptions[msg.subscription] = this._subscriptions[msg.subscription] || {};
this._subscriptions[msg.subscription][res] = id;
Expand Down
3 changes: 3 additions & 0 deletions packages/light.js/example/src/provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import Api from '@parity/api';

export const currentProvider = window.web3 && window.web3.currentProvider;
export const localProvider = new Api.Provider.Ws('ws://127.0.0.1:8546');
export const infuraProvider = new Api.Provider.Ws(
'wss://mainnet.infura.io/_ws'
);

const provider = currentProvider || localProvider;

Expand Down
5 changes: 0 additions & 5 deletions packages/light.js/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ export const createApiFromProvider = memoizee(
*/
export const setApi = (newApi: any) => {
api = newApi;
if (!api.isPubSub) {
console.warn(
`Current provider does not support pubsub. @parity/light.js will poll every second to listen to changes.`
);
}
};

/**
Expand Down
12 changes: 10 additions & 2 deletions packages/light.js/src/frequency/accounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import createPubsubObservable from './utils/createPubsubObservable';
* @param options - Options to pass to {@link FrequencyObservable}.
*/
export function onAccountsChanged$ (options?: FrequencyObservableOptions) {
return createPubsubObservable<Address[]>('eth_accounts', options);
return createPubsubObservable<Address[]>(
'eth_accounts',
'eth_accounts',
options
);
}

/**
Expand All @@ -21,5 +25,9 @@ export function onAccountsChanged$ (options?: FrequencyObservableOptions) {
* @param options - Options to pass to {@link FrequencyObservable}.
*/
export function onAccountsInfoChanged$ (options?: FrequencyObservableOptions) {
return createPubsubObservable<AccountsInfo>('parity_accountsInfo', options);
return createPubsubObservable<AccountsInfo>(
'parity_accountsInfo',
'parity_accountsInfo',
options
);
}
19 changes: 12 additions & 7 deletions packages/light.js/src/frequency/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
//
// SPDX-License-Identifier: MIT

import BigNumber from 'bignumber.js';
import { filter, map, withLatestFrom } from 'rxjs/operators';
import { filter, map, startWith, withLatestFrom } from 'rxjs/operators';
import * as memoizee from 'memoizee';
import { Observable } from 'rxjs';

import { Block, FrequencyObservableOptions } from '../types';
import { createApiFromProvider, getApi } from '../api';
import createPubsubObservable from './utils/createPubsubObservable';
import { FrequencyObservableOptions } from '../types';
import { distinctValues } from '../utils/operators';
import { onSyncingChanged$ } from './health';

/**
Expand All @@ -20,11 +20,16 @@ import { onSyncingChanged$ } from './health';
*/
const onEveryBlockWithApi$ = memoizee(
(api: any, options?: FrequencyObservableOptions) =>
createPubsubObservable('eth_blockNumber', options).pipe(
withLatestFrom(onSyncingChanged$(options)),
createPubsubObservable(
'eth_newHeads',
'eth_getBlockByNumber',
options
).pipe(
withLatestFrom(onSyncingChanged$(options).pipe(startWith(false))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but wouldn't this make onEveryBlockWithApi$ emit on initialization, even though the user is not synced? e.g.:

  • onEveryBlockWithApi$ is called
  • blockNumber RPC answers
  • onEveryBlockWithApi$ emits wit the blockNumber
  • isSyncing RPC answers true (which was always the case)
  • onEveryBlockWithApi$ doesn't emit until isSyncing RPC answers false

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think you're right. The problem is: when you connect to Metamask or infura, eth_subscribe('syncing') doesn't return anything, I assume they're always considered sync. So onSyncingChanged$ actually never fires.

Between the 2, i still prefer to fire once at the beginning.

However I'm open to other ideas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine a fix could be to make onSyncingChanged$ fire eth_syncing (non-pubsub rpc call) first to get an initial value, and then switch to the pubsub. non-pubsub "eth_syncing" does get an answer.

obviously not very clean though

Copy link
Collaborator Author

@amaury1093 amaury1093 Nov 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think it's a good idea, I thought about it too.

Another reason why it's good: before, parity_subscribe('eth_getBlock') immediately gives you an answer, and then gives you subsequent pubsub answers on each pub. eth_subscribe('newHeads') however doesn't give you the initial one, so there might be a (up to 15s) delay between subscribe and first event fired.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, makes sense then!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#61

filter(([_, isSyncing]) => isSyncing === false),
map(([blockNumber]) => blockNumber)
) as Observable<BigNumber>,
map(([blockNumber]) => blockNumber),
distinctValues()
) as Observable<Block>,
{ length: 1 } // Only memoize by api
);

Expand Down
6 changes: 5 additions & 1 deletion packages/light.js/src/frequency/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ import { FrequencyObservableOptions } from '../types';
* @param options - Options to pass to {@link FrequencyObservable}.
*/
export function onSyncingChanged$ (options?: FrequencyObservableOptions) {
return createPubsubObservable<object | false>('eth_syncing', options);
return createPubsubObservable<object | false>(
'eth_syncing',
'eth_syncing',
options
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,49 @@ import { setApi } from '../../api';

it('should return an Observable', () => {
setApi(resolveApi());
expect(isObservable(createPubsubObservable('fake_method'))).toBe(true);
expect(
isObservable(createPubsubObservable('eth_blockNumber', 'eth_blockNumber'))
).toBe(true);
});

it('should fire an event when pubsub publishes', done => {
setApi(resolveApi());
createPubsubObservable('fake_method').subscribe(data => {
expect(data).toBe('foo');
done();
});
createPubsubObservable('eth_blockNumber', 'eth_blockNumber').subscribe(
data => {
expect(data).toBe('foo');
done();
}
);
});

it('should fire an error when pubsub errors', done => {
setApi(rejectApi());
createPubsubObservable('fake_method').subscribe(undefined, err => {
expect(err).toEqual(new Error('bar'));
done();
});
createPubsubObservable('eth_blockNumber', 'eth_blockNumber').subscribe(
undefined,
err => {
expect(err).toEqual(new Error('bar'));
done();
}
);
});

it('should fire an event when polling pubsub publishes', done => {
setApi(resolveApi('foo', false));
createPubsubObservable('fake_method').subscribe(data => {
expect(data).toBe('foo');
done();
});
createPubsubObservable('eth_blockNumber', 'eth_blockNumber').subscribe(
data => {
expect(data).toBe('foo');
done();
}
);
});

it('should fire an error when polling pubsub errors', done => {
setApi(rejectApi(new Error('bar'), false));
createPubsubObservable('fake_method').subscribe(undefined, err => {
expect(err).toEqual(new Error('bar'));
done();
});
createPubsubObservable('eth_blockNumber', 'eth_blockNumber').subscribe(
undefined,
err => {
expect(err).toEqual(new Error('bar'));
done();
}
);
});
55 changes: 46 additions & 9 deletions packages/light.js/src/frequency/utils/createPubsubObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,45 @@
// SPDX-License-Identifier: MIT

import * as debug from 'debug';
import { exhaustMap } from 'rxjs/operators';
import { FrequencyObservableOptions } from '../../types';
import * as memoizee from 'memoizee';
import { Observable, Observer, timer } from 'rxjs';
import { switchMap } from 'rxjs/operators';

import { createApiFromProvider, getApi } from '../../api';
import { distinctReplayRefCount } from '../../utils/operators/distinctReplayRefCount';

const POLL_INTERVAL = 1000;

/**
* Create a polling function, calls the `fallback` JSONRPC on each second, or
* on previous call's result, whichever comes last.
*
* @ignore
*/
function createPoll<T> (
fallback: string,
api: any,
pollInterval = POLL_INTERVAL
) {
const [fallbackNamespace, fallbackMethod] = fallback.split('_');

return timer(0, pollInterval).pipe(
exhaustMap(() => api[fallbackNamespace][fallbackMethod]())
) as Observable<T>;
}

/**
* Given an api, returns an Observable that emits on each pubsub event.
* Pure function version of {@link createPubsubObservable}.
*
* @ignore
* @param pubsub - The pubsub method to subscribe to.
* @param fallback - If pubsub doesn't work, poll this method every
* POLL_INTERVAL ms.
*/
const createPubsubObservableWithApi = memoizee(
<T>(pubsub: string, api: any) => {
<T>(pubsub: string, fallback: string, api: any) => {
const [namespace, method] = pubsub.split('_');

// There's a chance the provider doesn't support pubsub, for example
Expand All @@ -29,12 +52,10 @@ const createPubsubObservableWithApi = memoizee(
debug('@parity/light.js:api')(
`Pubsub not available for ${
api.provider ? api.provider.constructor.name : 'current Api'
} provider, polling "${pubsub}" every second.`
} provider, polling "${fallback}" every ${POLL_INTERVAL}ms.`
);

return timer(0, 1000).pipe(
switchMap(() => api[namespace][method]())
) as Observable<T>;
return createPoll<T>(fallback, api);
}

return Observable.create((observer: Observer<T>) => {
Expand All @@ -47,10 +68,25 @@ const createPubsubObservableWithApi = memoizee(
observer.next(result);
}
}
);
).catch(() => {
// If we get an error during subscription, then default to fallback.
// TODO Should this be done on @parity/api?
debug('@parity/light.js:api')(
`Pubsub not available for method "${pubsub}", polling "${fallback}" every ${POLL_INTERVAL}ms`
);

createPoll<T>(fallback, api).subscribe(
e => observer.next(e),
e => observer.error(e),
() => observer.complete()
);
});

return () =>
subscription.then((subscriptionId: string) =>
api.pubsub.unsubscribe(subscriptionId)
subscriptionId
? api.pubsub.unsubscribe(subscriptionId)
: Promise.resolve()
);
}).pipe(distinctReplayRefCount()) as Observable<T>;
}
Expand All @@ -64,11 +100,12 @@ const createPubsubObservableWithApi = memoizee(
*/
const createPubsubObservable = <T>(
pubsub: string,
fallback: string,
{ provider }: FrequencyObservableOptions = {}
) => {
const api = provider ? createApiFromProvider(provider) : getApi();

return createPubsubObservableWithApi<T>(pubsub, api);
return createPubsubObservableWithApi<T>(pubsub, fallback, api);
};

export default createPubsubObservable;
16 changes: 8 additions & 8 deletions packages/light.js/src/rpc/eth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import BigNumber from 'bignumber.js';
import { of } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';

import { Address, RpcObservableOptions } from '../types';
import { Address, Block, RpcObservableOptions } from '../types';
import createRpc$ from './utils/createRpc';
import frequency from '../frequency';
import { isNullOrLoading, RPC_LOADING } from '../utils/isLoading';
Expand Down Expand Up @@ -66,9 +66,10 @@ export function defaultAccount$ (options?: RpcObservableOptions) {
* @return {Observable<Number>} - An Observable containing the block height.
*/
export function blockNumber$ (options?: RpcObservableOptions) {
return createRpc$<BigNumber, BigNumber>({
return createRpc$<Block, BigNumber>({
frequency: [frequency.onEveryBlock$],
name: 'blockNumber$'
name: 'blockNumber$',
pipes: () => [map(block => block.number)]
})(options)();
}

Expand All @@ -81,11 +82,10 @@ export function myBalance$ (options?: RpcObservableOptions) {
dependsOn: defaultAccount$,
name: 'myBalance$',
pipes: () => [
switchMap(
defaultAccount =>
isNullOrLoading(defaultAccount)
? of(RPC_LOADING)
: balanceOf$(defaultAccount)
switchMap(defaultAccount =>
isNullOrLoading(defaultAccount)
? of(RPC_LOADING)
: balanceOf$(defaultAccount)
)
]
})(options)();
Expand Down
5 changes: 5 additions & 0 deletions packages/light.js/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ export type Address = string;
// TODO This should be on @parity/api
export type ApiValue = any;

// TODO This should be on @parity/api
export type Block = {
number: BigNumber;
};

export interface Metadata<Source, Out> {
calledWithArgs?: {
[key: string]: ReplaySubject<Out>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ testValue(2, 'number');
testValue('foo', 'string');
testValue({ foo: 'bar' }, 'object');
testValue(new BigNumber(2), 'BigNumber');
testValue({ number: new BigNumber(2) }, 'Block');
21 changes: 20 additions & 1 deletion packages/light.js/src/utils/operators/distinctValues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,37 @@ import BigNumber from 'bignumber.js';
import { distinctUntilChanged } from 'rxjs/operators';
import { isObject } from '@parity/api/lib/util/types';

import { Block } from '../../types';

/**
* An intelligent distinctUntilChanged().
*
* @ignore
*/
export const distinctValues = <T>() =>
distinctUntilChanged<T>((x, y) => {
// If T == Block
if (
x &&
y &&
((x as unknown) as Block).number &&
((y as unknown) as Block).number
) {
return ((x as unknown) as Block).number.eq(
((y as unknown) as Block).number
);
}

// If T == BigNumber
if (BigNumber.isBigNumber(x) && BigNumber.isBigNumber(y)) {
return ((x as any) as BigNumber).eq((y as any) as BigNumber);
return ((x as unknown) as BigNumber).eq((y as unknown) as BigNumber);
}

// If T == object
if (isObject(x) && isObject(y)) {
return JSON.stringify(x) === JSON.stringify(y); // TODO Do a deep equal instead
}

// Other cases
return x === y;
});
2 changes: 1 addition & 1 deletion packages/light.js/src/utils/testHelpers/mockApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class MockProvider extends EventEmitter {

// List of JSONRPCs we want to mock
const listOfMockRps: { [index: string]: string[] } = {
eth: ['accounts', 'blockNumber', 'getBalance', 'syncing'],
eth: ['accounts', 'blockNumber', 'getBalance', 'newHeads', 'syncing'],
fake: ['method'],
net: ['peerCount'],
parity: ['accountsInfo', 'chain', 'postTransaction']
Expand Down