forked from pingles/curator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
discovery.clj
124 lines (107 loc) · 4.84 KB
/
discovery.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
(ns ^{:doc "Namespace for service discovery"} curator.discovery
(:require [clojure.edn :as edn]
[curator.framework :refer (time-units)])
(:import [org.apache.curator.x.discovery ServiceDiscovery ServiceDiscoveryBuilder ServiceInstance ServiceType UriSpec ProviderStrategy DownInstancePolicy ServiceProvider ServiceCache]
[org.apache.curator.x.discovery.details InstanceSerializer JsonInstanceSerializer InstanceProvider]
[org.apache.curator.x.discovery.strategies RandomStrategy RoundRobinStrategy StickyStrategy]
[java.io ByteArrayInputStream InputStreamReader PushbackReader]))
(defmacro dotonn [x & forms]
(let [gx (gensym)]
`(let [~gx ~x]
~@(map (fn [f]
(if (seq? f)
`(when ~@(next f)
(~(first f) ~gx ~@(next f)))
`(~f ~gx)))
forms)
~gx)))
(defn uri-spec*
"Creates a templated UriSpec from a string format.
Example: e.g. \"{scheme}://foo.com:{port}\"
Substitutions can include: scheme, name, id, address,
port, ssl-port, registration-time-utc, service-type"
[s]
(UriSpec. s))
(defn ^ServiceInstance service-instance
"name: my-service
uri-spec: \"{scheme}://foo.com:{port}\"
port: 1234
payload is serialized using json, only supports strings for now"
[name uri-spec port & {:keys [id address ssl-port service-type payload]}]
{:pre [(or (nil? payload) (string? payload))]}
(let [service-types {:dynamic ServiceType/DYNAMIC
:static ServiceType/STATIC
:permanent ServiceType/PERMANENT}]
(-> (dotonn (ServiceInstance/builder)
(.payload payload)
(.name name)
(.id id)
(.address address)
(.port port)
(.sslPort ssl-port)
(.uriSpec (uri-spec* uri-spec))
(.serviceType (service-types service-type)))
(.build))))
(defn uri [^ServiceInstance service-instance]
(.buildUriSpec service-instance))
(defn json-serializer []
(JsonInstanceSerializer. String))
(defn ^ServiceDiscovery service-discovery
[curator-framework service-instance & {:keys [base-path serializer payload-class]
:or {base-path "/foo"
payload-class String
serializer (json-serializer)}}]
{:pre [(.startsWith ^String base-path "/")]}
(-> (dotonn (ServiceDiscoveryBuilder/builder payload-class)
(.client curator-framework)
(.basePath base-path)
(.serializer (json-serializer))
(.thisInstance service-instance))
(.build)))
(defn services
"Returns the names of the services registered."
[^ServiceDiscovery service-discovery]
(.queryForNames service-discovery))
(defn random-strategy
[]
(RandomStrategy. ))
(defn round-robin-strategy
[]
(RoundRobinStrategy. ))
(defn sticky-strategy
[^ProviderStrategy strategy]
(StickyStrategy. strategy))
(defn down-instance-policy
([] (down-instance-policy 30 :seconds 2))
([timeout timeout-unit error-threshold]
{:pre [(some time-units [timeout-unit])]}
(DownInstancePolicy. timeout (time-units timeout-unit) error-threshold)))
(defn ^ServiceProvider service-provider
"Creates a service provider for a named service s."
[^ServiceDiscovery service-discovery s & {:keys [strategy down-instance-policy]
:or {strategy (random-strategy)
down-instance-policy (down-instance-policy)}}]
(-> (doto (.serviceProviderBuilder service-discovery)
(.serviceName s)
(.downInstancePolicy down-instance-policy)
(.providerStrategy strategy))
(.build)))
(defn service-cache
"Creates a service cache (rather than reading ZooKeeper each time) for
the service named s"
[^ServiceDiscovery service-discovery s]
(-> (.serviceCacheBuilder service-discovery)
( .name s)
(.build)))
(defn note-error
"Clients should use this to indicate a problem when trying to
connect to a service instance. The instance may be marked as down
depending on the service provider's down instance policy."
[^ServiceProvider service-provider ^ServiceInstance instance]
(.noteError service-provider instance))
(defmulti instances (fn [^Object x & args] (.getClass x)))
(defmethod instances ServiceDiscovery [^ServiceDiscovery sd s] (.queryForInstances sd s))
(defmethod instances ServiceCache [^ServiceCache sc] (.getInstances sc))
(defmulti instance (fn [^Object x & args] (.getClass x)))
(defmethod instance ServiceProvider [^ServiceProvider provider] (.getInstance provider))
(defmethod instance ServiceCache [cache ^ProviderStrategy strategy] (.getInstance strategy cache))