|
15 | 15 | */ |
16 | 16 |
|
17 | 17 | import { Observable, of, OperatorFunction, Subject } from 'rxjs'; |
18 | | -import { filter, flatMap, map, share } from 'rxjs/operators'; |
| 18 | +import { filter, flatMap, map, share, switchMap } from 'rxjs/operators'; |
19 | 19 | import { BlockInfoDTO } from 'symbol-openapi-typescript-fetch-client'; |
20 | 20 | import * as WebSocket from 'ws'; |
21 | 21 | import { Address } from '../model/account/Address'; |
@@ -284,9 +284,16 @@ export class Listener implements IListener { |
284 | 284 | return this.messageSubject.asObservable().pipe( |
285 | 285 | filter((listenerMessage) => listenerMessage.channelName === channel), |
286 | 286 | filter((listenerMessage) => listenerMessage.message instanceof Transaction), |
287 | | - map((listenerMessage) => listenerMessage.message as T), |
288 | | - filter((transaction) => this.filterHash(transaction, transactionHash)), |
289 | | - this.filterByNotifyAccount(address), |
| 287 | + switchMap((_) => { |
| 288 | + const transactionObservable = of(_.message as T).pipe( |
| 289 | + filter((transaction) => this.filterHash(transaction, transactionHash)), |
| 290 | + ); |
| 291 | + if (_.channelParam.toUpperCase() === address.plain()) { |
| 292 | + return transactionObservable; |
| 293 | + } else { |
| 294 | + return transactionObservable.pipe(this.filterByNotifyAccount(address)); |
| 295 | + } |
| 296 | + }), |
290 | 297 | ); |
291 | 298 | } |
292 | 299 |
|
|
0 commit comments