Skip to content

Reactive Programming ‐ Core Operators in WebFlux Reactor

woojin.jang edited this page May 21, 2026 · 5 revisions

Reactor에서 제공하는 데이터 변환 연산자 메서드 - map()

// 자바 record를 활용해 객체 스트림을 생성한 뒤, map을 통해 특정 필드를 추출
public class Main {
    public static void main(String[] args) {
        // map
        Flux.just(
                new User("홍길동", 30),
                new User("김영희", 25),
                new User("이철수", 35)
        )
        .map(User::name)
        .subscribe(name -> System.out.println(name));
    }

    record User(String name, int age) {
    }
}
// 숫자 형태 문자열을 정수로 변환하다가 파싱 불가능한 경우 에러 이벤트(onError)가 발생해 스트림이 다운스트림으로 어떻게 시그널을 전파하는지?
public class Main {
    public static void main(String[] args) {
        // map
        Flux.just("1", "2", "abc", "4")
                .map(s -> Integer.parseInt(s))
                .subscribe(
                        data -> System.out.println("변환 성공 " + data),
                        error -> System.out.println("변환 실패 " + error.getMessage())
                );
    }
}
  • onError는 종료 신호이다.
  • onErroronComplete와 마찬가지로 해당 스트림의 최종 종료를 의미하는데 예외를 캐치해 하위 파이프라인으로 onError(Throwable) 신호를 던지고 파이프라인을 즉시 폐쇄(Terminate)한다.
  • 따라서 파이프라인 내부의 전원 스위치가 내려간 것과 같기 때문에 뒤에 남아있던 데이터들(예를 들어, "4")은 평가조차 이루어지지 못하고 드롭된다.

Reactor에서 제공하는 데이터 변환 연산자 메서드 - flatMap()

public class Main {
    public static void main(String[] args) {
        // flatMap

        long start = System.currentTimeMillis();

        Flux.just("A", "B", "C")
                .flatMap(letter -> Mono.just(letter).delayElement(Duration.ofSeconds(1)))
                .map(l -> l + "처리 완료")
                .subscribe(
                        data -> {
                            long elapsed = System.currentTimeMillis() - start;
                            System.out.println(data + " - " + elapsed + "ms");
                        }
                );

        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • map은 스트림 내부에 또 다른 스트림이 겹쳐진 중첩 구조가 되지만 flatMap은 내부의 Mono/Flux들을 하나로 이어 붙여서 평평한 하나의 스트림으로 펴주는 역할을 한다.

Reactor에서 제공하는 데이터 변환 연산자 메서드 - concatMap()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // concatMap
        Flux.just("느린", "빠른", "중간").flatMap(task -> {
            long delay = switch (task) {
                case "느린" -> 300;
                case "빠른" -> 100;
                case "중간" -> 200;
                default -> 0;
            };
            return Mono.just(task).delayElement(Duration.ofMillis(delay));
        }).subscribe(System.out::println);

        long start = System.currentTimeMillis();

        Flux.just("느린", "빠른", "중간").concatMap(task -> {
            long delay = switch (task) {
                case "느린" -> 300;
                case "빠른" -> 100;
                case "중간" -> 200;
                default -> 0;
            };
            return Mono.just(task).delayElement(Duration.ofMillis(delay));
        }).subscribe(data -> {
            long elap = System.currentTimeMillis() - start;
            System.out.println(elap + "ms: " + data);
        });

        Thread.sleep(1000);
    }
}
  • concatMap()은 무조건 들어온 순서대로 하나씩 차례대로 처리한다.
  • 만약 A, B, C가 순서대로 들어오면, A의 비동기 처리가 끝날 때까지 B, C는 시작도 하지 않고 기다린다.
  • 비동기 처리 속도가 제각각이더라도 최종 결과물은 반드시 A ➔ B ➔ C라는 원래의 순서가 엄격하게 보장된다.

Reactor에서 제공하는 데이터 변환 연산자 메서드 - flatMapSequential()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // flatMapSequential은 flatMap과 달리 순서를 보장한다.

        long start = System.currentTimeMillis();

        Flux.just("느린", "빠른", "중간")
                .flatMapSequential(task -> {
                    long delay = switch (task) {
                        case "느린" -> 300L;
                        case "빠른" -> 100L;
                        case "중간" -> 200L;
                        default -> 0L;
                    };
                    return Mono.just(task).delayElement(Duration.ofMillis(delay));
                })
                .subscribe(data -> {
                    long elapsed = System.currentTimeMillis() - start;
                    System.out.println(elapsed + "ms: " + data);
                });

        Thread.sleep(500);
    }
}
  • flatMapSequential은 비동기로 동시에 쏘면서 성능을 챙기면서도 최종 결과물의 순서는 원본 순서대로 정렬한다.

Reactor에서 제공하는 데이터 변환 연산자 메서드 - flatMapMany()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // flatMapMany: Mono 환경에서 다중 데이터(Flux) 스트림으로 확장 및 평탄화

        // 1. 단순 문자열을 분해하여 여러 개의 데이터로 밀어내기
        Mono.just("hello")
                .flatMapMany(word -> Flux.fromArray(word.split("")))
                .subscribe(System.out::println);

        // 2. 실무 정석: 단일 조건(Mono)으로 다중 데이터 조회(Flux) 파이프라인 연동
        Mono.just("user123")
                .flatMapMany(Main::findOrdersByUserId)
                .subscribe(System.out::println);
    }

    static Flux<String> findOrdersByUserId(String userId) {
        return Flux.just("주문#1001", "주문#1002", "주문#1003");
    }
}
  • flatMapManyMonoFlux로 전환하기 위해 사용하는 연산자이다.

