Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
Enhance ws responses wih topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Vektrat committed Aug 19, 2020
1 parent 2629459 commit f0a7845
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 219 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ All notable changes to this project will be documented in this file.

The changelog format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [v1.2.1] - 21-Aug-2020
### Changed
- Removed `topic` (address) from transactionStatus WS responses.
- Removed `channelName` from WS transaction metadata.
- Wrapped WS responses so that the `topic` the client subscribed to is also returned.

## [v1.2.0] - 6-Aug-2020
### Added
- TLS installation notes.
Expand Down
1 change: 0 additions & 1 deletion catapult-sdk/src/model/ModelSchemaBuilder.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class ModelSchemaBuilder {

transactionStatus: {
hash: ModelType.binary,
address: ModelType.binary,
code: ModelType.statusCode,
deadline: ModelType.uint64,
height: ModelType.uint64
Expand Down
1 change: 0 additions & 1 deletion catapult-sdk/test/model/ModelSchemaBuilder_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ describe('model schema builder', () => {
'transactionMetadata.merkleComponentHash',

'transactionStatus.hash',
'transactionStatus.address',

'account.address',
'account.publicKey',
Expand Down
63 changes: 7 additions & 56 deletions rest/src/connection/MessageChannelBuilder.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,9 @@
* along with Catapult. If not, see <http://www.gnu.org/licenses/>.
*/

const { ServerMessageHandler } = require('./serverMessageHandlers');
const catapult = require('catapult-sdk');

const { BinaryParser } = catapult.parser;
const { uint64 } = catapult.utils;

const parserFromData = binaryData => {
const parser = new catapult.parser.BinaryParser();
parser.push(binaryData);
return parser;
};

const createBlockDescriptor = () => ({
filter: topicParam => {
if (topicParam)
Expand All @@ -37,10 +29,7 @@ const createBlockDescriptor = () => ({
return Buffer.of(0x49, 0x6A, 0xCA, 0x80, 0xE4, 0xD8, 0xF2, 0x9F);
},

handler: (codec, emit) => (topic, binaryBlock, hash, generationHash) => {
const block = codec.deserialize(parserFromData(binaryBlock));
emit({ type: 'blockHeaderWithMetadata', payload: { block, meta: { hash, generationHash } } });
}
handler: ServerMessageHandler.block
});

const createPolicyBasedAddressFilter = (markerByte, emptyAddressHandler) => topicParam => {
Expand All @@ -50,20 +39,6 @@ const createPolicyBasedAddressFilter = (markerByte, emptyAddressHandler) => topi
return Buffer.concat([Buffer.of(markerByte), Buffer.from(catapult.model.address.stringToAddress(topicParam))]);
};

const handlers = {
transaction: channelName => (codec, emit) => (topic, binaryTransaction, hash, merkleComponentHash, height) => {
const transaction = codec.deserialize(parserFromData(binaryTransaction));
const meta = {
hash, merkleComponentHash, height: uint64.fromBytes(height), channelName
};
emit({ type: 'transactionWithMetadata', payload: { transaction, meta } });
},

transactionHash: channelName => (codec, emit) => (topic, hash) => {
emit({ type: 'transactionWithMetadata', payload: { meta: { hash, channelName } } });
}
};

/**
* Builder for creating message channel information.
*/
Expand All @@ -83,28 +58,12 @@ class MessageChannelBuilder {

// add basic descriptors
this.descriptors.block = createBlockDescriptor();
this.add('confirmedAdded', 'a', 'transaction');
this.add('unconfirmedAdded', 'u', 'transaction');
this.add('unconfirmedRemoved', 'r', 'transactionHash');
this.add('confirmedAdded', 'a', ServerMessageHandler.transaction);
this.add('unconfirmedAdded', 'u', ServerMessageHandler.transaction);
this.add('unconfirmedRemoved', 'r', ServerMessageHandler.transactionHash);
this.descriptors.status = {
filter: this.createAddressFilter('s'),
handler: (codec, emit) => (topic, buffer) => {
const parser = new BinaryParser();
parser.push(buffer);

const hash = parser.buffer(catapult.constants.sizes.hash256);
const deadline = parser.uint64();
const code = parser.uint32();

// removing the markerChart from topic
const address = topic.subarray(1);
emit({
type: 'transactionStatus',
payload: {
hash, address, code, deadline
}
});
}
handler: ServerMessageHandler.transactionStatus
};
}

Expand All @@ -124,15 +83,7 @@ class MessageChannelBuilder {
if (markerChar in this.channelMarkers)
throw Error(`'${markerChar}' channel marker has already been registered`);

let channelHandler = handler;
if ('string' === typeof handler) {
if (!(handler in handlers))
throw Error(`cannot register channel '${name}' with unknown handler '${handler}'`);

channelHandler = handlers[handler](name);
}

this.descriptors[name] = { filter: this.createAddressFilter(markerChar), handler: channelHandler };
this.descriptors[name] = { filter: this.createAddressFilter(markerChar), handler };
this.channelMarkers[markerChar] = 1;
}

Expand Down
61 changes: 61 additions & 0 deletions rest/src/connection/serverMessageHandlers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2016-present,
* Jaguar0625, gimre, BloodyRookie, Tech Bureau, Corp. All rights reserved.
*
* This file is part of Catapult.
*
* Catapult is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Catapult is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Catapult. If not, see <http://www.gnu.org/licenses/>.
*/

const catapult = require('catapult-sdk');

const { BinaryParser } = catapult.parser;
const { uint64 } = catapult.utils;

const parserFromData = binaryData => {
const parser = new catapult.parser.BinaryParser();
parser.push(binaryData);
return parser;
};

const ServerMessageHandler = Object.freeze({
block: (codec, emit) => (topic, binaryBlock, hash, generationHash) => {
const block = codec.deserialize(parserFromData(binaryBlock));
emit({ type: 'blockHeaderWithMetadata', payload: { block, meta: { hash, generationHash } } });
},

transaction: (codec, emit) => (topic, binaryTransaction, hash, merkleComponentHash, height) => {
const transaction = codec.deserialize(parserFromData(binaryTransaction));
const meta = { hash, merkleComponentHash, height: uint64.fromBytes(height) };
emit({ type: 'transactionWithMetadata', payload: { transaction, meta } });
},

transactionHash: (codec, emit) => (topic, hash) => {
emit({ type: 'transactionWithMetadata', payload: { meta: { hash } } });
},

transactionStatus: (codec, emit) => (topic, buffer) => {
const parser = new BinaryParser();
parser.push(buffer);

const hash = parser.buffer(catapult.constants.sizes.hash256);
const deadline = parser.uint64();
const code = parser.uint32();
emit({ type: 'transactionStatus', payload: { hash, code, deadline } });
}
});

module.exports = {
ServerMessageHandler
};
2 changes: 2 additions & 0 deletions rest/src/connection/zmqService.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ module.exports.createZmqConnectionService = (zmqConfig, codec, channelDescriptor
const subscriptionInfo = findSubscriptionInfo(key, emitter, codec, channelDescriptors);

const zsocket = createZmqSocket(key, zmqConfig, logger);
// the second param (handler) gets called with the provided args in the message, which vary depending on the defined handler type
// (block, transaction, transactionStatus...)
zsocket.subscribe(subscriptionInfo.filter);
zsocket.on('message', subscriptionInfo.handler);
return zsocket;
Expand Down
5 changes: 3 additions & 2 deletions rest/src/plugins/aggregate/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

/** @module plugins/aggregate */
const aggregateRoutes = require('./aggregateRoutes');
const { ServerMessageHandler } = require('../../connection/serverMessageHandlers');
const catapult = require('catapult-sdk');

const { BinaryParser } = catapult.parser;
Expand All @@ -36,8 +37,8 @@ module.exports = {
},

registerMessageChannels: builder => {
builder.add('partialAdded', 'p', 'transaction');
builder.add('partialRemoved', 'q', 'transactionHash');
builder.add('partialAdded', 'p', ServerMessageHandler.transaction);
builder.add('partialRemoved', 'q', ServerMessageHandler.transactionHash);
builder.add('cosignature', 'c', (codec, emit) => (topic, buffer) => {
const parser = new BinaryParser();
parser.push(buffer);
Expand Down
2 changes: 1 addition & 1 deletion rest/src/server/bootstrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ module.exports = {
promiseAwareServer.ws = (route, callbacks) => {
const subscriptionManager = new SubscriptionManager(Object.assign({}, callbacks, {
newChannel: (channel, subscribers) =>
callbacks.newChannel(channel, websocketUtils.createMultisender(subscribers, formatters.ws))
callbacks.newChannel(channel, websocketUtils.createMultisender(channel, subscribers, formatters.ws))
}));

const clients = new Set();
Expand Down
11 changes: 9 additions & 2 deletions rest/src/server/formatters.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ const { formatArray, formatPage } = catapult.utils.formattingUtils;
const isCatapultObject = body => body && body.payload && body.type;

const formatBody = (modelFormatter, body) => {
const formatCatapultObject = (payload, type, structure) => {
const formatCatapultObject = (payload, type, structure, wsTopic) => {
if (Array.isArray(payload))
return formatArray(modelFormatter[type], payload);

if ('page' === structure)
return formatPage(modelFormatter[type], payload);

if (wsTopic) {
return {
topic: wsTopic,
data: modelFormatter[type].format(payload)
};
}

return modelFormatter[type].format(payload);
};

Expand All @@ -42,7 +49,7 @@ const formatBody = (modelFormatter, body) => {
statusCode = body.statusCode || 500;
view = body.body ? body.body : { message: body.message };
} else if (isCatapultObject(body)) {
view = formatCatapultObject(body.payload, body.type, body.structure);
view = formatCatapultObject(body.payload, body.type, body.structure, body.topic);
}

return {
Expand Down
4 changes: 3 additions & 1 deletion rest/src/server/websocketUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ const { base32 } = catapult.utils;
module.exports = {
/**
* Creates an aggregate subscriber composed of all websocket subscribers to a single topic.
* @param {string} topic Subscribed topic from which the data was received.
* @param {array<WebSocket>} subscribers Websocket subscribers.
* @param {function} formatter Formatter for formatting data before sending.
* @returns {function} Aggregate subscriber.
*/
createMultisender: (subscribers, formatter) => ({
createMultisender: (topic, subscribers, formatter) => ({
/**
* Sends data to all subscribers.
* @param {object} data Unformatted data.
*/
send: data => {
data.topic = topic;
const view = formatter(data);
subscribers.forEach(client => {
winston.debug(`websocket ${client.uid}: multisender.send sending data to client`);
Expand Down
Loading

0 comments on commit f0a7845

Please sign in to comment.