Skip to content

Reactive Programming ‐ Reactive Streams

woojin.jang edited this page May 25, 2026 · 2 revisions

리액티브 스트림즈(Reactive Streams)

  • 데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양

리액티브 스트림즈의 구성요소

  • Publisher : 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
  • Subscriber : 구독한 Publisher부터 통지(발행, 게시, 방출)된 데이터를 전달받아서 처리하는 역할을 한다.
  • Subscription : Publisher에 요청한 데이터 개수를 지정하고 데이터의 구독을 취소하는 역할을 한다.
  • Processor : Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고 Publisher로서 다른 Subscriber가 구독할 수 있다.

Publisher

/**
 * A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to
 * the demand received from its {@link Subscriber}(s).
 * <p>
 * A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link Publisher#subscribe(Subscriber)} dynamically
 * at various points in time.
 *
 * @param <T> the type of element signaled
 */
public interface Publisher<T> {

    /**
     * Request {@link Publisher} to start streaming data.
     * <p>
     * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
     * <p>
     * Each {@link Subscription} will work for only a single {@link Subscriber}.
     * <p>
     * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
     * <p>
     * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
     * signal the error via {@link Subscriber#onError(Throwable)}.
     *
     * @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
     */
    public void subscribe(Subscriber<? super T> s);
}
  • subscribe()를 호출한다고 해서 데이터가 그 자리에서 바로 처리되는 것이 아니라 이제부터 데이터를 받을 준비가 되었으니 준비되면 onNext()로 보내라고 구독자를 등록하는 과정이다.

Subscriber

/**
 * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
 * <p>
 * No further notifications will be received until {@link Subscription#request(long)} is called.
 * <p>
 * After signaling demand:
 * <ul>
 * <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(long)}</li>
 * <li>Single invocation of {@link #onError(Throwable)} or {@link Subscriber#onComplete()} which signals a terminal state after which no further events will be sent.
 * </ul>
 * <p>
 * Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more.
 *
 * @param <T> the type of element signaled
 */
public interface Subscriber<T> {

    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#request(long)} is invoked.
     * <p>
     * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
     * 
     * @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    // Publisher에게 요청할 데이터의 개수를 지정하거나 구독을 해지하는 것을 의미한다.
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
     * 
     * @param t the element signaled
     */
    // Publisher가 통지한 데이터를 처리한다.
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     *
     * @param t the throwable signaled
     */
    // Publisher가 통지한 데이터를 처리하는 과정에서 에러가 발생했을 때 해당 에러를 처리하는 역할을 한다.
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     */
    // Publisher가 데이터 통지를 완료했음을 알릴 때 호출되는 메서드로 데이터 통지가 정상적으로 완료될 경우에 어떤 후처리를 해야 한다면 해당 메서드에서 처리 코드를 작성한다.
    public void onComplete();
}

Subscription

/**
 * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
 * <p>
 * It can only be used once by a single {@link Subscriber}.
 * <p>
 * It is used to both signal desire for data and cancel demand (and allow resource cleanup).
 */
public interface Subscription {

    /**
     * No events will be sent by a {@link Publisher} until demand is signaled via this method.
     * <p>
     *  It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more,
     *  it may be treated by the {@link Publisher} as "effectively unbounded".
     * <p>
     * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
     * <p>
     * A {@link Publisher} can send less than is requested if the stream ends but
     * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
     * 
     * @param n the strictly positive number of elements to requests to the upstream {@link Publisher}
     */
    // Publisher에게 데이터의 개수를 요청한다.
    public void request(long n);

    /**
     * Request the {@link Publisher} to stop sending data and clean up resources.
     * <p>
     * Data may still be sent to meet previously signalled demand after calling cancel.
     */
    // 구독을 해지한다.
    public void cancel();
}

Processor

/**
 * A {@link Processor} represents a processing stage—which is both a {@link Subscriber}
 * and a {@link Publisher} and obeys the contracts of both.
 *
 * @param <T> the type of element signaled to the {@link Subscriber}
 * @param <R> the type of element signaled by the {@link Publisher}
 */
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

📖 Java

📖 Kotlin

📖 Coroutine

📖 Spring

📖 Spring Security

📖 Spring Batch

📖 Reactive Programming

📖 Database

📖 MySQL

📖 Redis

📖 JPA

📖 QueryDsl

📖 MSA

📖 Kafka

📖 Apache Flink

  • [Apache Flink - Apache Flink Architecture]
  • [Apache Flink - Stream Processing]
  • [Apache Flink - Data Stream API & Window]
  • [Apache Flink - State Management]

📖 HTTP

📖 AWS

📖 Docker

📖 Kubernetes

📖 CI/CD

📖 Nginx

📖 Monitoring🥈

  • [Monitoring - Log Concept]
  • [Monitoring - Log Level & Filter]
  • [Monitoring - Logback]
  • [Monitoring - Log Collection with ELK Stack]
  • [Monitoring - Log Monitoring with Kibana]
  • [Monitoring - Building a Monitoring System with Spring Boot Actuator]
  • [Monitoring - Server Monitoring with Prometheus and Grafana with Discord Alerts]

📖 Test

📖 Effective Java 3/E

📖 Kotlin Academy - Effective Kotlin

📖 Kotlin Academy - 핵심편

📖 스프링으로 시작하는 리액티브 프로그래밍

📖 가상 면접 사례로 배우는 대규모 시스템 설계 기초 1

📖 가상 면접 사례로 배우는 대규모 시스템 설계 기초 2

📖 Clean Code

📖 리팩토링 2판

📖 주니어 백엔드 개발자가 반드시 알아야 할 실무 지식

📖 GraphQL

Clone this wiki locally