Skip to content

07. How to implement Backpressure

이진혁 edited this page May 25, 2021 · 1 revision

이번 글에서는 Backpressure에 대한 개념이 숙지되어 있어야 쉽게 이해하실 수 있습니다.
다음 글을 읽으시면 도움이 될 것 같습니다.

BackpressurePublisherSubscriber간의 Subscription을 통한 상호작용을 뜻합니다.
여기서 Publisher는 데이터를 담고 있는 스트림에 불가하므로 구현한다해도
Mono보다 더 나은 단일 데이터 스트림을, Flux보다 더 나은 다중 데이터 스트림을 구현할 수 없을 것입니다.

물론 Subscriber를 구현한다고 해서 기존에 있던 Subscriber보다 더 나은 Subscriber를 구현한다는 것은 아닙니다.
하지만 Publisher는 데이터를 담는 그릇에 불가한 것에 비해,
SubscriberBackpressure 즉, 배압조절의 중심이 되는 축이기 때문에
구현하면서 웹플럭스의 동작원리에 대해 더 다가갈 수 있을 것입니다.
그래서 오늘은 Subscriber를 구현하고 로그를 찍어 실제 Backpressure가 어떻게 동작되고 있는지를 확인해보겠습니다.

Subscriber를 구현하는 방법은 간단합니다.
모든 것의 시작은 PublisherSubscriber에게 구독을 요청하면서 시작되는데,
이때 호출하는 subscribe() 메소드의 매개변수로 Subscriber 구현체를 넘겨주면 됩니다.

Flux.just("apple", "banana", "grape", "melon", "mango",
    "strawberry", "watermelon", "kiwi", "eggplant", "lemon",
    "peach")
    .log()
    .doOnNext(System.out::println)
    .subscribe(new Subscriber<>() {
        private Subscription subscription;
        private long count = 0;
        
        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(5);
        }

        @Override
        public void onNext(String s) {
            count++;
            if (count % 5 == 0) {
                this.subscription.request(5);
            }
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("처리 실패");
        }

        @Override
        public void onComplete() {
            System.out.println("처리 성공");
        }
    });

위 코드에서는 총 11개의 데이터를 담고 있는 Publisher를 만들고
로그를 찍을 수 있도록 log() 메소드를 사용한 후
doOnNext() 메소드를 이용하여 모든 데이터를 순회하며 출력하도록 합니다.

그런 다음 subscribe()를 실행하는데 그곳에 구현된 Subscriber의 구현은 다음과 같습니다.

  • subscribe()가 호출되면 실행되는 onSubscribe() 메소드는 매개변수로 넘어온 Subscription을 저장하고
    Request(5)를 보냄으로써 5개의 데이터를 요청합니다.
  • 데이터가 하나씩 올때마다 인스턴스 변수인 count의 값을 증가시키고 count가 5의 배수가 될때마다
    Request(5)를 다시 실행함으로써 5개의 데이터를 계속 받습니다.
  • 데이터 처리가 끝났을 때 에러가 발생한다면 "처리 실패"를 출력하고
    정상적인 데이터 처리가 완료된다면 "처리 완료"를 출력합니다.

실행결과로 출력되는 로그는 다음과 같습니다.

08:49:11.976 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
08:49:11.981 [Test worker] INFO reactor.Flux.Array.1 - | request(5)
08:49:11.981 [Test worker] INFO reactor.Flux.Array.1 - | onNext(apple)
apple
08:49:11.984 [Test worker] INFO reactor.Flux.Array.1 - | onNext(banana)
banana
08:49:11.984 [Test worker] INFO reactor.Flux.Array.1 - | onNext(grape)
grape
08:49:11.985 [Test worker] INFO reactor.Flux.Array.1 - | onNext(melon)
melon
08:49:11.986 [Test worker] INFO reactor.Flux.Array.1 - | onNext(mango)
mango
08:49:11.987 [Test worker] INFO reactor.Flux.Array.1 - | request(5)
08:49:11.987 [Test worker] INFO reactor.Flux.Array.1 - | onNext(strawberry)
strawberry
08:49:11.989 [Test worker] INFO reactor.Flux.Array.1 - | onNext(watermelon)
watermelon
08:49:11.992 [Test worker] INFO reactor.Flux.Array.1 - | onNext(kiwi)
kiwi
08:49:11.993 [Test worker] INFO reactor.Flux.Array.1 - | onNext(eggplant)
eggplant
08:49:11.993 [Test worker] INFO reactor.Flux.Array.1 - | onNext(lemon)
lemon
08:49:11.996 [Test worker] INFO reactor.Flux.Array.1 - | request(5)
08:49:11.997 [Test worker] INFO reactor.Flux.Array.1 - | onNext(peach)
peach
08:49:12.001 [Test worker] INFO reactor.Flux.Array.1 - | onComplete()
처리 성공

Backpressure의 기본 원리에서 배웠던 것과 같이 최초로 onSubscribe()가 호출되고
request(N)으로 데이터를 호출한 뒤, onNext()로 데이터를 받아옵니다.
그리고 정상적으로 데이터 처리가 끝났으니 onComplete()를 호출하고 Backpressure를 종료하게 됩니다.

그런데 이렇게 직접 Subscriber를 구현하였으니 request(5)가 오는 것은 당연합니다.
그러면 직접 구현하지 않으면 몇 개의 데이터를 가져오게 될까요? 정답은 "전부 다 가져온다" 입니다.

request(unbounded)

기본적으로 Subscriber를 구현하지 않으면 모든 데이터를 가져와서 처리하게 되어 있습니다.
하지만 이렇게 모든 데이터를 가져오게 되면 Backpressure의 의미가 희미해집니다.

그래서 Project Reactor를 직접 사용하는 것보다 웹플럭스의 도움을 받는 것이 훨씬 좋습니다.
웹플럭스를 이용하면 이런 Subscriber에 대한 구현을 스프링에 넘기고
우리는 Publisher를 만들어 처리하기만 하면 되기 때문입니다.

Example

Subscriber 직접 구현하여 배압조절(Backpressure)하기


Reference

백기선님의 Backpressure 강의

Clone this wiki locally