Skip to content

Chapter 1. The Reactive Way

juhee.bak edited this page Aug 29, 2016 · 4 revisions

before starts

Browser

Node.js

npm install rx

Reactive 란

excel 같은 걸 끼얹나

image

source 셀 업데이트 -> target 셀들이 자동으로 업데이트 되는. 이게 reactive programing 의 방향이라고..

  • 관계를 선언한 것들이 모여 -> program
  • program은 값의 변화에 따라 동작
  • 그러므로 program == flowing sequences of data

이벤트를 streams of values 로 보기

Erik Meijer, "Your Mouse Is a Database"

  • '메모리' 대신 '시간'으로 구분된 배열로 간주하고, 그걸 스트림으로 취급하자고 합니다. image
  • 이벤트를 단발적인 값이 아닌 스트림'처럼' 생각해보면...
    • query, manipulate 가능하고
    • 아직 생성되지 않은 값의 전체 sequence 를 제어할 수도 있고
    • DB나 Array에 저장되어 사용할 수 있을 때까지 기다리는 것과는 양상이 달라진다고 하네요

sequence에 query 하기

화면의 오른쪽(width/2)에서 발생한 열번의 클릭만 위치를 출력해보겠습니다.

//native js
var clicks = 0; //external state

document.addEventListener('click', function registerClicks(e) {
    if (clicks < 10) {
        if (e.clientX > window.innerWidth / 2) { //nested conditional blocks
            console.log(e.clientX, e.clientY);
            clicks += 1;
        }
    } else {
        document.removeEventListener('click', registerClicks);// tidy up, to not leak memory
    }
});
  • external state 와 if의 중첩 등은 사실 미묘한 버그를 잘 일으키기 때문에
    • Rx에서는 action이 발생한 scope 밖에 영향을 주면 무조건 side effect 라고 하면서 지양한다고 합니다.
    • external state, console printing, db update 도 side effect!
  • 그래서 Rx가 하고싶은 건, click DB에 query 날리는 것,
SELECT x, y FROM clicks LIMIT 10

이렇게 SQL처럼.

Rx.Observable.fromEvent(document, 'click')
    .filter(function(c) { return c.clientX > window.innerWidth / 2; })
    .take(10)
    .subscribe(function(c) { console.log(c.clientX, c.clientY) })

값이 실시간으로 발생하는거 빼면 똑같지 않나고 하네요. 그런가요...

Observable 은 이렇게 이벤트를 single isolated 가 아닌 sequence or stream 으로 다룰 수 있도록 해준다고 합니다.

Observer Pattern + Iterator = Rx Pattern

//usual observer pattern
function Producer() {
    this.listeners = [];
}
Producer.prototype.add = function(listener) {
    this.listeners.push(listener);
};
Producer.prototype.remove = function(listener) {
    var index = this.listeners.indexOf(listener);
    this.listeners.splice(index, 1);
};
Producer.prototype.notify = function(message) {
    this.listeners.forEach(function(listener) {
        listener.update(message);
    });
};
//usual iterator pattern
function iterateOnMultiples(arr, divisor) {
    this.cursor = 0;
    this.array = arr;
    this.divisor = divisor || 1;
}
iterateOnMultiples.prototype.next = function() {
    while (this.cursor < this.array.length) {
        //...
    }
};
iterateOnMultiples.prototype.hasNext = function() {
    //...
};
  • Observable sequence(Observable)이 Rx Pattern의 핵심
    • Observable(Producer)이 순서대로 실행하는데
    • observer(listener)가 다음 값을 요청(pull)하는 게 아니라
    • Observable이 적당한 시점에 observer 에게 값을 넘겨(push)줍니다.
      • Observable 구현 내용의 적절한 대목에서 observer.next(value) 해줍니다.
  • 여타 Observer pattern 과 Rx Pattern(Observable)의 다른점은
    • 하나 이상의 옵저버가 붙어야 streaming을 시작하고
    • sequence가 끝나면 시그널을 내보냄

RxJS 구성요소

Observable

Observable.create(callback)

  • callback은 observer를 arg로 받고, Observable이 어떻게 값을 넘겨줄지를 결정.
var observable = Rx.Observable.create(function(observer) {
    observer.onNext('A');
    observer.onNext('B'); 
    observer.onNext('C'); 

    observer.onCompleted(); // Signal
    //observer.onError(); //Called when an error occurs in the Observable
});

다음은, Observer

Observer.create(functions) Observer는 Observable을 구독(listen), 세 가지 메서드를 가짐(전부 Optional).

  • onNext - Producer.update와 동일. Observable이 값을 넘길때 호출됨.
  • onComplete - 데이터가 끝났다는 시그널
  • onError - Observable에서 에러가 발생하면 호출됨.
var observer = Rx.Observer.create(
    function onNext(x) { console.log('Next: ' + x); }, // === update
    // function onError(err) { console.log('Error: ' + err); }, 
    // function onCompleted() { console.log('Completed'); } 
);

Ajax Call Observable + 구독 하기

