Skip to content

Reactive Programming ‐ Core Operators in WebFlux Reactor

woojin.jang edited this page May 21, 2026 · 5 revisions

Reactor에서 제공하는 데이터 변환 연산자 메서드 - map()

// 자바 record를 활용해 객체 스트림을 생성한 뒤, map을 통해 특정 필드를 추출
public class Main {
    public static void main(String[] args) {
        // map
        Flux.just(
                new User("홍길동", 30),
                new User("김영희", 25),
                new User("이철수", 35)
        )
        .map(User::name)
        .subscribe(name -> System.out.println(name));
    }

    record User(String name, int age) {
    }
}
// 숫자 형태 문자열을 정수로 변환하다가 파싱 불가능한 경우 에러 이벤트(onError)가 발생해 스트림이 다운스트림으로 어떻게 시그널을 전파하는지?
public class Main {
    public static void main(String[] args) {
        // map
        Flux.just("1", "2", "abc", "4")
                .map(s -> Integer.parseInt(s))
                .subscribe(
                        data -> System.out.println("변환 성공 " + data),
                        error -> System.out.println("변환 실패 " + error.getMessage())
                );
    }
}
  • onError는 종료 신호이다.
  • onErroronComplete와 마찬가지로 해당 스트림의 최종 종료를 의미하는데 예외를 캐치해 하위 파이프라인으로 onError(Throwable) 신호를 던지고 파이프라인을 즉시 폐쇄(Terminate)한다.
  • 따라서 파이프라인 내부의 전원 스위치가 내려간 것과 같기 때문에 뒤에 남아있던 데이터들(예를 들어, "4")은 평가조차 이루어지지 못하고 드롭된다.

Reactor에서 제공하는 데이터 변환 연산자 메서드 - flatMap()

public class Main {
    public static void main(String[] args) {
        // flatMap

        long start = System.currentTimeMillis();

        Flux.just("A", "B", "C")
                .flatMap(letter -> Mono.just(letter).delayElement(Duration.ofSeconds(1)))
                .map(l -> l + "처리 완료")
                .subscribe(
                        data -> {
                            long elapsed = System.currentTimeMillis() - start;
                            System.out.println(data + " - " + elapsed + "ms");
                        }
                );

        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • map은 스트림 내부에 또 다른 스트림이 겹쳐진 중첩 구조가 되지만 flatMap은 내부의 Mono/Flux들을 하나로 이어 붙여서 평평한 하나의 스트림으로 펴주는 역할을 한다.

Reactor에서 제공하는 데이터 변환 연산자 메서드 - concatMap()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // concatMap
        Flux.just("느린", "빠른", "중간").flatMap(task -> {
            long delay = switch (task) {
                case "느린" -> 300;
                case "빠른" -> 100;
                case "중간" -> 200;
                default -> 0;
            };
            return Mono.just(task).delayElement(Duration.ofMillis(delay));
        }).subscribe(System.out::println);

        long start = System.currentTimeMillis();

        Flux.just("느린", "빠른", "중간").concatMap(task -> {
            long delay = switch (task) {
                case "느린" -> 300;
                case "빠른" -> 100;
                case "중간" -> 200;
                default -> 0;
            };
            return Mono.just(task).delayElement(Duration.ofMillis(delay));
        }).subscribe(data -> {
            long elap = System.currentTimeMillis() - start;
            System.out.println(elap + "ms: " + data);
        });

        Thread.sleep(1000);
    }
}
  • concatMap()은 무조건 들어온 순서대로 하나씩 차례대로 처리한다.
  • 만약 A, B, C가 순서대로 들어오면, A의 비동기 처리가 끝날 때까지 B, C는 시작도 하지 않고 기다린다.
  • 비동기 처리 속도가 제각각이더라도 최종 결과물은 반드시 A ➔ B ➔ C라는 원래의 순서가 엄격하게 보장된다.

