/
common.clj
169 lines (154 loc) · 6.35 KB
/
common.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
(ns clj-gcloud.common
(:require
[clj-gcloud.duration :as d]
[clojure.java.io :as io]
[clojure.spec.alpha :as s])
(:import
(com.google.auth.oauth2 ServiceAccountCredentials)
(com.google.api.gax.core CredentialsProvider BackgroundResource)
(com.google.api.gax.retrying RetrySettings)
(com.google.api.gax.rpc ClientSettings)
(com.google.cloud ServiceOptions Service RetryOption ServiceOptions$Builder)
(io.grpc ManagedChannel)))
(defn option-mapper
"Spec conformer helper for options"
[f]
(fn [opts]
(->> opts
(reduce #(conj %1 (f %2)) [])
(filter identity))))
;; RetrySettings
(def default-retry-settings (ServiceOptions/getDefaultRetrySettings))
(defn ^RetryOption ->RetryOption
[[k v]]
(case k
:total-timeout (RetryOption/totalTimeout (d/->duration v))
:initial-retry-delay (RetryOption/initialRetryDelay (d/->duration v))
:retry-delay-multiplier (RetryOption/retryDelayMultiplier v)
:max-retry-delay (RetryOption/maxRetryDelay (d/->duration v))
:max-attempts (RetryOption/maxAttempts v)
:jittered? (RetryOption/jittered v)
nil))
(s/def ::duration (s/tuple pos-int? d/chrono-units))
(s/def :retry.option/total-timeout ::duration)
(s/def :retry.option/initial-retry-delay ::duration)
(s/def :retry.option/retry-delay-multiplier double?)
(s/def :retry.option/max-retry-delay ::duration)
(s/def :retry.option/jittered? boolean?)
(s/def ::retry-settings
(s/and (s/keys :opt-un [:retry.option/total-timeout
:retry.option/initial-retry-delay
:retry.option/retry-delay-multiplier
:retry.option/max-retry-delay
:retry.option/jittered?])
(s/conformer (option-mapper ->RetryOption))))
(defn mk-retry-settings [settings]
(if (s/valid? ::retry-settings settings)
(->> settings
(s/conform ::retry-settings)
(into-array RetryOption)
(RetryOption/mergeToSettings default-retry-settings))
(throw (ex-info (s/explain-str ::retry-settings settings) settings))))
;; Credentials
(defn mk-credentials
[json-path]
(ServiceAccountCredentials/fromStream (io/input-stream json-path)))
(defn fixed-credentials
"Returns a credentials provider which will always returns the
service account credentials located at the path"
^CredentialsProvider
[path]
(let [credentials (mk-credentials path)]
(reify CredentialsProvider
(getCredentials [_] credentials))))
(defn mk-credentials-provider
"Creates a new credentials provider"
[creds]
(cond
(instance? CredentialsProvider creds) creds
(string? creds) (fixed-credentials creds)
:else nil))
(def default-project (ServiceOptions/getDefaultProjectId))
(defn get-project
"Returns the project id using the credentials located in the client settings.
If not a service account, it returns the first available project id among the following sources:
- The project ID specified by the GOOGLE_CLOUD_PROJECT environment variable;
- The App Engine project ID;
- The project ID specified in the JSON credentials file pointed by the
GOOGLE_APPLICATION_CREDENTIALS environment variable;
- The Google Cloud SDK project ID;
- The Compute Engine project ID"
^String
([]
default-project)
([^ClientSettings settings]
(let [credentials (-> settings .getCredentialsProvider .getCredentials)]
(if (instance? ServiceAccountCredentials credentials)
(let [^ServiceAccountCredentials svc-accnt credentials]
(.getProjectId svc-accnt))
default-project))))
;; Misc
(s/def :service.options/project-id string?)
(s/def :service.options/credentials string?)
(s/def :service.options/retry-settings ::retry-settings)
(s/def ::service-options (s/keys :opt-un [:service.options/project-id
:service.options/credentials
:service.options/retry-settings]))
(defn build-service
^Service
[^ServiceOptions$Builder builder {:keys [project-id credentials retry-settings] :as opts}]
(if (or (nil? opts) (s/valid? ::service-options opts))
(let [builder (cond-> builder
project-id (.setProjectId project-id)
credentials (.setCredentials (mk-credentials credentials))
retry-settings (.setRetrySettings (mk-retry-settings retry-settings)))]
(.getService ^ServiceOptions (.build builder)))
(throw
(ex-info (with-out-str (s/explain ::service-options opts)) opts))))
(defn array-type
"Return a string representing the type of an array with dimensions and a type.
For primitives, use a type like Integer/TYPE.
Useful for type hints of the form: ^#=(array-type String) my-str-array"
([type]
(array-type type 1))
([type dims]
(let [type (if (symbol? type) (eval type) type)]
(-> (apply make-array type (repeat dims 0)) class .getName))))
; Shutdown
(def default-termination-timeout [30 :seconds])
(defprotocol Shutdown
"Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately cancelled.
If a timeout/unit is specified, it will wait for the resource to become terminated,
giving up if the timeout is reached."
(shutdown! [this] [this timeout unit]))
(extend-type ManagedChannel
Shutdown
(shutdown!
([this] (apply shutdown! this default-termination-timeout))
([this timeout unit]
(doto this
(.shutdown)
(.awaitTermination timeout (get d/time-units unit))))))
(extend-type BackgroundResource
Shutdown
(shutdown!
([this] (apply shutdown! this default-termination-timeout))
([this timeout unit]
(doto this
(.shutdown)
(.awaitTermination timeout (get d/time-units unit))))))
; Helper methods for logging
(defmethod print-method CredentialsProvider [^CredentialsProvider cp w]
(print-method {:creds (.getCredentials cp)} w))
(defmethod print-method RetrySettings [^RetrySettings rs w]
(print-method
{:total-timeout (.getTotalTimeout rs)
:initial-retry-delay (.getInitialRetryDelay rs)
:retry-delay-multiplier (.getRetryDelayMultiplier rs)
:max-retry-delay (.getMaxRetryDelay rs)
:max-attempts (.getMaxAttempts rs)
:jittered? (.isJittered rs)
:initial-rpc-timeout (.getInitialRpcTimeout rs)
:rpc-timeout-multiplier (.getRpcTimeoutMultiplier rs)
:max-rpc-timeout (.getMaxRpcTimeout rs)}
w))