function get(url) {
    return Rx.Observable.create(function(observer) {
        var req = new XMLHttpRequest(); 

        req.open('GET', url);
        req.onload = function() { 
            if (req.status == 200) {
                observer.onNext(req.response);
                observer.onCompleted();
            } else {
                observer.onError(new Error(req.statusText)); }
            };
        req.onerror = function() {
            observer.onError(new Error("Unknown Error"));
        };
        req.send();
  });
}

// Observable 생성
var test = get('/머시기파일');

// Observer가 구독! 
// .subscribe는 Rx.Observer.create의 짧은 버전(내부 wrapping)
test.subscribe(
    function onNext(x) { console.log('Result: ' + x); }, 
    function onError(err) { console.log('Error: ' + err); }, 
    function onCompleted() { console.log('Completed'); }
);

Operator

sequence를 쿼리하고 조작하는 것 == Operator

  • Observable 의 static, instance 메서드 (ex: create)
  • create는 좋은 메서드지만 일일이 구현하기 귀찮을테니 좀 만들어 놨어요. "batteries included!"
  • 방금 만든 Ajax call 도 RxJS DOM library에 있어서.. 그냥 이렇게 쓰면 됩니다.
Rx.DOM.get('/머시기파일').subscribe(
    function onNext(x) { console.log('Result: ' + x.response); }, 
    function onError(err) { console.log('Error: ' + err); }
    //여긴 어차피 데이터 한번 오니까 onComplete는 없음.
);
  • Rx에서는 비동기 응답 말고도 모든 데이터를 Observable에 담아 사용합니다. (그냥 바로 쓸 데이터는 말고)
  • 다른 데이터랑 합쳐져야 하는 배열 데이터는 Observable로 만드는게 좋다고 합니다.
  • Rx에는 대부분의 JS 데이터 타입으로 Observable을 만들 수 있는 operator를 제공합니다.

Observable 만들어보기

배열을 Observable sequence로..

전개 가능한 데이터 타입(array, array-like, iterable obj)는 from operator 를 사용합니다.

Rx.Observable
    .from(['A', 'B', 'C']).subscribe( // 순서대로 next, error, complete
        function(x) {
            console.log('Next: ' + x);
        },
        function(err) {
            console.log('Error:', err);
        },
        function() {
            console.log('Completed');
        }
    );

// 출력
// 'Next: A'
// 'Next: B'
// 'Next: C'
// 'Completed'

이렇게 엮어서 쓰기 편해서 Observable로 만들라고 하는거 같아요.

function* fibonacci () {
  var fn1 = 1;
  var fn2 = 1;
  while (1){
    var current = fn2;
    fn2 = fn1;
    fn1 = fn1 + current;
    yield current;
  }
}

// Converts a generator to an observable sequence
var source = Rx.Observable.from(fibonacci()).take(5);

// Prints out each item
var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); });

// => onNext: 1
// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 5
// => onCompleted

이번엔 이벤트를...

이벤트로 Observable을 만들면 결합, 전달이 가능한 first-class 가 되어 natural 이벤트의 제약을 벗어날 수 있습니다.

  • 하나의 observable을 기반으로 새로운 observable을 만들 수 있어서 편리하네요.(immutable)
var allMoves = Rx.Observable.fromEvent(document, 'mousemove');
allMoves.subscribe(function(e) {
    console.log(e.clientX, e.clientY);
});

//specialize
var movesOnTheRight = allMoves.filter(function(e) {
    return e.clientX > window.innerWidth / 2;
});
var movesOnTheLeft = allMoves.filter(function(e) {
    return e.clientX < window.innerWidth / 2;
});
movesOnTheRight.subscribe(function(e) {
    console.log('Mouse is on the right:', e.clientX);
});
movesOnTheLeft.subscribe(function(e) {
    console.log('Mouse is on the left:', e.clientX);
});

마지막으로 콜백을

그냥 'Observable로 쓸 수 있다' 정도..? 이건 좀 더 써봐야 장점이 보일것 같아요.

  • fromCallback
  • fromNodeCallback (err, data)
var Rx = require('rx'); 
var fs = require('fs'); 
//Rx
var readdir = Rx.Observable.fromNodeCallback(fs.readdir);
readdir('/Users/sergi').subscribe(
    function(res) {
        console.log('List of directories: ' + res);
    },
    function(err) {
        console.log('Error: ' + err);
    },
    function() {
        console.log('Done!');
    });
//Node
fs.readdir('/Users/sergi', function(err, items) { 
    if (err) {
        console.log('Error: ' + err);
        return;
    }
    
    for (var i = 0; i < items.length; i++) {
        console.log('List of directories: ' + items[i]);
    }

    console.log('Done!');
}

Wrapping

  • 이벤트 스트림을 쿼리한다는 발상이 재밌음. debounce 같은 operator 가 나오는 점은 훌륭.
  • external state 로 선언될 걸 Rx가 관리해줘서 코드를 죽죽 순서대로 쓸 수 있어서 편함. cleanup을 다 지원해줘서 편함. event로 immutable filter? 만드는 것도 맘에 듦
  • 아직 책에서 찬사하는 것 만큼 좋은지는 모르겠... 좀 엮어서 써봐야 알 듯..

Read more