Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Fixes #437.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Drogalis committed Dec 14, 2015
1 parent 02a9146 commit 68ce9f4
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions src/onyx/api.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns onyx.api
(:require [clojure.string :refer [split]]
[clojure.core.async :refer [chan alts!! >!! <!! close! alts!! timeout go]]
[clojure.core.async :refer [chan alts!! >!! <!! close! alts!! timeout go poll!]]
[com.stuartsierra.component :as component]
[taoensso.timbre :refer [info warn fatal error]]
[onyx.log.entry :refer [create-log-entry]]
Expand Down Expand Up @@ -256,6 +256,20 @@
:else
(recur v)))))))

(defn ^{:no-doc true} restart-peer [peer-system shutdown-ch]
(let [result
(try
(let [[v ch] (alts!! [shutdown-ch (timeout 0)] :priority true)]
(when (not= ch shutdown-ch)
(component/start peer-system)))
(catch Throwable t
(warn t)
{:retry? true
:system (component/stop peer-system)}))]
(if (and (map? result) (:retry? result))
(recur (:system peer-system) shutdown-ch)
result)))

(defn ^{:no-doc true} peer-lifecycle [started-peer config shutdown-ch ack-ch]
(try
(loop [live @started-peer]
Expand All @@ -268,9 +282,9 @@
(= ch restart-ch)
(do (component/stop live)
(Thread/sleep (or (:onyx.peer/retry-start-interval config) 2000))
(let [live (component/start live)]
(reset! started-peer live)
(recur live)))
(when-let [live (restart-peer live shutdown-ch)]
(do (reset! started-peer live)
(recur live))))
:else (throw (ex-info "Read from a channel with no response implementation" {})))))
(catch Throwable e
(fatal "Peer lifecycle threw an exception")
Expand Down

0 comments on commit 68ce9f4

Please sign in to comment.