Reactor에서 제공하는 데이터 변환 연산자 메서드 - flatMapSequential()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // flatMapSequential은 flatMap과 달리 순서를 보장한다.

        long start = System.currentTimeMillis();

        Flux.just("느린", "빠른", "중간")
                .flatMapSequential(task -> {
                    long delay = switch (task) {
                        case "느린" -> 300L;
                        case "빠른" -> 100L;
                        case "중간" -> 200L;
                        default -> 0L;
                    };
                    return Mono.just(task).delayElement(Duration.ofMillis(delay));
                })
                .subscribe(data -> {
                    long elapsed = System.currentTimeMillis() - start;
                    System.out.println(elapsed + "ms: " + data);
                });

        Thread.sleep(500);
    }
}
  • flatMapSequential은 비동기로 동시에 쏘면서 성능을 챙기면서도 최종 결과물의 순서는 원본 순서대로 정렬한다.

Reactor에서 제공하는 데이터 변환 연산자 메서드 - flatMapMany()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // flatMapMany: Mono 환경에서 다중 데이터(Flux) 스트림으로 확장 및 평탄화

        // 1. 단순 문자열을 분해하여 여러 개의 데이터로 밀어내기
        Mono.just("hello")
                .flatMapMany(word -> Flux.fromArray(word.split("")))
                .subscribe(System.out::println);

        // 2. 실무 정석: 단일 조건(Mono)으로 다중 데이터 조회(Flux) 파이프라인 연동
        Mono.just("user123")
                .flatMapMany(Main::findOrdersByUserId)
                .subscribe(System.out::println);
    }

    static Flux<String> findOrdersByUserId(String userId) {
        return Flux.just("주문#1001", "주문#1002", "주문#1003");
    }
}
  • flatMapManyMonoFlux로 전환하기 위해 사용하는 연산자이다.

데이터 스트림 환경에서 가공 및 조율을 위한 Reactor의 필터링 연산자 메서드 - filter()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // filter 복합 체이닝

        Flux.just(
                new Product("노트북", 1500000, true),
                new Product("마우스", 35000, true),
                new Product("키보드", 89000, false),
                new Product("모니터", 450000, true),
                new Product("웹캠", 62000, true)
        )
        .filter(Product::inStock)      // 1차 필터: 재고 유무 확인
        .filter(p -> p.price() <= 100000) // 2차 필터: 가격 조건 확인
        .subscribe(p -> System.out.println(p.name() + " - " + p.price() + "원"));
    }

    record Product(String name, int price, boolean inStock) {
    }
}
  • filter()는 내부에 진위 연산 람다식을 받아서 스트림의 흐름을 제어한다.

데이터 스트림 환경에서 가공 및 조율을 위한 Reactor의 필터링 연산자 메서드 - distinct()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 1. 기본 distinct: 객체 자체의 equals/hashCode 기반 중복 제거
        Flux.just(1, 2, 3, 2, 1, 4, 3, 5)
                .distinct()
                .subscribe(data -> System.out.print(data + " "));
                
        System.out.println();

        // 2. 키 기반 distinct: 특정 필드(Key) 기준으로 중복 제거
        Flux.just(
                new User("홍길동", "서울"),
                new User("김영희", "부산"),
                new User("이철수", "서울"),
                new User("박지민", "대전"),
                new User("최민수", "부산")
        )
        .distinct(User::city) // 각 유저의 city 필드를 기준으로 중복 판별
        .subscribe(user -> System.out.println(user.name() + " (" + user.city() + ")"));
    }

    record User(String name, String city) {
    }
}
  • distinct()는 중복된 데이터를 완벽하게 솎아내는 역할을 한다.
  • 그러나 대용량 아키텍처 관점에서 치명적인 트레이드 오프를 가지고 있다.
    • 메모리 누수 위험 : Flux.interval처럼 종료되지 않고 무한히 흐르는 스트림이거나 하루에 수억 건씩 쏟아지는 금융 트랜잭션 스트림에 걸게 되면 스트림이 유지되는 내내 내부 Set에 데이터가 계속 누적되면서 힙 메모리를 잡아먹게 되고 결국 서버가 OutOfMemoryError(OOM)을 뱉을 수 있다.
    • 대안 - distinctUntilChanged() : 해당 메서드는 전체 데이터가 아니라 바로 직전에 통과한 데이터와만 비교해 연속으로 중복되는 데이터만 쳐내는 오퍼레이터이다.