데이터 스트림 환경에서 가공 및 조율을 위한 Reactor의 필터링 연산자 메서드 - filter()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // filter 복합 체이닝

        Flux.just(
                new Product("노트북", 1500000, true),
                new Product("마우스", 35000, true),
                new Product("키보드", 89000, false),
                new Product("모니터", 450000, true),
                new Product("웹캠", 62000, true)
        )
        .filter(Product::inStock)      // 1차 필터: 재고 유무 확인
        .filter(p -> p.price() <= 100000) // 2차 필터: 가격 조건 확인
        .subscribe(p -> System.out.println(p.name() + " - " + p.price() + "원"));
    }

    record Product(String name, int price, boolean inStock) {
    }
}
  • filter()는 내부에 진위 연산 람다식을 받아서 스트림의 흐름을 제어한다.

다중 스트림 구조에서 데이터 결합을 위한 Reactor 결합 연산자 메서드 - distinct()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 1. 기본 distinct: 객체 자체의 equals/hashCode 기반 중복 제거
        Flux.just(1, 2, 3, 2, 1, 4, 3, 5)
                .distinct()
                .subscribe(data -> System.out.print(data + " "));
                
        System.out.println();

        // 2. 키 기반 distinct: 특정 필드(Key) 기준으로 중복 제거
        Flux.just(
                new User("홍길동", "서울"),
                new User("김영희", "부산"),
                new User("이철수", "서울"),
                new User("박지민", "대전"),
                new User("최민수", "부산")
        )
        .distinct(User::city) // 각 유저의 city 필드를 기준으로 중복 판별
        .subscribe(user -> System.out.println(user.name() + " (" + user.city() + ")"));
    }

    record User(String name, String city) {
    }
}
  • distinct()는 중복된 데이터를 완벽하게 솎아내는 역할을 한다.
  • 그러나 대용량 아키텍처 관점에서 치명적인 트레이드 오프를 가지고 있다.
    • 메모리 누수 위험 : Flux.interval처럼 종료되지 않고 무한히 흐르는 스트림이거나 하루에 수억 건씩 쏟아지는 금융 트랜잭션 스트림에 걸게 되면 스트림이 유지되는 내내 내부 Set에 데이터가 계속 누적되면서 힙 메모리를 잡아먹게 되고 결국 서버가 OutOfMemoryError(OOM)을 뱉을 수 있다.
    • 대안 - distinctUntilChanged() : 해당 메서드는 전체 데이터가 아니라 바로 직전에 통과한 데이터와만 비교해 연속으로 중복되는 데이터만 쳐내는 오퍼레이터이다.

다중 스트림 구조에서 데이터 결합을 위한 Reactor 결합 연산자 메서드 - elementAt()

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // 1. 특정 인덱스의 데이터 추출 (Zero-based Index)
        Flux.just("사과", "바나나", "체리", "딸기")
                .elementAt(2)
                .subscribe(data -> System.out.println("elementAt: " + data));

        // 2. 인덱스 초과 시 기본값(Default Value)으로 방어
        Flux.just("사과", "바나나")
                .elementAt(5, "없음")
                .subscribe(data -> System.out.println("elementAt with default: " + data));
    }
}
  • elementAt() 연산자는 원하는 딱 하나의 데이터만 잡아내는 역할을 한다.
  • 그러나 기본값 없는 인덱스 초과가 발생할 위험이 높아 치명적인 안티패턴으로 잘 쓰이지 않는다.

무한한 데이터 스트림 환경에서 조건을 검증하기 위한 연산자 메서드

다중 스트림 환경에서 데이터를 모아서 하나의 산출물을 만드는 집계 연산자 메서드

📖 Java

📖 Kotlin

📖 Coroutine

📖 Spring

📖 Spring Security

📖 Spring Batch

📖 Reactive Programming

📖 Database

📖 MySQL

📖 Redis

📖 JPA

📖 QueryDsl

📖 MSA

📖 Kafka

📖 Apache Flink

  • [Apache Flink - Apache Flink Architecture]
  • [Apache Flink - Stream Processing]
  • [Apache Flink - Data Stream API & Window]
  • [Apache Flink - State Management]

📖 HTTP

📖 AWS

📖 Docker

📖 Kubernetes

📖 CI/CD

📖 Nginx

📖 Monitoring

  • [Monitoring - Log Concept]
  • [Monitoring - Log Level & Filter]
  • [Monitoring - Logback]
  • [Monitoring - Log Collection with ELK Stack]
  • [Monitoring - Log Monitoring with Kibana]
  • [Monitoring - Building a Monitoring System with Spring Boot Actuator]
  • [Monitoring - Server Monitoring with Prometheus and Grafana with Discord Alerts]

📖 Test

📖 Effective Java 3/E

📖 Kotlin Academy - Effective Kotlin

📖 Kotlin Academy - 핵심편

📖 스프링으로 시작하는 리액티브 프로그래밍

📖 가상 면접 사례로 배우는 대규모 시스템 설계 기초 1

📖 가상 면접 사례로 배우는 대규모 시스템 설계 기초 2

📖 Clean Code

📖 리팩토링 2판

📖 주니어 백엔드 개발자가 반드시 알아야 할 실무 지식

📖 개발자가 반드시 정복해야 할 객체 지향과 디자인 패턴

📖 Spring AI

Clone this wiki locally