Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/api-provider/src/http/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ export default class HttpProvider implements ProviderInterface {
return this.coder.decodeResponse(result);
}

async subscribe (method: string, params: Array<any>, cb: ProviderInterface$Callback): Promise<number> {
async subscribe (types: string, method: string, params: Array<any>, cb: ProviderInterface$Callback): Promise<number> {
throw new Error('Subscriptions has not been implemented');
}

async unsubscribe (method: string, id: number): Promise<boolean> {
async unsubscribe (type: string, method: string, id: number): Promise<boolean> {
throw new Error('Subscriptions has not been implemented');
}
}
8 changes: 4 additions & 4 deletions packages/api-provider/src/mock/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ export default function mockProvider (): ProviderInterface {
on(self, type, sub),
send: (method: string, params: Array<any>): Promise<any> =>
send(self, method, params),
subscribe: (method: string, ...params: Array<any>): Promise<number> =>
subscribe(self, method, params),
unsubscribe: (method: string, id: number): Promise<boolean> =>
unsubscribe(self, method, id)
subscribe: (type: string, method: string, ...params: Array<any>): Promise<number> =>
subscribe(self, type, method, params),
unsubscribe: (type: string, method: string, id: number): Promise<boolean> =>
unsubscribe(self, type, method, id)
};
}
8 changes: 4 additions & 4 deletions packages/api-provider/src/mock/subscribe.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ describe('subscribe', () => {
});