데이터 스트림 환경에서 가공 및 조율을 위한 Reactor의 필터링 연산자 메서드 - elementAt()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 1. 특정 인덱스의 데이터 추출 (Zero-based Index)
        Flux.just("사과", "바나나", "체리", "딸기")
                .elementAt(2)
                .subscribe(data -> System.out.println("elementAt: " + data));

        // 2. 인덱스 초과 시 기본값(Default Value)으로 방어
        Flux.just("사과", "바나나")
                .elementAt(5, "없음")
                .subscribe(data -> System.out.println("elementAt with default: " + data));
    }
}
  • elementAt() 연산자는 원하는 딱 하나의 데이터만 잡아내는 역할을 한다.
  • 그러나 기본값 없는 인덱스 초과가 발생할 위험이 높아 치명적인 안티패턴으로 잘 쓰이지 않는다.

다중 스트림 구조에서 데이터 결합을 위한 Reactor 결합 연산자 메서드 - concat()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // Flux.concat: 여러 스트림을 순차적으로 연결

        Flux<String> first = Flux.just("A", "B", "C");
        Flux<String> second = Flux.just("D", "E", "F");
        Flux<String> third = Flux.just("G");

        Flux.concat(first, second, third)
                .subscribe(data -> System.out.println("Received: " + data));
    }
}
  • concat()는 리액티브 스트림즈에서 다수의 데이터 소스를 순서대로 이어 붙여 하나의 연속적인 파이프라인으로 만드는 메커니즘을 보여준다.

다중 스트림 구조에서 데이터 결합을 위한 Reactor 결합 연산자 메서드 - concatWith()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // concatWith: 인스턴스 메서드 체이닝을 활용한 스트림 순차 연결

        Flux.just("A", "B")
                .concatWith(Flux.just("C", "D"))
                .concatWith(Flux.just("E"))
                .subscribe(System.out::println);
    }
}
  • concatWith()Flux.concat과 내부적인 연결 및 지연 구독 메커니즘과 동일하다.

다중 스트림 구조에서 데이터 결합을 위한 Reactor 결합 연산자 메서드 - merge()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // Flux.merge: 여러 스트림을 동시에 구독하여 인터리빙(Interleaving) 병합

        long start = System.currentTimeMillis();

        Flux<String> slow = Mono.just("slow")
                .delayElement(Duration.ofMillis(500))
                .flux();

        Flux<String> fast = Mono.just("fast")
                .delayElement(Duration.ofMillis(200))
                .flux();

        Flux.merge(slow, fast)
                .subscribe(data -> {
                    long elapsed = System.currentTimeMillis() - start;
                    System.out.println(elapsed + "ms: " + data);
                });

        Thread.sleep(1000);
    }
}
  • 여러 독립된 데이터 소스를 순서에 상관없이 먼저 처리되는 대로 가장 빠르게 다운스트림에 밀어 넣어 병합하는 역할을 한다.

