-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
core.clj
94 lines (76 loc) · 2.61 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
(ns libpinkas-clj.core
(:require [org.httpkit.client :as http]
[cheshire.core :refer [generate-string parse-string]]
[clojure.string :as s]
[clojure.walk :refer [postwalk]]
[clojure.core.async
:refer [timeout go-loop <! >!]
:include-macros true]))
(defonce ^:private reject-repo (atom #{}))
(defonce ^:private deref-timeout 2000)
(defonce ^:private default-interval 10)
(defn- +with [f hsh]
(f hsh @reject-repo))
(def ^:private remove-> (partial +with remove))
(def ^:private ifsome? (partial +with some))
(defprotocol Service
(describe [this])
(discover [this])
(register [this])
(deregister [this]))
(defn- with-ctx [path loc]
(if (s/ends-with? path "/")
(str path loc)
(str path "/" loc)))
(defn- keyword->capital [x]
(let [words (s/split (name x) #"-")
capitalwords (map #(s/capitalize %) words)]
(s/join capitalwords)))
(defn- transform [orig]
(postwalk (fn [k] (if (keyword? k) (keyword->capital k) k))
orig))
(defn- deref-with-default [ref]
(deref ref deref-timeout false))
(defn- with-http
([]
(fn [^String x] (http/request {:url x})))
([b]
(fn [^String x] (http/request {:method :put
:url x
:body (generate-string b)}))))
(def ^:private transform-http
(comp with-http transform))
(defn- exec [^String path ^String loc m]
(when-let [resp (-> path
(with-ctx loc)
m
deref-with-default)]
(let [body (:body resp)]
(when-not (empty? body)
(parse-string body true)))))
(defn- with-hash [val inner]
(hash (get-in val inner)))
(defn service [path info & ops]
(reify Service
(describe [this]
{:info info
:hash (with-hash info [:service :id])
:path path
:ops ops})
(discover [this]
(exec path (str "service/" (-> info :service :service)) (with-http)))
(register [this]
(let [hsh (with-hash info [:service :id])
{:keys [interval] :or {interval default-interval}} ops
beat (fn [] (exec path "register" (transform-http info)))
added (beat)]
(swap! reject-repo (fn [_] (remove-> #(= hsh %))))
(when added (go-loop []
(<! (timeout (* interval 1000)))
(when (not (ifsome? #(= hsh %)))
(beat)
(recur))))
added))
(deregister [this]
(swap! reject-repo #(conj % (with-hash info [:service :id])))
(exec path "deregister" (transform-http info)))))