Skip to content

Commit

Permalink
⭐ GenStage를 사용해 초당 10개씩 요청하는 rate limiter 소비자 추가
Browse files Browse the repository at this point in the history
- 생산자는 요구 개수 밑으로 랜덤하게 숫자를 생성
  - [1, demand]
- 소비자는 1초당 10개씩 소비를 하며 콘솔에 event를 출력
  - 이후 upbit API를 사용해 시세 API 함수 호출할 예정
  • Loading branch information
ohyecloudy committed Jan 16, 2021
1 parent 35c8086 commit b39fc08
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 13 deletions.
24 changes: 11 additions & 13 deletions lib/bitcoin_price_scraper.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
defmodule BitcoinPriceScraper do
@moduledoc """
Documentation for `BitcoinPriceScraper`.
"""
alias BitcoinPriceScraper.{QuotationDemander, RateLimiter}

@doc """
Hello world.
def scrap() do
{:ok, producer} = QuotationDemander.start_link(1)
{:ok, consumer} = RateLimiter.start_link()

## Examples
iex> BitcoinPriceScraper.hello()
:world
"""
def hello do
:world
GenStage.sync_subscribe(consumer,
to: producer,
# 시세(quotation) API 요청수 제한
# 초당 10, 분당 600
# https://docs.upbit.com/docs/user-request-guide
limits_per_second: 10
)
end
end
19 changes: 19 additions & 0 deletions lib/bitcoin_price_scraper/quotation_demander.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule BitcoinPriceScraper.QuotationDemander do
use GenStage

def start_link(number) do
GenStage.start_link(__MODULE__, number)
end

def init(counter) do
{:producer, counter}
end

def handle_demand(demand, counter) when demand > 0 do
# 이벤트 요구 개수 이하를 랜덤하게 생산한다
# [1, demand]
demand = :rand.uniform(demand)
events = Enum.to_list(counter..(counter + demand - 1))
{:noreply, events, counter + demand}
end
end
50 changes: 50 additions & 0 deletions lib/bitcoin_price_scraper/rate_limiter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# https://hexdocs.pm/gen_stage/GenStage.html 예제 코드를 참고함

defmodule BitcoinPriceScraper.RateLimiter do
use GenStage

def start_link() do
GenStage.start_link(__MODULE__, :ok)
end

def init(_) do
{:consumer, %{}}
end

def handle_subscribe(:producer, opts, from, producers) do
limits_per_second = Keyword.fetch!(opts, :limits_per_second)

producers =
producers
|> Map.put(from, limits_per_second)
|> ask_and_schedule(from)

# :manual을 리턴해 생산자(producer)에 요구(demand)를 보내는 걸 직접 컨트롤한다.
{:manual, producers}
end

def handle_events(events, _from, producers) do
# consume!
IO.puts("#{inspect(NaiveDateTime.utc_now())}: #{inspect(events, charlists: true)}")

{:noreply, [], producers}
end

def handle_info({:ask, from}, producers) do
{:noreply, [], ask_and_schedule(producers, from)}
end

defp ask_and_schedule(producers, from) do
case producers do
%{^from => limits_per_second} ->
# 이벤트를 요구한다. :manual 모드일 때는 GenStage.ask/2 함수를 호출해서 직접 요구해야 한다
GenStage.ask(from, limits_per_second)
# 초당 호출 개수 제한이 있으므로 1초 스케쥴링을 한다
Process.send_after(self(), {:ask, from}, :timer.seconds(1))
producers

%{} ->
producers
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule BitcoinPriceScraper.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:gen_stage, "~> 1.0"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
Expand Down
3 changes: 3 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
%{
"gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"},
}

0 comments on commit b39fc08

Please sign in to comment.