Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 91 additions & 70 deletions src/infrastructure/Listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,29 @@
* limitations under the License.
*/

import {Observable, Subject} from 'rxjs';
import {filter, map, share} from 'rxjs/operators';
import { Observable, Subject } from 'rxjs';
import { filter, map, share } from 'rxjs/operators';
import * as WebSocket from 'ws';
import {Address} from '../model/account/Address';
import {PublicAccount} from '../model/account/PublicAccount';
import {BlockInfo} from '../model/blockchain/BlockInfo';
import {NetworkType} from '../model/blockchain/NetworkType';
import {NamespaceId} from '../model/namespace/NamespaceId';
import {AggregateTransaction} from '../model/transaction/AggregateTransaction';
import {AggregateTransactionCosignature} from '../model/transaction/AggregateTransactionCosignature';
import {CosignatoryModificationAction} from '../model/transaction/CosignatoryModificationAction';
import {CosignatureSignedTransaction} from '../model/transaction/CosignatureSignedTransaction';
import {Deadline} from '../model/transaction/Deadline';
import {InnerTransaction} from '../model/transaction/InnerTransaction';
import {MultisigAccountModificationTransaction} from '../model/transaction/MultisigAccountModificationTransaction';
import {MultisigCosignatoryModification} from '../model/transaction/MultisigCosignatoryModification';
import {Transaction} from '../model/transaction/Transaction';
import {TransactionStatusError} from '../model/transaction/TransactionStatusError';
import {TransferTransaction} from '../model/transaction/TransferTransaction';
import {UInt64} from '../model/UInt64';
import {CreateTransactionFromDTO, extractBeneficiary} from './transaction/CreateTransactionFromDTO';
import { Address } from '../model/account/Address';
import { PublicAccount } from '../model/account/PublicAccount';
import { BlockInfo } from '../model/blockchain/BlockInfo';
import { NamespaceId } from '../model/namespace/NamespaceId';
import { AggregateTransaction } from '../model/transaction/AggregateTransaction';
import { AggregateTransactionCosignature } from '../model/transaction/AggregateTransactionCosignature';
import { CosignatoryModificationAction } from '../model/transaction/CosignatoryModificationAction';
import { CosignatureSignedTransaction } from '../model/transaction/CosignatureSignedTransaction';
import { Deadline } from '../model/transaction/Deadline';
import { InnerTransaction } from '../model/transaction/InnerTransaction';
import { MultisigAccountModificationTransaction } from '../model/transaction/MultisigAccountModificationTransaction';
import { MultisigCosignatoryModification } from '../model/transaction/MultisigCosignatoryModification';
import { Transaction } from '../model/transaction/Transaction';
import { TransactionStatusError } from '../model/transaction/TransactionStatusError';
import { TransferTransaction } from '../model/transaction/TransferTransaction';
import { UInt64 } from '../model/UInt64';
import {
CreateTransactionFromDTO,
extractBeneficiary
} from './transaction/CreateTransactionFromDTO';

enum ListenerChannelName {
block = 'block',
Expand Down Expand Up @@ -82,11 +84,11 @@ export class Listener {
constructor(/**
* Listener configuration.
*/
private config: string,
private config: string,
/**
* WebSocket injected when using listeners in client.
*/
private websocketInjected?: any) {
private websocketInjected?: any) {
this.config = config.replace(/\/$/, '');
this.url = `${this.config}/ws`;
this.messageSubject = new Subject();
Expand Down Expand Up @@ -114,58 +116,76 @@ export class Listener {
};
this.webSocket.onmessage = (msg) => {
const message = JSON.parse(msg.data as string);

if (message.uid) {
this.uid = message.uid;
resolve();
} else if (message.transaction) {
this.messageSubject.next({channelName: message.meta.channelName, message: CreateTransactionFromDTO(message)});
} else if (message.block) {
const networkType = parseInt(message.block.version.toString(16).substr(0, 2), 16);
this.messageSubject.next({
channelName: ListenerChannelName.block, message: new BlockInfo(
message.meta.hash,
message.meta.generationHash,
message.meta.totalFee ? UInt64.fromNumericString(message.meta.totalFee) : new UInt64([0, 0]),
message.meta.numTransactions,
message.block.signature,
PublicAccount.createFromPublicKey(message.block.signerPublicKey, networkType),
networkType,
parseInt(message.block.version.toString(16).substr(2, 2), 16), // Tx version
message.block.type,
UInt64.fromNumericString(message.block.height),
UInt64.fromNumericString(message.block.timestamp),
UInt64.fromNumericString(message.block.difficulty),
message.block.feeMultiplier,
message.block.previousBlockHash,
message.block.blockTransactionsHash,
message.block.blockReceiptsHash,
message.block.stateHash,
extractBeneficiary(message, networkType), // passing `message` as `blockDTO`
),
});
} else if (message.status) {
this.messageSubject.next({
channelName: ListenerChannelName.status, message: new TransactionStatusError(
message.hash,
message.status,
Deadline.createFromDTO(message.deadline)),
});
} else if (message.parentHash) {
this.messageSubject.next({
channelName: ListenerChannelName.cosignature,
message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey),
});
} else if (message.meta && message.meta.hash) {
this.messageSubject.next({channelName: message.meta.channelName, message: message.meta.hash});
}
this.handleMessage(message, resolve);
};
} else {
resolve();
}
});
}