it('fails on unknown methods', () => {
return subscribe(state, 'test_notFound').catch((error) => {
return subscribe(state, 'test', 'test_notFound').catch((error) => {
expect(error.message).toMatch(/Invalid method 'test_notFound'/);
});
});

it('returns a subscription id', () => {
return subscribe(state, 'chain_newHead', [() => void 0]).then((id) => {
return subscribe(state, 'chain_newHead', 'chain_newHead', [() => void 0]).then((id) => {
expect(id).toEqual(state.subscriptionId);
});
});

it('stores the mapping values', () => {
const cb = () => void 0;

return subscribe(state, 'chain_newHead', [cb]).then((id) => {
return subscribe(state, 'chain_newHead', 'chain_newHead', [cb]).then((id) => {
expect(state.subscriptionMap[id]).toEqual('chain_newHead');
expect(state.subscriptions['chain_newHead'].callbacks[id]).toEqual(cb);
});
Expand All @@ -36,7 +36,7 @@ describe('subscribe', () => {
it('calls back with the last known value', (done) => {
state.subscriptions['chain_newHead'].lastValue = 'testValue';

subscribe(state, 'chain_newHead', [(_, value) => {
subscribe(state, 'chain_newHead', 'chain_newHead', [(_, value) => {
expect(value).toEqual('testValue');
done();
}]);
Expand Down
2 changes: 1 addition & 1 deletion packages/api-provider/src/mock/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import { MockState, MockState$Subscription$Callback } from './types';

export default async function subscribe (self: MockState, method: string, params: Array<any>): Promise<number> {
export default async function subscribe (self: MockState, type: string, method: string, params: Array<any>): Promise<number> {
self.l.debug(() => ['subscribe', method, params]);

if (self.subscriptions[method]) {
Expand Down
10 changes: 5 additions & 5 deletions packages/api-provider/src/mock/unsubscribe.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,31 @@ describe('unsubscribe', () => {
beforeEach(() => {
state = createState();

return subscribe(state, 'chain_newHead', [() => void 0]).then((_id) => {
return subscribe(state, 'chain_newHead', 'chain_newHead', [() => void 0]).then((_id) => {
id = _id;
});
});

it('fails on unknown ids', () => {
return unsubscribe(state, 'chain_newHead', 5).catch((error) => {
return unsubscribe(state, 'chain_newHead', 'chain_newHead', 5).catch((error) => {
expect(error.message).toMatch(/Unable to find/);
});
});

it('unsubscribes successfully', () => {
return unsubscribe(state, 'chain_newHead', id);
return unsubscribe(state, 'chain_newHead', 'chain_newHead', id);
});

it('fails on double unsubscribe', () => {
return unsubscribe(state, 'chain_newHead', id)
return unsubscribe(state, 'chain_newHead', 'chain_newHead', id)
.then(() => unsubscribe(state, id))
.catch((error) => {
expect(error.message).toMatch(/Unable to find/);
});
});

it('clears the subscriptions', () => {
return unsubscribe(state, 'chain_newHead', id).then(() => {
return unsubscribe(state, 'chain_newHead', 'chain_newHead', id).then(() => {
expect(state.subscriptionMap[id]).not.toBeDefined();
expect(state.subscriptions['chain_newHead'].callbacks[id]).not.toBeDefined();
});
Expand Down
2 changes: 1 addition & 1 deletion packages/api-provider/src/mock/unsubscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import { MockState } from './types';

export default async function unsubscribe (self: MockState, _: string, id: number): Promise<boolean> {
export default async function unsubscribe (self: MockState, type: string, name: string, id: number): Promise<boolean> {
const method = self.subscriptionMap[id];

self.l.debug(() => ['unsubscribe', id, method]);
Expand Down
4 changes: 2 additions & 2 deletions packages/api-provider/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ export interface ProviderInterface {
isConnected (): boolean,
on (type: ProviderInterface$Emitted, sub: ProviderInterface$EmitCb): void,
send (method: string, params: Array<any>): Promise<any>,
subscribe (method: string, params: Array<any>, cb: ProviderInterface$Callback): Promise<number>,
unsubscribe (method: string, id: number): Promise<boolean>
subscribe (type: string, method: string, params: Array<any>, cb: ProviderInterface$Callback): Promise<number>,
unsubscribe (type: string, method: string, id: number): Promise<boolean>
}
30 changes: 18 additions & 12 deletions packages/api-provider/src/ws/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ import logger from '@polkadot/util/logger';

import coder from '../coder/json';

type SubscriptionHandler = {
callback: ProviderInterface$Callback,
type: string
};

type WsState$Awaiting = {
callback: ProviderInterface$Callback,
method: string,
params: Array<any>,
subscription?: ProviderInterface$Callback
subscription?: SubscriptionHandler
};

type WsState$Subscription = {
callback: ProviderInterface$Callback,
type WsState$Subscription = SubscriptionHandler & {
method: string,
params: Array<any>
};
Expand Down Expand Up @@ -91,7 +95,7 @@ export default class WsProvider extends E3.EventEmitter implements WSProviderInt
return super.on(type, sub);
}

async send (method: string, params: Array<any>, subscription?: ProviderInterface$Callback): Promise<any> {
async send (method: string, params: Array<any>, subscription?: SubscriptionHandler): Promise<any> {
return new Promise((resolve, reject): void => {
try {
const json = this.coder.encodeJson(method, params);
Expand Down Expand Up @@ -124,16 +128,18 @@ export default class WsProvider extends E3.EventEmitter implements WSProviderInt
});
}

async subscribe (method: string, params: Array<any>, subscription: ProviderInterface$Callback): Promise<number> {
const id = await this.send(method, params, subscription);
async subscribe (type: string, method: string, params: Array<any>, callback: ProviderInterface$Callback): Promise<number> {
const id = await this.send(method, params, { callback, type });

return id as number;
}

async unsubscribe (method: string, id: number): Promise<boolean> {
assert(!isUndefined(this.subscriptions[`${method}::${id}`]), `Unable to find active subscription=${id}`);
async unsubscribe (type: string, method: string, id: number): Promise<boolean> {
const subscription = `${type}::${id}`;

assert(!isUndefined(this.subscriptions[subscription]), `Unable to find active subscription=${subscription}`);

delete this.subscriptions[id];
delete this.subscriptions[subscription];

const result = await this.send(method, [id]);

Expand Down Expand Up @@ -178,12 +184,12 @@ export default class WsProvider extends E3.EventEmitter implements WSProviderInt
}

try {
const { method, params, subscription } = this.handlers[response.id];
const { method, params, subscription } = handler;
const result = this.coder.decodeResponse(response);

if (subscription) {
this.subscriptions[`${method}::${result}`] = {
callback: subscription,
this.subscriptions[`${subscription.type}::${result}`] = {
...subscription,
method,
params
};
Expand Down
18 changes: 12 additions & 6 deletions packages/api-provider/src/ws/onMessageSubscribe.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ describe('onMessageSubscribe', () => {
ws.handlers[11] = {
callback: (_, id) => {},
method: 'test',
subscription: (_, result) => {
expect(result).toEqual('test');
done();
subscription: {
callback: (_, result) => {
expect(result).toEqual('test');
done();
},
type: 'test'
}
};

Expand All @@ -43,9 +46,12 @@ describe('onMessageSubscribe', () => {
ws.handlers[11] = {
callback: (_, id) => {},
method: 'test',
subscription: (error) => {
expect(error.message).toMatch(/test/);
done();
subscription: {
callback: (error) => {
expect(error.message).toMatch(/test/);
done();
},
type: 'test'
}
};

Expand Down
8 changes: 4 additions & 4 deletions packages/api-provider/src/ws/unsubscribe.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ describe('subscribe', () => {
const ws = createWs();

return ws
.subscribe('subscribe_test', [], () => {})
.subscribe('test', 'subscribe_test', [], () => {})
.then((id) => {
return ws.unsubscribe('subscribe_test', id);
return ws.unsubscribe('test', 'subscribe_test', id);
});
});

Expand All @@ -74,9 +74,9 @@ describe('subscribe', () => {
const ws = createWs();

return ws
.subscribe('subscribe_test', [], () => {})
.subscribe('test', 'subscribe_test', [], () => {})
.then((id) => {
return ws.unsubscribe('subscribe_test', 111);
return ws.unsubscribe('test', 'subscribe_test', 111);
})
.catch((error) => {
expect(error.message).toMatch(/find active subscription/);
Expand Down
3 changes: 2 additions & 1 deletion packages/api/src/create/formatResult.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe('formatResult', () => {
send: jest.fn((method, params) =>
Promise.resolve('0x0102')
),
subscribe: jest.fn((method, params, subscription) =>
subscribe: jest.fn((type, method, params, subscription) =>
subscription(null, {
block: '0x1234',
changes: [
Expand Down Expand Up @@ -61,6 +61,7 @@ describe('formatResult', () => {
expect(
provider.subscribe
).toHaveBeenCalledWith(
'state_storage',
'state_subscribeStorage',
[[ENC_ONE, ENC_TWO]],
expect.anything()
Expand Down
4 changes: 2 additions & 2 deletions packages/api/src/create/methodSubscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import formatResult from './formatResult';

export default function methodSubscribe (provider: ProviderInterface, rpcName: string, method: SectionItem<Interfaces>): ApiInterface$Section$Method {
const unsubscribe = (subscriptionId: any): Promise<any> =>
provider.send(method.subscribe[1], [subscriptionId]);
provider.unsubscribe(rpcName, method.subscribe[1], subscriptionId);
const _call = async (...values: Array<any>): Promise<any> => {
try {
const cb: ProviderInterface$Callback = values.pop();
Expand All @@ -29,7 +29,7 @@ export default function methodSubscribe (provider: ProviderInterface, rpcName: s
cb(error, formatResult(method, params, values, result));
};

return provider.subscribe(method.subscribe[0], params, update);
return provider.subscribe(rpcName, method.subscribe[0], params, update);
} catch (error) {
throw new ExtError(`${signature(method)}:: ${error.message}`, (error as ExtError).code, undefined);
}
Expand Down