ํฌํฌ/์กฐ์ธ ํ๋ ์์ํฌ ์ ๋ณ๋ ฌ ์คํธ๋ฆผ ์ ๋ณ๋ ฌ์ฑ์ ๊ท์คํ ๋๊ตฌ๋ค. ์ด๋ค์ ํ ํ์คํฌ๋ฅผ ์ฌ๋ฌ ํ์ ํ์คํฌ๋ก ๋๋์ด CPU์ ๋ค๋ฅธ ์ฝ์ด ๋๋ ๋ค๋ฅธ ๋จธ์ ์์ ์ด๋ค ํ์ ํ์คํฌ๋ฅผ ๋ณ๋ ฌ๋ก ์คํํ๋ค.
๋ฐ๋ฉด ๋ณ๋ ฌ์ฑ์ด ์๋๋ผ ๋์์ฑ์ ํ์๋ก ํ๋ ์ํฉ, ์ฆ ์กฐ๊ธ์ฉ ์ฐ๊ด๋ ์์
์ ๊ฐ์ CPU์์ ๋์ํ๋ ๊ฒ ๋๋ ์ ํ๋ฆฌ์ผ์ด์
์ ์์ฐ์ฑ์ ๊ทน๋ํํ ์ ์๋๋ก ์ฝ์ด๋ฅผ ๋ฐ์๊ฒ ์ ์งํ๋ ๊ฒ์ด ๋ชฉํ๋ผ๋ฉด, ์๊ฒฉ ์๋น์ค๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์ค๋ ๋๋ฅผ ๋ธ๋กํจ์ผ๋ก ์ฐ์ฐ ์์์ ๋ญ๋นํ๋ ์ผ์ ํผํด์ผ ํ๋ค.
CompletableFuture์ java.util.concurrent.Flow์ ๊ถ๊ทน์ ์ธ ๋ชฉํ๋ ๊ฐ๋ฅํํ ๋์์ ์คํํ ์ ์๋ ๋ ๋ฆฝ์ ์ธ ํ์คํฌ๋ฅผ ๊ฐ๋ฅํ๊ฒ ๋ง๋ค๋ฉด์ ๋ฉํฐ์ฝ์ด ๋๋ ์ฌ๋ฌ ๊ธฐ๊ธฐ๋ฅผ ํตํด ์ ๊ณต๋๋ ๋ณ๋ ฌ์ฑ์ ์ฝ๊ฒ ์ด์ฉํ๋ ๊ฒ์ด๋ค.
sum = Arrays.stream(numbers).parallel().sum();
์์ ๊ฐ์ด ์๋ฐ ์คํธ๋ฆผ์ ํตํด ๋ณ๋ ฌ ์คํธ๋ฆผ์ ์ฝ๊ฒ ํ์ฉํ ์ ์๊ฒ ๋์๋ค.
์๋ ๊ณผ ๊ฐ์ด ๋ช
์์ ์ผ๋ก ์ค๋ ๋๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๋นํด ์คํธ๋ฆผ์ ์ด์ฉํด ์ค๋ ๋ ์ฌ์ฉ ํจํด์ ์ถ์ํํ์๋ค๊ณ ๋ณผ ์ ์๋ค.
์คํธ๋ฆผ์ผ๋ก ์ถ์ํํ๋ ๊ฒ์ ์ธ๋ชจ ์๋ ์ฝ๋๊ฐ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ด๋ถ๋ก ๊ตฌํ๋๋ฉด์ ๋ณต์ก์ฑ๋ ์ค์ด๋ ๋ค๋ ์ฅ์ ์ด ๋ ํด์ง๋ค.
์๋ฐ 5์์ Executor ํ๋ ์์ํฌ์ ์ค๋ ๋ ํ์ ํตํด ์๋ฐ ๊ฐ๋ฐ์๊ฐ ์ง์ ํ์คํฌ ์ ์ถ๊ณผ ์คํ์ ๋ถ๋ฆฌํ ์ ์๋๋ก ์ ๊ณตํ๋ค.
์๋ฐ ์ค๋ ๋๋ ์ง์ ์ด์์ฒด์ ์ค๋ ๋์ ์ ๊ทผํ๋ค.
์ด์์ฒด์ ์ค๋ ๋๋ฅผ ๋ง๋ค๊ณ ์ข
๋ฃํ๋ ค๋ฉด ๋น์ผ ๋น์ฉ(ํ์ด์ง ํ
์ด๋ธ๊ณผ ๊ด๋ จํ ์ํธ์์ฉ)์ ์น๋ฌ์ผ ํ๋ฉฐ ๋์ฑ์ด ์ด์์ฒด์ ์ค๋ ๋์ ์ซ์๋ ์ ํ๋์ด ์๋ ๊ฒ์ด ๋ฌธ์ ๋ค.
์ด์์ฒด์ ๊ฐ ์ง์ํ๋ ์ค๋ ๋ ์๋ฅผ ์ด๊ณผํด ์ฌ์ฉํ๋ฉด ์๋ฐ ์ ํ๋ฆฌ์ผ์ด์
์ด ์์์น ๋ชปํ ๋ฐฉ์์ผ๋ก ํฌ๋์ ๋ ์ ์์ผ๋ฏ๋ก ๊ธฐ์กด ์ค๋ ๋๊ฐ ์คํ๋๋ ์ํ์์ ๊ณ์ ์๋ก์ด ์ค๋ ๋๋ฅผ ๋ง๋๋ ์ํฉ์ด ์ผ์ด๋์ง ์๋๋ก ์ฃผ์ํด์ผ ํ๋ค.
๋ณดํต ์ด์์ฒด์ ์ ์๋ฐ์ ์ค๋ ๋ ๊ฐ์๊ฐ ํ๋์จ์ด ์ค๋ ๋ ๊ฐ์๋ณด๋ค ๋ง์ผ๋ฏ๋ก ์ผ๋ถ ์ด์์ฒด์ ์ค๋ ๋๊ฐ ๋ธ๋ก๋๊ฑฐ๋ ์๊ณ ์๋ ์ํฉ์์ ๋ชจ๋ ํ๋์จ์ด ์ค๋ ๋๊ฐ ์ฝ๋๋ฅผ ์คํํ๋๋ก ํ ๋น๋ ์ํฉ์ ๋์ ์ ์๋ค.
๋ง์ฝ 8๊ฐ์ ์ฝ์ด๋ฅผ ๊ฐ์ง๋ฉฐ ๊ฐ ์ฝ์ด๋ ๋ ๊ฐ์ ๋์นญ ๋ฉํฐํ๋ก์ธ์ฑ(symmetric multiprocessing) ํ๋์จ์ด ์ค๋ ๋๋ฅผ ํฌํจํ๋ฏ๋ก ํ๋์จ์ด ์ค๋ ๋๋ฅผ ์ด 16๊ฐ ํฌํจํ๋๋ฐ ์๋ฒ์๋ ํ๋ก์ธ์๋ฅผ ์ฌ๋ฌ ๊ฐ ํฌํจํ ์ ์์ผ๋ฏ๋ก ํ๋์จ์ด ์ค๋ ๋ 64๊ฐ๋ฅผ ๋ณด์ ํ ์ ์๋ค.
ํ๋ก๊ทธ๋จ์์ ์ฌ์ฉํ ์ต์ ์ ์๋ฐ ์ค๋ ๋ ๊ฐ์๋ ์ฌ์ฉํ ์ ์๋ ํ๋์จ์ด์ ์ฝ์ด ๊ฐ์์ ๋ฐ๋ผ ๋ฌ๋ผ์ง๋ค.
์๋ฐ ExecutorService
๋ ํ์คํฌ๋ฅผ ์ ์ถํ๊ณ ๋์ค์ ๊ฒฐ๊ณผ๋ฅผ ์์งํ ์ ์๋ ์ธํฐํ์ด์ค๋ฅผ ์ ๊ณตํ๋ค.
Executors.newFixedThreadPool
๊ฐ์ ํฉํ ๋ฆฌ ๋ฉ์๋ ์ค ํ๋๋ฅผ ์ด์ฉํด ์ค๋ ๋ ํ์ ๋ง๋ค์ด ์ฌ์ฉํ ์ ์๋ค.
์ค๋ ๋ ํ์์ ์ฌ์ฉํ์ง ์์ ์ค๋ ๋๋ก ์ ์ถ๋ ํ์คํฌ๋ฅผ ๋จผ์ ์จ ์์๋๋ก ์คํํ๋ค.
์ด๋ค ํ์คํฌ ์คํ์ด ์ข
๋ฃ๋๋ฉด ์ด๋ค ์ค๋ ๋๋ฅผ ํ๋ก ๋ฐํํ๋ค.
ํ๋์จ์ด์ ๋ง๋ ์์ ํ์คํฌ๋ฅผ ์ ์งํจ๊ณผ ๋์์ ์ ์ฒ๊ฐ์ ํ์คํฌ๋ฅผ ์ค๋ ๋ ํ์ ์๋ฌด ์ค๋ฒํค๋ ์์ด ์ ์ถํ ์ ์๋ค๋ ์ ์ด๋ค.
ํ๋ก๊ทธ๋๋จธ๋ ํ์คํฌ(Runnable์ด๋ Callable)๋ฅผ ์ ๊ณตํ๋ฉด ์ค๋ ๋๊ฐ ์ด๋ฅผ ์คํํ๋ค.
์ค๋ ๋๋ฅผ ์ง์ ์ฌ์ฉํ๋ ๊ฒ๋ณด๋ค ์ค๋ ๋ ํ์ ์ด์ฉํ๋ ๊ฒ์ด ๋ฐ๋์งํ์ง๋ง ๋ ๊ฐ์ง ์ฌํญ์ ์ฃผ์ํด์ผ ํ๋ค.
k
์ค๋ ๋๋ฅผ ๊ฐ์ง ์ค๋ ๋ ํ์ ์ค์งk
๋งํผ์ ์ค๋ ๋๋ฅผ ๋์์ ์คํํ ์ ์๋ค. ์ด๊ณผ๋ก ์ ์ถ๋ ํ์คํฌ๋ ํ์ ์ ์ฅ๋๋ฉฐ ์ด์ ์ ํ์คํฌ ์ค ํ๋๊ฐ ์ข ๋ฃ๋๊ธฐ ์ ๊น์ง๋ ์ค๋ ๋์ ํ ๋นํ์ง ์๋๋ค.- ๋ถํ์ํ๊ฒ ๋ง์ ์ค๋ ๋๋ฅผ ๋ง๋๋ ์ผ์ ํผํ ์ ์์ผ๋ฏ๋ก ๋ณดํต ์ด ์ํฉ์ ์๋ฌด ๋ฌธ์ ๊ฐ ๋์ง ์์ง๋ง sleep ์ํ๊ฑฐ๋ I/O๋ฅผ ๊ธฐ๋ค๋ฆฌ๊ฑฐ๋ ๋คํธ์ํฌ ์ฐ๊ฒฐ์ ๊ธฐ๋ค๋ฆฌ๋ ํ์คํฌ๊ฐ ์๋ค๋ฉด ์ฃผ์ํด์ผ ํ๋ค.
- ์ค๋ ๋ ๋ธ๋ก ์ํฉ์์๋ ํ์คํฌ๊ฐ ์์ปค ์ค๋ ๋์ ํ ๋น๋ ์ํ๋ฅผ ์ ์งํ์ง๋ง ์๋ฌด ์์ ๋ ํ์ง ์๊ฒ ๋๋ค.
- ์์ ๊ทธ๋ฆผ์ ๋ณด๋ฉด
4๊ฐ์ ํ๋์จ์ด ์ค๋ ๋
์5๊ฐ์ ์ค๋ ๋๋ฅผ ๊ฐ๋ ์ค๋ ๋ ํ
์20๊ฐ์ ํ์คํฌ
๋ฅผ ์ ์ถํ๋ค๊ณ ๊ฐ์ ํ์. - ๋ชจ๋ ํ์คํฌ๊ฐ ๋ณ๋ ฌ๋ก ์คํ๋๋ฉด์ 20๊ฐ์ ํ์คํฌ๋ฅผ ์คํํ ๊ฒ์ด๋ผ ์์ํ ์ ์๋ค. ์ฒ์ ์ ์ถํ ์ธ ์ค๋ ๋๊ฐ ์ ์ ์๊ฑฐ๋ I/O๋ฅผ ๊ธฐ๋ค๋ฆฐ๋ค๊ณ ๊ฐ์ ํ์.
- ๊ทธ๋ฌ๋ฉด ๋๋จธ์ง 15๊ฐ์ ํ์คํฌ๋ฅผ ๋ ์ค๋ ๋๊ฐ ์คํํด์ผ ํ๋ฏ๋ก ์์ ํจ์จ์ฑ์ด ์์๋ณด๋ค ์ ๋ฐ์ผ๋ก ๋จ์ด์ง๋ค.
- ์ฒ์ ์ ์ถํ ํ์คํฌ๊ฐ ๊ธฐ์กด ์คํ ์ค์ธ ํ์คํฌ๊ฐ ๋์ค์ ํ์คํฌ ์ ์ถ์ ๊ธฐ๋ค๋ฆฌ๋ ์ํฉ(Future์ ์ผ๋ฐ์ ์ธ ํจํด)์ด๋ผ๋ฉด ๋ฐ๋๋ฝ์ ๊ฑธ๋ฆด ์๋ ์๋ค.
- ํต์ฌ์ ๋ธ๋ก(์๊ฑฐ๋ ์ด๋ฒคํธ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋)ํ ์ ์๋ ํ์คํฌ๋ ์ค๋ ๋ ํ์ ์ ์ถํ์ง ๋ง์์ผ ํ๋ค๋ ๊ฒ์ด์ง๋ง ํญ์ ์ด๋ฅผ ์งํฌ ์ ์๋ ๊ฒ์ ์๋๋ค.
- ์ค์ํ ์ฝ๋๋ฅผ ์คํํ๋ ์ค๋ ๋๊ฐ ์ฃฝ๋ ์ผ์ด ๋ฐ์ํ์ง ์๋๋ก ๋ณดํต ์๋ฐ ํ๋ก๊ทธ๋จ์
main
์ด ๋ฐํ๋๊ธฐ ์ ์ ๋ชจ๋ ์ค๋ ๋์ ์์ ์ด ๋๋๊ธธ ๊ธฐ๋ค๋ฆฐ๋ค.- ์ค๋ ๋ ํ์ ์์ปค ์ค๋ ๋๊ฐ ๋ง๋ค์ด์ง ๋ค์ ๋ค๋ฅธ ํ์คํฌ ์ ์ถ์ ๊ธฐ๋ค๋ฆฌ๋ฉด์ ์ข ๋ฃ๋์ง ์์ ์ํ์ผ ์ ์์ผ๋ฏ๋ก ์ค๋ ๋ ํ์ ์ข ๋ฃํ๋ ์ต๊ด์ ๊ฐ๋ ๊ฒ์ด ์ค์ํ๋ค.
- ๋ณดํต ์ฅ๊ธฐ๊ฐ ์คํํ๋ ์ธํฐ๋ท ์๋น์ค๋ฅผ ๊ด๋ฆฌํ๋๋ก ์ค๋ ์คํ๋๋
ExecutorService
๋ฅผ ๊ฐ๋ ๊ฒ์ ํํ ์ผ์ด๋ค. - ์๋ฐ๋ ์ด๋ฐ ์ํฉ์ ๋ค๋ฃฐ ์ ์๋๋ก
Thread.setDaemon
๋ฉ์๋๋ฅผ ์ ๊ณตํ๋ค.
ํฌํฌ/์กฐ์ธ ํ๋ ์์ํฌ์์ ๋งํ๋ ๋์์ฑ์ ํ ๊ฐ์ ํน๋ณํ ์์ฑ
์ฆ, ํ์คํฌ๋ ์ค๋ ๋๊ฐ ๋ฉ์๋ ํธ์ถ ์์์ ์์๋๋ฉด ๊ทธ ๋ฉ์๋ ํธ์ถ์ ๋ฐํํ์ง ์๊ณ ์์
์ด ๋๋๊ธฐ๋ฅผ ๊ธฐ๋ค๋ ธ๋ค.
๋ค์ ๋งํด ์ค๋ ๋ ์์ฑ
๊ณผ join()
์ด ํ ์์ฒ๋ผ ์ค์ฒฉ๋ ๋ฉ์๋ ํธ์ถ ๋ด์ ์ถ๊ฐ๋์๋ค.
์ด๋ฆ ์๊ฒฉํ ํฌํฌ/์กฐ์ธ ์ด๋ผ๊ณ ํ๋ค.
์๊ฒฉํ ํฌํฌ/์กฐ์ธ, ํ์ดํ๋ ์ค๋ ๋, ์์ ํฌํฌ์ ์กฐ์ธ์, ์ฌ๊ฐํ์ ๋ฉ์๋ ํธ์ถ๊ณผ ๋ฐํ์ ์๋ฏธํ๋ค.
์ฌ์ ๋ก์ด ํฌํฌ/์กฐ์ธ
์๊ฒฉํ๊ณผ ์ฌ์ ๋ก์ด์ ์ฐจ์ด๋ ๋ฉ์ธ ์ค๋ ๋๊ฐ ์ด๋์์ ํฌํฌ๋ฅผ ํ๋์ง์ ์ฐจ์ด์ธ ๊ฒ ๊ฐ๋ค.
์๊ฒฉํ์ ํฌํฌ๋ฅผ ๋ฌ ์ดํ์ ์์ ์ค๋ ๋์ ๋ก์ง์ด ์คํ๋๋๋ก ๋์ด์๊ณ , ์ฌ์ ๋ก์ด์ ๋ฉ์๋ ๋ด๋ถ์์ ๋ฉ์ธ ์ค๋ ๋๊ฐ ํฌํฌ๋ฅผ ๋จ๋ ์ฐจ์ด์ธ ๊ฒ์ผ๋ก ๋ณด์ธ๋ค.
์๋์ ๊ฐ์ด๋ฉ์๋ ํธ์ถ์์ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋๋ก ๋ฉ์๋๊ฐ ๋ฐํ๋ ํ์๋ ๋ง๋ค์ด์ง ํ์คํฌ ์คํ์ด ๊ณ์๋๋ ๋ฉ์๋๋ฅผ ๋น๋๊ธฐ ๋ฉ์๋๋ผ ํ๋ค.
- ์ค๋ ๋ ์คํ์ ๋ฉ์๋๋ฅผ ํธ์ถํ ๋ค์์ ์ฝ๋์ ๋์์ ์คํ๋๋ฏ๋ก ๋ฐ์ดํฐ ๊ฒฝ์ ๋ฌธ์ ๋ฅผ ์ผ์ผํค์ง ์๋๋ก ์ฃผ์ํด์ผ ํ๋ค.
- ๊ธฐ์กด ์คํ ์ค์ด๋ ์ค๋ ๋๊ฐ ์ข
๋ฃ๋์ง ์์ ์ํฉ์์ ์๋ฐ์
main()
๋ฉ์๋๊ฐ ๋ฐํํ๋ฉด ์ด๋ป๊ฒ ๋ ๊น? ๋ค์๊ณผ ๊ฐ์ ๋ ๊ฐ์ง ๋ฐฉ๋ฒ์ด ์๊ธดํ์ง๋ง ์ด๋ ๋ฐฉ๋ฒ๋ ์์ ํ์ง ์๋ค.- ์ ํ๋ฆฌ์ผ์ด์ ์ ์ข ๋ฃํ์ง ๋ชปํ๊ณ ๋ชจ๋ ์ค๋ ๋๊ฐ ์คํ์ ๋๋ผ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
- ์ ํ๋ฆฌ์ผ์ด์ ์ข ๋ฃ๋ฅผ ๋ฐฉํดํ๋ ์ค๋ ๋๋ฅผ ๊ฐ์ ์ข ๋ฃ ์ํค๊ณ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ข ๋ฃํ๋ค.
main()
๋ฉ์๋๋ ๋ชจ๋ ๋น๋ฐ๋ชฌ ์ค๋ ๋๊ฐ ์ข ๋ฃ๋ ๋ ๊น์ง ํ๋ก๊ทธ๋จ์ ์ข ๋ฃํ์ง ์๊ณ ๊ธฐ๋ค๋ฆฐ๋ค.
์ค๋ ๋๋ฅผ sleep()
ํ์ฌ๋ ์ฌ์ ํ ์์คํ
์์์ ์ ์ ํ๊ณ ์๋ค.
์ค๋ ๋ ํ์์ ์ ์ ์๋ ํ์คํฌ๋ ๋ค๋ฅธ ํ์คํฌ๊ฐ ์์๋์ง ๋ชปํ๊ฒ ๋ง์ผ๋ฏ๋ก ์์์ ์๋นํ๋ ์ฌ์ค์ ๋ช
์ฌํด์ผ ํ๋ค.
๋ชจ๋ ๋ธ๋ก ๋์๋ ๋ง์ฐฌ๊ฐ์ง๋ค. ๋ธ๋ก ๋์์ ๋ค๋ฅธ ํ์คํฌ๊ฐ ์ด๋ค ๋์์ ์๋ฃํ๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ๋์(์๋ฅผ ๋ค์ด, Future์ get() ํธ์ถ)๊ณผ ์ธ๋ถ ์ํธ์์ฉ(์๋ฅผ ๋ค์ด, ๋๋น ์์
์ด๋ ์ฌ์ฉ์ ์
๋ ฅ์ ๊ธฐ๋ค๋ฆฌ๋ ๋ฑ)์ ๊ธฐ๋ค๋ฆฌ๋ ๋์ ๋ ๊ฐ์ง๋ก ๊ตฌ๋ถํ ์ ์๋ค.
// A์ฝ๋
void scheduling() throws InterruptedException() {
doSomething("first");
Thread.sleep(5000);
doSomething("second");
}
// B์ฝ๋
void scheduling() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
doSomething("first");
executorService.schedule(() -> doSomething("second"), 5, SECONDS);
executorService.shutdown();
}
์์ ์ฝ๋๋ ์ค๋ ๋ ํ ํ์ ์ถ๊ฐ๋๋ฉฐ ๋์ค์ ์ฐจ๋ก๊ฐ ๋๋ฉด ์คํ๋๋ค.
ํ์ง๋ง ์ฝ๋๊ฐ ์คํ๋๋ฉด ์์ปค ์ค๋ ๋๋ฅผ ์ ์ ํ ์ํ์์ ์๋ฌด๊ฒ๋ ํ์ง ์๊ณ 5์ด๋ฅผ ์๋ค.
๊ทธ๋ฆฌ๊ณ ๊นจ์ด๋์ ๋ ๋ฒ์งธ ์์
์ ์คํํ ๋ค์ ์์
์ ์ข
๋ฃํ๊ณ ์์ปค ์ค๋ ๋๋ฅผ ํด์ ํ๋ค.
๋ ๊ฐ์ ์ฝ๋๋ ๋๊ฐ์ ํ๋์ ์ํํ์ง๋ง B์ฝ๋
๊ฐ ๋ ์ข์ ์ด์ ๋ A๊ฐ ์๋ ๋์ ์ค๋ ๋ ์์์ ์ ์ ํ๋ ๋ฐ๋ฉด B๋ ๋ค๋ฅธ ์์
์ด ์คํ๋ ์ ์๋๋ก ํ์ฉํ๋ค๋ ์ ์ด๋ค.
(์ค๋ ๋๋ฅผ ์ฌ์ฉํ ํ์๊ฐ ์์ด ๋ฉ๋ชจ๋ฆฌ๋ง ์กฐ๊ธ ๋ ์ฌ์ฉํ๋ค.)
ํ์คํฌ๋ฅผ ๋ง๋ค ๋๋ ์ด๋ฐ ํน์ง์ ์ ํ์ฉํด์ผ ํ๋ค.
ํ์คํฌ๊ฐ ์คํ๋๋ฉด ๊ท์คํ ์์์ ์ ์ ํ๋ฏ๋ก ํ์คํฌ๊ฐ ๋๋์ ์์์ ํด์ ํ๊ธฐ ์ ๊น์ง ํ์คํฌ๋ฅผ ๊ณ์ ์คํํด์ผ ํ๋ค.
ํ์คํฌ๋ฅผ ๋ธ๋กํ๋ ๊ฒ๋ณด๋ค๋ ๋ค์ ์์
์ ํ์คํฌ๋ก ์ ์ถํ๊ณ ํ์ฌ ํ์คํฌ๋ ์ข
๋ฃํ๋ ๊ฒ์ด ๋ฐ๋์งํ๋ค.
๋ํ์ ์ผ๋ก I/O ์์
์ ์ ์ฉํ ์ ์๋ค.
๊ณ ์ ์ ์ผ๋ก ์ฝ๊ธฐ ์์
์ ๊ธฐ๋ค๋ฆฌ๋ ๊ฒ์ด ์๋๋ผ ๋ธ๋กํ์ง ์๋ '์ฝ๊ธฐ ์์' ๋ฉ์๋๋ฅผ ํธ์ถํ๊ณ ์ฝ๊ธฐ ์์
์ด ๋๋๋ฉด ์ด๋ฅผ ์ฒ๋ฆฌํ ๋ค์ ํ์คํฌ๋ฅผ ๋ฐํ์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ค์ผ์คํ๋๋ก ์์ฒญํ๊ณ ์ข
๋ฃํ๋ค.
์ค๋ ๋์๋ ์ ํ์ด ์๊ณ ์ ๋ ดํ์ง ์์ผ๋ฏ๋ก ์ ์ ์๊ฑฐ๋ ๋ธ๋กํด์ผ ํ๋ ์ฌ๋ฌ ํ์คํฌ๊ฐ ์์ ๋ ๊ฐ๋ฅํ๋ฉด B์ฝ๋
์ ํ์์ ๋ฐ๋ฅด๋ ๊ฒ์ด ์ข๋ค.
CompletableFuture
์ธํฐํ์ด์ค๋ ์ด์ ์ ์ดํด๋ณธ Future
์ get()
์ ์ด์ฉํด ๋ช
์์ ์ผ๋ก ๋ธ๋กํ์ง ์๊ณ ์ฝค๋น๋ค์ดํฐ๋ฅผ ์ฌ์ฉํจ์ผ๋ก ์ด๋ฐ ํ์์ ์ฝ๋๋ฅผ ๋ฐํ์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ด์ ์ถ๊ฐํ๋ค.
Future
๋ ๋ฆฌ์กํฐ๋ธ ํ์์ ๋น๋๊ธฐ API์์ ํธ์ถ๋ ๋ฉ์๋์ ์ค์ ๋ฐ๋๋ ๋ณ๋์ ์ค๋ ๋์์ ํธ์ถ๋๋ฉฐ ์ด๋ ๋ฐ์ํ๋ ์ด๋ค ์๋ฌ๋ ์ด๋ฏธ ํธ์ถ์์ ์คํ ๋ฒ์์๋ ๊ด๊ณ๊ฐ ์๋ ์ํฉ์ด ๋๋ค.
Future
๋ฅผ ๊ตฌํํ CompletableFuture
์์๋ ๋ฐํ์ get()
๋ฉ์๋์ ์์ธ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ฉฐ ์์ธ์์ ํ๋ณตํ ์ ์๋๋ก exceptionally()
๊ฐ์ ๋ฉ์๋๋ ์ ๊ณตํ๋ค.
๋ฆฌ์กํฐ๋ธ ํ์์ ๋น๋๊ธฐ API์์๋ return
๋์ ๊ธฐ์กด ์ฝ๋ฐฑ์ด ํธ์ถ๋๋ฏ๋ก ์์ธ๊ฐ ๋ฐ์ํ์ ๋ ์๋์ ๊ฐ์ด ์คํ๋ ์ถ๊ฐ ์ฝ๋ฐฑ์ ๋ง๋ค์ด ์ธํฐํ์ด์ค๋ฅผ ๋ฐ๊ฟ์ผ ํ๋ค.
void testFunc(int x, Consumer<Integer> dealWithResult, Consumer<Throwable> dealWithException);
Flow API
์์๋ ์ฌ๋ฌ ๊ฐ์ ์ฝ๋ฐฑ์ ํ ๊ฐ์ฒด(๋ค ๊ฐ์ ์ฝ๋ฐฑ์ ๊ฐ๊ฐ ๋ํํ๋ ๋ค ๋ฉ์๋๋ฅผ ํฌํจํ๋ Subscriber<T>
)๋ก ๊ฐ์ผ๋ค.
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
/**
* ๊ฐ์ด ์์ ๋
*/
public void onNext(T item);
/**
* ๋์ค์ ์๋ฌ๊ฐ ๋ฐ์ํ์ ๋
*/
public void onError(Throwable throwable);
/**
* ๊ฐ์ ๋ค ์์งํ๊ฑฐ๋ ์๋ฌ๊ฐ ๋ฐ์ํด์ ๋ ์ด์ ์ฒ๋ฆฌํ ๋ฐ์ดํฐ๊ฐ ์์ ๋
*/
public void onComplete();
}
์์ ๊ฐ์ ์ข ๋ฅ์ ํธ์ถ์ ๋ฉ์์ง ๋๋ ์ด๋ฒคํธ ๋ผ ๋ถ๋ฅธ๋ค.
int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
Future<Integer> a3 = executorService.submit(() -> q3(t));
System.out.println( r(a1.get(),a2.get() + a3.get()));
์์ ์ฝ๋์์ ๋ณ๋ ฌ์ฑ์ ๊ทน๋ํํ๋ ค๋ฉด ๋ชจ๋ ํจ์๋ฅผ Future
๋ก ๊ฐ์ธ์ผ ํ๋ค.
์ด๋ ๊ฒ ๋ง์ ํ์คํฌ๊ฐ get()
๋ฉ์๋๋ฅผ ํธ์ถํด Future
๊ฐ ๋๋๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์ํ์ ๋์ด๊ฒ ๋๋ฉด ํ๋์จ์ด์ ๋ณ๋ ฌ์ฑ์ ์ ๋๋ก ํ์ฉํ์ง ๋ชปํ๊ฑฐ๋ ์ฌ์ง์ด ๋ฐ๋๋ฝ์ ๊ฑธ๋ฆด ์๋ ์๋ค.
CompletableFuture ์ ์ฝค๋น๋ค์ดํฐ ๋ฅผ ํ์ฉํด ํ์คํฌ๊ฐ ๊ธฐ๋ค๋ฆฌ๊ฒ ๋ง๋๋ ์ผ์ ํผํ ์ ์๋ค.
(๋ง์ฐฌ๊ฐ์ง๋ก ์๋ฐ 8 ์คํธ๋ฆผ์ ์๋ฃ ๊ตฌ์กฐ๋ฅผ ๋ฐ๋ณตํด์ผ ํ๋ ์ฝ๋๋ฅผ ๋ด๋ถ์ ์ผ๋ก ์์
์ ์ฒ๋ฆฌํ๋ ์คํธ๋ฆผ ์ฝค๋น๋ค์ดํฐ๋ก ๋ฐ๊ฟ์ค๋ค.)
์๋ฐ 8์์ Future ์ธํฐํ์ด์ค์ ๊ตฌํ์ธ CompletableFuture
๋ฅผ ์ด์ฉํด Future๋ฅผ ์กฐํฉํ ์ ์๋ ๊ธฐ๋ฅ์ด ์ถ๊ฐ๋๋ค.
๊ทธ๋ผ ComposableFuture
์ด ์๋๋ผ CompletableFuture
๋ผ๊ณ ๋ถ๋ฅด๋ ์ด์ ๋ ๋ญ๊น?
์ผ๋ฐ์ ์ผ๋ก Future๋ ์คํํด์ get()์ผ๋ก ๊ฒฐ๊ณผ๋ฅผ ์ป์ ์ ์๋ Callable
๋ก ๋ง๋ค์ด์ง๋ค.
ํ์ง๋ง CompletableFuture๋ ์คํํ ์ฝ๋ ์์ด Future๋ฅผ ๋ง๋ค ์ ์๋๋ก ํ์ฉํ๋ฉฐ complete() ๋ฉ์๋๋ฅผ ์ด์ฉํด ๋์ค์ ์ด๋ค ๊ฐ์ ์ด์ฉํด ๋ค๋ฅธ ์ค๋ ๋๊ฐ ์ด๋ฅผ ์๋ฃํ ์ ์๊ณ get()์ผ๋ก ๊ฐ์ ์ป์ ์ ์๋๋ก ํ์ฉํ๊ธฐ ๋๋ฌธ์ CompletableFuture๋ผ๊ณ ๋ถ๋ฅธ๋ค.
@Test
@DisplayName("CompletableFuture ์ ์ฉ")
void completableFutureCombine() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> result1 = executorService.submit(() -> sum(100));
Future<Integer> result2 = executorService.submit(() -> sum(1000));
Integer sum = Integer.sum(result1.get(), result2.get());
Assertions.assertThat(sum).isEqualTo(505550);
CompletableFuture<Integer> completableFuture1 = new CompletableFuture<>();
CompletableFuture<Integer> completableFuture2 = new CompletableFuture<>();
CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2, Integer::sum);
executorService.submit(() -> completableFuture1.complete(sum(100)));
executorService.submit(() -> completableFuture2.complete(sum(1000)));
Assertions.assertThat(completableFuture3.get()).isEqualTo(505550);
executorService.shutdown();
}
thenCombine()
์ ํตํด ๋ ์ฐ์ฐ์ด ๋๋ฌ์ ๋ ์ค๋ ๋ ํ์์ ์คํ๋ ์ฐ์ฐ์ ๋ง๋ ๋ค.
๊ฒฐ๊ณผ๋ฅผ ๋ ํ๋ ์ฐ์ฐ์ ์์ ๋ ์์
์ด ๋๋๊ธฐ ์ ๊น์ง๋ ์คํ๋์ง ์์ ๋จผ์ ์์ํด์ ๋ธ๋ก๋์ง ์๋ ๊ฒ์ด ํน์ง์ด๋ค.
์ํฉ์ ๋ฐ๋ผ ๋ฉ์ธ ์ค๋ ๋๊ฐ Future
์ get()
์ ๊ธฐ๋ค๋ฆฌ๋ ์ํฉ์ด ํฐ ๋ฌธ์ ๊ฐ ๋์ง ์์ ๋๋ Future
๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉํด๋ ๊ด์ฐฎ๋ค.
ํ์ง๋ง ์ฌ๋ฌ ๊ฐ์ Future
๋ฅผ ์ฌ์ฉํด์ผ ํ๋ ๊ฒฝ์ฐ์๋ ๋ณ๋ ฌ ์คํ์ ํจ์จ์ฑ์ ๋์ด๊ณ ๋ฐ๋๋ฝ์ ํผํ ์ ์๋ ์ฝค๋น๋ค์ดํฐ๋ฅผ ํ์ฉํ์.
์๋ฐ 9์์๋ Flow
์ ์ธํฐํ์ด์ค์ ๋ฐํ-๊ตฌ๋
๋ชจ๋ธ ์ ์ ์ฉํด ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ์ ๊ณตํ๋ค.
- ๊ตฌ๋ ์ ๊ฐ ๊ตฌ๋ ํ ์ ์๋ ๋ฐํ์
- ์ด ์ฐ๊ฒฐ์ ๊ตฌ๋ ์ด๋ผ ํ๋ค.
- ์ด ์ฐ๊ฒฐ์ ์ด์ฉํด ๋ฉ์์ง (๋๋ ์ด๋ฒคํธ)๋ฅผ ์ ์กํ๋ค.
์ฌ๋ฌ ์ปดํฌ๋ํธ๊ฐ ํ ๊ตฌ๋ ์๋ก ๊ตฌ๋ ํ ์ ์๊ณ ํ ์ปดํฌ๋ํธ๋ ์ฌ๋ฌ ๊ฐ๋ณ ์คํธ๋ฆผ์ ๋ฐํํ ์ ์์ผ๋ฉฐ ํ ์ปดํฌ๋ํธ๋ ์ฌ๋ฌ ๊ตฌ๋ ์์ ๊ฐ์ ํ ์ ์๋ค.
๋ ์ ๋ณด ์์ค๋ก ๋ถํฐ ๋ฐ์ํ๋ ์ด๋ฒคํธ๋ฅผ ํฉ์ณ์ ๋ค๋ฅธ ๊ตฌ๋
์๊ฐ ๋ณผ ์ ์๋๋ก ๋ฐํํ๋ ์๋ฅผ ํตํด ๋ฐํ-๊ตฌ๋
์ ํน์ง์ ํ์ธํ ์ ์๋ค.
=C1+C2
๋ผ๋ ๊ณต์์ ํฌํจํ๋ ์คํ๋ ๋์ํธ ์
C3
์ ๋ง๋ ๋ค๊ณ ๊ฐ์ ํ์.
C1์ด๋ C2 ์
์ ๊ฐ์ด ๋ณ๊ฒฝ๋์ ๋ C3์๊ฒ ๋ ๊ฐ์ ๋ํ๋๋ก C1๊ณผ C2์ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ ๋ C3์ ๊ตฌ๋
ํ๋๋ก ๋ง๋ค์ด๋ณด์.
interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
interface Subscriber<T> {
void onNext(T t);
}
private class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {
private int value = 0;
private String name;
private List<Subscriber> subscribers = new ArrayList<>();
public SimpleCell(String name) {
this.name = name;
}
public int getValue() {
return this.value;
}
private void notifyAllSubscribers() {
this.subscribers.forEach(subscriber -> subscriber.onNext(this.value));
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
this.subscribers.add(subscriber);
}
@Override
public void onNext(Integer integer) {
this.value = integer;
System.out.printf("%s : %d\n", this.name, this.value);
notifyAllSubscribers();
}
}
private class ArithmeticCell extends SimpleCell {
private int left;
private int right;
public ArithmeticCell(String name) {
super(name);
}
public void setLeft(int left) {
this.left = left;
super.onNext(left + this.right);
}
public void setRight(int right) {
this.right = right;
super.onNext(right + this.left);
}
}
@Test
@DisplayName("์
pub-sub")
void pubsub() {
SimpleCell c1 = new SimpleCell("C1");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c3 = new SimpleCell("C3");
c1.subscribe(c3);
c2.subscribe(c3);
c1.onNext(10);
Assertions.assertThat(c3.value).isEqualTo(10);
c2.onNext(20);
Assertions.assertThat(c3.value).isEqualTo(20);
ArithmeticCell c4 = new ArithmeticCell("C4");
c1.subscribe(c4::setLeft);
c2.subscribe(c4::setRight);
c1.onNext(10);
Assertions.assertThat(c4.getValue()).isEqualTo(10);
c2.onNext(20);
Assertions.assertThat(c4.getValue()).isEqualTo(30);
// C1 : 10
// C3 : 10
// C2 : 20
// C3 : 20
// C1 : 10
// C3 : 10
// C4 : 10
// C2 : 20
// C3 : 20
// C4 : 30
}
๋ฐ์ดํฐ๊ฐ ๋ฐํ์(์์ฐ์)์์ ๊ตฌ๋ ์(์๋น์)๋ก ํ๋ฆ์ ์ฐฉ์ํด ๊ฐ๋ฐ์๋ ์ด๋ฅผ ์ ์คํธ๋ฆผ ๋๋ ๋ค์ด์คํธ๋ฆผ ์ด๋ผ ๋ถ๋ฅธ๋ค.
์ ์์ ์์ ๋ฐ์ดํฐvalue
๋ ์ ์คํธ๋ฆผ onNext() ๋ฉ์๋๋ก ์ ๋ฌ๋๊ณ , notifyAllSubscribers() ํธ์ถ์ ํตํด ๋ค์ด์คํธ๋ฆผ onNext() ํธ์ถ๋ก ์ ๋ฌ๋๋ค.
์ฌ๊ธฐ์์๋ ์ค๋ช
ํ์ง ์์์ง๋ง onError
์ onComplete
์ด Flow API์ Subscriber์์ ์ง์๋๋ ํด๋น ๋ฉ์๋๋ค์ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ์ ํ๋ฆ์ ๋ ์์ธํ๊ฒ ์ ์ดํด์ผ ํ๋ค.
์์ ๊ฐ์ ๋ฐํ-๊ตฌ๋
ํจํด์์ ์์ฒญ๋ ๋ฐ์ดํฐ๊ฐ onNext()๋ก ์ ๋ฌ๋๋ค๋ฉด ์ด๋ป๊ฒ ๋ ๊น? ์ด๋ฐ ์ํฉ์ ์๋ ฅ ์ด๋ผ ๋ถ๋ฅธ๋ค.
์์ง ํ์ดํ์ ๋ง์ ๋ฐ์ดํฐ์ ์๋ ฅ์ด ๋ค์ด์จ๋ค๊ณ ์๊ฐํ์ ๋ ์ด ์๋ ฅ์ ์กฐ์ ํ๋ ์ญ์๋ ฅ ๊ธฐ๋ฒ์ด ํ์ํ๋ค.
๋ฐํ์๊ฐ ๋ฌดํ์ ์๋๋ก ์์ดํ
์ ๋ฐฉ์ถํ๋ ๋์ ์์ฒญํ์ ๋๋ง ๋ค์ ์์ดํ
์ ๋ณด๋ด๋๋กํ๋ request()
๋ฉ์๋ (Subscription์ด๋ผ๋ ์ ์ธํฐํ์ด์ค์ ํฌํจ)๋ฅผ ์ ๊ณตํ๋ค. (๋ฐ์ด๋ด๊ธฐ ๋ชจ๋ธ์ด ์๋๋ผ ๋น๊น ๋ชจ๋ธ)
Publisher
๋ ์ฌ๋ฌ Subscriber
๋ฅผ ๊ฐ์ง๊ณ ์์ผ๋ฏ๋ก ์ญ์๋ ฅ ์์ฒญ์ด ํ ์ฐ๊ฒฐ์๋ง ์ํฅ์ ๋ฏธ์ณ์ผ ํ๋ค๋ ๊ฒ์ด ๋ฌธ์ ๊ฐ ๋ ์ ์๋ค.
/**
* ์ง์ ๋ ๊ตฌ๋
์ ๋ํด ๋ค๋ฅธ ๊ตฌ๋
์ ๋ฉ์๋๋ฅผ ํธ์ถํ๊ธฐ ์ ์ ํธ์ถ๋๋ ๋ฉ์๋์
๋๋ค.
* ์ด ๋ฉ์๋์์ ์์ธ๊ฐ ๋ฐ์ํ๋ฉด ๊ฒฐ๊ณผ ๋์์ด ๋ณด์ฅ๋์ง ์์ง๋ง ๊ตฌ๋
์ด ์ค์ ๋์ง ์๊ฑฐ๋ ์ทจ์๋ ์ ์์ต๋๋ค.
* ์ผ๋ฐ์ ์ผ๋ก ์ด ๋ฉ์๋์ ๊ตฌํ์ ํญ๋ชฉ ์์ ์ ํ์ฑํํ๊ธฐ ์ํด subscription.request ํธ์ถํฉ๋๋ค.
*/
public void onSubscribe(Subscription subscription);
Publisher
์ Subscriber
์ฌ์ด์ ์ฑ๋์ด ์ฐ๊ฒฐ๋๋ฉด ์ฒซ ์ด๋ฒคํธ๋ก ์ด ๋ฉ์๋๊ฐ ํธ์ถ๋๋ค.
Subscription
๊ฐ์ฒด๋ ๋ค์์ฒ๋ผ Subscriber
์ Publisher
์ ํต์ ํ ์ ์๋ ๋ฉ์๋๋ฅผ ํฌํจํ๋ค.
public static interface Subscription {
public void request(long n);
public void cancel();
}
Publisher
๋ Subscription
๊ฐ์ฒด๋ฅผ ๋ง๋ค์ด Subscriber
๋ก ์ ๋ฌํ๋ฉด Subscriber
๋ ์ด๋ฅผ ์ด์ฉํด Publisher
๋ก ์ ๋ณด๋ฅผ ๋ณด๋ผ ์ ์๋ค.
public class Shop {
public double getPrice(String product) {
return calculatePrice(product);
}
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}
private double calculatePrice(String product) {
delay();
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
}
}
@Test
void getPriceAsync() {
Shop shop = new Shop();
long start = System.nanoTime();
Future<Double> myFavorite = shop.getPriceAsync("my favorite");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("invocation returned after " + invocationTime + " msecs");
System.out.println("doSomethingElse");
try {
Double price = myFavorite.get();
System.out.printf("Price is %.2f\n", price);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
}
์์ ์์ ์์ ์๋ฌ๋ฅผ ์ฌ๋ฐ๋ก ๊ด๋ฆฌํ๋ ๋ฐฉ๋ฒ์ ์ดํด๋ณด์
๊ฐ๊ฒฉ์ ๊ณ์ฐํ๋ ๋์ ์๋ฌ๊ฐ ๋ฐ์ํ๋ฉด ์ด๋ป๊ฒ ๋ ๊น? ์์ธ๊ฐ ๋ฐ์ํ๋ฉด ํด๋น ์ค๋ ๋์๋ง ์ํฅ์ ๋ฏธ์น๋ค.
์ฆ ์๋ฌ๊ฐ ๋ฐ์ํด๋ ๊ฐ๊ฒฉ ๊ณ์ฐ์ ๊ณ์ ์งํ๋๋ฉฐ ์ผ์ ์์๊ฐ ๊ผฌ์ธ๋ค. ํด๋ผ์ด์ธํธ๋ get()
์ด ๋ฐํ๋ ๋๊น์ง ์์ํ ๊ธฐ๋ค๋ฆฌ๊ฒ ๋ ์๋ ์๋ค.
์ด๋ฐ ์ํฉ์์๋ ํ์์์์ ํ์ฉํ๋ ๊ฒ์ด ์ข๋ค. ๊ทธ๋ฆฌ๊ณ ์๋ฌ๊ฐ ์ ๋ฐ์ํ๋์ง ์ ์ ์๋๋ก completeExceptionally
๋ฉ์๋๋ฅผ ์ด์ฉํด์ CompletableFuture
๋ด๋ถ์์ ๋ฐ์ํ ์์ธ๋ฅผ ํด๋ผ์ด์ธํธ๋ก ์ ๋ฌํ ์ ์๋ค.
@Test
@DisplayName("์ฌ๋ฌ Shop์ ๊ฐ๊ฒฉ์ ๊ณ์ฐํ ๋ ๋น๋ธ๋ก ์ฝ๋๋ก ์์ฑํ๊ธฐ")
void nonblock() {
List<Shop> shops = List.of(
new Shop("a"),
new Shop("b"),
new Shop("c"),
new Shop("d"),
new Shop("e")
);
long start = System.nanoTime();
String product = "iPhone";
List<String> single = shops.stream()
.map(shop -> String.format("%s price is %.2f", shop.name, shop.getPrice(product)))
.toList();
long duration = ((System.nanoTime() - start) / 1_000_000);
System.out.println("stream process duration " + duration + " msecs");
// ์์ ๊ฒฐ๊ณผ๋ ๊ฐ ์์ ๋ง๋ค 1์ด์ฉ ๋๋ ์ด๊ฐ ์กด์ฌํ์ฌ ์ต์ 5์ด ์ด์์ด๋ค.
// stream process duration 5041 msecs
long start1 = System.nanoTime();
List<String> blockingParallel = shops.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.name, shop.getPrice(product)))
.toList();
long duration1 = ((System.nanoTime() - start1) / 1_000_000);
System.out.println("blockingParallel process duration1 " + duration1 + " msecs");
// blockingParallel process duration1 1009 msecs
// ๋ฆฌ์คํธ์ CompletableFuture๋ ๊ฐ๊ฐ ๊ณ์ฐ ๊ฒฐ๊ณผ๊ฐ ๋๋ ์์ ์ ์ด๋ฆ ๋ฌธ์์ด์ ํฌํจํ๋ค.
// ํ์ง๋ง ํ์ํ ๋ฐํ ํ์
์ List<String>์ด๋ฏ๋ก ๋ชจ๋ CompletableFuture์ ๋์์ด ์๋ฃ๋๊ณ ๊ฒฐ๊ณผ๋ฅผ ์ถ์ถํ ๋ค์์ ๋ฆฌ์คํธ๋ฅผ ๋ฐํํด์ผ ํ๋ค.
// ์ฆ, ๋ฆฌ์คํธ์ ๋ชจ๋ CompletableFuture์ join์ ํธ์ถํด์ ๋ชจ๋ ๋์์ด ๋๋๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆฐ๋ค.
// CompletableFuture์ join๋ฉ์๋๋ Future์ธํฐํ์ด์ค์ get ๋ฉ์๋์ ๊ฐ์ ์๋ฏธ๋ฅผ ๊ฐ๋๋ค.
// ๋ค๋ง join์ ์๋ฌด ์์ธ๋ ๋ฐ์์ํค์ง ์๋๋ค๋ ์ ์ด ๋ค๋ฅด๋ค.
// ๋ฐ๋ผ์ map์ ๋๋ค ํํ์์ try/catch๋ก ๊ฐ์ ํ์๊ฐ ์๋ ๊ฒ์ด๋ค.
long start2 = System.nanoTime();
List<CompletableFuture<String>> futures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.name, shop.getPrice(product))))
.toList();
List<String> strings = futures.stream()
.map(CompletableFuture::join)
.toList();
long duration2 = ((System.nanoTime() - start2) / 1_000_000);
System.out.println("futures process duration2 " + duration2 + " msecs");
// futures process duration2 1010 msecs
}
์๋ถ๋ถ์ ์์ฐจ์ ์ผ๋ก ํ๊ฐ๋ฅผ ์งํํ๋ ๋จ์ผ ํ์ดํ๋ผ์ธ ์คํธ๋ฆผ ์ฒ๋ฆฌ ๊ณผ์ ์ ํํํ ๊ฒ์ด๋ค. (์ ์ ์ผ๋ก ํ์๋ ๋ถ๋ถ์ด ์คํธ๋ฆผ ์ฒ๋ฆฌ ๊ณผ์ ์ ํํํ ๊ฒ์ด๋ค.)
์ฆ, ์ด์ ์์ฒญ์ ์ฒ๋ฆฌ๊ฐ ์์ ํ ๋๋ ๋ค์์ ์๋ก ๋ง๋ CompletableFuture
๊ฐ ์ฒ๋ฆฌ๋๋ค.
๋ฐ๋ฉด ์๋์ชฝ์ ์ฐ์ CompletableFuture
๋ฅผ ๋ฆฌ์คํธ๋ก ๋ชจ์ ๋ค์์ ๋ค๋ฅธ ์์
๊ณผ๋ ๋
๋ฆฝ์ ์ผ๋ก ๊ฐ์์ ์์
์ ์ํํ๋ ๋ชจ์ต์ด๋ค.
๊ฒฐ๊ตญ CompletableFuture
๋ฅผ ๋ฆฌ์คํธ๋ก ๋จผ์ ๋ชจ์๋๊ณ ๋น๋๊ธฐ๋ก ํ์คํฌ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ ์๊ฐ์ ๋ฒ์ด๋์ ๊ฒ์ผ๋ก ๋ณผ ์ ์๋ค.
๋ง์ง๋ง futures
์ ์ฐ์ฐ์์ ํ๋์ ์คํธ๋ฆผ ํ์ดํ๋ผ์ธ์ผ๋ก ์ฒ๋ฆฌํ์ง ์๊ณ ๋ ๊ฐ์ ์คํธ๋ฆผ ํ์ดํ๋ผ์ธ์ผ๋ก ์ฒ๋ฆฌํ๋ค๋ ์ ์ ์ฃผ๋ชฉํด์ผ ํ๋ค.
์คํธ๋ฆผ ์ฐ์ฐ์ ๊ฒ์ผ๋ฅธ ํน์ฑ์ด ์์ผ๋ฏ๋ก ํ๋์ ํ์ดํ๋ผ์ธ์ผ๋ก ์ฐ์ฐ์ ์ฒ๋ฆฌํ๋ค๋ฉด ๋ชจ๋ ๊ฐ๊ฒฉ ์ ๋ณด ์์ฒญ ๋์์ด ๋๊ธฐ์ , ์์ฐจ์ ์ผ๋ก ์ด๋ฃจ์ด์ง๋ ๊ฒฐ๊ณผ๊ฐ ๋๋ค.
CompletableFuture
๋ก ๊ฐ ์์ ์ ์ ๋ณด๋ฅผ ์์ฒญํ ๋ ๊ธฐ์กด ์์ฒญ ์์
์ด ์๋ฃ๋์ด์ผ join
์ด ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํ๋ฉด์ ๋ค์ ์์ ์ผ๋ก ์ ๋ณด๋ฅผ ์์ฒญํ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค.
์คํธ๋ฆผ ๋ณ๋ ฌํ์ CompletableFuture ๋ณ๋ ฌํ
๋ณ๋ ฌ ์คํธ๋ฆผ์ผ๋ก ๋ณํํด์ ์ปฌ๋ ์ ์ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ๊ณผ ์ปฌ๋ ์ ์ ๋ฐ๋ณตํ๋ฉด์ CompletableFuture ๋ด๋ถ์ ์ฐ์ฐ์ผ๋ก ๋ง๋๋ ๋ฐฉ๋ฒ๊ณผ ์ด๋ค ๊ฒ์ ์ ํํด์ผ ํ ๊น?
- CompletableFuture๋ฅผ ์ด์ฉํ๋ฉด ์ ์ฒด์ ์ธ ๊ณ์ฐ์ด ๋ธ๋ก๋์ง ์๋๋ก ์ค๋ ๋ ํ์ ํฌ๊ธฐ๋ฅผ ์กฐ์ ํ ์ ์๋ค.
- I/O๊ฐ ํฌํจ๋์ง ์์ ๊ณ์ฐ ์ค์ฌ์ ๋์์ด๋ผ๋ฉด ์คํธ๋ฆผ ์ธํฐํ์ด์ค๊ฐ ๊ฐ์ฅ ๊ตฌํํ๊ธฐ ๊ฐ๋จํ๋ฉฐ ํจ์จ์ ์ผ ์ ์๋ค.
- I/O๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์์ ์ ๋ณ๋ ฌ๋ก ์คํํ ๋๋ CompletableFuture๊ฐ ๋ ๋ง์ ์ ์ฐ์ฑ์ ์ ๊ณตํ๋ฉฐ
W/C
์ ๋น์จ์ ์ ํฉํ ์ค๋ ๋ ์๋ฅผ ์ค์ ํ ์ ์๋ค.
์ค๋ ๋ ํ ํฌ๊ธฐ ์กฐ์
์๋ฐ ๋ณ๋ ฌ ํ๋ก๊ทธ๋๋ฐ(์ฑ )์์ ์ค๋ ๋ ํ์ ์ต์ ๊ฐ์ ์ฐพ๋ ๋ฐฉ๋ฒ์ ์ ์ํ๋ค.
์ค๋ ๋ ํ์ด ๋๋ฌด ํฌ๋ฉด CPU์ ๋ฉ๋ชจ๋ฆฌ ์์์ ์๋ก ๊ฒฝ์ํ๋๋ผ ์๊ฐ์ ๋ญ๋นํ ์ ์๋ค.
๋ฐ๋ฉด ์ค๋ ๋ ํ์ด ๋๋ฌด ์์ผ๋ฉด CPU์ ์ผ๋ถ ์ฝ์ด๋ ํ์ฉ๋์ง ์์ ์ ์๋ค.
threads = nCPU * uCPU * (1 + W/C)
nCPU๋Runtime.getRuntime().availableProcessors()
๊ฐ ๋ฐํํ๋ ์ฝ์ด์ ์
uCPU๋ 0๊ณผ 1์ฌ์ด์ ๊ฐ์ ๊ฐ๋ CPU ํ์ฉ ๋น์จ
W/C๋ ๋๊ธฐ์๊ฐ๊ณผ ๊ณ์ฐ์๊ฐ์ ๋น์จ
์์ ์์ ์์ ์ํ์ ์ฐพ๋ ์๊ฐ์ 99ํผ์ผํธ์ ์๊ฐ์ ๊ธฐ๋ค๋ฆฌ๋ฏ๋ก W/C ๋น์จ์ 100์ผ๋ก ๊ฐ์ฃผํ๊ณ , CPU ํ์ฉ๋ฅ ์ 100ํผ์ผํธ, ์ฌ์ฉํ ์ ์๋ ์ฝ์ด์ ์๋ 4๋ผ๋ฉด 400๊ฐ์ ์ค๋ ๋ ๋ฅผ ๊ฐ๋ ์ค๋ ๋ ํ์ ๋ง๋ค์ด์ผ ํ๋ค.
ํ์ง๋ง ์์ ์ ๋ณด๋ค ๋ง์ ์ค๋ ๋๋ฅผ ๊ฐ์ง๊ณ ์์ด ๋ด์ผ ์ฌ์ฉํ ๊ฐ๋ฅ์ฑ์ด ์ ํ ์์ผ๋ฏ๋ก ์์ ์๋ณด๋ค ๋ง์ ์ค๋ ๋๋ฅผ ๊ฐ๋ ๊ฒ์ ๋ญ๋น์ผ ๋ฟ์ด๋ค.
@Test
void discount() {
List<Shop> shops = List.of(
new Shop("a"),
new Shop("b"),
new Shop("c"),
new Shop("d"),
new Shop("e"),
new Shop("f"),
new Shop("g"),
new Shop("h")
);
final Executor executor = Executors.newFixedThreadPool(shops.size());
final String product = "iPhone";
List<String> collect = shops.stream()
.map(shop -> shop.getPriceOfCode(product))
.map(Quote::parse)
.map(Discount::applyDiscount)
.toList();
List<CompletableFuture<String>> collect1 = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPriceOfCode(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.toList();
for(CompletableFuture<String> future : collect1) {
assertThat(future.isDone()).isEqualTo(false);
}
List<String> list = collect1.stream()
.map(CompletableFuture::join)
.toList();
for (int i = 0; i < collect1.size(); i++) {
CompletableFuture<String> future = collect1.get(i);
assertThat(future.isDone()).isEqualTo(true);
String e = list.get(i);
assertThat(e).isNotEmpty();
}
}
List
์ ๋ด๊ฒจ์๋ CompletableFuture<String>
์ (join
์ด ํธ์ถ๋๊ธฐ ์ ์ ์ธ์ ํธ์ถ๋ ์ง๋ ๋ชจ๋ฅด์ง๋ง) ๋น๋๊ธฐ์ ์ผ๋ก ์์ ์์ ์ ๋ณด๋ฅผ ์กฐํํ๋ค.
thenApply()
๋ CompletableFuture
๊ฐ ๋๋ ๋ ๊น์ง ๋ธ๋กํ์ง ์๋๋ค. ์ฆ, CompletableFuture๊ฐ ๋์์ ์์ ํ ์๋ฃํ ๋ค์์ thenApply
๋ฉ์๋๋ก ์ ๋ฌ๋ ๋๋ค ํํ์์ ์ ์ฉํ ์ ์๋ค.
๋ฐ๋ผ์ CompletableFuture<String>
์ CompletableFuture<Quote>
๋ก ๋ณํํ๋ค.
์์์ ์กฐํฉํ thenCompose()
๋ Async ๋ฒ์ ๋ ์กด์ฌํ๋ค.
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(defaultExecutor(), fn);
}
Aysnc๋ก ๋๋์ง ์๋ ๋ฉ์๋๋ ์ด์ ์์
์ ์ํํ ์ค๋ ๋์ ๊ฐ์ ์ค๋ ๋์์ ์์
์ ์คํํจ์ ์๋ฏธ ํ๋ฉฐ Async๋ก ๋๋๋ ๋ฉ์๋๋ ๋ค์ ์์
์ด ๋ค๋ฅธ ์ค๋ ๋์์ ์คํ๋๋๋ก ์ค๋ ๋ ํ๋ก ์์
์ ์ ์ถํ๋ค.
์์ ์์ ์์๋ ์ค๋ ๋ ์ ํ ์ค๋ฒํค๋๋ฅผ ์ ๊ฒ ๋ฐ์์ํค๊ธฐ ์ํด thenCompose
๋ฅผ ์ฌ์ฉํ๋ค.
์์์๋ thenCompose
๋ฅผ ํตํด ์ฒซ ๋ฒ์งธ CompletableFuture๊ฐ ๋๋๋ฉด ๊ทธ ๊ฒฐ๊ด๊ฐ์ ๊ฐ์ง๊ณ ๋ค์ CompletableFuture๋ฅผ ์คํ์์ผ ๋ณด์๋ค.
ํ์ง๋ง ๋ ๊ฐ์ CompletableFuture๋ฅผ ๋
๋ฆฝ์ ์ผ๋ก ์คํํ๊ณ ํฉ์น๊ณ ์ถ๋ค๋ฉด ์ด๋ป๊ฒ ํด์ผํ ๊น?
์๋ฅผ ๋ค์ด, ํ ์จ๋ผ์ธ ์์ ์์ ๊ณ ๊ฐ์๊ฒ ์ ๋ก ๊ฐ๊ฒฉ์ ๋ณด์ฌ์ค ๋ ๋ฌ๋ฌ ๊ฐ๊ฒฉ๊ณผ ํ์จ์ ์ธ๋ถ ์๋น์ค๋ฅผ ํตํด ์ฌ๊ณตํด์ค์ผ ํ๋ค๊ณ ์๊ฐํด๋ณด์.
@Test
void futurePrice() {
final Shop shop = new Shop("a");
final String product = "iPhone";
CompletableFuture<Double> doubleCompletableFuture = CompletableFuture
.supplyAsync(() -> shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(() -> getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate
);
Double result = doubleCompletableFuture.join();
}
@Test
void java7_futurePrice() {
final Shop shop = new Shop("a");
final String product = "iPhone";
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Double> futureRate = executorService.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
return getRate(Money.EUR, Money.USD);
}
});
Future<Double> futureResult = executorService.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
double priceInUSD = shop.getPrice(product);
return priceInUSD * futureRate.get();
}
});
}
thenCombine
์๋ Aysnc ๋์์ ์ง์ํ๋ ๋ฉ์๋ thenCombineAsync
๋ฅผ ์ ๊ณตํ๋ฉฐ ๋ ๋ฒ์งธ ์ธ์๋ก ๋ฐ๋ BiFunction
(CompletableFuture ๊ฒฐ๊ณผ๋ฅผ ์ด๋ป๊ฒ ํฉ์น ์ง ์ ์)๋ฅผ ์ค๋ ๋ ํ๋ก ์ ์ถ๋๋ฉด์ ๋ณ๋์ ํ์คํฌ์์ ๋น๋๊ธฐ์ ์ผ๋ก ์ํ๋๋ค.
๊ทธ๋ฆฌ๊ณ ์๋ฐ 7์์ ์ฌ์ฉํ ๋ฐฉ๋ฒ๊ณผ ๋น๊ตํ๋ฉด CompletableFuture๋ฅผ ํตํด ๋ณต์กํ ์ฐ์ฐ ์ํ ๋ฐฉ๋ฒ์ ํจ๊ณผ์ ์ผ๋ก ์ฝ๊ฒ ์ ์ํ ์ ์๋ ๊ฒ์ ์ ์ ์๋ค.
Future์ ๊ณ์ฐ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ค๋ฆด๋๋ ๋ฌดํ์ ๊ธฐ๋ค๋ฆฌ๋ ์ํฉ์ด ๋ฐ์ํ ์ ์์ผ๋ฏ๋ก ๋ธ๋ก์ ํ์ง ์๋ ๊ฒ์ด ์ข๋ค.
์๋ฐ 9์์ ์ถ๊ฐ๋ ํ์์์์ ์ฌ์ฉํ ์ ์๋ค.
@Test
void futurePrice() {
final Shop shop = new Shop("a");
final String product = "iPhone";
CompletableFuture<Double> doubleCompletableFuture = CompletableFuture
.supplyAsync(() -> shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(() -> getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate
)
.orTimeout(3, TimeUnit.SECONDS);
Double result = doubleCompletableFuture.join();
}
orTimeout
๋ฉ์๋๋ ์ง์ ๋ ์๊ฐ์ด ์ง๋ ํ์ CompletableFuture๋ฅผ TimeoutException
์ผ๋ก ์๋ฃํ๋ฉด์ ๋ ๋ค๋ฅธ CompletableFuture๋ฅผ ๋ฐํํ ์ ์๋๋ก ๋ด๋ถ์ ์ผ๋ก ScheduledThreadExecutor๋ฅผ ํ์ฉํ๋ค.
์ด ๋ฉ์๋๋ฅผ ์ด์ฉํ๋ฉด ๊ณ์ฐ ํ์ดํ๋ผ์ธ์ ์ฐ๊ฒฐํ๊ณ ์ฌ๊ธฐ์ TimeoutException
์ด ๋ฐ์ํ์ ๋ ์ฌ์ฉ์๊ฐ ์ฝ๊ฒ ์ดํดํ ์ ์๋ ๋ฉ์์ง๋ฅผ ์ ๊ณตํ ์ ์๋ค.
๊ทธ๋ฆฌ๊ณ completeOnTimeout()
๋ฉ์๋๋ฅผ ํตํด ํ์์์ ๋ฐ์ ์ ๊ธฐ๋ณธ๊ฐ์ ์ง์ ํ์ฌ ๋ค์ CompletableFuture์ ๋ํ ํ์คํฌ๋ ๊ณ์ ์งํ๋๋๋ก ํ ์ ์๋ค.
@Test
void futurePrice() {
final Shop shop = new Shop("a");
final String product = "iPhone";
CompletableFuture<Double> doubleCompletableFuture = CompletableFuture
.supplyAsync(() -> shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(() -> getRate(Money.EUR, Money.USD))
.completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
(price, rate) -> price * rate
)
.orTimeout(3, TimeUnit.SECONDS);
Double result = doubleCompletableFuture.join();
}
๋ง์ฝ ์ฌ๋ฌ ์์ ์์ ๊ฐ๊ฒฉ์ ์กฐํํ ๋ ์ผ๋ถ ์์ ์ ์๋ต์ ๋น ๋ฅด๊ฒ ์คฌ๋ค๊ณ ๊ฐ์ ํ์ ๋ ๋ชจ๋ ์์ ์์ ๊ฐ๊ฒฉ ์กฐํ๊ฐ ๋๋ฌ์ ๋ ๊น์ง ๊ฐ๋๋ฆฌ์ง ์๊ณ ๊ฐ๊ฒฉ ์ ๋ณด ์๋ต์ด ๋๋ ๋ ๋ง๋ค ์ฆ์ ๋ณด์ฌ์ค ์ ์๋ค๋ฉด ์ฌ์ฉ์ ๊ฒฝํ์ด ๋ ์ข์์ง ๊ฒ์ด๋ค.
@Test
void findPricesStream() {
List<Shop> shops = List.of(
new Shop("a"),
new Shop("b"),
new Shop("c"),
new Shop("d"),
new Shop("e")
);
final String product = "iPhone";
ExecutorService executorService = Executors.newCachedThreadPool();
long start = System.nanoTime();
CompletableFuture[] futures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceOfCodeAndRandomDelay(product), executorService))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(futureQuote ->
CompletableFuture.supplyAsync(() -> Discount.applyDiscount(futureQuote), executorService))
)
.map(future -> future.thenAccept(it -> System.out.printf("%s (done in %s msecs)\n", it, (System.nanoTime() - start) / 1_000_000)))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
// ์ฒซ ๋ฒ์จฐ
// d price is 68.425 (done in 2777 msecs)
// a price is 81.605 (done in 2887 msecs)
// e price is 118.61 (done in 2910 msecs)
// b price is 73.967 (done in 3169 msecs)
// c price is 154.60649999999998 (done in 3350 msecs)
// ๋ ๋ฒ์งธ
// c price is 162.22 (done in 2558 msecs)
// e price is 135.43200000000002 (done in 2588 msecs)
// a price is 132.752 (done in 2762 msecs)
// d price is 114.08800000000001 (done in 3188 msecs)
// b price is 125.229 (done in 3345 msecs)
}
์์ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด shop๋ง๋ค ๋ฐํ๋๋ ์์์ ์๊ฐ์ด ์๋ก ๋ค๋ฅด๊ณ ๋จผ์ ์๋ตํ๋ ์ ๋ณด๋ถํฐ ์ถ๋ ฅ๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
thenAccept()
๋ฉ์๋๋ฅผ ํตํด CompletableFuture์ ๊ณ์ฐ์ด ๋๋๋ฉด ๊ฐ์ ์๋นํ๋ ์์
์ Consumer ์ธ์๋ฅผ ํตํด ์ง์ ํ ์ ์๋ค.
๋ค๋ฅธ ๊ฒ๊ณผ ๋ง์ฐฌ๊ฐ์ง๋ก thenAceeptAsync()
๋ ์กด์ฌํ๋ค.
์ถ๊ฐ์ ์ผ๋ก allOf
๋ฅผ ํตํด CompletableFuture<Void>
ํ์
์ ๋ชจ๋ ํ์คํฌ๊ฐ ์คํ ์๋ฃ๋๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๊ฒ ํ ์ ์๋ค.
๋ฐ๋๋ก anyOf
๋ฅผ ํตํด ์ฒ๋ฆ์ผ๋ก ์๋ฃํ CompletableFuture<Object>
ํ์
์ ๋ฐํ๋ฐ์ ์ ์๋ค.
์์ ๋ชจ๋ ์์ ๋ ์ฌ๊ธฐ์ ํ์ธํ ์ ์๋ค.
- runAsync() - ๋ฆฌํด ๊ฐ์ด ์๋ ๊ฒฝ์ฐ
- supplyAsync() - ๋ฆฌํด ๊ฐ์ด ์๋ ๊ฒฝ์ฐ
Supplier
๋ฅผ ์ธ์๋ก ๋ฐ์ ๋น๋๊ธฐ๋ก ์คํํด์CompletableFuture
๋ฅผ ๋ฐํํ๋ค.- ForkJoinPool์ Executor ์ค ํ๋๊ฐ
Supplier
๋ฅผ ์คํํ ๊ฒ์ด๋ค.
- ์ํ๋ Executor(Thread Pool)๋ฅผ ์ฌ์ฉํด์ ์คํํ ์๋ ์๋ค.
- ๊ธฐ๋ณธ์ ForkJoinPool.commonPool()
- ForkJoinPool - JAVA7
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
completableFuture.complete("test");
// ์์ ๋์ผํ ์ฝ๋์ด๋ค.
CompletableFuture<String> completableFuture1 = CompletableFuture.completedFuture("test");
// runAsync - ๋ฆฌํด ๊ฐ์ด ์๋ ๊ฒฝ์ฐ
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
});
completableFuture2.get();
// ์ถ๋ ฅ
// Hello ForkJoinPool.commonPool-worker-3
// supplyAsync() - ๋ฆฌํด ๊ฐ์ด ์๋ ๊ฒฝ์ฐ
CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
return "return Value!!!";
});
System.out.println(completableFuture3.get());
// ์ถ๋ ฅ
// return Value!!!
}
- ๋ฆฌํด ๊ฐ์ ๋ฐ์์ ๋ค๋ฅธ ๊ฐ์ผ๋ก ๋ฐ๊พธ๋ ์ฝ๋ฐฑ
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Return : " + Thread.currentThread().getName());
return "return Value!!!";
}).thenApply((s) -> {
System.out.println("Then Apply : " + Thread.currentThread().getName());
return s.toUpperCase();
});
// get์ ํธ์ถํ์ง ์์ผ๋ฉด ์๋ฌด์ผ๋ ์ผ์ด๋์ง ์๋๋ค.
String s = completableFuture.get();
System.out.println(s);
// ์ถ๋ ฅ
// Return : ForkJoinPool.commonPool-worker-3
// Then Apply : ForkJoinPool.commonPool-worker-3
// RETURN VALUE!!!
}
- ๋ฆฌํด ๊ฐ์ผ๋ก ๋ ๋ค๋ฅธ ์์ ์ ์ฒ๋ฆฌํ๋ ์ฝ๋ฐฑ (๋ฆฌํด์์ด)
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Return : " + Thread.currentThread().getName());
return "return Value!!!";
}).thenAccept((s) -> {
System.out.println("Then Accept : " + Thread.currentThread().getName());
System.out.println("Then Accept To UpperCase : " + s.toUpperCase());
});
completableFuture.get();
// ์ถ๋ ฅ
// Return : ForkJoinPool.commonPool-worker-3
// Then Accept : ForkJoinPool.commonPool-worker-3
// Then Accept To UpperCase : RETURN VALUE!!!
}
- ๋ค๋ฅธ ์์ ์ ์ฒ๋ฆฌํ๋ ์ฝ๋ฐฑ
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Return : " + Thread.currentThread().getName());
return "return Value!!!";
}).thenRun(() -> {
// Runnable
System.out.println("Then Run : " + Thread.currentThread().getName());
});
completableFuture.get();
// ์ถ๋ ฅ
// Return : ForkJoinPool.commonPReturn : ForkJoinPool.commonPool-worker-3
// Then Run : ForkJoinPool.commonPool-worker-3
}
- ForkJoinPool์ ์ฌ์ฉํ์ง ์๊ณ ๊ฐ๋ฐ์๊ฐ ์ง์ ๋ง๋ Thread๋ฅผ ์ ๊ณตํ ์๋ ์๋ค.
- ExecutorService๋ฅผ ์๋ก ์์ฑํ์ฌ ๋งค๊ฐ๋ณ์๋ก ์ ๋ฌํ๋ฉด ๋๋ค.
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Return : " + Thread.currentThread().getName());
return "return Value!!!";
} , executorService).thenRun(() -> {
// Runnable
System.out.println("Then Run : " + Thread.currentThread().getName());
});
completableFuture.get();
executorService.shutdown();
// ์ถ๋ ฅ
// Return : pool-1-thread-1
// Then Run : pool-1-thread-1
}
- ๋ ์์ ์ด ์๋ก ์ด์ด์ ์คํํ๋๋ก ์กฐํฉ
- ์ฒซ ๋ฒ์งธ ์ฐ์ฐ์ ๊ฒฐ๊ณผ๋ฅผ ๋ ๋ฒ์งธ ์ฐ์ฐ์ผ๋ก ์ ๋ฌํ๋ค.
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello ";
});
// hello.thenCompose(s -> getWorld(s));
CompletableFuture<String> helloWorld =
hello.thenCompose(AppForCompletableFuture::getWorld);
System.out.println(helloWorld.get());
// ์ถ๋ ฅ
// Hello : ForkJoinPool.commonPool-worker-3
// World : ForkJoinPool.commonPool-worker-5
// Hello World
}
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return message + " World";
});
}
- ๋ ์์ ์ ๋ ๋ฆฝ์ ์ผ๋ก ์คํํ๊ณ ๋ ๋ค ์ข ๋ฃ ํ์ ๋ ์ฝ๋ฐฑ ์คํ
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello ";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<String> result = hello.thenCombine(world, (h , w) -> h + " " + w);
System.out.println(result.get());
// ์ถ๋ ฅ
// Hello : ForkJoinPool.commonPool-worker-3
// World : ForkJoinPool.commonPool-worker-5
// Hello World
}
- ์ฌ๋ฌ ์์ ์ ๋ชจ๋ ์คํํ๊ณ ๋ชจ๋ ์์ ๊ฒฐ๊ณผ์ ์ฝ๋ฐฑ ์คํ
- allOf๋ฅผ ์ฌ์ฉํ์ฌ ์์ ์ ๊ฒฐ๊ณผ๋ฅผ List๋ก ๋ฐํ๋ฐ๊ธฐ
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello ";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
List<CompletableFuture> futures = Arrays.asList(hello , world);
CompletableFuture[] futuresArray
= futures.toArray(new CompletableFuture[futures.size()]);
// ๊ฒฐ๊ณผ ํ์
๋ค์ด ๋ชจ๋ ๋์ผํด์ผํ๋ค.
CompletableFuture<List<Object>> listCompletableFuture =
CompletableFuture.allOf(futuresArray).thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
listCompletableFuture.get().forEach(System.out::println);
// ์ถ๋ ฅ
// Hello : ForkJoinPool.commonPool-worker-3
// World : ForkJoinPool.commonPool-worker-5
// Hello
// World
}
- ์ฌ๋ฌ ์์ ์ค์ ๊ฐ์ฅ ๋นจ๋ฆฌ ๋๋ ํ๋์ ๊ฒฐ๊ณผ์ ์ฝ๋ฐฑ ์คํ
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello ";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<Void> future =
CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
future.get();
// ์ถ๋ ฅ
// Hello : ForkJoinPool.commonPool-worker-3
// World : ForkJoinPool.commonPool-worker-5
// Hello
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if(throwError){
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello ";
}).exceptionally(exceptionType -> {
System.out.println("Exception Type : " + exceptionType);
return "Error!";
});
System.out.println(hello.get());
// ์ถ๋ ฅ
// Exception Type : java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
// Error!
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if(throwError){
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello ";
}).handle((result , exceptionType) -> {
// ์ฒซ๋ฒ์งธ ํ๋ผ๋ฏธํฐ - ์ ์์ ์ธ ๊ฒฝ์ฐ ๋ฐํ๋๋ ๊ฒฐ๊ณผ ๊ฐ
// ๋๋ฒ์งธ ํ๋ผ๋ฏธํฐ - ์์ธ ๋ฐ์์ ์์ธ
if(exceptionType != null){
System.out.println("Exception Type : " + exceptionType);
return "ERROR !!!";
}
return result;
});
System.out.println(hello.get());
// ์์ธ ๋ฐ์ ์ "ERROR !!!" ๋ฅผ ๋ฐํ
// ์์ธ๊ฐ ๋ฐ์ํ์ง ์์์ ์ "Hello" ๋ฐํ
}