Кастомная реализация основных концепций реактивного программирования на Java (аналог RxJava).
В проекте реализована система реактивных потоков с возможностью управления потоками выполнения и обработки событий, построенная на паттерне «Наблюдатель» (Observer). Реализованы базовые компоненты, операторы преобразования данных, планировщики потоков и механизмы отмены подписки.
-
Observable — источник данных, фабрики
create(),just(). -
Observer — интерфейс с методами
onNext(),onError(),onComplete(). -
Операторы (в пакете
com.dzrxjava.operators):Map(map)Filter(filter)FlatMap(flatMap)Merge(merge)Concat(concat)Reduce(reduce)
-
Schedulers (в пакете
com.dzrxjava.schedulers):IO(cached thread pool)Computation(fixed thread pool)Single(single-thread executor)
-
Disposable:
Disposable— отмена одной подпискиCompositeDisposable— групповая отмена
-
Логирование через SLF4J + Log4j
- Java 23+
- Maven
- SLF4J API + Log4j
- JUnit 5
-
Клонировать репозиторий:
git clone https://github.com/mephi-learn/dzrxjava cd dzrxjava -
Собрать и запустить тесты:
mvn clean test -
Запустить демонстрацию:
mvn exec:java -Dexec.mainClass="com.dzrxjava.Main"
-
Паттерн Observer:
- Источник (
Observable) делегирует эмиссию элементов черезOnSubscribe. - Потребитель реализует
Observerили передаёт лямбды вsubscribe(). Disposableконтролирует отмену,CompositeDisposable— групповую отмену.
- Источник (
-
Структура пакетов:
core— базовые компоненты и фабрики.operators— классы-операторы для модульности.schedulers— управление планировщиками потоков.
-
Flow:
- Построение цепочки:
Observable.create(...)→ операторы →subscribeOn()/observeOn()→subscribe(). - Все переходы потоков выполняются через
Rx.schedule(...).
- Построение цепочки:
| Scheduler | Реализация | Применение |
|---|---|---|
| IO | CachedThreadPool |
I/O задачи, сеть |
| Computation | FixedThreadPool(N=CPU) |
CPU-bound вычисления |
| Single | SingleThreadExecutor |
Последовательная обработка |
subscribeOn()определяет поток подписки.observeOn()переключает поток обработки событий.
В проекте написаны юнит-тесты JUnit 5 для ключевых сценариев:
-
Базовая работа
create()+subscribe(onNext, onError, onComplete)just(), проверка эмиссии и завершения.
-
Операторы
map,filterflatMap,merge,concat,reduce
-
Планировщики
subscribeOn/observeOnпроверяют переключение потоков.
-
Обработка ошибок
- Эмит
onError, проверка прекращенияonNext.
- Эмит
-
Отмена подписки
Disposable.dispose(),CompositeDisposable.dispose().