Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
60 lines (47 sloc) 1.72 KB

mergeMap, as well as other **Map operators, will substitute value on the source stream with a stream of values, returned by inner function. When source stream emits, mergeMap will call inner function to merge yet another inner stream to the resulting stream:

const { rxObserver, palette } = require('api/v0.3');
const { from, timer, pipe } = require('rxjs');
const { zip, take, map, mergeMap, delayWhen } = require('rxjs/operators');

// our source$ will emit values at 5ms, 10ms, 20ms
const source$ = fromDelayed([ 5, 10, 20 ]).pipe(
    zip(from(palette), Marble) // colorize each item

const mergeMap$ = source$.pipe(
    mergeMap(x => timer(0, 3).pipe(
        colorize(x.color))  // colorize as source$ value

// visualization
mergeMap$.subscribe(rxObserver('mergeMap( timer(0, 3).take(3) )'));

// helpers
function colorize(color) {
  return pipe(
    map(y => Marble(y, color))

// creates a colored Marble
function Marble(value,color) {
  return {
    valueOf: ()=>value
    , color

// like .from, but items are delayed by their value
function fromDelayed (arr) {
  return from(arr).pipe(

NOTE: mergeMap also has a flatMap alias

Check out this mergeMap vs exhaustMap vs switchMap vs concatMap head-to-head comparison.

You can’t perform that action at this time.