/
requests.cr
275 lines (246 loc) · 9.49 KB
/
requests.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
module Kube
class Transport
DELETE_OPTS_BODY_VERSION_MIN = SemanticVersion.parse("1.11.0")
@need_delete_body : Bool = ::K8S::Kubernetes::VERSION < DELETE_OPTS_BODY_VERSION_MIN
@version : K8S::Apimachinery::Version::Info? = nil
def request_options(request_object = nil, content_type = "application/json", **options)
headers = HTTP::Headers{
"Content-Type" => content_type,
"Accept" => "application/json",
"User-Agent" => "kube-client.cr/#{Kube::Client::VERSION}",
}
if @auth_token
headers["Authorization"] = "Bearer #{@auth_token}"
elsif @auth_username && @auth_password
headers["Authorization"] = "Basic #{Base64.strict_encode("#{@auth_username}:#{@auth_password}")}"
end
if request_object.is_a?(::JSON::Serializable) || request_object.is_a?(NamedTuple) || request_object.is_a?(Hash) || request_object.is_a?(K8S::Kubernetes::Object)
options.merge(
headers: headers,
body: request_object.to_json,
query: options[:query]?,
)
else
options.merge(
headers: headers,
query: options[:query]?,
)
end
end
def format_request(options)
method = options[:method]
path = options[:path]
body = nil
if obj = options[:request_object]?
body = "<#{obj.class.name}>"
end
[method, path, body].compact.join " "
end
private def _parse_resp(method, path, response, content_type, response_class : T.class | Nil = nil) forall T
case content_type
when "application/json"
if response_class
response_class.from_json(response.body)
else
K8S::Kubernetes::Resource.from_json(response.body)
end
when "application/yaml"
if response_class
response_class.from_yaml(response.body)
else
K8S::Kubernetes::Resource.from_yaml(response.body)
# YAML.parse(response.body)
end
when "text/plain"
response.body
else
raise Kube::Error::API.new(method, path, response.status, "Invalid response Content-Type: #{content_type}")
end
rescue ex : JSON::SerializableError
Log.error { "Invalid response: #{ex.message}" }
Log.error { "Response: #{response.body}" }
raise ex
end
def parse_response(response : HTTP::Client::Response, method : String, path : String, response_class : T.class | Nil = nil, **options) forall T
content_type = response.headers["Content-Type"].split(';', 2).first
if response.success?
_parse_resp(method, path, response, content_type, response_class)
else
response_data = _parse_resp(method, path, response, content_type)
if (response_data.is_a?(JSON::Any) || response_data.is_a?(YAML::Any)) && response_data["kind"]? == "Status"
raise Kube::Error::API.new(method, path, response.status, K8S::Apimachinery::Apis::Meta::V1::Status.from_json(response.body))
else
raise Kube::Error::API.new(method, path, response.status, response.body)
end
end
end
# Format request path with query params
private def _request_path(options, query : Hash(String, String | Array(String))? = nil) : String
path = options[:path]
unless query.nil?
path += "?#{URI::Params.encode(query.as(Hash(String, String | Array(String))))}"
end
path
end
private def _request(options, req_options)
using_connection do |client|
client.exec(
method: options[:method],
path: _request_path(options, req_options[:query]?),
headers: req_options[:headers]?,
body: req_options[:body]?,
)
end
end
def request(**options)
request(**options, response_class: K8S::Kubernetes::Resource)
end
def watch_request(response_class : T.class, response_channel, **options) forall T
req_options = request_options(**options)
path = _request_path(options, req_options[:query]?)
spawn _watch_request(response_class, response_channel, path, req_options)
end
def _watch_request(response_class : T.class, response_channel, path, req_options) forall T
using_connection do |client|
client.exec(method: "GET", path: path, headers: req_options[:headers]?) do |response|
if response.success?
io = response.body_io
while !io.closed?
raw_event = io.gets
if raw_event
event = response_class.from_json(raw_event)
response_channel.send(event)
end
end
else
raise Kube::Error::API.new("GET", path, response.status, response.body)
end
end
end
rescue ex : Kube::Error::API
response_channel.send(ex)
response_channel.close
rescue ex
response_channel.close
raise ex
end
def request(response_class : T.class, response_block : Proc? = nil, **options) forall T
opts = options.to_h
req_options = if opts[:method]? == "DELETE" && need_delete_body?
request_options(**options.merge({
request_object: opts.reject(:query),
}))
else
request_options(**options)
end
t1 = Time.monotonic
response = _request(options, req_options)
t = Time.monotonic - t1
obj = parse_response(**options, response: response, response_class: response_class)
rescue ex : K8S::Error::UnknownResource
logger.warn { "#{format_request(options)} => HTTP #{ex} in #{t}s" }
logger.debug { "Request: #{req_options}" } unless req_options.nil?
logger.debug { "Response: #{response.body}" } unless response.nil?
nil
rescue ex
logger.warn { "#{format_request(options)} => HTTP #{ex} in #{t}s" }
logger.debug { "Request: #{req_options}" } unless req_options.nil?
logger.debug { "Response: #{response.body}" } unless response.nil?
raise ex
else
logger.debug { "Request: #{req_options}" } unless req_options.nil?
logger.debug { "Response: #{response.body}" }
logger.info { "#{format_request(options)} => HTTP #{response.status}: #{obj.inspect} in #{t}s" }
obj
end
def requests(*options, skip_missing = false, skip_forbidden = false, retry_errors = true, skip_unknown = true, **common_options)
requests(options, skip_missing, skip_forbidden, retry_errors, skip_unknown, **common_options)
end
def requests(options : Enumerable(W), skip_missing = false, skip_forbidden = false, retry_errors = true, skip_unknown = true,
**common_options) forall W
t1 = Time.monotonic
responses = options.map { |opts| _request(opts, common_options) }
t = Time.monotonic - t1
objects = responses.zip(options).map do |response, request_options|
begin
parse_response(**request_options.merge({response: response}))
rescue e : Kube::Error::UndefinedResource | K8S::Error::UnknownResource
raise e unless skip_unknown
nil
rescue e : Kube::Error::NotFound
raise e unless skip_missing
nil
rescue e : Kube::Error::Forbidden
raise e unless skip_forbidden
nil
rescue e : Kube::Error::ServiceUnavailable
raise e unless retry_errors
logger.warn { "Retry #{format_request(request_options)} => HTTP #{e.code} #{e.reason} in #{t}" }
# only retry the failed request, not the entire pipeline
request(**common_options.merge(request_options))
end
end
rescue e : Kube::Error::API
logger.warn { "[#{options.map { |o| format_request(o) }.join ", "}] => HTTP #{e.code} #{e.reason} in #{t}" }
raise e
else
logger.info { "[#{options.map { |o| format_request(o) }.join ", "}] => HTTP [#{responses.map(&.status).join ", "}] in #{t}" }
objects
end
# Returns true if delete options should be sent as bode of the DELETE request
def need_delete_body? : Bool
@need_delete_body ||= ::K8S::Kubernetes::VERSION < DELETE_OPTS_BODY_VERSION_MIN
end
def version : K8S::Apimachinery::Version::Info
@version ||= get("/version", response_class: K8S::Apimachinery::Version::Info).as(K8S::Apimachinery::Version::Info)
end
def get(*path)
get(*path, response_class: K8S::Kubernetes::Resource)
end
def get(*path, response_class : T.class, **options) forall T
request(
**options.merge({
method: "GET",
path: self.path(*path),
response_class: response_class,
})
)
end
def gets(paths : Array(String), response_class : T.class, **options) forall T
requests(
paths.map { |path|
{
method: "GET",
path: self.path(path),
}
},
**options.merge({
response_class: response_class,
})
)
end
def gets(*paths)
gets(*paths, response_class: K8S::Kubernetes::Resource)
end
def gets(*paths, response_class : T.class, **options) forall T
requests(
*paths.map { |path|
options.merge({
method: "GET",
path: self.path(path),
})
},
**options.merge({
response_class: response_class,
})
)
end
def path(*path)
if path.first == path_prefix
File.join(path)
else
File.join(path_prefix, *path)
end.gsub(%r{//}, "/")
end
end
end