-
Notifications
You must be signed in to change notification settings - Fork 0
/
bootstrap.clj
69 lines (59 loc) · 2.85 KB
/
bootstrap.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
(ns lupapiste-pubsub.bootstrap
(:require [taoensso.timbre :as timbre]
[clojure.java.io :as io])
(:import [com.google.cloud.pubsub.v1 TopicAdminClient TopicAdminSettings SubscriptionAdminSettings SubscriptionAdminClient]
[io.grpc ManagedChannelBuilder ManagedChannel]
[com.google.api.gax.rpc FixedTransportChannelProvider]
[com.google.api.gax.grpc GrpcTransportChannel InstantiatingGrpcChannelProvider]
[java.util.concurrent TimeUnit]
[com.google.api.gax.core NoCredentialsProvider GoogleCredentialsProvider BackgroundResource FixedCredentialsProvider]
[com.google.auth.oauth2 ServiceAccountCredentials]))
(defn custom-transport-channel-provider [endpoint]
(timbre/info "Connecting to Pub/Sub endpoint" endpoint)
(-> (ManagedChannelBuilder/forTarget endpoint)
(.usePlaintext)
(.build)
(GrpcTransportChannel/create)
(FixedTransportChannelProvider/create)))
(defn default-transport-channel-provider []
(timbre/info "Connecting to Pub/Sub global cloud endpoint")
(-> (InstantiatingGrpcChannelProvider/newBuilder)
(.build)))
(defn transport-channel-provider [endpoint]
(if endpoint
(custom-transport-channel-provider endpoint)
(default-transport-channel-provider)))
(defn terminate-transport! [channel-provider]
(when (instance? FixedTransportChannelProvider channel-provider)
(let [tc ^GrpcTransportChannel (.getTransportChannel channel-provider)
chan ^ManagedChannel (.getChannel tc)]
(.shutdown tc)
(.shutdown chan)
(.awaitTermination chan 5 TimeUnit/SECONDS))))
(defn topic-admin-client [{:keys [channel-provider credentials-provider]}]
(-> (TopicAdminSettings/newBuilder)
(.setTransportChannelProvider channel-provider)
(.setCredentialsProvider credentials-provider)
^TopicAdminSettings (.build)
(TopicAdminClient/create)))
(defn subscription-admin-client [{:keys [channel-provider credentials-provider]}]
(-> (SubscriptionAdminSettings/newBuilder)
(.setTransportChannelProvider channel-provider)
(.setCredentialsProvider credentials-provider)
^SubscriptionAdminSettings (.build)
(SubscriptionAdminClient/create)))
(defn shutdown-client [^BackgroundResource client]
(.shutdown client)
(.awaitTermination client 5 TimeUnit/SECONDS))
(defn no-credentials-provider []
(NoCredentialsProvider/create))
(defn google-credentials-provider []
(-> (GoogleCredentialsProvider/newBuilder)
(.setScopesToApply ["https://www.googleapis.com/auth/cloud-platform"
"https://www.googleapis.com/auth/pubsub"])
(.build)))
(defn fixed-credentials-provider [service-account-file]
(when service-account-file
(-> (with-open [is (io/input-stream service-account-file)]
(ServiceAccountCredentials/fromStream is))
(FixedCredentialsProvider/create))))