다중 스트림 구조에서 데이터 결합을 위한 Reactor 결합 연산자 메서드 - zip()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // Flux.zip: 여러 스트림의 데이터를 index 기준으로 1:1 매칭하여 결합

        // 1. Bi-Function 조합식을 결합한 형태 (2개의 스트림)
        Flux<String> names = Flux.just("홍길동", "김영희", "이철수");
        Flux<Integer> ages = Flux.just(20, 30, 40);

        Flux.zip(names, ages, (name, age) -> name + " : " + age)
                .subscribe(System.out::println);
                
        System.out.println("--------------------------------");

        // 2. 3개 이상의 스트림 결합 (Tuple 데이터 구조 활용)
        Flux<String> names2 = Flux.just("홍길동", "김영희", "이철수");
        Flux<Integer> ages2 = Flux.just(20, 30, 40);
        Flux<String> cities = Flux.just("서울", "부산", "대구");

        Flux.zip(names2, ages2, cities)
                .map(tuple -> tuple.getT1() + "(" + tuple.getT2() + "세, " + tuple.getT3() + ")")
                .subscribe(System.out::println);

        System.out.println("--------------------------------");

        // 3. 결합할 데이터의 개수가 불일치할 때의 동작 메커니즘
        Flux<String> names3 = Flux.just("홍길동", "이철수", "박지민");
        Flux<Integer> ages3 = Flux.just(20, 30);

        Flux.zip(names3, ages3)
                .subscribe(System.out::println);
    }
}
  • zip()는 서로 다른 파이프라인에서 흐르는 데이터들을 동일한 인덱스 기준으로 정확하게 한 쌍씩 묶어내는 역할을 한다.

무한한 데이터 스트림 환경에서 조건을 검증하기 위한 연산자 메서드 - defaultIfEmpty(), switchIfEmpty()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // defaultIfEmpty: 스트림이 비어있을 때 고정된 기본값(Fallback Value)을 반환

        // 1. 빈 스트림인 경우 -> 기본값 출력
        Flux<String> result = Flux.<String>empty()
                .defaultIfEmpty("데이터 없음");

        result.subscribe(System.out::println);

        System.out.println("--------------------------------");

        // 2. 데이터가 존재하는 경우 -> 원본 데이터 그대로 통과
        Flux<String> hasData = Flux.just("A", "B", "C")
                .defaultIfEmpty("데이터 없음");

        hasData.subscribe(System.out::println);
    }
}
  • defaultIfEmpty()는 데이터가 흐르지 않고 그냥 끝나버린 빈 스트림을 감지하고 이에 대한 디폴트 값을 안전하게 밀어주는 역할을 한다.
public class Main {
    public static void main(String[] args) throws InterruptedException {
        // defaultIfEmpty, switchIfEmpty
        Flux<String> fromCache = Flux.empty(); // 캐시에 데이터가 없다고 가정
        fromCache
                .switchIfEmpty(Flux.defer(() -> findFromDB()))
                .subscribe(data -> System.out.println("Received: " + data));

        Flux<String> cached = Flux.just("캐시된 데이터 1", "캐시된 데이터 2");
        cached
                .switchIfEmpty(Flux.defer(() -> findFromDB()))
                .subscribe(data -> System.out.println("Received: " + data));

    }

    private static Flux<String> findFromDB() {
        // DB에서 데이터를 조회한다고 가정
        return Flux.just("A", "B", "C");
    }
}
  • switchIfEmpty()는 고정된 값이 아니라 또 다른 리액티브 스트림을 통째로 갈아끼우는 역할을 한다.

무한한 데이터 스트림 환경에서 조건을 검증하기 위한 연산자 메서드 - hasElement(), hasElements()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // hasElement & hasElements: 스트림 내 데이터 존재 여부 검증

        // 1. Mono에 데이터가 존재하는 경우 -> true
        Mono.just("hello")
                .hasElement()
                .subscribe(has -> System.out.println("Mono(just) has element: " + has));

        // 2. Mono가 비어있는 경우 -> false
        Mono.empty()
                .hasElement()
                .subscribe(has -> System.out.println("Mono(empty) has element: " + has));

        System.out.println("--------------------------------");

        // 3. Flux에 데이터가 존재하는 경우 -> true
        Flux.just(1, 2, 3)
                .hasElements()
                .subscribe(has -> System.out.println("Flux(just) has elements: " + has));

        // 4. Flux가 비어있는 경우 -> false
        Flux.empty()
                .hasElements()
                .subscribe(has -> System.out.println("Flux(empty) has elements: " + has));
    }
}
  • hasElement()는 단일 발행자의 스트림이 비어있는지 여부를 판단하고 hasElements()는 다중 발행자의 스트림이 비어있는지 여부를 확인한다.