/**
* @internal
*
* This method handles one incoming message from the web socket and it dispatches it to the message subject listener.
*
* @param message the object payload.
* @param resolve the method to notify when the uid has been resolved and the listener connection has been stablished.
*/
handleMessage(message: any, resolve) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc, make it Private and @Internal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be called from the unit test if it's private?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. okay, you added unit tests. so just need to make it internal then I think

if (message.uid) {
this.uid = message.uid;
resolve();
} else if (message.transaction) {
this.messageSubject.next({
channelName: message.meta.channelName,
message: CreateTransactionFromDTO(message)
});
} else if (message.block) {
const networkType = parseInt(message.block.version.toString(16).substr(0, 2), 16);
this.messageSubject.next({
channelName: ListenerChannelName.block, message: new BlockInfo(
message.meta.hash,
message.meta.generationHash,
message.meta.totalFee ? UInt64.fromNumericString(message.meta.totalFee) : new UInt64([0, 0]),
message.meta.numTransactions,
message.block.signature,
PublicAccount.createFromPublicKey(message.block.signerPublicKey, networkType),
networkType,
parseInt(message.block.version.toString(16).substr(2, 2), 16), // Tx version
message.block.type,
UInt64.fromNumericString(message.block.height),
UInt64.fromNumericString(message.block.timestamp),
UInt64.fromNumericString(message.block.difficulty),
message.block.feeMultiplier,
message.block.previousBlockHash,
message.block.blockTransactionsHash,
message.block.blockReceiptsHash,
message.block.stateHash,
extractBeneficiary(message, networkType), // passing `message` as `blockDTO`
),
});
} else if (message.status) {
this.messageSubject.next({
channelName: ListenerChannelName.status, message: new TransactionStatusError(
Address.createFromEncoded(message.address),
message.hash,
message.status,
Deadline.createFromDTO(message.deadline)),
});
} else if (message.parentHash) {
this.messageSubject.next({
channelName: ListenerChannelName.cosignature,
message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey),
});
} else if (message.meta && message.meta.hash) {
this.messageSubject.next({
channelName: message.meta.channelName,
message: message.meta.hash
});
}
}

