Skip to content

Commit

Permalink
🐛 upbit 시세에 대해 새로운 요청과 재시도 요청을 분리한다
Browse files Browse the repository at this point in the history
- producer의 event를 처리하는 함수에서 재시도 요청을 같이 처리하면
  마지막 이벤트 리스트에서 실패하면 재시도 요청을 안 보낸다
  - 최소 1개 요청을 producer에 보내게 했지만 producer가 생산할 이벤트
    자체가 없으면 함수 자체가 호출 안 됨
- producer로부터 받은 새로운 요청과 재시도 요청을 처리하는 함수와
  스케쥴링을 분리
  - producer가 이벤트를 발행하지 않아도 이전에 실패해 재시도를 해야
    하는 요청을 처리한다
  • Loading branch information
ohyecloudy committed Feb 14, 2021
1 parent eafd77c commit 367d350
Showing 1 changed file with 40 additions and 24 deletions.
64 changes: 40 additions & 24 deletions lib/bitcoin_price_scraper/rate_limiter.ex
Expand Up @@ -40,31 +40,14 @@ defmodule BitcoinPriceScraper.RateLimiter do
def handle_events(events, from, producers) do
IO.puts("handle_events - #{to_string(NaiveDateTime.utc_now())}, count: #{Enum.count(events)}")

if not Enum.empty?(producers[from].pending) do
IO.puts(
"retry count: #{Enum.count(producers[from].pending)}, detail: #{
inspect(producers[from].pending)
}"
)
end

# 이전에 실패한 candle 조회 요청을 보낸다
:telemetry.execute(
[:upbit, :quotation, :request, :retry],
%{count: Enum.count(producers[from].pending)}
)

{_success, pending} = request_candles(producers[from].pending)

:telemetry.execute([:upbit, :quotation, :request, :new], %{count: Enum.count(events)})
{_success, failed} = request_candles(events)

producers =
Map.update!(producers, from, fn exist ->
# 이전에 실패한 candle 조회 요청 중 실패한 요청과
# producer로 부터 받은 이벤트 중 실패한 목록을 업데이트해서
# 다음에 시도할 수 있게 한다.
%{exist | pending: pending ++ failed}
# 이 함수에서는 새로운 요청을 처리할 뿐, 이전에 실패한 요청을 처리하지 않는다.
# 실패한 요청을 다음에 시도할 수 있게 추가한다
%{exist | pending: exist.pending ++ failed}
end)

{:noreply, [], producers}
Expand All @@ -74,6 +57,10 @@ defmodule BitcoinPriceScraper.RateLimiter do
{:noreply, [], ask_and_schedule(producers, from)}
end

def handle_info({:retry, from}, producers) do
{:noreply, [], retry_events(producers, from)}
end

defp request_candles(events) do
events
|> Enum.split_with(fn e ->
Expand Down Expand Up @@ -119,16 +106,45 @@ defmodule BitcoinPriceScraper.RateLimiter do
defp ask_and_schedule(producers, from) do
case producers do
%{^from => %{limits_per_second: limits_per_second, pending: pending}} ->
# 이벤트를 요구한다. :manual 모드일 때는 GenStage.ask/2 함수를 호출해서 직접 요구해야 한다
# 실패해서 다음에 시도해야 할 이벤트 개수를 초당 요청 가능한 개수에서 뺀 만큼 요청한다
# 단, 0이면 handle_events 함수 호출이 안 되므로 최소 1개를 요청한다
GenStage.ask(from, max(limits_per_second - Enum.count(pending), 1))
GenStage.ask(from, max(limits_per_second - Enum.count(pending), 0))
# 초당 호출 개수 제한이 있으므로 1초 스케쥴링을 한다
Process.send_after(self(), {:ask, from}, :timer.seconds(1))

if pending > 0 do
Process.send_after(self(), {:retry, from}, :timer.seconds(1))
end

producers

%{} ->
producers
end
end

defp retry_events(producers, from) do
if not Enum.empty?(producers[from].pending) do
IO.puts(
"retry count: #{Enum.count(producers[from].pending)}, detail: #{
inspect(producers[from].pending)
}"
)

# 이전에 실패한 candle 조회 요청을 보낸다
:telemetry.execute(
[:upbit, :quotation, :request, :retry],
%{count: Enum.count(producers[from].pending)}
)

{_success, pending} = request_candles(producers[from].pending)

producers =
Map.update!(producers, from, fn exist ->
%{exist | pending: pending}
end)

producers
else
producers
end
end
end

0 comments on commit 367d350

Please sign in to comment.