무한한 데이터 스트림 환경에서 조건을 검증하기 위한 연산자 메서드 - any(), all()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // any & all: 스트림 내 데이터의 조건 충족 여부 검증

        Flux<Integer> numbers = Flux.just(2, 4, 6, 8, 10);

        // 1. any: 하나라도 조건을 만족하는가?
        numbers.any(n -> n > 7)
                .subscribe(has -> System.out.println("any (n > 7): " + has));

        numbers.any(n -> n > 100)
                .subscribe(has -> System.out.println("any (n > 100): " + has));

        System.out.println("--------------------------------");

        // 2. all: 모든 데이터가 조건을 만족하는가?
        numbers.all(n -> n % 2 == 0)
                .subscribe(has -> System.out.println("all (n % 2 == 0): " + has));

        numbers.all(n -> n > 3)
                .subscribe(has -> System.out.println("all (n > 3): " + has));
    }
}
  • any()는 스트림에 흘러가는 데이터 집합이 특정 비즈니스 조건에 맞는지 판별해 하나라도 조건을 만족하면 true를 리턴한다.
  • all()는 스트림에 흘러가는 데이터의 집합이 전체 조건을 만족하면 true를 반환한다.

무한한 데이터 스트림 환경에서 조건을 검증하기 위한 연산자 메서드 - then(), thenMany()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 1. then(Mono): 원본 스트림 완료 후 다른 단수(Mono) 스트림 실행 (원본 데이터는 드롭)
        Flux.just("저장1", "저장2", "저장3")
                .doOnNext(data -> System.out.println("처리 중: " + data))
                .then(Mono.just("모든 저장 완료"))
                .subscribe(result -> System.out.println("결과: " + result));

        System.out.println("---");

        // 2. then(): 인자 없이 호출하여 성공 종료 신호(Mono<Void>)만 하위로 전달
        Flux.just("작업1", "작업2")
                .doOnNext(data -> System.out.println("실행: " + data))
                .then()
                .subscribe(
                        data -> {},
                        error -> {},
                        () -> System.out.println("모든 작업 완료")
                );

        System.out.println("---");

        // 3. thenMany(Flux): 원본 스트림 완료 후 다른 다중(Flux) 스트림 실행
        Flux.just("초기화1", "초기화2")
                .doOnNext(data -> System.out.println("초기화: " + data))
                .thenMany(Flux.just("결과 A", "결과 B", "결과 C"))
                .subscribe(data -> System.out.println("수신: " + data));
    }
}
  • then()는 앞선 스트림이 성공적으로 끝나면 기존 데이터들은 메모리에서 전부 폐기하고 인자로 넘겨받은 단수 발행자로 파이프라인을 갈아 끼운다.
  • thenMany()는 기존 데이터를 버리는 메커니즘은 then()과 동일하나 onComplete를 호출하는 순간 파이프라인이 Flux<T> 스트림으로 변환된다.

다중 스트림 환경에서 데이터를 모아서 하나의 산출물을 만드는 집계 연산자 메서드 - reduce(), scan()

