Skip to content

Commit

Permalink
[release-1.6] Wasm remote load cache fix and stats (#208)
Browse files Browse the repository at this point in the history
* Fetch .wasm from remote URI without depending on Listener. (#204)

* Fetch .wasm from remote URI without depending on Listener.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Reactivate tests.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Add stats for wasm remote load fetch and cache. (#207)

* Add stats for wasm remote load fetch and cache.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Address comments and ensure that the stats have the same lifetime as the
cache.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Address comments.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Address ASAN issue.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

* Mess around with the tests some more.

Signed-off-by: John Plevyak <jplevyak@gmail.com>

Co-authored-by: John Plevyak <jplevyak@gmail.com>
  • Loading branch information
bianpengyuan and jplevyak authored May 7, 2020
1 parent 9697eb5 commit c0035b1
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 19 deletions.
1 change: 1 addition & 0 deletions source/extensions/common/wasm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ envoy_cc_library(
"//include/envoy/server:lifecycle_notifier_interface",
"//source/common/buffer:buffer_lib",
"//source/common/common:enum_to_int",
"//source/common/config:remote_data_fetcher_lib",
"//source/common/http:message_lib",
"//source/common/http:utility_lib",
"//source/common/tracing:http_tracer_lib",
Expand Down
90 changes: 74 additions & 16 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
#include "extensions/common/wasm/wasm.h"

#include <stdio.h>
#include <algorithm>
#include <chrono>

#include <algorithm>
#include <cctype>
#include <chrono>
#include <limits>
#include <memory>
#include <string>

#include "envoy/common/exception.h"
#include "envoy/config/wasm/v3/wasm.pb.validate.h"
#include "envoy/event/deferred_deletable.h"
#include "envoy/grpc/status.h"
#include "envoy/http/codes.h"
#include "envoy/local_info/local_info.h"
Expand All @@ -24,6 +24,7 @@
#include "common/common/empty_string.h"
#include "common/common/enum_to_int.h"
#include "common/common/logger.h"
#include "common/config/remote_data_fetcher.h"
#include "common/http/header_map_impl.h"
#include "common/http/message_impl.h"
#include "common/http/utility.h"
Expand Down Expand Up @@ -68,16 +69,46 @@ std::string Sha256(absl::string_view data) {

namespace {

#define CREATE_WASM_STATS(COUNTER, GAUGE) \
COUNTER(remote_load_cache_hits) \
COUNTER(remote_load_cache_negative_hits) \
COUNTER(remote_load_cache_misses) \
COUNTER(remote_load_fetch_successes) \
COUNTER(remote_load_fetch_failures) \
GAUGE(remote_load_cache_entries, NeverImport)

struct CreateWasmStats {
Stats::ScopeSharedPtr scope_;
CREATE_WASM_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

struct CodeCacheEntry {
std::string code;
bool in_progress;
MonotonicTime use_time;
MonotonicTime fetch_time;
};

class RemoteDataFetcherAdapter : public Config::DataFetcher::RemoteDataFetcherCallback,
public Event::DeferredDeletable {
public:
RemoteDataFetcherAdapter(std::function<void(std::string cb)> cb) : cb_(cb) {}
~RemoteDataFetcherAdapter() = default;
void onSuccess(const std::string& data) override { cb_(data); }
virtual void onFailure(Config::DataFetcher::FailureReason) override { cb_(""); }
void setFetcher(std::unique_ptr<Config::DataFetcher::RemoteDataFetcher>&& fetcher) {
fetcher_ = std::move(fetcher);
}

private:
std::function<void(std::string)> cb_;
std::unique_ptr<Config::DataFetcher::RemoteDataFetcher> fetcher_;
};

std::atomic<int64_t> active_wasm_;
std::mutex code_cache_mutex;
std::unordered_map<std::string, CodeCacheEntry>* code_cache = nullptr;
CreateWasmStats* create_wasm_stats = nullptr;

std::string Xor(absl::string_view a, absl::string_view b) {
ASSERT(a.size() == b.size());
Expand Down Expand Up @@ -555,6 +586,10 @@ void clearCodeCacheForTesting(bool fail_if_not_cached) {
delete code_cache;
code_cache = nullptr;
}
if (create_wasm_stats) {
delete create_wasm_stats;
create_wasm_stats = nullptr;
}
}

// TODO: remove this post #4160: Switch default to SimulatedTimeSystem.
Expand All @@ -578,6 +613,11 @@ createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::Sco
if (!code_cache) {
code_cache = new std::remove_reference<decltype(*code_cache)>::type;
}
if (!create_wasm_stats) {
create_wasm_stats =
new CreateWasmStats{scope, CREATE_WASM_STATS(POOL_COUNTER_PREFIX(*scope, "wasm."),
POOL_GAUGE_PREFIX(*scope, "wasm."))};
}
// Remove entries older than CODE_CACHE_SECONDS_CACHING_TTL except for our target.
for (auto it = code_cache->begin(); it != code_cache->end();) {
if (now - it->second.use_time > std::chrono::seconds(CODE_CACHE_SECONDS_CACHING_TTL) &&
Expand All @@ -587,32 +627,39 @@ createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::Sco
++it;
}
}
create_wasm_stats->remote_load_cache_entries_.set(code_cache->size());
auto it = code_cache->find(vm_config.code().remote().sha256());
if (it != code_cache->end()) {
it->second.use_time = now;
if (it->second.in_progress) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
"createWasm: failed to load (in prpgress) from {}", source);
"createWasm: failed to load (in progress) from {}", source);
create_wasm_stats->remote_load_cache_misses_.inc();
throw WasmException(
fmt::format("Failed to load WASM code (fetch in progress) from {}", source));
}
code = it->second.code;
if (code.empty()) {
if (now - it->second.fetch_time <
std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) {
create_wasm_stats->remote_load_cache_negative_hits_.inc();
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
"createWasm: failed to load (cached) from {}", source);
throw WasmException(fmt::format("Failed to load WASM code (cached) from {}", source));
}
fetch = true; // Fetch failed, retry.
it->second.in_progress = true;
it->second.fetch_time = now;
} else {
create_wasm_stats->remote_load_cache_hits_.inc();
}
} else {
fetch = true; // Not in cache, fetch.
auto& e = (*code_cache)[vm_config.code().remote().sha256()];
e.in_progress = true;
e.use_time = e.fetch_time = now;
create_wasm_stats->remote_load_cache_entries_.set(code_cache->size());
create_wasm_stats->remote_load_cache_misses_.inc();
}
} else if (vm_config.code().has_local()) {
code = Config::DataSource::read(vm_config.code().local(), true, api);
Expand Down Expand Up @@ -664,17 +711,20 @@ createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::Sco
};

if (fetch) {
// NB: if the (fetching) exception is thrown below, the remote_data provider will be deleted
// immediately rather than completing the async fetch, so allow for self-delete.
auto remote_data_provider_holder =
std::make_shared<std::unique_ptr<Config::DataSource::RemoteAsyncDataProvider>>();
auto fetch_callback = [vm_config, complete_cb, source, &dispatcher,
remote_data_provider_holder](const std::string& code) {
auto holder = std::make_shared<std::unique_ptr<Event::DeferredDeletable>>();
auto fetch_callback = [vm_config, complete_cb, source, &dispatcher, scope,
holder](const std::string& code) {
{
std::lock_guard<std::mutex> guard(code_cache_mutex);
auto& e = (*code_cache)[vm_config.code().remote().sha256()];
e.in_progress = false;
e.code = code;
if (code.empty()) {
create_wasm_stats->remote_load_fetch_failures_.inc();
} else {
create_wasm_stats->remote_load_fetch_successes_.inc();
}
create_wasm_stats->remote_load_cache_entries_.set(code_cache->size());
}
if (!fail_if_code_not_cached) {
if (code.empty()) {
Expand All @@ -684,16 +734,24 @@ createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin, Stats::Sco
complete_cb(code);
}
// NB: must be deleted explicitly.
dispatcher.deferredDelete(
Envoy::Event::DeferredDeletablePtr{remote_data_provider_holder->release()});
remote_data_provider_holder->reset();
if (*holder) {
dispatcher.deferredDelete(Envoy::Event::DeferredDeletablePtr{holder->release()});
}
};
remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
cluster_manager, init_manager, vm_config.code().remote(), dispatcher, random, true,
fetch_callback);
if (fail_if_code_not_cached) {
*remote_data_provider_holder = std::move(remote_data_provider);
auto adapter = std::make_unique<RemoteDataFetcherAdapter>(fetch_callback);
auto fetcher = std::make_unique<Config::DataFetcher::RemoteDataFetcher>(
cluster_manager, vm_config.code().remote().http_uri(), vm_config.code().remote().sha256(),
*adapter);
auto fetcher_ptr = fetcher.get();
adapter->setFetcher(std::move(fetcher));
*holder = std::move(adapter);
fetcher_ptr->fetch();
throw WasmException(fmt::format("Failed to load WASM code (fetching) from {}", source));
} else {
remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
cluster_manager, init_manager, vm_config.code().remote(), dispatcher, random, true,
fetch_callback);
}
} else {
complete_cb(code);
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <atomic>
#include <deque>
#include <chrono>
#include <deque>
#include <map>
#include <memory>

Expand Down
2 changes: 2 additions & 0 deletions test/extensions/common/wasm/wasm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ TEST_P(WasmCommonTest, RemoteCode) {
wasm->configure(root_context, plugin, "done");
dispatcher->run(Event::Dispatcher::RunType::NonBlock);
dispatcher->clearDeferredDeleteList();
clearCodeCacheForTesting(false);
}

TEST_P(WasmCommonTest, RemoteCodeMultipleRetry) {
Expand Down Expand Up @@ -644,6 +645,7 @@ TEST_P(WasmCommonTest, RemoteCodeMultipleRetry) {
wasm->configure(root_context, plugin, "done");
dispatcher->run(Event::Dispatcher::RunType::NonBlock);
dispatcher->clearDeferredDeleteList();
clearCodeCacheForTesting(false);
}

} // namespace Wasm
Expand Down
3 changes: 2 additions & 1 deletion test/extensions/filters/http/wasm/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class WasmFilterConfigTest : public testing::TestWithParam<std::string> {
ON_CALL(context_, dispatcher()).WillByDefault(ReturnRef(dispatcher_));
}

void SetUp() { Envoy::Extensions::Common::Wasm::clearCodeCacheForTesting(false); }
void SetUp() override { Envoy::Extensions::Common::Wasm::clearCodeCacheForTesting(false); }
void TearDown() override { Envoy::Extensions::Common::Wasm::clearCodeCacheForTesting(false); }

void initializeForRemote() {
retry_timer_ = new Event::MockTimer();
Expand Down
3 changes: 2 additions & 1 deletion test/extensions/filters/http/wasm/wasm_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class WasmHttpFilterTest : public testing::TestWithParam<std::string> {
WasmHttpFilterTest() {}
~WasmHttpFilterTest() {}

void SetUp() { Envoy::Extensions::Common::Wasm::clearCodeCacheForTesting(false); }
void SetUp() override { Envoy::Extensions::Common::Wasm::clearCodeCacheForTesting(false); }
void TearDown() override { Envoy::Extensions::Common::Wasm::clearCodeCacheForTesting(false); }

void setupConfig(const std::string& code, std::string root_id = "") {
root_context_ = new TestRoot();
Expand Down

0 comments on commit c0035b1

Please sign in to comment.