/**
* returns a boolean that repressents the open state
* @returns a boolean
Expand Down Expand Up @@ -207,7 +227,7 @@ export class Listener {
public newBlock(): Observable<BlockInfo> {
this.subscribeTo('block');
return this.messageSubject
.asObservable().pipe(
.asObservable().pipe(
share(),
filter((_) => _.channelName === ListenerChannelName.block),
filter((_) => _.message instanceof BlockInfo),
Expand Down Expand Up @@ -310,7 +330,8 @@ export class Listener {
return this.messageSubject.asObservable().pipe(
filter((_) => _.channelName === ListenerChannelName.status),
filter((_) => _.message instanceof TransactionStatusError),
map((_) => _.message as TransactionStatusError));
map((_) => _.message as TransactionStatusError),
filter((_) => address.equals(_.address)));
}

/**
Expand Down Expand Up @@ -394,7 +415,7 @@ export class Listener {
}

return transaction.signer!.address.equals(address) || (
transaction instanceof TransferTransaction
transaction instanceof TransferTransaction
&& (transaction.recipientAddress as Address).equals(address)
);
}
Expand Down
7 changes: 7 additions & 0 deletions src/model/transaction/TransactionStatusError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import {Deadline} from './Deadline';
import {Address} from "../account/Address";

/**
* Transaction status error model returned by listeners
Expand All @@ -23,11 +24,17 @@ export class TransactionStatusError {

/**
* @internal
* @param address
* @param hash
* @param status
* @param deadline
*/
constructor(
/**
* The address of the account that signed the invalid transaction.
* It's the address listened when calling Lister.status.
*/
public readonly address: Address,
/**
* The transaction hash.
*/
Expand Down
91 changes: 91 additions & 0 deletions test/infrastructure/Listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

import {expect} from 'chai';
import {Listener} from '../../src/infrastructure/Listener';
import {Address} from "../../src/model/account/Address";
import {deepEqual} from "assert";
import {UInt64} from "../../src/model/UInt64";
import {timeout} from "rxjs/operators";

describe('Listener', () => {
it('should createComplete a WebSocket instance given url parameter', () => {
Expand All @@ -32,6 +36,93 @@ describe('Listener', () => {
});
});

describe('onStatusWhenAddressIsTheSame', () => {
it('Should forward status', (done) => {


const errorEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96860EB';

const errorAddress = Address.createFromEncoded(errorEncodedAddress);

class WebSocketMock {
constructor(public readonly url: string) {
}

send(payload: string) {
expect(payload).to.be.eq(`{"subscribe":"status/${errorAddress.plain()}"}`);
}
}

const statusInfoErrorDTO = {
address: errorEncodedAddress,
deadline: '1010',
hash: 'transaction-hash',
status: 'error-message',
};

const listener = new Listener('ws://localhost:3000', WebSocketMock);

listener.open();

listener.status(errorAddress).pipe(timeout(2000)).subscribe((transactionStatusError) => {
expect(transactionStatusError.address).to.deep.equal(errorAddress);
expect(transactionStatusError.hash).to.be.equal(statusInfoErrorDTO.hash);
expect(transactionStatusError.status).to.be.equal(statusInfoErrorDTO.status);
deepEqual(transactionStatusError.deadline.toDTO(), UInt64.fromNumericString(statusInfoErrorDTO.deadline).toDTO());
done();
}, err => {
done('Should have not timed out!');
});

listener.handleMessage(statusInfoErrorDTO, null);


});
});

describe('onStatusWhenAddressIsDifferentAddress', () => {
it('Should not forward status', (done) => {


const errorEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96860EB';

const subscribedEncodedAddress = '906415867F121D037AF447E711B0F5E4D52EBBF066D96AAAAA';
const subscribedAddress = Address.createFromEncoded(subscribedEncodedAddress);

class WebSocketMock {

constructor(public readonly url: string) {
}

send(payload: string) {
expect(payload).to.be.eq(`{"subscribe":"status/${subscribedAddress.plain()}"}`);
}
}

const statusInfoErrorDTO = {
address: errorEncodedAddress,
deadline: '1010',
hash: 'transaction-hash',
status: 'error-message',
};

const listener = new Listener('ws://localhost:3000', WebSocketMock);

listener.open();

listener.status(subscribedAddress).pipe(timeout(100)).subscribe(status => {
done('Should have timed out!');
}, err => {
expect(err.name).to.be.eq('TimeoutError');
done();
});

listener.handleMessage(statusInfoErrorDTO, null);


});
});

describe('onerror', () => {
it('should reject because of wrong server url', async () => {
const listener = new Listener('https://notcorrecturl:0000');
Expand Down
5 changes: 4 additions & 1 deletion test/model/transaction/TransactionStatusError.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ import {expect} from 'chai';
import {TransactionStatusError} from '../../../src/model/transaction/TransactionStatusError';
import {Deadline} from '../../../src/model/transaction/Deadline';
import { UInt64 } from '../../../src/model/UInt64';
import {Address} from "../../../src/model/account/Address";

describe('TransactionStatusError', () => {

it('should createComplete an TransactionStatusError object', () => {
const statusInfoErrorDTO = {
address: Address.createFromRawAddress('SBILTA367K2LX2FEXG5TFWAS7GEFYAGY7QLFBYKC'),
deadline: '1010',
hash: 'transaction-hash',
status: 'error-message',
};

const transactionStatusError = new TransactionStatusError(
statusInfoErrorDTO.address,
statusInfoErrorDTO.hash,
statusInfoErrorDTO.status,
Deadline.createFromDTO(statusInfoErrorDTO.deadline));

expect(transactionStatusError.address).to.be.equal(statusInfoErrorDTO.address);
expect(transactionStatusError.hash).to.be.equal(statusInfoErrorDTO.hash);
expect(transactionStatusError.status).to.be.equal(statusInfoErrorDTO.status);
deepEqual(transactionStatusError.deadline.toDTO(), UInt64.fromNumericString(statusInfoErrorDTO.deadline).toDTO());
Expand Down