import java.util.HashMap;
import reactor.core.publisher.Flux;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // reduce: 스트림의 모든 원소를 하나의 단일 값으로 압축(축소)하는 연산자

        // 1. 초기값 없는 reduce: 첫 번째 데이터가 초기값이 됨
        Flux.just(1, 2, 3, 4, 5)
                .reduce((acc, next) -> acc + next)
                .subscribe(result -> System.out.println("결과 (합산): " + result));

        Flux.just("Hello", "Reactor", "World")
                .reduce((acc, next) -> acc + next)
                .subscribe(result -> System.out.println("결과 (문자열): " + result));

        System.out.println("---------");

        // 2. 초기값(Seed)이 있는 reduce: 빈 스트림이 들어와도 초기값을 안전하게 반환
        Flux.just(1, 2, 3, 4, 5)
                .reduce(100, (acc, next) -> acc + next)
                .subscribe(result -> System.out.println("결과 (초기값 100): " + result));

        Flux.<Integer>empty()
                .reduce(100, (acc, next) -> acc + next)
                .subscribe(result -> System.out.println("결과 (빈 스트림 방어): " + result));

        System.out.println("---------");

        // 3. 가변 객체(Map)를 컬렉터로 활용한 데이터 집계 및 빈도수(Frequency) 카운팅
        Flux.just("apple", "banana", "apple", "cherry", "banana", "cherry")
                .reduce(new HashMap<String, Integer>(), (map, word) -> {
                    map.merge(word, 1, Integer::sum);
                    return map;
                })
                .subscribe(freq -> System.out.println("결과 (빈도수): " + freq));
    }
}
  • reduce() : 중간 과정은 숨기고 결과만 중요할 때 선택한다.
  • scan() : 중간 과정을 실시간 스트림(Flux)으로 내어줄 때 선택한다.

다중 스트림 환경에서 데이터를 모아서 하나의 산출물을 만드는 집계 연산자 메서드 - groupBy()

import reactor.core.publisher.Flux;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // groupBy: 특정 조건(Key)을 기준으로 하나의 스트림을 여러 개의 서브 스트림으로 분할

        Flux.range(1, 10)
                .groupBy(n -> n % 2 == 0 ? "짝수" : "홀수") // 1. 그룹화 기준 키 정의
                .flatMap(group -> group.collectList().map(list -> group.key() + " : " + list)) // 2. 그룹별 데이터 수집
                .subscribe(System.out::println);
    }
}
  • groupBy()는 자바 애플리케이션 서버의 메모리 레벨에서 실시간으로 스트림을 쪼개는 그룹핑 역할을 수행한다.

📖 Java

📖 Kotlin

📖 Coroutine

📖 Spring

📖 Spring Security

📖 Spring Batch

📖 Reactive Programming

📖 Database

📖 MySQL

📖 Redis

📖 JPA

📖 QueryDsl

📖 MSA

📖 Kafka

📖 Apache Flink

  • [Apache Flink - Apache Flink Architecture]
  • [Apache Flink - Stream Processing]
  • [Apache Flink - Data Stream API & Window]
  • [Apache Flink - State Management]

📖 HTTP

📖 AWS

📖 Docker

📖 Kubernetes

📖 CI/CD

📖 Nginx

📖 Monitoring🥈

  • [Monitoring - Log Concept]
  • [Monitoring - Log Level & Filter]
  • [Monitoring - Logback]
  • [Monitoring - Log Collection with ELK Stack]
  • [Monitoring - Log Monitoring with Kibana]
  • [Monitoring - Building a Monitoring System with Spring Boot Actuator]
  • [Monitoring - Server Monitoring with Prometheus and Grafana with Discord Alerts]

📖 Test

📖 Effective Java 3/E

📖 Kotlin Academy - Effective Kotlin

📖 Kotlin Academy - 핵심편

📖 스프링으로 시작하는 리액티브 프로그래밍

📖 가상 면접 사례로 배우는 대규모 시스템 설계 기초 1

📖 가상 면접 사례로 배우는 대규모 시스템 설계 기초 2

📖 Clean Code

📖 리팩토링 2판

📖 주니어 백엔드 개발자가 반드시 알아야 할 실무 지식

📖 GraphQL

Clone this wiki locally