diff --git a/src/infrastructure/Listener.ts b/src/infrastructure/Listener.ts index aedd0d2d32..86ff417488 100644 --- a/src/infrastructure/Listener.ts +++ b/src/infrastructure/Listener.ts @@ -14,27 +14,29 @@ * limitations under the License. */ -import {Observable, Subject} from 'rxjs'; -import {filter, map, share} from 'rxjs/operators'; +import { Observable, Subject } from 'rxjs'; +import { filter, map, share } from 'rxjs/operators'; import * as WebSocket from 'ws'; -import {Address} from '../model/account/Address'; -import {PublicAccount} from '../model/account/PublicAccount'; -import {BlockInfo} from '../model/blockchain/BlockInfo'; -import {NetworkType} from '../model/blockchain/NetworkType'; -import {NamespaceId} from '../model/namespace/NamespaceId'; -import {AggregateTransaction} from '../model/transaction/AggregateTransaction'; -import {AggregateTransactionCosignature} from '../model/transaction/AggregateTransactionCosignature'; -import {CosignatoryModificationAction} from '../model/transaction/CosignatoryModificationAction'; -import {CosignatureSignedTransaction} from '../model/transaction/CosignatureSignedTransaction'; -import {Deadline} from '../model/transaction/Deadline'; -import {InnerTransaction} from '../model/transaction/InnerTransaction'; -import {MultisigAccountModificationTransaction} from '../model/transaction/MultisigAccountModificationTransaction'; -import {MultisigCosignatoryModification} from '../model/transaction/MultisigCosignatoryModification'; -import {Transaction} from '../model/transaction/Transaction'; -import {TransactionStatusError} from '../model/transaction/TransactionStatusError'; -import {TransferTransaction} from '../model/transaction/TransferTransaction'; -import {UInt64} from '../model/UInt64'; -import {CreateTransactionFromDTO, extractBeneficiary} from './transaction/CreateTransactionFromDTO'; +import { Address } from '../model/account/Address'; +import { PublicAccount } from '../model/account/PublicAccount'; +import { BlockInfo } from '../model/blockchain/BlockInfo'; +import { NamespaceId } from '../model/namespace/NamespaceId'; +import { AggregateTransaction } from '../model/transaction/AggregateTransaction'; +import { AggregateTransactionCosignature } from '../model/transaction/AggregateTransactionCosignature'; +import { CosignatoryModificationAction } from '../model/transaction/CosignatoryModificationAction'; +import { CosignatureSignedTransaction } from '../model/transaction/CosignatureSignedTransaction'; +import { Deadline } from '../model/transaction/Deadline'; +import { InnerTransaction } from '../model/transaction/InnerTransaction'; +import { MultisigAccountModificationTransaction } from '../model/transaction/MultisigAccountModificationTransaction'; +import { MultisigCosignatoryModification } from '../model/transaction/MultisigCosignatoryModification'; +import { Transaction } from '../model/transaction/Transaction'; +import { TransactionStatusError } from '../model/transaction/TransactionStatusError'; +import { TransferTransaction } from '../model/transaction/TransferTransaction'; +import { UInt64 } from '../model/UInt64'; +import { + CreateTransactionFromDTO, + extractBeneficiary +} from './transaction/CreateTransactionFromDTO'; enum ListenerChannelName { block = 'block', @@ -82,11 +84,11 @@ export class Listener { constructor(/** * Listener configuration. */ - private config: string, + private config: string, /** * WebSocket injected when using listeners in client. */ - private websocketInjected?: any) { + private websocketInjected?: any) { this.config = config.replace(/\/$/, ''); this.url = `${this.config}/ws`; this.messageSubject = new Subject(); @@ -114,51 +116,7 @@ export class Listener { }; this.webSocket.onmessage = (msg) => { const message = JSON.parse(msg.data as string); - - if (message.uid) { - this.uid = message.uid; - resolve(); - } else if (message.transaction) { - this.messageSubject.next({channelName: message.meta.channelName, message: CreateTransactionFromDTO(message)}); - } else if (message.block) { - const networkType = parseInt(message.block.version.toString(16).substr(0, 2), 16); - this.messageSubject.next({ - channelName: ListenerChannelName.block, message: new BlockInfo( - message.meta.hash, - message.meta.generationHash, - message.meta.totalFee ? UInt64.fromNumericString(message.meta.totalFee) : new UInt64([0, 0]), - message.meta.numTransactions, - message.block.signature, - PublicAccount.createFromPublicKey(message.block.signerPublicKey, networkType), - networkType, - parseInt(message.block.version.toString(16).substr(2, 2), 16), // Tx version - message.block.type, - UInt64.fromNumericString(message.block.height), - UInt64.fromNumericString(message.block.timestamp), - UInt64.fromNumericString(message.block.difficulty), - message.block.feeMultiplier, - message.block.previousBlockHash, - message.block.blockTransactionsHash, - message.block.blockReceiptsHash, - message.block.stateHash, - extractBeneficiary(message, networkType), // passing `message` as `blockDTO` - ), - }); - } else if (message.status) { - this.messageSubject.next({ - channelName: ListenerChannelName.status, message: new TransactionStatusError( - message.hash, - message.status, - Deadline.createFromDTO(message.deadline)), - }); - } else if (message.parentHash) { - this.messageSubject.next({ - channelName: ListenerChannelName.cosignature, - message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey), - }); - } else if (message.meta && message.meta.hash) { - this.messageSubject.next({channelName: message.meta.channelName, message: message.meta.hash}); - } + this.handleMessage(message, resolve); }; } else { resolve(); @@ -166,6 +124,68 @@ export class Listener { }); } + /** + * @internal + * + * This method handles one incoming message from the web socket and it dispatches it to the message subject listener. + * + * @param message the object payload. + * @param resolve the method to notify when the uid has been resolved and the listener connection has been stablished. + */ + handleMessage(message: any, resolve) { + if (message.uid) { + this.uid = message.uid; + resolve(); + } else if (message.transaction) { + this.messageSubject.next({ + channelName: message.meta.channelName, + message: CreateTransactionFromDTO(message) + }); + } else if (message.block) { + const networkType = parseInt(message.block.version.toString(16).substr(0, 2), 16); + this.messageSubject.next({ + channelName: ListenerChannelName.block, message: new BlockInfo( + message.meta.hash, + message.meta.generationHash, + message.meta.totalFee ? UInt64.fromNumericString(message.meta.totalFee) : new UInt64([0, 0]), + message.meta.numTransactions, + message.block.signature, + PublicAccount.createFromPublicKey(message.block.signerPublicKey, networkType), + networkType, + parseInt(message.block.version.toString(16).substr(2, 2), 16), // Tx version + message.block.type, + UInt64.fromNumericString(message.block.height), + UInt64.fromNumericString(message.block.timestamp), + UInt64.fromNumericString(message.block.difficulty), + message.block.feeMultiplier, + message.block.previousBlockHash, + message.block.blockTransactionsHash, + message.block.blockReceiptsHash, + message.block.stateHash, + extractBeneficiary(message, networkType), // passing `message` as `blockDTO` + ), + }); + } else if (message.status) { + this.messageSubject.next({ + channelName: ListenerChannelName.status, message: new TransactionStatusError( + Address.createFromEncoded(message.address), + message.hash, + message.status, + Deadline.createFromDTO(message.deadline)), + }); + } else if (message.parentHash) { + this.messageSubject.next({ + channelName: ListenerChannelName.cosignature, + message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey), + }); + } else if (message.meta && message.meta.hash) { + this.messageSubject.next({ + channelName: message.meta.channelName, + message: message.meta.hash + }); + } + } + /** * returns a boolean that repressents the open state * @returns a boolean @@ -207,7 +227,7 @@ export class Listener { public newBlock(): Observable { this.subscribeTo('block'); return this.messageSubject - .asObservable().pipe( + .asObservable().pipe( share(), filter((_) => _.channelName === ListenerChannelName.block), filter((_) => _.message instanceof BlockInfo), @@ -310,7 +330,8 @@ export class Listener { return this.messageSubject.asObservable().pipe( filter((_) => _.channelName === ListenerChannelName.status), filter((_) => _.message instanceof TransactionStatusError), - map((_) => _.message as TransactionStatusError)); + map((_) => _.message as TransactionStatusError), + filter((_) => address.equals(_.address))); } /** @@ -394,7 +415,7 @@ export class Listener { } return transaction.signer!.address.equals(address) || ( - transaction instanceof TransferTransaction + transaction instanceof TransferTransaction && (transaction.recipientAddress as Address).equals(address) ); } diff --git a/src/model/transaction/TransactionStatusError.ts b/src/model/transaction/TransactionStatusError.ts index 54372f7c6c..b94f7e735c 100644 --- a/src/model/transaction/TransactionStatusError.ts +++ b/src/model/transaction/TransactionStatusError.ts @@ -15,6 +15,7 @@ */ import {Deadline} from './Deadline'; +import {Address} from "../account/Address"; /** * Transaction status error model returned by listeners @@ -23,11 +24,17 @@ export class TransactionStatusError { /** * @internal + * @param address * @param hash * @param status * @param deadline */ constructor( + /** + * The address of the account that signed the invalid transaction. + * It's the address listened when calling Lister.status. + */ + public readonly address: Address, /** * The transaction hash. */ diff --git a/test/infrastructure/Listener.spec.ts b/test/infrastructure/Listener.spec.ts index ebec26d286..4bdf22387e 100644 --- a/test/infrastructure/Listener.spec.ts +++ b/test/infrastructure/Listener.spec.ts @@ -16,6 +16,10 @@ import {expect} from 'chai'; import {Listener} from '../../src/infrastructure/Listener'; +import {Address} from "../../src/model/account/Address"; +import {deepEqual} from "assert"; +import {UInt64} from "../../src/model/UInt64"; +import {timeout} from "rxjs/operators"; describe('Listener', () => { it('should createComplete a WebSocket instance given url parameter', () => { @@ -32,6 +36,93 @@ describe('Listener', () => { }); }); + describe('onStatusWhenAddressIsTheSame', () => { + it('Should forward status', (done) => { + + + const errorEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96860EB'; + + const errorAddress = Address.createFromEncoded(errorEncodedAddress); + + class WebSocketMock { + constructor(public readonly url: string) { + } + + send(payload: string) { + expect(payload).to.be.eq(`{"subscribe":"status/${errorAddress.plain()}"}`); + } + } + + const statusInfoErrorDTO = { + address: errorEncodedAddress, + deadline: '1010', + hash: 'transaction-hash', + status: 'error-message', + }; + + const listener = new Listener('ws://localhost:3000', WebSocketMock); + + listener.open(); + + listener.status(errorAddress).pipe(timeout(2000)).subscribe((transactionStatusError) => { + expect(transactionStatusError.address).to.deep.equal(errorAddress); + expect(transactionStatusError.hash).to.be.equal(statusInfoErrorDTO.hash); + expect(transactionStatusError.status).to.be.equal(statusInfoErrorDTO.status); + deepEqual(transactionStatusError.deadline.toDTO(), UInt64.fromNumericString(statusInfoErrorDTO.deadline).toDTO()); + done(); + }, err => { + done('Should have not timed out!'); + }); + + listener.handleMessage(statusInfoErrorDTO, null); + + + }); + }); + + describe('onStatusWhenAddressIsDifferentAddress', () => { + it('Should not forward status', (done) => { + + + const errorEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96860EB'; + + const subscribedEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96AAAAA'; + const subscribedAddress = Address.createFromEncoded(subscribedEncodedAddress); + + class WebSocketMock { + + constructor(public readonly url: string) { + } + + send(payload: string) { + expect(payload).to.be.eq(`{"subscribe":"status/${subscribedAddress.plain()}"}`); + } + } + + const statusInfoErrorDTO = { + address: errorEncodedAddress, + deadline: '1010', + hash: 'transaction-hash', + status: 'error-message', + }; + + const listener = new Listener('ws://localhost:3000', WebSocketMock); + + listener.open(); + + listener.status(subscribedAddress).pipe(timeout(100)).subscribe(status => { + done('Should have timed out!'); + }, err => { + expect(err.name).to.be.eq('TimeoutError'); + done(); + }); + + listener.handleMessage(statusInfoErrorDTO, null); + + + }); + }); + describe('onerror', () => { it('should reject because of wrong server url', async () => { const listener = new Listener('https://notcorrecturl:0000'); diff --git a/test/model/transaction/TransactionStatusError.spec.ts b/test/model/transaction/TransactionStatusError.spec.ts index 46cf4f1ba4..156c3f902b 100644 --- a/test/model/transaction/TransactionStatusError.spec.ts +++ b/test/model/transaction/TransactionStatusError.spec.ts @@ -19,21 +19,24 @@ import {expect} from 'chai'; import {TransactionStatusError} from '../../../src/model/transaction/TransactionStatusError'; import {Deadline} from '../../../src/model/transaction/Deadline'; import { UInt64 } from '../../../src/model/UInt64'; +import {Address} from "../../../src/model/account/Address"; describe('TransactionStatusError', () => { it('should createComplete an TransactionStatusError object', () => { const statusInfoErrorDTO = { + address: Address.createFromRawAddress('SBILTA367K2LX2FEXG5TFWAS7GEFYAGY7QLFBYKC'), deadline: '1010', hash: 'transaction-hash', status: 'error-message', }; - const transactionStatusError = new TransactionStatusError( + statusInfoErrorDTO.address, statusInfoErrorDTO.hash, statusInfoErrorDTO.status, Deadline.createFromDTO(statusInfoErrorDTO.deadline)); + expect(transactionStatusError.address).to.be.equal(statusInfoErrorDTO.address); expect(transactionStatusError.hash).to.be.equal(statusInfoErrorDTO.hash); expect(transactionStatusError.status).to.be.equal(statusInfoErrorDTO.status); deepEqual(transactionStatusError.deadline.toDTO(), UInt64.fromNumericString(statusInfoErrorDTO.deadline).toDTO());