Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 70 additions & 35 deletions src/infrastructure/Listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { Observable, of, OperatorFunction, Subject } from 'rxjs';
import { filter, flatMap, map, share } from 'rxjs/operators';
import { filter, flatMap, map, share, switchMap } from 'rxjs/operators';
import { BlockInfoDTO } from 'symbol-openapi-typescript-fetch-client';
import * as WebSocket from 'ws';
import { Address } from '../model/account/Address';
Expand Down Expand Up @@ -46,6 +46,7 @@ export enum ListenerChannelName {

interface ListenerMessage {
readonly channelName: ListenerChannelName;
readonly channelParam: string;
readonly message: Transaction | string | NewBlock | TransactionStatusError | CosignatureSignedTransaction;
}

Expand Down Expand Up @@ -132,36 +133,61 @@ export class Listener implements IListener {
if (message.uid) {
this.uid = message.uid;
resolve();
} else if (message.transaction) {
this.messageSubject.next({
channelName: message.meta.channelName,
message: CreateTransactionFromDTO(message),
});
} else if (message.block) {
this.messageSubject.next({
channelName: ListenerChannelName.block,
message: this.toNewBlock(message),
});
} else if (message.code) {
this.messageSubject.next({
channelName: ListenerChannelName.status,
message: new TransactionStatusError(
Address.createFromEncoded(message.address),
message.hash,
message.code,
Deadline.createFromDTO(message.deadline),
),
});
} else if (message.parentHash) {
this.messageSubject.next({
channelName: ListenerChannelName.cosignature,
message: new CosignatureSignedTransaction(message.parentHash, message.signature, message.signerPublicKey),
});
} else if (message.meta && message.meta.hash) {
this.messageSubject.next({
channelName: message.meta.channelName,
message: message.meta.hash,
});
return;
}
const topic = message.topic as string;
const channelName = topic.indexOf('/') >= 0 ? topic.substr(0, topic.indexOf('/')) : topic;
const channelParam = topic.indexOf('/') >= 0 ? topic.split('/')[1] : '';
switch (channelName) {
case ListenerChannelName.confirmedAdded:
case ListenerChannelName.unconfirmedAdded:
case ListenerChannelName.partialAdded:
this.messageSubject.next({
channelName: ListenerChannelName[channelName],
channelParam: channelParam,
message: CreateTransactionFromDTO(message.data),
});
break;
case ListenerChannelName.block:
this.messageSubject.next({
channelName: ListenerChannelName[channelName],
channelParam: channelParam,
message: this.toNewBlock(message.data),
});
break;
case ListenerChannelName.status:
this.messageSubject.next({
channelName: ListenerChannelName[channelName],
channelParam: channelParam,
message: new TransactionStatusError(
Address.createFromRawAddress(channelParam),
message.data.hash,
message.data.code,
Deadline.createFromDTO(message.data.deadline),
),
});
break;
case ListenerChannelName.cosignature:
this.messageSubject.next({
channelName: ListenerChannelName[channelName],
channelParam: channelParam,
message: new CosignatureSignedTransaction(
message.data.parentHash,
message.data.signature,
message.data.signerPublicKey,
),
});
break;
case ListenerChannelName.partialRemoved:
case ListenerChannelName.unconfirmedRemoved:
this.messageSubject.next({
channelName: ListenerChannelName[channelName],
channelParam: channelParam,
message: message.data.meta.hash,
});
break;
default:
throw new Error(`Channel: ${channelName} is not supported.`);
}
}

Expand Down Expand Up @@ -258,9 +284,16 @@ export class Listener implements IListener {
return this.messageSubject.asObservable().pipe(
filter((listenerMessage) => listenerMessage.channelName === channel),
filter((listenerMessage) => listenerMessage.message instanceof Transaction),
map((listenerMessage) => listenerMessage.message as T),
filter((transaction) => this.filterHash(transaction, transactionHash)),
this.filterByNotifyAccount(address),
switchMap((_) => {
const transactionObservable = of(_.message as T).pipe(
filter((transaction) => this.filterHash(transaction, transactionHash)),
);
if (_.channelParam.toUpperCase() === address.plain()) {
return transactionObservable;
} else {
return transactionObservable.pipe(this.filterByNotifyAccount(address));
}
}),
);
}

Expand Down Expand Up @@ -306,6 +339,7 @@ export class Listener implements IListener {
return this.messageSubject.asObservable().pipe(
filter((_) => _.channelName === channel),
filter((_) => typeof _.message === 'string'),
filter((_) => _.channelParam.toUpperCase() === address.plain()),
map((_) => _.message as string),
filter((_) => !transactionHash || _.toUpperCase() == transactionHash.toUpperCase()),
);
Expand All @@ -325,9 +359,9 @@ export class Listener implements IListener {
return this.messageSubject.asObservable().pipe(
filter((_) => _.channelName === ListenerChannelName.status),
filter((_) => _.message instanceof TransactionStatusError),
filter((_) => _.channelParam.toUpperCase() === address.plain()),
map((_) => _.message as TransactionStatusError),
filter((_) => !transactionHash || _.hash.toUpperCase() == transactionHash.toUpperCase()),
filter((_) => address.equals(_.address)),
);
}

Expand Down Expand Up @@ -391,6 +425,7 @@ export class Listener implements IListener {
return this.messageSubject.asObservable().pipe(
filter((_) => _.channelName.toUpperCase() === ListenerChannelName.cosignature.toUpperCase()),
filter((_) => _.message instanceof CosignatureSignedTransaction),
filter((_) => _.channelParam.toUpperCase() === address.plain()),
map((_) => _.message as CosignatureSignedTransaction),
);
}
Expand Down
1 change: 0 additions & 1 deletion src/model/transaction/Deadline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ export class Deadline {
}

/**
* @internal
* @param value
* @returns {Deadline}
*/
Expand Down
Loading