-
Notifications
You must be signed in to change notification settings - Fork 66
/
context.h
401 lines (358 loc) · 15 KB
/
context.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
// Copyright 2016-2019 Envoy Project Authors
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <chrono>
#include <ctime>
#include <functional>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "include/proxy-wasm/context_interface.h"
namespace proxy_wasm {
#include "proxy_wasm_common.h"
#include "proxy_wasm_enums.h"
class WasmBase;
class WasmVm;
/**
* PluginBase is container to hold plugin information which is shared with all Context(s) created
* for a given plugin. Embedders may extend this class with additional host-specific plugin
* information as required.
* @param name is the name of the plugin.
* @param root_id is an identifier for the in VM handlers for this plugin.
* @param vm_id is a string used to differentiate VMs with the same code and VM configuration.
* @param plugin_configuration is configuration for this plugin.
* @param fail_open if true the plugin will pass traffic as opposed to close all streams.
*/
struct PluginBase {
PluginBase(std::string_view name, std::string_view root_id, std::string_view vm_id,
std::string_view runtime, std::string_view plugin_configuration, bool fail_open)
: name_(std::string(name)), root_id_(std::string(root_id)), vm_id_(std::string(vm_id)),
runtime_(std::string(runtime)), plugin_configuration_(plugin_configuration),
fail_open_(fail_open) {}
const std::string name_;
const std::string root_id_;
const std::string vm_id_;
const std::string runtime_;
std::string plugin_configuration_;
const bool fail_open_;
const std::string &log_prefix() const { return log_prefix_; }
private:
std::string makeLogPrefix() const;
std::string log_prefix_;
};
struct BufferBase : public BufferInterface {
BufferBase() = default;
~BufferBase() override = default;
// BufferInterface
size_t size() const override {
if (owned_data_) {
return owned_data_size_;
}
return data_.size();
}
WasmResult copyTo(WasmBase *wasm, size_t start, size_t length, uint64_t ptr_ptr,
uint64_t size_ptr) const override;
WasmResult copyFrom(size_t /* start */, size_t /* length */,
std::string_view /* data */) override {
// Setting a string buffer not supported (no use case).
return WasmResult::BadArgument;
}
virtual void clear() {
data_ = "";
owned_data_ = nullptr;
}
BufferBase *set(std::string_view data) {
clear();
data_ = data;
return this;
}
BufferBase *set(std::unique_ptr<char[]> owned_data, uint32_t owned_data_size) {
clear();
owned_data_ = std::move(owned_data);
owned_data_size_ = owned_data_size;
return this;
}
protected:
std::string_view data_;
std::unique_ptr<char[]> owned_data_;
uint32_t owned_data_size_;
};
/**
* ContextBase is the interface between the VM host and the VM. It has several uses:
*
* 1) To provide host-specific implementations of ABI calls out of the VM. For example, a proxy
* which wants to provide the ability to make an HTTP call must implement the
* ContextBase::httpCall() method.
*
* 2) To call into the VM. For example, when the above mentioned httpCall() completes, the host must
* call ContextBase::onHttpCallResponse(). Similarly, when a new HTTP request arrives and the
* headers are available, the host must create a new ContextBase object to manage the new stream and
* call onRequestHeaders() on that object which will cause a corresponding Context to be allocated
* in the VM which will receive the proxy_on_context_create and proxy_on_request_headers calls.
*
* 3) For testing and instrumentation the methods of ContextBase can be replaces or augmented.
*/
class ContextBase : public RootInterface,
public HttpInterface,
public NetworkInterface,
public StreamInterface,
public HeaderInterface,
public HttpCallInterface,
public GrpcCallInterface,
public GrpcStreamInterface,
public MetricsInterface,
public SharedDataInterface,
public SharedQueueInterface,
public GeneralInterface {
public:
ContextBase(); // Testing.
ContextBase(WasmBase *wasm); // Vm Context.
ContextBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin); // Root Context.
ContextBase(WasmBase *wasm, uint32_t parent_context_id,
std::shared_ptr<PluginBase> plugin); // Stream context.
virtual ~ContextBase();
WasmBase *wasm() const { return wasm_; }
uint32_t id() const { return id_; }
// The VM Context used for calling "malloc" has an id_ == 0.
bool isVmContext() const { return id_ == 0; }
// Root Contexts have the VM Context as a parent.
bool isRootContext() const { return parent_context_id_ == 0; }
ContextBase *parent_context() const { return parent_context_; }
ContextBase *root_context() const {
const ContextBase *previous = this;
ContextBase *parent = parent_context_;
while (parent != previous) {
previous = parent;
parent = parent->parent_context_;
}
return parent;
}
std::string_view root_id() const { return isRootContext() ? root_id_ : plugin_->root_id_; }
std::string_view log_prefix() const {
return isRootContext() ? root_log_prefix_ : plugin_->log_prefix();
}
WasmVm *wasmVm() const;
// Called before deleting the context.
virtual void destroy();
/**
* Calls into the VM.
* These are implemented by the proxy-independent host code. They are virtual to support some
* types of testing.
*/
// Context
void onCreate() override;
bool onDone() override;
void onLog() override;
void onDelete() override;
void onForeignFunction(uint32_t foreign_function_id, uint32_t data_size) override;
// Root
bool onStart(std::shared_ptr<PluginBase> plugin) override;
bool onConfigure(std::shared_ptr<PluginBase> plugin) override;
void onTick(TimerToken token) override;
void onQueueReady(SharedQueueDequeueToken token) override;
// HTTP
FilterHeadersStatus onRequestHeaders(uint32_t headers, bool end_of_stream) override;
FilterDataStatus onRequestBody(uint32_t body_buffer_length, bool end_of_stream) override;
FilterTrailersStatus onRequestTrailers(uint32_t trailers) override;
FilterMetadataStatus onRequestMetadata(uint32_t elements) override;
FilterHeadersStatus onResponseHeaders(uint32_t headers, bool end_of_stream) override;
FilterDataStatus onResponseBody(uint32_t body_buffer_length, bool end_of_stream) override;
FilterTrailersStatus onResponseTrailers(uint32_t trailers) override;
FilterMetadataStatus onResponseMetadata(uint32_t elements) override;
// Network
FilterStatus onNetworkNewConnection() override;
FilterStatus onDownstreamData(uint32_t data_length, bool end_of_stream) override;
FilterStatus onUpstreamData(uint32_t data_length, bool end_of_stream) override;
void onDownstreamConnectionClose(CloseType) override;
void onUpstreamConnectionClose(CloseType) override;
// Async call response.
void onHttpCallResponse(HttpCallToken token, uint32_t headers, uint32_t body_size,
uint32_t trailers) override;
// Grpc
void onGrpcReceiveInitialMetadata(GrpcToken token, uint32_t elements) override;
void onGrpcReceive(GrpcToken token, uint32_t response_size) override;
void onGrpcReceiveTrailingMetadata(GrpcToken token, uint32_t trailers) override;
void onGrpcClose(GrpcToken token, GrpcStatusCode status_code) override;
void error(std::string_view message) override {
std::cerr << message << "\n";
abort();
}
WasmResult unimplemented() override {
error("unimplemented proxy-wasm API");
return WasmResult::Unimplemented;
}
bool isFailed();
bool isFailOpen() { return plugin_->fail_open_; }
//
// General Callbacks.
//
WasmResult log(uint32_t /* level */, std::string_view /* message */) override {
return unimplemented();
}
uint32_t getLogLevel() override { return static_cast<uint32_t>(LogLevel::info); }
uint64_t getCurrentTimeNanoseconds() override {
#if !defined(_MSC_VER)
struct timespec tpe;
clock_gettime(CLOCK_REALTIME, &tpe);
uint64_t t = tpe.tv_sec;
t *= 1000000000;
t += tpe.tv_nsec;
return t;
#else
unimplemented();
return 0;
#endif
}
std::string_view getConfiguration() override {
unimplemented();
return "";
}
std::pair<uint32_t, std::string_view> getStatus() override {
unimplemented();
return std::make_pair(1, "unimplmemented");
}
WasmResult setTimerPeriod(std::chrono::milliseconds period, uint32_t *timer_token_ptr) override;
// Buffer
BufferInterface *getBuffer(WasmBufferType /* type */) override {
unimplemented();
return nullptr;
}
bool endOfStream(WasmStreamType /* type */) override {
unimplemented();
return true;
}
// HTTP
WasmResult httpCall(std::string_view /* target */, const Pairs & /*request_headers */,
std::string_view /* request_body */, const Pairs & /* request_trailers */,
int /* timeout_millisconds */, uint32_t * /* token_ptr */) override {
return unimplemented();
}
// gRPC
WasmResult grpcCall(std::string_view /* grpc_service */, std::string_view /* service_name */,
std::string_view /* method_name */, const Pairs & /* initial_metadata */,
std::string_view /* request */, std::chrono::milliseconds /* timeout */,
GrpcToken * /* token_ptr */) override {
return unimplemented();
}
WasmResult grpcStream(std::string_view /* grpc_service */, std::string_view /* service_name */,
std::string_view /* method_name */, const Pairs & /* initial_metadata */,
GrpcToken * /* token_ptr */) override {
return unimplemented();
}
WasmResult grpcClose(uint32_t /* token */) override { // cancel on call, close on stream.
return unimplemented();
}
WasmResult grpcCancel(uint32_t /* token */) override { // cancel on call, reset on stream.
return unimplemented();
}
WasmResult grpcSend(uint32_t /* token */, std::string_view /* message */,
bool /* end_stream */) override { // stream only
return unimplemented();
}
// Metrics
WasmResult defineMetric(uint32_t /* type */, std::string_view /* name */,
uint32_t * /* metric_id_ptr */) override {
return unimplemented();
}
WasmResult incrementMetric(uint32_t /* metric_id */, int64_t /* offset */) override {
return unimplemented();
}
WasmResult recordMetric(uint32_t /* metric_id */, uint64_t /* value */) override {
return unimplemented();
}
WasmResult getMetric(uint32_t /* metric_id */, uint64_t * /* value_ptr */) override {
return unimplemented();
}
// Properties
WasmResult getProperty(std::string_view /* path */, std::string * /* result */) override {
return unimplemented();
}
WasmResult setProperty(std::string_view /* key */,
std::string_view /* serialized_value */) override {
return unimplemented();
}
// Continue
WasmResult continueStream(WasmStreamType /* stream_type */) override { return unimplemented(); }
WasmResult closeStream(WasmStreamType /* stream_type */) override { return unimplemented(); }
WasmResult sendLocalResponse(uint32_t /* response_code */, std::string_view /* body_text */,
Pairs /* additional_headers */, GrpcStatusCode /* grpc_status */,
std::string_view /* details */) override {
return unimplemented();
}
void clearRouteCache() override { unimplemented(); }
void failStream(WasmStreamType stream_type) override { closeStream(stream_type); }
// Shared Data
WasmResult getSharedData(std::string_view key,
std::pair<std::string, uint32_t /* cas */> *data) override;
WasmResult setSharedData(std::string_view key, std::string_view value, uint32_t cas) override;
// Shared Queue
WasmResult registerSharedQueue(std::string_view queue_name,
SharedQueueDequeueToken *token_ptr) override;
WasmResult lookupSharedQueue(std::string_view vm_id, std::string_view queue_name,
SharedQueueEnqueueToken *token) override;
WasmResult dequeueSharedQueue(uint32_t token, std::string *data) override;
WasmResult enqueueSharedQueue(uint32_t token, std::string_view value) override;
// Header/Trailer/Metadata Maps
WasmResult addHeaderMapValue(WasmHeaderMapType /* type */, std::string_view /* key */,
std::string_view /* value */) override {
return unimplemented();
}
WasmResult getHeaderMapValue(WasmHeaderMapType /* type */, std::string_view /* key */,
std::string_view * /*result */) override {
return unimplemented();
}
WasmResult getHeaderMapPairs(WasmHeaderMapType /* type */, Pairs * /* result */) override {
return unimplemented();
}
WasmResult setHeaderMapPairs(WasmHeaderMapType /* type */, const Pairs & /* pairs */) override {
return unimplemented();
}
WasmResult removeHeaderMapValue(WasmHeaderMapType /* type */,
std::string_view /* key */) override {
return unimplemented();
}
WasmResult replaceHeaderMapValue(WasmHeaderMapType /* type */, std::string_view /* key */,
std::string_view /* value */) override {
return unimplemented();
}
WasmResult getHeaderMapSize(WasmHeaderMapType /* type */, uint32_t * /* result */) override {
return unimplemented();
}
protected:
friend class WasmBase;
void initializeRootBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin);
std::string makeRootLogPrefix(std::string_view vm_id) const;
WasmBase *wasm_{nullptr};
uint32_t id_{0};
uint32_t parent_context_id_{0}; // 0 for roots and the general context.
ContextBase *parent_context_{nullptr}; // set in all contexts.
std::string root_id_; // set only in root context.
std::string root_log_prefix_; // set only in root context.
std::shared_ptr<PluginBase> plugin_;
bool in_vm_context_created_ = false;
bool destroyed_ = false;
};
class DeferAfterCallActions {
public:
DeferAfterCallActions(ContextBase *context) : wasm_(context->wasm()) {}
~DeferAfterCallActions();
private:
WasmBase *const wasm_;
};
uint32_t resolveQueueForTest(std::string_view vm_id, std::string_view queue_name);
} // namespace proxy_wasm