Skip to content

Commit

Permalink
agent heartbeat message -> event connection is not closed !
Browse files Browse the repository at this point in the history
  • Loading branch information
nohaapav committed Nov 16, 2017
1 parent c31e2d8 commit fec708b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
10 changes: 9 additions & 1 deletion src/clj/swarmpit/agent.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:import (clojure.lang ExceptionInfo))
(:require [immutant.scheduling :refer :all]
[clojure.tools.logging :as log]
[swarmpit.handler-events :as events]
[swarmpit.api :as api]))

(defn- autoredeploy-job
Expand All @@ -24,8 +25,15 @@
(catch ExceptionInfo e
(log/error "Service" id "autoredeploy failed! " (ex-data e))))))))

(defn- event-heartbeat-job
[]
(events/broadcast {:From "SWARMPIT" :Message "heartbeat"}))

(defn init
[]
(schedule autoredeploy-job
(-> (in 1 :minutes)
(every 60 :second))))
(every 60 :second)))
(schedule event-heartbeat-job
(-> (in 30 :seconds)
(every 30 :second))))
5 changes: 3 additions & 2 deletions src/clj/swarmpit/authorization.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
:handler {:and [authenticated-access admin-access]}}
{:pattern #"^/login$"
:handler any-access}
{:pattern #"^/events"
:handler any-access}
{:pattern #"^/events"
:request-method :post
:handler any-access}
{:pattern #"^/version$"
:handler any-access}
{:pattern #"^/$"
Expand Down
18 changes: 10 additions & 8 deletions src/clj/swarmpit/handler_events.clj
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
(ns swarmpit.handler-events
(:require [org.httpkit.server :refer [run-server with-channel on-close send! close]]
[immutant.scheduling :refer :all]
[cheshire.core :refer [generate-string]]
[swarmpit.handler :refer [dispatch resp-accepted resp-error]]))

(def channel-hub (atom {}))

(defn broadcast [message]
(doseq [channel (keys @channel-hub)]
(send! channel message false)))
(defn broadcast [data]
(let [message (str "data: " (generate-string data) "\n\n")]
(doseq [channel (keys @channel-hub)]
(send! channel message false))))

(defmethod dispatch :events [_]
(fn [request]
(with-channel request channel
(send! channel {:status 200
:headers {"Content-Type" "text/event-stream"
"Cache-Control" "no-cache"
"Connection" "keep-alive"}
:headers {"Content-Type" "text/event-stream"
"Cache-Control" "no-cache"
"Connection" "keep-alive"}
:body ":ok\n\n"} false)
(swap! channel-hub assoc channel request)
(on-close channel (fn [_]
(swap! channel-hub dissoc channel))))))

(defmethod dispatch :event-push [_]
(fn [{:keys [params]} ]
(fn [{:keys [params]}]
(if (some? params)
(do
(broadcast (str "data: " (generate-string params) "\n\n"))
(broadcast params)
(resp-accepted (str "Broadcasted to " (count @channel-hub) " clients")))
(resp-error 400 "No data send"))))

0 comments on commit fec708b

Please sign in to comment.