diff --git a/api/envoy/config/tap/v3/common.proto b/api/envoy/config/tap/v3/common.proto index 1884bd57d3d1..126993d0f7b4 100644 --- a/api/envoy/config/tap/v3/common.proto +++ b/api/envoy/config/tap/v3/common.proto @@ -4,6 +4,7 @@ package envoy.config.tap.v3; import "envoy/config/common/matcher/v3/matcher.proto"; import "envoy/config/core/v3/base.proto"; +import "envoy/config/core/v3/extension.proto"; import "envoy/config/core/v3/grpc_service.proto"; import "envoy/config/route/v3/route_components.proto"; @@ -183,7 +184,7 @@ message OutputConfig { } // Tap output sink configuration. -// [#next-free-field: 6] +// [#next-free-field: 7] message OutputSink { option (udpa.annotations.versioning).previous_message_type = "envoy.service.tap.v2alpha.OutputSink"; @@ -259,6 +260,9 @@ message OutputSink { // been configured to receive tap configuration from some other source (e.g., static // file, XDS, etc.) configuring the buffered admin output type will fail. BufferedAdminSink buffered_admin = 5; + + // Tap output filter will be defined by an extension type + core.v3.TypedExtensionConfig custom_sink = 6; } } diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto index db1d83471cf1..9f4a37751ffc 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto @@ -29,7 +29,6 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // All options and processing modes are implemented except for the following: // // * Request and response attributes are not sent and not processed. -// * Dynamic metadata in responses from the external processor is ignored. // * "async mode" is not implemented. // The filter communicates with an external gRPC service called an "external processor" @@ -99,7 +98,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` object in a namespace matching the filter // name. // -// [#next-free-field: 15] +// [#next-free-field: 17] message ExternalProcessor { // Configuration for the gRPC service that the filter will communicate with. // The filter supports both the "Envoy" and "Google" gRPC clients. @@ -126,7 +125,6 @@ message ExternalProcessor { // for a reply. bool async_mode = 4; - // [#not-implemented-hide:] // Envoy provides a number of :ref:`attributes ` // for expressive policies. Each attribute name provided in this field will be // matched against that list and populated in the request_headers message. @@ -134,7 +132,6 @@ message ExternalProcessor { // for the list of supported attributes and their types. repeated string request_attributes = 5; - // [#not-implemented-hide:] // Envoy provides a number of :ref:`attributes ` // for expressive policies. Each attribute name provided in this field will be // matched against that list and populated in the response_headers message. @@ -200,6 +197,42 @@ message ExternalProcessor { // :ref:`mode_override `. // If not set, ``mode_override`` API in the response message will be ignored. bool allow_mode_override = 14; + + // If set to true, ignore the + // :ref:`immediate_response ` + // message in an external processor response. In such case, no local reply will be sent. + // Instead, the stream to the external processor will be closed. There will be no + // more external processing for this stream from now on. + bool disable_immediate_response = 15; + + // Options related to the sending and receiving of dynamic metadata + MetadataOptions metadata_options = 16; +} + +// The MetadataOptions structure defines options for the sending and receiving of +// dynamic metadata. Specifically, which namespaces to send to the server, whether +// metadata returned by the server may be written, and how that metadata may be written. +message MetadataOptions { + message MetadataNamespaces { + // Specifies a list of metadata namespaces whose values, if present, + // will be passed to the ext_proc service as an opaque *protobuf::Struct*. + repeated string untyped = 1; + + // Specifies a list of metadata namespaces whose values, if present, + // will be passed to the ext_proc service as a *protobuf::Any*. This allows + // envoy and the external processing server to share the protobuf message + // definition for safe parsing. + repeated string typed = 2; + } + + // Describes which typed or untyped dynamic metadata namespaces to forward to + // the external processing server. + MetadataNamespaces forwarding_namespaces = 1; + + // Describes which typed or untyped dynamic metadata namespaces to accept from + // the external processing server. Set to empty or leave unset to disallow writing + // any received dynamic metadata. Receiving of typed metadata is not supported. + MetadataNamespaces receiving_namespaces = 2; } // The HeaderForwardingRules structure specifies what headers are @@ -242,7 +275,7 @@ message ExtProcPerRoute { } // Overrides that may be set on a per-route basis -// [#next-free-field: 6] +// [#next-free-field: 7] message ExtProcOverrides { // Set a different processing mode for this route than the default. ProcessingMode processing_mode = 1; @@ -251,16 +284,17 @@ message ExtProcOverrides { // Set a different asynchronous processing option than the default. bool async_mode = 2; - // [#not-implemented-hide:] // Set different optional attributes than the default setting of the // ``request_attributes`` field. repeated string request_attributes = 3; - // [#not-implemented-hide:] // Set different optional properties than the default setting of the // ``response_attributes`` field. repeated string response_attributes = 4; // Set a different gRPC service for this route than the default. config.core.v3.GrpcService grpc_service = 5; + + // Options related to the sending and receiving of dynamic metadata + MetadataOptions metadata_options = 6; } diff --git a/api/envoy/service/ext_proc/v3/external_processor.proto b/api/envoy/service/ext_proc/v3/external_processor.proto index 666e65296255..2fcb473e051e 100644 --- a/api/envoy/service/ext_proc/v3/external_processor.proto +++ b/api/envoy/service/ext_proc/v3/external_processor.proto @@ -56,7 +56,7 @@ service ExternalProcessor { // This represents the different types of messages that Envoy can send // to an external processing server. -// [#next-free-field: 8] +// [#next-free-field: 9] message ProcessingRequest { // Specify whether the filter that sent this request is running in synchronous // or asynchronous mode. The choice of synchronous or asynchronous mode @@ -115,6 +115,9 @@ message ProcessingRequest { // in the filter configuration. HttpTrailers response_trailers = 7; } + + // Dynamic metadata associated with the request. + config.core.v3.Metadata metadata_context = 8; } // For every ProcessingRequest received by the server with the ``async_mode`` field @@ -158,9 +161,9 @@ message ProcessingResponse { ImmediateResponse immediate_response = 7; } - // [#not-implemented-hide:] - // Optional metadata that will be emitted as dynamic metadata to be consumed by the next - // filter. This metadata will be placed in the namespace ``envoy.filters.http.ext_proc``. + // Optional metadata that will be emitted as dynamic metadata to be consumed by + // following filters. This metadata will be placed in the namespace(s) specified by the top-level + // field name(s) of the struct. google.protobuf.Struct dynamic_metadata = 8; // Override how parts of the HTTP request and response are processed diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 9ecf0d6e48ce..43374d08d7b7 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -13,5 +13,97 @@ removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` new_features: +- area: google_grpc + change: | + Fixed recursion when HTTP connection is disconnected due to a high number of premature resets. +- area: grpc + change: | + Fixed a bug in gRPC async client cache which intermittently causes CPU spikes due to busy loop in timer expiration. +- area: tracing + change: | + Fixed a bug where Datadog spans tagged as errors would not have the appropriate error property set. +- area: tracing + change: | + Fixed a bug where child spans produced by the Datadog tracer would have an incorrect operation name. +- area: tracing + change: | + Fixed a bug that caused the Datadog tracing extension to drop traces that + should be kept on account of an extracted sampling decision. +- area: proxy protocol + change: | + fixed a crash when Envoy is configured for PROXY protocol on both a listener and cluster, and the listener receives + a PROXY protocol header with address type LOCAL (typically used for health checks). +- area: proxy_protocol + change: | + Fix crash due to uncaught exception when the operating system does not support an address type (such as IPv6) that is + received in a proxy protocol header. Connections will instead be dropped/reset. +- area: proxy_protocol + change: | + Fixed a bug where TLVs with non utf8 characters were inserted as protobuf values into filter metadata circumventing + ext_authz checks when ``failure_mode_allow`` is set to ``true``. +- area: tls + change: | + Fix crash due to uncaught exception when the operating system does not support an address type (such as IPv6) that is + received in an mTLS client cert IP SAN. These SANs will be ignored. This applies only when using formatter + ``%DOWNSTREAM_PEER_IP_SAN%``. +- area: http + change: | + Fixed crash when HTTP request idle and per try timeouts occurs within backoff interval. +- area: url matching + change: | + Fixed excessive CPU utilization when using regex URL template matcher. + +removed_config_or_runtime: +# *Normally occurs at the end of the* :ref:`deprecation period ` + +new_features: +- area: tap + change: | + added :ref:`custom_sink ` type to enable writing tap data + out to a custom sink extension. +- area: access_log + change: | + added %RESPONSE_FLAGS_LONG% substitution string, that will output a pascal case string representing the resonse flags. + The output response flags will correspond with %RESPONSE_FLAGS%, only with a long textual string representation. +- area: config + change: | + Added the capability to defer broadcasting of certain cluster (CDS, EDS) to + worker threads from the main thread. This optimization can save significant + amount of memory in cases where there are (1) a large number of workers and + (2) a large amount of config, most of which is unused. This capability is + guarded by :ref:`enable_deferred_cluster_creation + `. +- area: extension_discovery_service + change: | + added ECDS support for :ref:` downstream network filters`. +- area: ext_proc + change: | + added + :ref:`disable_immediate_response ` + config API to ignore the + :ref:`immediate_response ` + message from the external processing server. +- area: http + change: | + added :ref:`Json-To-Metadata filter `. +- area: extension_discovery_service + change: | + added metric listener.listener_stat.network_extension_config_missing to track closed connections due to missing config. +- area: redis + change: | + added support for time command (returns a local response). +- area: redis + change: | + Provide initial span attributes to a sampler used in the OpenTelemetry tracer. +- area: ext_proc + change: | + implemented + :ref:`request_attributes ` + and + :ref:`response_attributes ` + config APIs to enable sending and receiving attributes from/to the external processing server. deprecated: + Added an off-by-default runtime flag + ``envoy.reloadable_features.google_grpc_disable_tls_13`` to disable TLSv1.3 + usage by gRPC SDK for ``google_grpc`` services. diff --git a/envoy/grpc/async_client_manager.h b/envoy/grpc/async_client_manager.h index 7cf027ee9053..aa99f2c23a6e 100644 --- a/envoy/grpc/async_client_manager.h +++ b/envoy/grpc/async_client_manager.h @@ -32,6 +32,39 @@ class AsyncClientFactory { using AsyncClientFactoryPtr = std::unique_ptr; +class GrpcServiceConfigWithHashKey { +public: + GrpcServiceConfigWithHashKey() = default; + + explicit GrpcServiceConfigWithHashKey(const envoy::config::core::v3::GrpcService& config) + : config_(config), pre_computed_hash_(Envoy::MessageUtil::hash(config)){}; + + template friend H AbslHashValue(H h, const GrpcServiceConfigWithHashKey& wrapper) { + return H::combine(std::move(h), wrapper.pre_computed_hash_); + } + + std::size_t getPreComputedHash() const { return pre_computed_hash_; } + + friend bool operator==(const GrpcServiceConfigWithHashKey& lhs, + const GrpcServiceConfigWithHashKey& rhs) { + if (lhs.pre_computed_hash_ == rhs.pre_computed_hash_) { + return Protobuf::util::MessageDifferencer::Equivalent(lhs.config_, rhs.config_); + } + return false; + } + + const envoy::config::core::v3::GrpcService& config() const { return config_; } + + void setConfig(const envoy::config::core::v3::GrpcService g) { + config_ = g; + pre_computed_hash_ = Envoy::MessageUtil::hash(g); + } + +private: + envoy::config::core::v3::GrpcService config_; + std::size_t pre_computed_hash_; +}; + // Singleton gRPC client manager. Grpc::AsyncClientManager can be used to create per-service // Grpc::AsyncClientFactory instances. All manufactured Grpc::AsyncClients must // be destroyed before the AsyncClientManager can be safely destructed. @@ -39,6 +72,7 @@ class AsyncClientManager { public: virtual ~AsyncClientManager() = default; + // TODO(diazalan) deprecate old getOrCreateRawAsyncClient once all filters have been transitioned /** * Create a Grpc::RawAsyncClient. The async client is cached thread locally and shared across * different filter instances. @@ -54,6 +88,22 @@ class AsyncClientManager { getOrCreateRawAsyncClient(const envoy::config::core::v3::GrpcService& grpc_service, Stats::Scope& scope, bool skip_cluster_check) PURE; + /** + * Create a Grpc::RawAsyncClient. The async client is cached thread locally and shared across + * different filter instances. + * @param grpc_service Envoy::Grpc::GrpcServiceConfigWithHashKey which contains config and + * hashkey. + * @param scope stats scope. + * @param skip_cluster_check if set to true skips checks for cluster presence and being statically + * configured. + * @param cache_option always use cache or use cache when runtime is enabled. + * @return RawAsyncClientPtr a grpc async client. + * @throws EnvoyException when grpc_service validation fails. + */ + virtual RawAsyncClientSharedPtr + getOrCreateRawAsyncClientWithHashKey(const GrpcServiceConfigWithHashKey& grpc_service, + Stats::Scope& scope, bool skip_cluster_check) PURE; + /** * Create a Grpc::AsyncClients factory for a service. Validation of the service is performed and * will raise an exception on failure. diff --git a/source/common/grpc/async_client_manager_impl.cc b/source/common/grpc/async_client_manager_impl.cc index 4b9dc4f86125..67d022cb043b 100644 --- a/source/common/grpc/async_client_manager_impl.cc +++ b/source/common/grpc/async_client_manager_impl.cc @@ -7,6 +7,7 @@ #include "source/common/common/base64.h" #include "source/common/grpc/async_client_impl.h" +#include "source/common/protobuf/utility.h" #include "absl/strings/match.h" @@ -138,12 +139,27 @@ AsyncClientManagerImpl::factoryForGrpcService(const envoy::config::core::v3::Grp RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClient( const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope, bool skip_cluster_check) { - RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config); + const GrpcServiceConfigWithHashKey config_with_hash_key = GrpcServiceConfigWithHashKey(config); + RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key); if (client != nullptr) { return client; } - client = factoryForGrpcService(config, scope, skip_cluster_check)->createUncachedRawAsyncClient(); - raw_async_client_cache_->setCache(config, client); + client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check) + ->createUncachedRawAsyncClient(); + raw_async_client_cache_->setCache(config_with_hash_key, client); + return client; +} + +RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClientWithHashKey( + const GrpcServiceConfigWithHashKey& config_with_hash_key, Stats::Scope& scope, + bool skip_cluster_check) { + RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key); + if (client != nullptr) { + return client; + } + client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check) + ->createUncachedRawAsyncClient(); + raw_async_client_cache_->setCache(config_with_hash_key, client); return client; } @@ -153,11 +169,12 @@ AsyncClientManagerImpl::RawAsyncClientCache::RawAsyncClientCache(Event::Dispatch } void AsyncClientManagerImpl::RawAsyncClientCache::setCache( - const envoy::config::core::v3::GrpcService& config, const RawAsyncClientSharedPtr& client) { - ASSERT(lru_map_.find(config) == lru_map_.end()); + const GrpcServiceConfigWithHashKey& config_with_hash_key, + const RawAsyncClientSharedPtr& client) { + ASSERT(lru_map_.find(config_with_hash_key) == lru_map_.end()); // Create a new cache entry at the beginning of the list. - lru_list_.emplace_front(config, client, dispatcher_.timeSource().monotonicTime()); - lru_map_[config] = lru_list_.begin(); + lru_list_.emplace_front(config_with_hash_key, client, dispatcher_.timeSource().monotonicTime()); + lru_map_[config_with_hash_key] = lru_list_.begin(); // If inserting to an empty cache, enable eviction timer. if (lru_list_.size() == 1) { evictEntriesAndResetEvictionTimer(); @@ -165,8 +182,8 @@ void AsyncClientManagerImpl::RawAsyncClientCache::setCache( } RawAsyncClientSharedPtr AsyncClientManagerImpl::RawAsyncClientCache::getCache( - const envoy::config::core::v3::GrpcService& config) { - auto it = lru_map_.find(config); + const GrpcServiceConfigWithHashKey& config_with_hash_key) { + auto it = lru_map_.find(config_with_hash_key); if (it == lru_map_.end()) { return nullptr; } @@ -197,7 +214,7 @@ void AsyncClientManagerImpl::RawAsyncClientCache::evictEntriesAndResetEvictionTi // This will cause cpu spike. if (time_to_next_expire_sec.count() <= 0) { // Erase the expired entry. - lru_map_.erase(lru_list_.back().config_); + lru_map_.erase(lru_list_.back().config_with_hash_key_); lru_list_.pop_back(); } else { cache_eviction_timer_->enableTimer(time_to_next_expire_sec); diff --git a/source/common/grpc/async_client_manager_impl.h b/source/common/grpc/async_client_manager_impl.h index eee1400ff84c..eadf3009490c 100644 --- a/source/common/grpc/async_client_manager_impl.h +++ b/source/common/grpc/async_client_manager_impl.h @@ -9,6 +9,7 @@ #include "envoy/upstream/cluster_manager.h" #include "source/common/grpc/stat_names.h" +#include "source/common/protobuf/utility.h" namespace Envoy { namespace Grpc { @@ -51,32 +52,35 @@ class AsyncClientManagerImpl : public AsyncClientManager { getOrCreateRawAsyncClient(const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope, bool skip_cluster_check) override; + RawAsyncClientSharedPtr + getOrCreateRawAsyncClientWithHashKey(const GrpcServiceConfigWithHashKey& config_with_hash_key, + Stats::Scope& scope, bool skip_cluster_check) override; + AsyncClientFactoryPtr factoryForGrpcService(const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope, bool skip_cluster_check) override; class RawAsyncClientCache : public ThreadLocal::ThreadLocalObject { public: explicit RawAsyncClientCache(Event::Dispatcher& dispatcher); - void setCache(const envoy::config::core::v3::GrpcService& config, + void setCache(const GrpcServiceConfigWithHashKey& config_with_hash_key, const RawAsyncClientSharedPtr& client); - RawAsyncClientSharedPtr getCache(const envoy::config::core::v3::GrpcService& config); + RawAsyncClientSharedPtr getCache(const GrpcServiceConfigWithHashKey& config_with_hash_key); private: void evictEntriesAndResetEvictionTimer(); struct CacheEntry { - CacheEntry(const envoy::config::core::v3::GrpcService& config, + CacheEntry(const GrpcServiceConfigWithHashKey& config_with_hash_key, RawAsyncClientSharedPtr const& client, MonotonicTime create_time) - : config_(config), client_(client), accessed_time_(create_time) {} - envoy::config::core::v3::GrpcService config_; + : config_with_hash_key_(config_with_hash_key), client_(client), + accessed_time_(create_time) {} + GrpcServiceConfigWithHashKey config_with_hash_key_; RawAsyncClientSharedPtr client_; MonotonicTime accessed_time_; }; using LruList = std::list; - absl::flat_hash_map - lru_map_; LruList lru_list_; + absl::flat_hash_map lru_map_; Event::Dispatcher& dispatcher_; Envoy::Event::TimerPtr cache_eviction_timer_; static constexpr std::chrono::seconds EntryTimeoutInterval{50}; diff --git a/source/extensions/common/tap/BUILD b/source/extensions/common/tap/BUILD index 452fa65c6161..68e8617581a8 100644 --- a/source/extensions/common/tap/BUILD +++ b/source/extensions/common/tap/BUILD @@ -28,6 +28,7 @@ envoy_cc_library( ":tap_interface", "//source/common/common:assert_lib", "//source/common/common:hex_lib", + "//source/common/config:utility_lib", "//source/extensions/common/matcher:matcher_lib", "@envoy_api//envoy/config/tap/v3:pkg_cc_proto", "@envoy_api//envoy/data/tap/v3:pkg_cc_proto", diff --git a/source/extensions/common/tap/tap.h b/source/extensions/common/tap/tap.h index 107152f9d08b..9f3a86966eab 100644 --- a/source/extensions/common/tap/tap.h +++ b/source/extensions/common/tap/tap.h @@ -76,6 +76,27 @@ class Sink { }; using SinkPtr = std::unique_ptr; +using SinkContext = + absl::variant, + std::reference_wrapper>; + +/** + * Abstract tap sink factory. Produces a factory that can instantiate SinkPtr objects + */ +class TapSinkFactory : public Config::TypedFactory { +public: + ~TapSinkFactory() override = default; + std::string category() const override { return "envoy.tap.sinks"; } + + /** + * Create a Sink that can be used for writing out data produced by the tap filter. + * @param config supplies the protobuf configuration for the sink factory + * @param cluster_manager is a ClusterManager from the HTTP/transport socket context + */ + virtual SinkPtr createSinkPtr(const Protobuf::Message& config, SinkContext context) PURE; +}; + +using TapSinkFactoryPtr = std::unique_ptr; /** * Generic configuration for a tap extension (filter, transport socket, etc.). diff --git a/source/extensions/common/tap/tap_config_base.cc b/source/extensions/common/tap/tap_config_base.cc index 5d91f81bf927..7f75f4636f77 100644 --- a/source/extensions/common/tap/tap_config_base.cc +++ b/source/extensions/common/tap/tap_config_base.cc @@ -3,9 +3,11 @@ #include "envoy/config/tap/v3/common.pb.h" #include "envoy/data/tap/v3/common.pb.h" #include "envoy/data/tap/v3/wrapper.pb.h" +#include "envoy/server/transport_socket_config.h" #include "source/common/common/assert.h" #include "source/common/common/fmt.h" +#include "source/common/config/utility.h" #include "source/common/protobuf/utility.h" #include "source/extensions/common/matcher/matcher.h" @@ -45,12 +47,13 @@ bool Utility::addBufferToProtoBytes(envoy::data::tap::v3::Body& output_body, } TapConfigBaseImpl::TapConfigBaseImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Common::Tap::Sink* admin_streamer) + Common::Tap::Sink* admin_streamer, SinkContext context) : max_buffered_rx_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( proto_config.output_config(), max_buffered_rx_bytes, DefaultMaxBufferedBytes)), max_buffered_tx_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( proto_config.output_config(), max_buffered_tx_bytes, DefaultMaxBufferedBytes)), streaming_(proto_config.output_config().streaming()) { + using ProtoOutputSink = envoy::config::tap::v3::OutputSink; auto& sinks = proto_config.output_config().sinks(); ASSERT(sinks.size() == 1); @@ -86,6 +89,33 @@ TapConfigBaseImpl::TapConfigBaseImpl(const envoy::config::tap::v3::TapConfig& pr sink_ = std::make_unique(sinks[0].file_per_tap()); sink_to_use_ = sink_.get(); break; + case ProtoOutputSink::OutputSinkTypeCase::kCustomSink: { + TapSinkFactory& tap_sink_factory = + Envoy::Config::Utility::getAndCheckFactory(sinks[0].custom_sink()); + + // extract message validation visitor from the context and use it to define config + ProtobufTypes::MessagePtr config; + using TsfContextRef = + std::reference_wrapper; + using HttpContextRef = std::reference_wrapper; + if (absl::holds_alternative(context)) { + Server::Configuration::TransportSocketFactoryContext& tsf_context = + absl::get(context).get(); + config = Config::Utility::translateAnyToFactoryConfig(sinks[0].custom_sink().typed_config(), + tsf_context.messageValidationVisitor(), + tap_sink_factory); + } else { + Server::Configuration::FactoryContext& http_context = + absl::get(context).get(); + config = Config::Utility::translateAnyToFactoryConfig( + sinks[0].custom_sink().typed_config(), + http_context.messageValidationContext().staticValidationVisitor(), tap_sink_factory); + } + + sink_ = tap_sink_factory.createSinkPtr(*config, context); + sink_to_use_ = sink_.get(); + break; + } case envoy::config::tap::v3::OutputSink::OutputSinkTypeCase::kStreamingGrpc: PANIC("not implemented"); case envoy::config::tap::v3::OutputSink::OutputSinkTypeCase::OUTPUT_SINK_TYPE_NOT_SET: diff --git a/source/extensions/common/tap/tap_config_base.h b/source/extensions/common/tap/tap_config_base.h index 77a997929b74..aca6eb1cf485 100644 --- a/source/extensions/common/tap/tap_config_base.h +++ b/source/extensions/common/tap/tap_config_base.h @@ -103,7 +103,7 @@ class TapConfigBaseImpl : public virtual TapConfig { protected: TapConfigBaseImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Common::Tap::Sink* admin_streamer); + Common::Tap::Sink* admin_streamer, SinkContext context); private: // This is the default setting for both RX/TX max buffered bytes. (This means that per tap, the diff --git a/source/extensions/filters/http/ext_authz/config.cc b/source/extensions/filters/http/ext_authz/config.cc index 54ca6fc5db45..1e15833e5fc1 100644 --- a/source/extensions/filters/http/ext_authz/config.cc +++ b/source/extensions/filters/http/ext_authz/config.cc @@ -6,6 +6,7 @@ #include "envoy/config/core/v3/grpc_service.pb.h" #include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.h" #include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.validate.h" +#include "envoy/grpc/async_client_manager.h" #include "envoy/registry/registry.h" #include "source/common/config/utility.h" @@ -42,34 +43,22 @@ Http::FilterFactoryCb ExtAuthzFilterConfig::createFilterFactoryFromProtoTyped( context.clusterManager(), client_config); callbacks.addStreamFilter(std::make_shared(filter_config, std::move(client))); }; - } else if (proto_config.grpc_service().has_google_grpc()) { - // Google gRPC client. + } else { + // gRPC client. const uint32_t timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout); Config::Utility::checkTransportVersion(proto_config); + Envoy::Grpc::GrpcServiceConfigWithHashKey config_with_hash_key = + Envoy::Grpc::GrpcServiceConfigWithHashKey(proto_config.grpc_service()); callback = [&context, filter_config, timeout_ms, - proto_config](Http::FilterChainFactoryCallbacks& callbacks) { + config_with_hash_key](Http::FilterChainFactoryCallbacks& callbacks) { auto client = std::make_unique( - context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient( - proto_config.grpc_service(), context.scope(), true), + context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClientWithHashKey( + config_with_hash_key, context.scope(), true), std::chrono::milliseconds(timeout_ms)); callbacks.addStreamFilter(std::make_shared(filter_config, std::move(client))); }; - } else { - // Envoy gRPC client. - const uint32_t timeout_ms = - PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout); - Config::Utility::checkTransportVersion(proto_config); - callback = [grpc_service = proto_config.grpc_service(), &context, filter_config, - timeout_ms](Http::FilterChainFactoryCallbacks& callbacks) { - Grpc::RawAsyncClientSharedPtr raw_client = - context.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient( - grpc_service, context.scope(), true); - auto client = std::make_unique( - raw_client, std::chrono::milliseconds(timeout_ms)); - callbacks.addStreamFilter(std::make_shared(filter_config, std::move(client))); - }; } return callback; diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index 2c2397d6d304..ef2c32690f8e 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -19,6 +19,12 @@ envoy_cc_library( "ext_proc.h", "processor_state.h", ], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), deps = [ ":client_interface", ":mutation_utils_lib", @@ -29,24 +35,41 @@ envoy_cc_library( "//source/common/buffer:buffer_lib", "//source/common/protobuf", "//source/common/runtime:runtime_features_lib", + "//source/extensions/filters/common/expr:evaluator_lib", "//source/extensions/filters/common/mutation_rules:mutation_rules_lib", "//source/extensions/filters/http/common:pass_through_filter_lib", "@com_google_absl//absl/status", "@com_google_absl//absl/strings:str_format", + "@com_google_cel_cpp//eval/public:builtin_func_registrar", + "@com_google_cel_cpp//eval/public:cel_expr_builder_factory", "@envoy_api//envoy/config/common/mutation_rules/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", - ], + ] + select( + { + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "@com_google_cel_cpp//parser", + ], + }, + ), ) envoy_cc_extension( name = "config", srcs = ["config.cc"], hdrs = ["config.h"], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), deps = [ ":client_lib", ":ext_proc", + "//source/extensions/filters/common/expr:evaluator_lib", "//source/extensions/filters/http/common:factory_base_lib", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", ], @@ -56,6 +79,7 @@ envoy_cc_library( name = "client_interface", hdrs = ["client.h"], deps = [ + "//envoy/grpc:async_client_manager_interface", "//envoy/grpc:status", "//envoy/stream_info:stream_info_interface", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 1cf1bb605308..25594b3748e4 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -4,6 +4,7 @@ #include "envoy/common/pure.h" #include "envoy/config/core/v3/grpc_service.pb.h" +#include "envoy/grpc/async_client_manager.h" #include "envoy/grpc/status.h" #include "envoy/service/ext_proc/v3/external_processor.pb.h" #include "envoy/stream_info/stream_info.h" @@ -32,14 +33,16 @@ class ExternalProcessorCallbacks { std::unique_ptr&& response) PURE; virtual void onGrpcError(Grpc::Status::GrpcStatus error) PURE; virtual void onGrpcClose() PURE; + virtual void logGrpcStreamInfo() PURE; }; class ExternalProcessorClient { public: virtual ~ExternalProcessorClient() = default; - virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, - const StreamInfo::StreamInfo& stream_info) PURE; + virtual ExternalProcessorStreamPtr + start(ExternalProcessorCallbacks& callbacks, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, + const StreamInfo::StreamInfo& stream_info) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.cc b/source/extensions/filters/http/ext_proc/client_impl.cc index b3d9d32c563e..80ccd31189db 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.cc +++ b/source/extensions/filters/http/ext_proc/client_impl.cc @@ -13,10 +13,10 @@ ExternalProcessorClientImpl::ExternalProcessorClientImpl(Grpc::AsyncClientManage ExternalProcessorStreamPtr ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const StreamInfo::StreamInfo& stream_info) { Grpc::AsyncClient grpcClient( - client_manager_.getOrCreateRawAsyncClient(grpc_service, scope_, true)); + client_manager_.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, scope_, true)); return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, stream_info); } @@ -75,6 +75,7 @@ void ExternalProcessorStreamImpl::onReceiveTrailingMetadata(Http::ResponseTraile void ExternalProcessorStreamImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { ENVOY_LOG(debug, "gRPC stream closed remotely with status {}: {}", status, message); + callbacks_.logGrpcStreamInfo(); stream_closed_ = true; if (status == Grpc::Status::Ok) { callbacks_.onGrpcClose(); diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 90f00fa0baa9..45c86fc9da5c 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -27,7 +27,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { ExternalProcessorClientImpl(Grpc::AsyncClientManager& client_manager, Stats::Scope& scope); ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const StreamInfo::StreamInfo& stream_info) override; private: diff --git a/source/extensions/filters/http/ext_proc/config.cc b/source/extensions/filters/http/ext_proc/config.cc index 7a0240e84e03..88c171a9f721 100644 --- a/source/extensions/filters/http/ext_proc/config.cc +++ b/source/extensions/filters/http/ext_proc/config.cc @@ -15,9 +15,9 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs); const uint32_t max_message_timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs); - const auto filter_config = - std::make_shared(proto_config, std::chrono::milliseconds(message_timeout_ms), - max_message_timeout_ms, context.scope(), stats_prefix); + const auto filter_config = std::make_shared( + proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms, + context.scope(), stats_prefix, Envoy::Extensions::Filters::Common::Expr::getBuilder(context)); return [filter_config, grpc_service = proto_config.grpc_service(), &context](Http::FilterChainFactoryCallbacks& callbacks) { @@ -44,9 +44,10 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs); const uint32_t max_message_timeout_ms = PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs); - const auto filter_config = - std::make_shared(proto_config, std::chrono::milliseconds(message_timeout_ms), - max_message_timeout_ms, server_context.scope(), stats_prefix); + const auto filter_config = std::make_shared( + proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms, + server_context.scope(), stats_prefix, + Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context)); return [filter_config, grpc_service = proto_config.grpc_service(), &server_context](Http::FilterChainFactoryCallbacks& callbacks) { diff --git a/source/extensions/filters/http/ext_proc/config.h b/source/extensions/filters/http/ext_proc/config.h index a2912466eb6b..e18452e262d0 100644 --- a/source/extensions/filters/http/ext_proc/config.h +++ b/source/extensions/filters/http/ext_proc/config.h @@ -5,6 +5,7 @@ #include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" #include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.validate.h" +#include "source/extensions/filters/common/expr/evaluator.h" #include "source/extensions/filters/http/common/factory_base.h" namespace Envoy { @@ -29,7 +30,7 @@ class ExternalProcessingFilterConfig Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped( const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& proto_config, - Server::Configuration::ServerFactoryContext& context, + Server::Configuration::ServerFactoryContext&, ProtobufMessage::ValidationVisitor& validator) override; Http::FilterFactoryCb createFilterFactoryFromProtoWithServerContextTyped( diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 4adb8c5b73c1..c26f7d0f78dd 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -8,6 +8,10 @@ #include "absl/strings/str_format.h" +#if defined(USE_CEL_PARSER) +#include "parser/parser.h" +#endif + namespace Envoy { namespace Extensions { namespace HttpFilters { @@ -113,13 +117,58 @@ ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_ : encoding_processor_grpc_calls_; } +absl::flat_hash_map +FilterConfig::initExpressions(const Protobuf::RepeatedPtrField& matchers) const { + absl::flat_hash_map expressions; +#if defined(USE_CEL_PARSER) + for (const auto& matcher : matchers) { + auto parse_status = google::api::expr::parser::Parse(matcher); + if (!parse_status.ok()) { + throw EnvoyException("Unable to parse descriptor expression: " + + parse_status.status().ToString()); + } + expressions.emplace(matcher, Extensions::Filters::Common::Expr::createExpression( + builder_->builder(), parse_status.value().expr())); + } +#else + ENVOY_LOG(warn, "CEL expression parsing is not available for use in this environment." + " Attempted to parse " + + std::to_string(matchers.size()) + " expressions"); +#endif + return expressions; +} + FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config) : disabled_(config.disabled()) { - if (config.has_overrides()) { + if (!config.has_overrides()) { + return; + } + + const auto& overrides = config.overrides(); + if (overrides.has_processing_mode()) { processing_mode_ = config.overrides().processing_mode(); } - if (config.overrides().has_grpc_service()) { - grpc_service_ = config.overrides().grpc_service(); + if (overrides.has_grpc_service()) { + grpc_service_ = overrides.grpc_service(); + } + + if (!overrides.has_metadata_options()) { + return; + } + + const auto& md_opts = overrides.metadata_options(); + if (md_opts.has_forwarding_namespaces()) { + untyped_forwarding_namespaces_ = + std::vector(md_opts.forwarding_namespaces().untyped().begin(), + md_opts.forwarding_namespaces().untyped().end()); + typed_forwarding_namespaces_ = + std::vector(md_opts.forwarding_namespaces().typed().begin(), + md_opts.forwarding_namespaces().typed().end()); + } + if (md_opts.has_receiving_namespaces()) { + untyped_receiving_namespaces_ = + std::vector(md_opts.receiving_namespaces().untyped().begin(), + md_opts.receiving_namespaces().untyped().end()); } } @@ -129,6 +178,15 @@ void FilterConfigPerRoute::merge(const FilterConfigPerRoute& src) { if (src.grpcService().has_value()) { grpc_service_ = src.grpcService(); } + if (src.untypedForwardingMetadataNamespaces().has_value()) { + untyped_forwarding_namespaces_ = src.untypedForwardingMetadataNamespaces(); + } + if (src.typedForwardingMetadataNamespaces().has_value()) { + typed_forwarding_namespaces_ = src.typedForwardingMetadataNamespaces(); + } + if (src.untypedReceivingMetadataNamespaces().has_value()) { + untyped_receiving_namespaces_ = src.untypedReceivingMetadataNamespaces(); + } } void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { @@ -160,7 +218,7 @@ Filter::StreamOpenState Filter::openStream() { } if (!stream_) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); - stream_ = client_->start(*this, grpc_service_, decoder_callbacks_->streamInfo()); + stream_ = client_->start(*this, config_with_hash_key_, decoder_callbacks_->streamInfo()); if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called // Asserts that `stream_` is nullptr since it is not valid to be used any further @@ -171,7 +229,7 @@ Filter::StreamOpenState Filter::openStream() { stats_.streams_started_.inc(); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not // have a proper implementation of streamInfo. - if (grpc_service_.has_envoy_grpc()) { + if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) { logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo()); } } @@ -180,11 +238,6 @@ Filter::StreamOpenState Filter::openStream() { void Filter::closeStream() { if (stream_) { - if (grpc_service_.has_envoy_grpc()) { - logging_info_->setBytesSent(stream_->streamInfo().bytesSent()); - logging_info_->setBytesReceived(stream_->streamInfo().bytesReceived()); - logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); - } ENVOY_LOG(debug, "Calling close on stream"); if (stream_->close()) { stats_.streams_closed_.inc(); @@ -206,7 +259,8 @@ void Filter::onDestroy() { } FilterHeadersStatus Filter::onHeaders(ProcessorState& state, - Http::RequestOrResponseHeaderMap& headers, bool end_stream) { + Http::RequestOrResponseHeaderMap& headers, bool end_stream, + absl::optional proto) { switch (openStream()) { case StreamOpenState::Error: return FilterHeadersStatus::StopIteration; @@ -220,10 +274,14 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, state.setHeaders(&headers); state.setHasNoBody(end_stream); ProcessingRequest req; + addDynamicMetadata(state, req); auto* headers_req = state.mutableHeaders(req); MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(), *headers_req->mutable_headers()); headers_req->set_end_of_stream(end_stream); + if (proto.has_value()) { + (*headers_req->mutable_attributes())[FilterName] = proto.value(); + } state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), ProcessorState::CallbackState::HeadersCallback); ENVOY_LOG(debug, "Sending headers message"); @@ -233,6 +291,48 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, return FilterHeadersStatus::StopIteration; } +const absl::optional Filter::evaluateAttributes( + Filters::Common::Expr::ActivationPtr activation, + const absl::flat_hash_map& + expr) { + absl::optional proto; + if (expr.size() > 0) { + proto.emplace(ProtobufWkt::Struct{}); + for (const auto& hash_entry : expr) { + ProtobufWkt::Arena arena; + const auto result = hash_entry.second.get()->Evaluate(*activation, &arena); + if (!result.ok()) { + // TODO: Stats? + continue; + } + + if (result.value().IsError()) { + ENVOY_LOG(trace, "error parsing cel expression {}", hash_entry.first); + continue; + } + + ProtobufWkt::Value value; + switch (result.value().type()) { + case google::api::expr::runtime::CelValue::Type::kBool: + value.set_bool_value(result.value().BoolOrDie()); + break; + case google::api::expr::runtime::CelValue::Type::kNullType: + value.set_null_value(ProtobufWkt::NullValue{}); + break; + case google::api::expr::runtime::CelValue::Type::kDouble: + value.set_number_value(result.value().DoubleOrDie()); + break; + default: + value.set_string_value(Filters::Common::Expr::print(result.value())); + } + + (*(proto.value()).mutable_fields())[hash_entry.first] = value; + } + } + + return proto; +} + FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_stream) { ENVOY_LOG(trace, "decodeHeaders: end_stream = {}", end_stream); mergePerRouteConfig(); @@ -240,13 +340,21 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st decoding_state_.setCompleteBodyAvailable(true); } - if (!decoding_state_.sendHeaders()) { - ENVOY_LOG(trace, "decodeHeaders: Skipped"); - return FilterHeadersStatus::Continue; + FilterHeadersStatus status = FilterHeadersStatus::Continue; + if (decoding_state_.sendHeaders()) { + absl::optional proto; + if (!config_->requestExpr().empty()) { + auto activation_ptr = Filters::Common::Expr::createActivation(decoding_state_.streamInfo(), + &headers, nullptr, nullptr); + proto = evaluateAttributes(std::move(activation_ptr), config_->requestExpr()); + } + + status = onHeaders(decoding_state_, headers, end_stream, proto); + ENVOY_LOG(trace, "onHeaders returning {}", static_cast(status)); + } else { + ENVOY_LOG(trace, "decodeHeaders: Skipped header processing"); } - const auto status = onHeaders(decoding_state_, headers, end_stream); - ENVOY_LOG(trace, "decodeHeaders returning {}", static_cast(status)); return status; } @@ -543,13 +651,21 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s encoding_state_.setCompleteBodyAvailable(true); } - if (processing_complete_ || !encoding_state_.sendHeaders()) { - ENVOY_LOG(trace, "encodeHeaders: Continue"); - return FilterHeadersStatus::Continue; + FilterHeadersStatus status = FilterHeadersStatus::Continue; + if (!processing_complete_ && encoding_state_.sendHeaders()) { + absl::optional proto; + if (!config_->responseExpr().empty()) { + auto activation_ptr = Filters::Common::Expr::createActivation(encoding_state_.streamInfo(), + nullptr, &headers, nullptr); + proto = evaluateAttributes(std::move(activation_ptr), config_->responseExpr()); + } + + status = onHeaders(encoding_state_, headers, end_stream, proto); + ENVOY_LOG(trace, "onHeaders returns {}", static_cast(status)); + } else { + ENVOY_LOG(trace, "encodeHeaders: Skipped header processing"); } - const auto status = onHeaders(encoding_state_, headers, end_stream); - ENVOY_LOG(trace, "encodeHeaders returns {}", static_cast(status)); return status; } @@ -573,6 +689,7 @@ void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data, state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(), new_state); ProcessingRequest req; + addDynamicMetadata(state, req); auto* body_req = state.mutableBody(req); body_req->set_end_of_stream(end_stream); body_req->set_body(data.toString()); @@ -593,6 +710,7 @@ void Filter::sendBufferedData(ProcessorState& state, ProcessorState::CallbackSta void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) { ProcessingRequest req; + addDynamicMetadata(state, req); auto* trailers_req = state.mutableTrailers(req); MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(), *trailers_req->mutable_trailers()); @@ -603,6 +721,20 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers stats_.stream_msgs_sent_.inc(); } +void Filter::logGrpcStreamInfo() { + if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { + const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter(); + if (upstream_meter != nullptr) { + logging_info_->setBytesSent(upstream_meter->wireBytesSent()); + logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); + } + // Only set upstream host in logging info once. + if (logging_info_->upstreamHost() == nullptr) { + logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); + } + } +} + void Filter::onNewTimeout(const ProtobufWkt::Duration& override_message_timeout) { const auto result = DurationUtil::durationToMillisecondsNoThrow(override_message_timeout); if (!result.ok()) { @@ -632,6 +764,83 @@ void Filter::onNewTimeout(const ProtobufWkt::Duration& override_message_timeout) stats_.override_message_timeout_received_.inc(); } +void Filter::addDynamicMetadata(ProcessorState& state, ProcessingRequest& req) { + // get the callbacks from the ProcessorState. This will be the appropriate + // callbacks for the current state of the filter + auto* cb = state.callbacks(); + envoy::config::core::v3::Metadata forwarding_metadata; + + // If metadata_context_namespaces is specified, pass matching filter metadata to the ext_proc + // service. If metadata key is set in both the connection and request metadata then the value + // will be the request metadata value. The metadata will only be searched for the callbacks + // corresponding to the traffic direction at the time of the external processing request. + const auto& request_metadata = cb->streamInfo().dynamicMetadata().filter_metadata(); + for (const auto& context_key : state.untypedForwardingMetadataNamespaces()) { + if (const auto metadata_it = request_metadata.find(context_key); + metadata_it != request_metadata.end()) { + (*forwarding_metadata.mutable_filter_metadata())[metadata_it->first] = metadata_it->second; + } else if (cb->connection().has_value()) { + const auto& connection_metadata = + cb->connection().value().get().streamInfo().dynamicMetadata().filter_metadata(); + if (const auto metadata_it = connection_metadata.find(context_key); + metadata_it != connection_metadata.end()) { + (*forwarding_metadata.mutable_filter_metadata())[metadata_it->first] = metadata_it->second; + } + } + } + + // If typed_metadata_context_namespaces is specified, pass matching typed filter metadata to the + // ext_proc service. If metadata key is set in both the connection and request metadata then + // the value will be the request metadata value. The metadata will only be searched for the + // callbacks corresponding to the traffic direction at the time of the external processing + // request. + const auto& request_typed_metadata = cb->streamInfo().dynamicMetadata().typed_filter_metadata(); + for (const auto& context_key : state.typedForwardingMetadataNamespaces()) { + if (const auto metadata_it = request_typed_metadata.find(context_key); + metadata_it != request_typed_metadata.end()) { + (*forwarding_metadata.mutable_typed_filter_metadata())[metadata_it->first] = + metadata_it->second; + } else if (cb->connection().has_value()) { + const auto& connection_typed_metadata = + cb->connection().value().get().streamInfo().dynamicMetadata().typed_filter_metadata(); + if (const auto metadata_it = connection_typed_metadata.find(context_key); + metadata_it != connection_typed_metadata.end()) { + (*forwarding_metadata.mutable_typed_filter_metadata())[metadata_it->first] = + metadata_it->second; + } + } + } + + *req.mutable_metadata_context() = forwarding_metadata; +} + +void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, + std::unique_ptr& response) { + bool has_receiving_namespaces = state.untypedReceivingMetadataNamespaces().size() > 0; + if (!(has_receiving_namespaces && response->has_dynamic_metadata())) { + return; + } + + if (response->has_dynamic_metadata()) { + auto response_metadata = response->dynamic_metadata().fields(); + auto receiving_namespaces = state.untypedReceivingMetadataNamespaces(); + for (const auto& context_key : response_metadata) { + if (auto metadata_it = std::find(receiving_namespaces.begin(), receiving_namespaces.end(), + context_key.first); + metadata_it != receiving_namespaces.end()) + cb->streamInfo().setDynamicMetadata(context_key.first, + response_metadata.at(context_key.first).struct_value()); + } + } +} + +void Filter::setEncoderDynamicMetadata(std::unique_ptr& response) { + setDynamicMetadata(encoder_callbacks_, encoding_state_, response); +} +void Filter::setDecoderDynamicMetadata(std::unique_ptr& response) { + setDynamicMetadata(decoder_callbacks_, decoding_state_, response); +} + void Filter::onReceiveMessage(std::unique_ptr&& r) { if (processing_complete_) { ENVOY_LOG(debug, "Ignoring stream message received after processing complete"); @@ -661,32 +870,51 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { absl::Status processing_status; switch (response->response_case()) { case ProcessingResponse::ResponseCase::kRequestHeaders: + setDecoderDynamicMetadata(response); processing_status = decoding_state_.handleHeadersResponse(response->request_headers()); break; case ProcessingResponse::ResponseCase::kResponseHeaders: + setEncoderDynamicMetadata(response); processing_status = encoding_state_.handleHeadersResponse(response->response_headers()); break; case ProcessingResponse::ResponseCase::kRequestBody: + setDecoderDynamicMetadata(response); processing_status = decoding_state_.handleBodyResponse(response->request_body()); break; case ProcessingResponse::ResponseCase::kResponseBody: + setEncoderDynamicMetadata(response); processing_status = encoding_state_.handleBodyResponse(response->response_body()); break; case ProcessingResponse::ResponseCase::kRequestTrailers: + setDecoderDynamicMetadata(response); processing_status = decoding_state_.handleTrailersResponse(response->request_trailers()); break; case ProcessingResponse::ResponseCase::kResponseTrailers: + setEncoderDynamicMetadata(response); processing_status = encoding_state_.handleTrailersResponse(response->response_trailers()); break; case ProcessingResponse::ResponseCase::kImmediateResponse: - // We won't be sending anything more to the stream after we - // receive this message. - ENVOY_LOG(debug, "Sending immediate response"); - processing_complete_ = true; - closeStream(); - onFinishProcessorCalls(Grpc::Status::Ok); - sendImmediateResponse(response->immediate_response()); - processing_status = absl::OkStatus(); + if (config_->disableImmediateResponse()) { + ENVOY_LOG(debug, "Filter has disable_immediate_response configured. " + "Treat the immediate response message as spurious response."); + processing_status = + absl::FailedPreconditionError("unhandled immediate response due to config disabled it"); + } else { + setDecoderDynamicMetadata(response); + // We won't be sending anything more to the stream after we + // receive this message. + ENVOY_LOG(debug, "Sending immediate response"); + // TODO(tyxia) For immediate response case here and below, logging is needed because + // `onFinishProcessorCalls` is called after `closeStream` below. + // Investigate to see if we can switch the order of those two so that the logging here can be + // avoided. + logGrpcStreamInfo(); + processing_complete_ = true; + closeStream(); + onFinishProcessorCalls(Grpc::Status::Ok); + sendImmediateResponse(response->immediate_response()); + processing_status = absl::OkStatus(); + } break; default: // Any other message is considered spurious @@ -716,6 +944,7 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { ENVOY_LOG(debug, "Sending immediate response: {}", processing_status.message()); stats_.stream_msgs_received_.inc(); processing_complete_ = true; + logGrpcStreamInfo(); closeStream(); onFinishProcessorCalls(processing_status.raw_code()); ImmediateResponse invalid_mutation_response; @@ -753,6 +982,7 @@ void Filter::onGrpcError(Grpc::Status::GrpcStatus status) { void Filter::onGrpcClose() { ENVOY_LOG(debug, "Received gRPC stream close"); + processing_complete_ = true; stats_.streams_closed_.inc(); // Successful close. We can ignore the stream for the rest of our request @@ -763,6 +993,7 @@ void Filter::onGrpcClose() { void Filter::onMessageTimeout() { ENVOY_LOG(debug, "message timeout reached"); + logGrpcStreamInfo(); stats_.message_timeouts_.inc(); if (config_->failureModeAllow()) { // The user would like a timeout to not cause message processing to fail. @@ -878,6 +1109,31 @@ void Filter::mergePerRouteConfig() { if (merged_config->grpcService()) { ENVOY_LOG(trace, "Setting new GrpcService from per-route configuration"); grpc_service_ = *merged_config->grpcService(); + config_with_hash_key_.setConfig(*merged_config->grpcService()); + } + if (merged_config->untypedForwardingMetadataNamespaces()) { + ENVOY_LOG(trace, + "Setting new untyped forwarding metadata namespaces from per-route configuration"); + decoding_state_.setUntypedForwardingMetadataNamespaces( + *merged_config->untypedForwardingMetadataNamespaces()); + encoding_state_.setUntypedForwardingMetadataNamespaces( + *merged_config->untypedForwardingMetadataNamespaces()); + } + if (merged_config->typedForwardingMetadataNamespaces()) { + ENVOY_LOG(trace, + "Setting new typed forwarding metadata namespaces from per-route configuration"); + decoding_state_.setTypedForwardingMetadataNamespaces( + *merged_config->typedForwardingMetadataNamespaces()); + encoding_state_.setTypedForwardingMetadataNamespaces( + *merged_config->typedForwardingMetadataNamespaces()); + } + if (merged_config->untypedReceivingMetadataNamespaces()) { + ENVOY_LOG(trace, + "Setting new untyped receiving metadata namespaces from per-route configuration"); + decoding_state_.setUntypedReceivingMetadataNamespaces( + *merged_config->untypedReceivingMetadataNamespaces()); + encoding_state_.setUntypedReceivingMetadataNamespaces( + *merged_config->untypedReceivingMetadataNamespaces()); } } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 5c8e347a06a6..3c488cd6771d 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -21,6 +21,7 @@ #include "source/common/common/logger.h" #include "source/common/common/matchers.h" #include "source/common/protobuf/protobuf.h" +#include "source/extensions/filters/common/expr/evaluator.h" #include "source/extensions/filters/common/mutation_rules/mutation_rules.h" #include "source/extensions/filters/http/common/pass_through_filter.h" #include "source/extensions/filters/http/ext_proc/client.h" @@ -121,12 +122,13 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object { Upstream::HostDescriptionConstSharedPtr upstream_host_; }; -class FilterConfig { +class FilterConfig : public Logger::Loggable { public: FilterConfig(const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config, const std::chrono::milliseconds message_timeout, const uint32_t max_message_timeout_ms, Stats::Scope& scope, - const std::string& stats_prefix) + const std::string& stats_prefix, + Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder) : failure_mode_allow_(config.failure_mode_allow()), disable_clear_route_cache_(config.disable_clear_route_cache()), message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms), @@ -134,8 +136,20 @@ class FilterConfig { processing_mode_(config.processing_mode()), mutation_checker_(config.mutation_rules()), filter_metadata_(config.filter_metadata()), allow_mode_override_(config.allow_mode_override()), + disable_immediate_response_(config.disable_immediate_response()), allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())), - disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {} + disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())), + builder_(builder), request_expr_(initExpressions(config.request_attributes())), + response_expr_(initExpressions(config.response_attributes())), + untyped_forwarding_namespaces_( + config.metadata_options().forwarding_namespaces().untyped().begin(), + config.metadata_options().forwarding_namespaces().untyped().end()), + typed_forwarding_namespaces_( + config.metadata_options().forwarding_namespaces().typed().begin(), + config.metadata_options().forwarding_namespaces().typed().end()), + untyped_receiving_namespaces_( + config.metadata_options().receiving_namespaces().untyped().begin(), + config.metadata_options().receiving_namespaces().untyped().end()) {} bool failureModeAllow() const { return failure_mode_allow_; } @@ -150,6 +164,7 @@ class FilterConfig { } bool allowModeOverride() const { return allow_mode_override_; } + bool disableImmediateResponse() const { return disable_immediate_response_; } const Filters::Common::MutationRules::Checker& mutationChecker() const { return mutation_checker_; @@ -164,6 +179,28 @@ class FilterConfig { const Envoy::ProtobufWkt::Struct& filterMetadata() const { return filter_metadata_; } + const absl::flat_hash_map& + requestExpr() const { + return request_expr_; + } + + const absl::flat_hash_map& + responseExpr() const { + return response_expr_; + } + + const std::vector& untypedForwardingMetadataNamespaces() const { + return untyped_forwarding_namespaces_; + } + + const std::vector& typedForwardingMetadataNamespaces() const { + return typed_forwarding_namespaces_; + } + + const std::vector& untypedReceivingMetadataNamespaces() const { + return untyped_receiving_namespaces_; + } + private: ExtProcFilterStats generateStats(const std::string& prefix, const std::string& filter_stats_prefix, Stats::Scope& scope) { @@ -181,6 +218,9 @@ class FilterConfig { return header_matchers; } + absl::flat_hash_map + initExpressions(const Protobuf::RepeatedPtrField& matchers) const; + const bool failure_mode_allow_; const bool disable_clear_route_cache_; const std::chrono::milliseconds message_timeout_; @@ -192,10 +232,23 @@ class FilterConfig { const Envoy::ProtobufWkt::Struct filter_metadata_; // If set to true, allow the processing mode to be modified by the ext_proc response. const bool allow_mode_override_; + // If set to true, disable the immediate response from the ext_proc server, which means + // closing the stream to the ext_proc server, and no more external processing. + const bool disable_immediate_response_; // Empty allowed_header_ means allow all. const std::vector allowed_headers_; // Empty disallowed_header_ means disallow nothing, i.e, allow all. const std::vector disallowed_headers_; + + Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder_; + + const absl::flat_hash_map + request_expr_; + const absl::flat_hash_map + response_expr_; + const std::vector untyped_forwarding_namespaces_; + const std::vector typed_forwarding_namespaces_; + const std::vector untyped_receiving_namespaces_; }; using FilterConfigSharedPtr = std::shared_ptr; @@ -216,10 +269,24 @@ class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig { return grpc_service_; } + const absl::optional>& untypedForwardingMetadataNamespaces() const { + return untyped_forwarding_namespaces_; + } + const absl::optional>& typedForwardingMetadataNamespaces() const { + return typed_forwarding_namespaces_; + } + const absl::optional>& untypedReceivingMetadataNamespaces() const { + return untyped_receiving_namespaces_; + } + private: bool disabled_; absl::optional processing_mode_; absl::optional grpc_service_; + + absl::optional> untyped_forwarding_namespaces_; + absl::optional> typed_forwarding_namespaces_; + absl::optional> untyped_receiving_namespaces_; }; class Filter : public Logger::Loggable, @@ -241,13 +308,20 @@ class Filter : public Logger::Loggable, Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client, const envoy::config::core::v3::GrpcService& grpc_service) : config_(config), client_(std::move(client)), stats_(config->stats()), - grpc_service_(grpc_service), decoding_state_(*this, config->processingMode()), - encoding_state_(*this, config->processingMode()) {} + grpc_service_(grpc_service), config_with_hash_key_(grpc_service), + decoding_state_(*this, config->processingMode(), + config->untypedForwardingMetadataNamespaces(), + config->typedForwardingMetadataNamespaces(), + config->untypedReceivingMetadataNamespaces()), + encoding_state_(*this, config->processingMode(), + config->untypedForwardingMetadataNamespaces(), + config->typedForwardingMetadataNamespaces(), + config->untypedReceivingMetadataNamespaces()) {} const FilterConfig& config() const { return *config_; } ExtProcFilterStats& stats() { return stats_; } - ExtProcLoggingInfo& loggingInfo() { return *logging_info_; } + ExtProcLoggingInfo* loggingInfo() { return logging_info_; } void onDestroy() override; void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override; @@ -264,13 +338,11 @@ class Filter : public Logger::Loggable, Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap& trailers) override; // ExternalProcessorCallbacks - void onReceiveMessage( std::unique_ptr&& response) override; - void onGrpcError(Grpc::Status::GrpcStatus error) override; - void onGrpcClose() override; + void logGrpcStreamInfo() override; void onMessageTimeout(); void onNewTimeout(const ProtobufWkt::Duration& override_message_timeout); @@ -283,6 +355,9 @@ class Filter : public Logger::Loggable, void sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers); + const ProcessorState& encodingState() { return encoding_state_; } + const ProcessorState& decodingState() { return decoding_state_; } + private: void mergePerRouteConfig(); StreamOpenState openStream(); @@ -293,17 +368,33 @@ class Filter : public Logger::Loggable, void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response); Http::FilterHeadersStatus onHeaders(ProcessorState& state, - Http::RequestOrResponseHeaderMap& headers, bool end_stream); + Http::RequestOrResponseHeaderMap& headers, bool end_stream, + absl::optional proto); + + const absl::optional evaluateAttributes( + Filters::Common::Expr::ActivationPtr activation, + const absl::flat_hash_map& + expr); // Return a pair of whether to terminate returning the current result. std::pair sendStreamChunk(ProcessorState& state, bool end_stream); Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream); Http::FilterTrailersStatus onTrailers(ProcessorState& state, Http::HeaderMap& trailers); + void + setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state, + std::unique_ptr& response); + void setEncoderDynamicMetadata( + std::unique_ptr& response); + void setDecoderDynamicMetadata( + std::unique_ptr& response); + void addDynamicMetadata(ProcessorState& state, + envoy::service::ext_proc::v3::ProcessingRequest& req); const FilterConfigSharedPtr config_; const ExternalProcessorClientPtr client_; ExtProcFilterStats stats_; ExtProcLoggingInfo* logging_info_; envoy::config::core::v3::GrpcService grpc_service_; + Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; // The state of the filter on both the encoding and decoding side. DecodingProcessorState decoding_state_; diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 4e1ea746ac59..5f671d355270 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -34,13 +34,17 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, CallbackState next_state) { + filter_.logGrpcStreamInfo(); + stopMessageTimer(); if (call_start_time_.has_value()) { std::chrono::microseconds duration = std::chrono::duration_cast( filter_callbacks_->dispatcher().timeSource().monotonicTime() - call_start_time_.value()); - filter_.loggingInfo().recordGrpcCall(duration, call_status, callback_state_, - trafficDirection()); + ExtProcLoggingInfo* logging_info = filter_.loggingInfo(); + if (logging_info != nullptr) { + logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection()); + } call_start_time_ = absl::nullopt; } callback_state_ = next_state; diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index c921cdb322c2..b556842c2c6a 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -80,10 +80,19 @@ class ProcessorState : public Logger::Loggable { }; explicit ProcessorState(Filter& filter, - envoy::config::core::v3::TrafficDirection traffic_direction) + envoy::config::core::v3::TrafficDirection traffic_direction, + std::vector untyped_forwarding_namespaces, + std::vector typed_forwarding_namespaces, + std::vector untyped_receiving_namespaces) : filter_(filter), watermark_requested_(false), paused_(false), no_body_(false), complete_body_available_(false), trailers_available_(false), body_replaced_(false), - partial_body_processed_(false), traffic_direction_(traffic_direction) {} + partial_body_processed_(false), traffic_direction_(traffic_direction), + untyped_forwarding_namespaces_(untyped_forwarding_namespaces.begin(), + untyped_forwarding_namespaces.end()), + typed_forwarding_namespaces_(typed_forwarding_namespaces.begin(), + typed_forwarding_namespaces.end()), + untyped_receiving_namespaces_(untyped_receiving_namespaces.begin(), + untyped_receiving_namespaces.end()) {} ProcessorState(const ProcessorState&) = delete; virtual ~ProcessorState() = default; ProcessorState& operator=(const ProcessorState&) = delete; @@ -107,6 +116,28 @@ class ProcessorState : public Logger::Loggable { virtual void setProcessingMode( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) PURE; + + const std::vector& untypedForwardingMetadataNamespaces() const { + return untyped_forwarding_namespaces_; + }; + void setUntypedForwardingMetadataNamespaces(const std::vector ns) { + untyped_forwarding_namespaces_ = std::vector(ns.begin(), ns.end()); + }; + + const std::vector& typedForwardingMetadataNamespaces() const { + return typed_forwarding_namespaces_; + }; + void setTypedForwardingMetadataNamespaces(const std::vector ns) { + typed_forwarding_namespaces_ = std::vector(ns.begin(), ns.end()); + }; + + const std::vector& untypedReceivingMetadataNamespaces() const { + return untyped_receiving_namespaces_; + }; + void setUntypedReceivingMetadataNamespaces(const std::vector ns) { + untyped_receiving_namespaces_ = std::vector(ns.begin(), ns.end()); + }; + bool sendHeaders() const { return send_headers_; } bool sendTrailers() const { return send_trailers_; } envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode bodyMode() const { @@ -165,6 +196,9 @@ class ProcessorState : public Logger::Loggable { virtual envoy::service::ext_proc::v3::HttpTrailers* mutableTrailers(envoy::service::ext_proc::v3::ProcessingRequest& request) const PURE; + virtual StreamInfo::StreamInfo& streamInfo() PURE; + virtual Http::StreamFilterCallbacks* callbacks() PURE; + protected: void setBodyMode( envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode); @@ -209,6 +243,10 @@ class ProcessorState : public Logger::Loggable { absl::optional call_start_time_ = absl::nullopt; const envoy::config::core::v3::TrafficDirection traffic_direction_; + std::vector untyped_forwarding_namespaces_; + std::vector typed_forwarding_namespaces_; + std::vector untyped_receiving_namespaces_; + private: virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} }; @@ -216,8 +254,13 @@ class ProcessorState : public Logger::Loggable { class DecodingProcessorState : public ProcessorState { public: explicit DecodingProcessorState( - Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) - : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::INBOUND) { + Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode, + std::vector untyped_forwarding_namespaces, + std::vector typed_forwarding_namespaces, + std::vector untyped_receiving_namespaces) + : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::INBOUND, + untyped_forwarding_namespaces, typed_forwarding_namespaces, + untyped_receiving_namespaces) { setProcessingModeInternal(mode); } DecodingProcessorState(const DecodingProcessorState&) = delete; @@ -276,6 +319,9 @@ class DecodingProcessorState : public ProcessorState { void requestWatermark() override; void clearWatermark() override; + StreamInfo::StreamInfo& streamInfo() override { return decoder_callbacks_->streamInfo(); } + Http::StreamFilterCallbacks* callbacks() override { return decoder_callbacks_; } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); @@ -289,8 +335,13 @@ class DecodingProcessorState : public ProcessorState { class EncodingProcessorState : public ProcessorState { public: explicit EncodingProcessorState( - Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode) - : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::OUTBOUND) { + Filter& filter, const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode, + std::vector untyped_forwarding_namespaces, + std::vector typed_forwarding_namespaces, + std::vector untyped_receiving_namespaces) + : ProcessorState(filter, envoy::config::core::v3::TrafficDirection::OUTBOUND, + untyped_forwarding_namespaces, typed_forwarding_namespaces, + untyped_receiving_namespaces) { setProcessingModeInternal(mode); } EncodingProcessorState(const EncodingProcessorState&) = delete; @@ -349,6 +400,9 @@ class EncodingProcessorState : public ProcessorState { void requestWatermark() override; void clearWatermark() override; + StreamInfo::StreamInfo& streamInfo() override { return encoder_callbacks_->streamInfo(); } + Http::StreamFilterCallbacks* callbacks() override { return encoder_callbacks_; } + private: void setProcessingModeInternal( const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode); diff --git a/source/extensions/filters/http/tap/config.cc b/source/extensions/filters/http/tap/config.cc index 5c573924f768..c9051d7c68b6 100644 --- a/source/extensions/filters/http/tap/config.cc +++ b/source/extensions/filters/http/tap/config.cc @@ -15,21 +15,27 @@ namespace TapFilter { class HttpTapConfigFactoryImpl : public Extensions::Common::Tap::TapConfigFactory { public: + HttpTapConfigFactoryImpl(Server::Configuration::FactoryContext& context) + : factory_context_(context) {} // TapConfigFactory Extensions::Common::Tap::TapConfigSharedPtr createConfigFromProto(const envoy::config::tap::v3::TapConfig& proto_config, Extensions::Common::Tap::Sink* admin_streamer) override { - return std::make_shared(std::move(proto_config), admin_streamer); + return std::make_shared(std::move(proto_config), admin_streamer, + factory_context_); } + +private: + Server::Configuration::FactoryContext& factory_context_; }; Http::FilterFactoryCb TapFilterFactory::createFilterFactoryFromProtoTyped( const envoy::extensions::filters::http::tap::v3::Tap& proto_config, const std::string& stats_prefix, Server::Configuration::FactoryContext& context) { - FilterConfigSharedPtr filter_config( - new FilterConfigImpl(proto_config, stats_prefix, std::make_unique(), - context.scope(), context.admin(), context.singletonManager(), - context.threadLocal(), context.mainThreadDispatcher())); + FilterConfigSharedPtr filter_config(new FilterConfigImpl( + proto_config, stats_prefix, std::make_unique(context), + context.scope(), context.admin(), context.singletonManager(), context.threadLocal(), + context.mainThreadDispatcher())); return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { auto filter = std::make_shared(filter_config); callbacks.addStreamFilter(filter); diff --git a/source/extensions/filters/http/tap/tap_config_impl.cc b/source/extensions/filters/http/tap/tap_config_impl.cc index b3d844c7a48e..2aa2653a4ffe 100644 --- a/source/extensions/filters/http/tap/tap_config_impl.cc +++ b/source/extensions/filters/http/tap/tap_config_impl.cc @@ -29,8 +29,9 @@ fillHeaderList(Protobuf::RepeatedPtrField* } // namespace HttpTapConfigImpl::HttpTapConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Common::Tap::Sink* admin_streamer) - : TapCommon::TapConfigBaseImpl(std::move(proto_config), admin_streamer) {} + Common::Tap::Sink* admin_streamer, + Server::Configuration::FactoryContext& context) + : TapCommon::TapConfigBaseImpl(std::move(proto_config), admin_streamer, context) {} HttpPerRequestTapperPtr HttpTapConfigImpl::createPerRequestTapper(uint64_t stream_id) { return std::make_unique(shared_from_this(), stream_id); diff --git a/source/extensions/filters/http/tap/tap_config_impl.h b/source/extensions/filters/http/tap/tap_config_impl.h index d79ef4cc4842..5938d38076fc 100644 --- a/source/extensions/filters/http/tap/tap_config_impl.h +++ b/source/extensions/filters/http/tap/tap_config_impl.h @@ -19,7 +19,8 @@ class HttpTapConfigImpl : public Extensions::Common::Tap::TapConfigBaseImpl, public std::enable_shared_from_this { public: HttpTapConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Extensions::Common::Tap::Sink* admin_streamer); + Extensions::Common::Tap::Sink* admin_streamer, + Server::Configuration::FactoryContext& context); // TapFilter::HttpTapConfig HttpPerRequestTapperPtr createPerRequestTapper(uint64_t stream_id) override; diff --git a/source/extensions/transport_sockets/tap/config.cc b/source/extensions/transport_sockets/tap/config.cc index 3b565bee92e9..c8267771e854 100644 --- a/source/extensions/transport_sockets/tap/config.cc +++ b/source/extensions/transport_sockets/tap/config.cc @@ -17,18 +17,21 @@ namespace Tap { class SocketTapConfigFactoryImpl : public Extensions::Common::Tap::TapConfigFactory { public: - SocketTapConfigFactoryImpl(TimeSource& time_source) : time_source_(time_source) {} + SocketTapConfigFactoryImpl(TimeSource& time_source, + Server::Configuration::TransportSocketFactoryContext& context) + : time_source_(time_source), factory_context_(context) {} // TapConfigFactory Extensions::Common::Tap::TapConfigSharedPtr createConfigFromProto(const envoy::config::tap::v3::TapConfig& proto_config, Extensions::Common::Tap::Sink* admin_streamer) override { return std::make_shared(std::move(proto_config), admin_streamer, - time_source_); + time_source_, factory_context_); } private: TimeSource& time_source_; + Server::Configuration::TransportSocketFactoryContext& factory_context_; }; Network::UpstreamTransportSocketFactoryPtr @@ -49,7 +52,7 @@ UpstreamTapSocketConfigFactory::createTransportSocketFactory( return std::make_unique( outer_config, std::make_unique( - server_context.mainThreadDispatcher().timeSource()), + server_context.mainThreadDispatcher().timeSource(), context), server_context.admin(), server_context.singletonManager(), server_context.threadLocal(), server_context.mainThreadDispatcher(), std::move(inner_transport_factory)); } @@ -72,7 +75,7 @@ DownstreamTapSocketConfigFactory::createTransportSocketFactory( return std::make_unique( outer_config, std::make_unique( - server_context.mainThreadDispatcher().timeSource()), + server_context.mainThreadDispatcher().timeSource(), context), server_context.admin(), server_context.singletonManager(), server_context.threadLocal(), server_context.mainThreadDispatcher(), std::move(inner_transport_factory)); } diff --git a/source/extensions/transport_sockets/tap/tap_config_impl.h b/source/extensions/transport_sockets/tap/tap_config_impl.h index cf715c3df973..38556025c140 100644 --- a/source/extensions/transport_sockets/tap/tap_config_impl.h +++ b/source/extensions/transport_sockets/tap/tap_config_impl.h @@ -3,6 +3,7 @@ #include "envoy/config/tap/v3/common.pb.h" #include "envoy/data/tap/v3/transport.pb.h" #include "envoy/event/timer.h" +#include "envoy/server/transport_socket_config.h" #include "source/extensions/common/tap/tap_config_base.h" #include "source/extensions/transport_sockets/tap/tap_config.h" @@ -51,8 +52,10 @@ class SocketTapConfigImpl : public Extensions::Common::Tap::TapConfigBaseImpl, public std::enable_shared_from_this { public: SocketTapConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config, - Extensions::Common::Tap::Sink* admin_streamer, TimeSource& time_system) - : Extensions::Common::Tap::TapConfigBaseImpl(std::move(proto_config), admin_streamer), + Extensions::Common::Tap::Sink* admin_streamer, TimeSource& time_system, + Server::Configuration::TransportSocketFactoryContext& context) + : Extensions::Common::Tap::TapConfigBaseImpl(std::move(proto_config), admin_streamer, + context), time_source_(time_system) {} // SocketTapConfig diff --git a/test/common/grpc/BUILD b/test/common/grpc/BUILD index 0c8cb3380684..890b2ba26d61 100644 --- a/test/common/grpc/BUILD +++ b/test/common/grpc/BUILD @@ -1,5 +1,7 @@ load( "//bazel:envoy_build_system.bzl", + "envoy_benchmark_test", + "envoy_cc_benchmark_binary", "envoy_cc_fuzz_test", "envoy_cc_test", "envoy_cc_test_library", @@ -214,3 +216,28 @@ envoy_cc_test( "//test/test_common:utility_lib", ], ) + +envoy_cc_benchmark_binary( + name = "async_client_manager_benchmark", + srcs = ["async_client_manager_benchmark.cc"], + external_deps = [ + "benchmark", + ], + deps = [ + "//source/common/api:api_lib", + "//source/common/grpc:async_client_manager_lib", + "//test/mocks/stats:stats_mocks", + "//test/mocks/thread_local:thread_local_mocks", + "//test/mocks/upstream:cluster_manager_mocks", + "//test/mocks/upstream:cluster_priority_set_mocks", + "//test/test_common:test_runtime_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + +envoy_benchmark_test( + name = "async_client_manager_benchmark_test", + timeout = "long", + benchmark_binary = "async_client_manager_benchmark", +) diff --git a/test/common/grpc/async_client_manager_benchmark.cc b/test/common/grpc/async_client_manager_benchmark.cc new file mode 100644 index 000000000000..f23a0aba54e5 --- /dev/null +++ b/test/common/grpc/async_client_manager_benchmark.cc @@ -0,0 +1,79 @@ +#include + +#include "envoy/config/core/v3/grpc_service.pb.h" +#include "envoy/grpc/async_client.h" + +#include "source/common/api/api_impl.h" +#include "source/common/event/dispatcher_impl.h" +#include "source/common/grpc/async_client_manager_impl.h" + +#include "test/benchmark/main.h" +#include "test/mocks/stats/mocks.h" +#include "test/mocks/thread_local/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" +#include "test/mocks/upstream/cluster_priority_set.h" +#include "test/test_common/test_runtime.h" +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +#include "benchmark/benchmark.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Grpc { +namespace { + +class AsyncClientManagerImplTest { +public: + AsyncClientManagerImplTest() + : api_(Api::createApiForTest()), stat_names_(scope_.symbolTable()), + async_client_manager_(cm_, tls_, test_time_.timeSystem(), *api_, stat_names_) {} + + Upstream::MockClusterManager cm_; + NiceMock tls_; + Stats::MockStore store_; + Stats::MockScope& scope_{store_.mockScope()}; + DangerousDeprecatedTestTime test_time_; + Api::ApiPtr api_; + StatNames stat_names_; + AsyncClientManagerImpl async_client_manager_; +}; + +void testGetOrCreateAsyncClientWithConfig(::benchmark::State& state) { + AsyncClientManagerImplTest async_client_man_test; + + envoy::config::core::v3::GrpcService grpc_service; + grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + + for (auto _ : state) { + for (int i = 0; i < 1000; i++) { + RawAsyncClientSharedPtr foo_client0 = + async_client_man_test.async_client_manager_.getOrCreateRawAsyncClient( + grpc_service, async_client_man_test.scope_, true); + } + } +} + +void testGetOrCreateAsyncClientWithHashConfig(::benchmark::State& state) { + AsyncClientManagerImplTest async_client_man_test; + + envoy::config::core::v3::GrpcService grpc_service; + grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + GrpcServiceConfigWithHashKey config_with_hash_key_a = GrpcServiceConfigWithHashKey(grpc_service); + + for (auto _ : state) { + for (int i = 0; i < 1000; i++) { + RawAsyncClientSharedPtr foo_client0 = + async_client_man_test.async_client_manager_.getOrCreateRawAsyncClientWithHashKey( + config_with_hash_key_a, async_client_man_test.scope_, true); + } + } +} + +BENCHMARK(testGetOrCreateAsyncClientWithConfig)->Unit(::benchmark::kMicrosecond); +BENCHMARK(testGetOrCreateAsyncClientWithHashConfig)->Unit(::benchmark::kMicrosecond); + +} // namespace +} // namespace Grpc +} // namespace Envoy diff --git a/test/common/grpc/async_client_manager_impl_test.cc b/test/common/grpc/async_client_manager_impl_test.cc index ee1d2008b20c..b3615e33df8b 100644 --- a/test/common/grpc/async_client_manager_impl_test.cc +++ b/test/common/grpc/async_client_manager_impl_test.cc @@ -53,16 +53,17 @@ class RawAsyncClientCacheTest : public testing::Test { TEST_F(RawAsyncClientCacheTest, CacheEviction) { envoy::config::core::v3::GrpcService foo_service; foo_service.mutable_envoy_grpc()->set_cluster_name("foo"); + GrpcServiceConfigWithHashKey config_with_hash_key(foo_service); RawAsyncClientSharedPtr foo_client = std::make_shared(); - client_cache_.setCache(foo_service, foo_client); + client_cache_.setCache(config_with_hash_key, foo_client); waitForSeconds(49); // Cache entry hasn't been evicted because it was created 49s ago. - EXPECT_EQ(client_cache_.getCache(foo_service).get(), foo_client.get()); + EXPECT_EQ(client_cache_.getCache(config_with_hash_key).get(), foo_client.get()); waitForSeconds(49); // Cache entry hasn't been evicted because it was accessed 49s ago. - EXPECT_EQ(client_cache_.getCache(foo_service).get(), foo_client.get()); + EXPECT_EQ(client_cache_.getCache(config_with_hash_key).get(), foo_client.get()); waitForSeconds(51); - EXPECT_EQ(client_cache_.getCache(foo_service).get(), nullptr); + EXPECT_EQ(client_cache_.getCache(config_with_hash_key).get(), nullptr); } TEST_F(RawAsyncClientCacheTest, MultipleCacheEntriesEviction) { @@ -70,23 +71,27 @@ TEST_F(RawAsyncClientCacheTest, MultipleCacheEntriesEviction) { RawAsyncClientSharedPtr foo_client = std::make_shared(); for (int i = 1; i <= 50; i++) { grpc_service.mutable_envoy_grpc()->set_cluster_name(std::to_string(i)); - client_cache_.setCache(grpc_service, foo_client); + GrpcServiceConfigWithHashKey config_with_hash_key(grpc_service); + client_cache_.setCache(config_with_hash_key, foo_client); } waitForSeconds(20); for (int i = 51; i <= 100; i++) { grpc_service.mutable_envoy_grpc()->set_cluster_name(std::to_string(i)); - client_cache_.setCache(grpc_service, foo_client); + GrpcServiceConfigWithHashKey config_with_hash_key(grpc_service); + client_cache_.setCache(config_with_hash_key, foo_client); } waitForSeconds(30); // Cache entries created 50s before have expired. for (int i = 1; i <= 50; i++) { grpc_service.mutable_envoy_grpc()->set_cluster_name(std::to_string(i)); - EXPECT_EQ(client_cache_.getCache(grpc_service).get(), nullptr); + GrpcServiceConfigWithHashKey config_with_hash_key(grpc_service); + EXPECT_EQ(client_cache_.getCache(config_with_hash_key).get(), nullptr); } // Cache entries 30s before haven't expired. for (int i = 51; i <= 100; i++) { grpc_service.mutable_envoy_grpc()->set_cluster_name(std::to_string(i)); - EXPECT_EQ(client_cache_.getCache(grpc_service).get(), foo_client.get()); + GrpcServiceConfigWithHashKey config_with_hash_key(grpc_service); + EXPECT_EQ(client_cache_.getCache(config_with_hash_key).get(), foo_client.get()); } } @@ -96,14 +101,15 @@ TEST_F(RawAsyncClientCacheTest, GetExpiredButNotEvictedCacheEntry) { envoy::config::core::v3::GrpcService foo_service; foo_service.mutable_envoy_grpc()->set_cluster_name("foo"); RawAsyncClientSharedPtr foo_client = std::make_shared(); - client_cache_.setCache(foo_service, foo_client); + GrpcServiceConfigWithHashKey config_with_hash_key(foo_service); + client_cache_.setCache(config_with_hash_key, foo_client); time_system_.advanceTimeAsyncImpl(std::chrono::seconds(50)); // Cache entry hasn't been evicted because it is accessed before timer fire. - EXPECT_EQ(client_cache_.getCache(foo_service).get(), foo_client.get()); + EXPECT_EQ(client_cache_.getCache(config_with_hash_key).get(), foo_client.get()); time_system_.advanceTimeAndRun(std::chrono::seconds(50), *dispatcher_, Event::Dispatcher::RunType::NonBlock); // Cache entry has been evicted because it is accessed after timer fire. - EXPECT_EQ(client_cache_.getCache(foo_service).get(), nullptr); + EXPECT_EQ(client_cache_.getCache(config_with_hash_key).get(), nullptr); } class RawAsyncClientCacheTestBusyLoop : public testing::Test { @@ -179,6 +185,25 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) { async_client_manager_.factoryForGrpcService(grpc_service, scope_, false); } +TEST_F(AsyncClientManagerImplTest, GrpcServiceConfigWithHashKeyTest) { + envoy::config::core::v3::GrpcService grpc_service; + grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + envoy::config::core::v3::GrpcService grpc_service_c; + grpc_service.mutable_envoy_grpc()->set_cluster_name("bar"); + + GrpcServiceConfigWithHashKey config_with_hash_key_a = GrpcServiceConfigWithHashKey(grpc_service); + GrpcServiceConfigWithHashKey config_with_hash_key_b = GrpcServiceConfigWithHashKey(grpc_service); + GrpcServiceConfigWithHashKey config_with_hash_key_c = + GrpcServiceConfigWithHashKey(grpc_service_c); + EXPECT_TRUE(config_with_hash_key_a == config_with_hash_key_b); + EXPECT_FALSE(config_with_hash_key_a == config_with_hash_key_c); + + EXPECT_EQ(config_with_hash_key_a.getPreComputedHash(), + config_with_hash_key_b.getPreComputedHash()); + EXPECT_NE(config_with_hash_key_a.getPreComputedHash(), + config_with_hash_key_c.getPreComputedHash()); +} + TEST_F(AsyncClientManagerImplTest, RawAsyncClientCache) { envoy::config::core::v3::GrpcService grpc_service; grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); diff --git a/test/extensions/common/tap/BUILD b/test/extensions/common/tap/BUILD index f65ad733eaf5..dea02d4f722a 100644 --- a/test/extensions/common/tap/BUILD +++ b/test/extensions/common/tap/BUILD @@ -26,9 +26,12 @@ envoy_cc_test( srcs = envoy_select_admin_functionality(["admin_test.cc"]), deps = [ "//source/extensions/common/tap:admin", + "//source/extensions/common/tap:tap_config_base", "//test/mocks/server:admin_mocks", "//test/mocks/server:admin_stream_mocks", + "//test/mocks/server:server_mocks", "//test/test_common:logging_lib", + "//test/test_common:registry_lib", "@envoy_api//envoy/config/tap/v3:pkg_cc_proto", ], ) diff --git a/test/extensions/common/tap/admin_test.cc b/test/extensions/common/tap/admin_test.cc index 2a9807893971..f25c56a4340b 100644 --- a/test/extensions/common/tap/admin_test.cc +++ b/test/extensions/common/tap/admin_test.cc @@ -5,10 +5,13 @@ #include "source/extensions/common/tap/admin.h" #include "source/extensions/common/tap/tap.h" +#include "source/extensions/common/tap/tap_config_base.h" #include "test/mocks/server/admin.h" #include "test/mocks/server/admin_stream.h" +#include "test/mocks/server/mocks.h" #include "test/test_common/logging.h" +#include "test/test_common/registry.h" #include "gtest/gtest.h" @@ -20,6 +23,7 @@ using ::testing::_; using ::testing::AtLeast; using ::testing::Between; using ::testing::DoAll; +using ::testing::Invoke; using ::testing::Return; using ::testing::ReturnRef; using ::testing::SaveArg; @@ -146,6 +150,93 @@ config_id: test_config_id StrictMock sink_; }; +using Extensions::Common::Tap::TapSinkFactory; +class MockTapSinkFactory : public TapSinkFactory { +public: + MockTapSinkFactory() {} + ~MockTapSinkFactory() override{}; + + MOCK_METHOD(SinkPtr, createSinkPtr, (const Protobuf::Message& config, SinkContext), (override)); + + MOCK_METHOD(std::string, name, (), (const, override)); + MOCK_METHOD(ProtobufTypes::MessagePtr, createEmptyConfigProto, (), (override)); +}; + +class TestConfigImpl : public TapConfigBaseImpl { +public: + TestConfigImpl(const envoy::config::tap::v3::TapConfig& proto_config, + Extensions::Common::Tap::Sink* admin_streamer, SinkContext context) + : TapConfigBaseImpl(std::move(proto_config), admin_streamer, context) {} +}; + +TEST(TypedExtensionConfigTest, AddTestConfigHttpContext) { + + const std::string tap_config_yaml = + R"EOF( + match: + any_match: true + output_config: + sinks: + - format: PROTO_BINARY + custom_sink: + name: custom_sink + typed_config: + "@type": type.googleapis.cm/google.protobuf.StringValue +)EOF"; + envoy::config::tap::v3::TapConfig tap_config; + TestUtility::loadFromYaml(tap_config_yaml, tap_config); + + MockTapSinkFactory factory_impl; + EXPECT_CALL(factory_impl, name).Times(AtLeast(1)); + EXPECT_CALL(factory_impl, createEmptyConfigProto) + .WillRepeatedly(Invoke([]() -> ProtobufTypes::MessagePtr { + return std::make_unique(); + })); + EXPECT_CALL( + factory_impl, + createSinkPtr( + _, + testing::VariantWith>(_))); + Registry::InjectFactory factory(factory_impl); + + NiceMock factory_context; + TestConfigImpl(tap_config, NULL, factory_context); +} + +TEST(TypedExtensionConfigTest, AddTestConfigTransportSocketContext) { + + const std::string tap_config_yaml = + R"EOF( + match: + any_match: true + output_config: + sinks: + - format: PROTO_BINARY + custom_sink: + name: custom_sink + typed_config: + "@type": type.googleapis.cm/google.protobuf.StringValue +)EOF"; + envoy::config::tap::v3::TapConfig tap_config; + TestUtility::loadFromYaml(tap_config_yaml, tap_config); + + MockTapSinkFactory factory_impl; + EXPECT_CALL(factory_impl, name).Times(AtLeast(1)); + EXPECT_CALL(factory_impl, createEmptyConfigProto) + .WillRepeatedly(Invoke([]() -> ProtobufTypes::MessagePtr { + return std::make_unique(); + })); + EXPECT_CALL( + factory_impl, + createSinkPtr( + _, testing::VariantWith< + std::reference_wrapper>(_))); + Registry::InjectFactory factory(factory_impl); + + NiceMock factory_context; + TestConfigImpl(tap_config, NULL, factory_context); +} + // Make sure warn if using a pipe address for the admin handler. TEST_F(AdminHandlerTest, AdminWithPipeSocket) { EXPECT_LOG_CONTAINS( diff --git a/test/extensions/filters/http/ext_authz/config_test.cc b/test/extensions/filters/http/ext_authz/config_test.cc index 6995304446fb..2eddc7f01cec 100644 --- a/test/extensions/filters/http/ext_authz/config_test.cc +++ b/test/extensions/filters/http/ext_authz/config_test.cc @@ -63,12 +63,14 @@ class ExtAuthzFilterTest : public Event::TestUsingSimulatedTime, const envoy::extensions::filters::http::ext_authz::v3::ExtAuthz& ext_authz_config) { // Delegate call to mock async client manager to real async client manager. ON_CALL(context_, getServerFactoryContext()).WillByDefault(testing::ReturnRef(server_context_)); - ON_CALL(context_.cluster_manager_.async_client_manager_, getOrCreateRawAsyncClient(_, _, _)) - .WillByDefault(Invoke([&](const envoy::config::core::v3::GrpcService& config, - Stats::Scope& scope, bool skip_cluster_check) { - return async_client_manager_->getOrCreateRawAsyncClient(config, scope, - skip_cluster_check); - })); + ON_CALL(context_.cluster_manager_.async_client_manager_, + getOrCreateRawAsyncClientWithHashKey(_, _, _)) + .WillByDefault( + Invoke([&](const Envoy::Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, + Stats::Scope& scope, bool skip_cluster_check) { + return async_client_manager_->getOrCreateRawAsyncClientWithHashKey( + config_with_hash_key, scope, skip_cluster_check); + })); ExtAuthzFilterConfig factory; return factory.createFilterFactoryFromProto(ext_authz_config, "stats", context_); } @@ -204,8 +206,11 @@ class ExtAuthzFilterGrpcTest : public ExtAuthzFilterTest { void expectGrpcClientSentRequest( const envoy::extensions::filters::http::ext_authz::v3::ExtAuthz& ext_authz_config, int requests_sent_per_thread) { - Grpc::RawAsyncClientSharedPtr async_client = async_client_manager_->getOrCreateRawAsyncClient( - ext_authz_config.grpc_service(), context_.scope(), false); + Envoy::Grpc::GrpcServiceConfigWithHashKey config_with_hash_key = + Envoy::Grpc::GrpcServiceConfigWithHashKey(ext_authz_config.grpc_service()); + Grpc::RawAsyncClientSharedPtr async_client = + async_client_manager_->getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, + context_.scope(), false); Grpc::MockAsyncClient* mock_async_client = dynamic_cast(async_client.get()); EXPECT_NE(mock_async_client, nullptr); diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index 4f6af31bc0f7..248d541d46cc 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -30,6 +30,12 @@ envoy_extension_cc_test( name = "filter_test", size = "small", srcs = ["filter_test.cc"], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), extension_names = ["envoy.filters.http.ext_proc"], deps = [ ":mock_server_lib", @@ -117,12 +123,19 @@ envoy_extension_cc_test( name = "ext_proc_integration_test", size = "large", # This test can take a while under tsan. srcs = ["ext_proc_integration_test.cc"], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), extension_names = ["envoy.filters.http.ext_proc"], shard_count = 2, tags = [ "cpu:3", ], deps = [ + ":logging_test_filter_lib", ":utils_lib", "//source/extensions/filters/http/ext_proc:config", "//test/common/http:common_lib", @@ -138,6 +151,12 @@ envoy_extension_cc_test( name = "streaming_integration_test", size = "large", srcs = ["streaming_integration_test.cc"], + copts = select({ + "//bazel:windows_x86_64": [], + "//conditions:default": [ + "-DUSE_CEL_PARSER", + ], + }), extension_names = ["envoy.filters.http.ext_proc"], tags = [ "cpu:3", @@ -272,3 +291,27 @@ envoy_extension_cc_test( "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", ], ) + +envoy_proto_library( + name = "logging_test_filter_proto", + srcs = ["logging_test_filter.proto"], +) + +envoy_extension_cc_test_library( + name = "logging_test_filter_lib", + srcs = [ + "logging_test_filter.cc", + ], + extension_names = ["envoy.filters.http.ext_proc"], + deps = [ + ":logging_test_filter_proto_cc_proto", + "//envoy/http:filter_interface", + "//envoy/registry", + "//envoy/server:filter_config_interface", + "//source/common/router:string_accessor_lib", + "//source/extensions/filters/http/common:factory_base_lib", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "//source/extensions/filters/http/ext_proc", + "//test/test_common:utility_lib", + ], +) diff --git a/test/extensions/filters/http/ext_proc/client_test.cc b/test/extensions/filters/http/ext_proc/client_test.cc index 85f20b837942..63bd95f2b1aa 100644 --- a/test/extensions/filters/http/ext_proc/client_test.cc +++ b/test/extensions/filters/http/ext_proc/client_test.cc @@ -30,8 +30,9 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback protected: void SetUp() override { grpc_service_.mutable_envoy_grpc()->set_cluster_name("test"); + config_with_hash_key_.setConfig(grpc_service_); - EXPECT_CALL(client_manager_, getOrCreateRawAsyncClient(_, _, _)) + EXPECT_CALL(client_manager_, getOrCreateRawAsyncClientWithHashKey(_, _, _)) .WillOnce(Invoke(this, &ExtProcStreamTest::doFactory)); client_ = @@ -60,12 +61,14 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback void onGrpcError(Grpc::Status::GrpcStatus status) override { grpc_status_ = status; } void onGrpcClose() override { grpc_closed_ = true; } + void logGrpcStreamInfo() override {} std::unique_ptr last_response_; Grpc::Status::GrpcStatus grpc_status_ = Grpc::Status::WellKnownGrpcStatus::Ok; bool grpc_closed_ = false; envoy::config::core::v3::GrpcService grpc_service_; + Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; ExternalProcessorClientPtr client_; Grpc::MockAsyncClientManager client_manager_; Grpc::MockAsyncStream stream_; @@ -76,14 +79,14 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback }; TEST_F(ExtProcStreamTest, OpenCloseStream) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); EXPECT_CALL(stream_, closeStream()); EXPECT_CALL(stream_, resetStream()); stream->close(); } TEST_F(ExtProcStreamTest, SendToStream) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); // Send something and ensure that we get it. Doesn't really matter what. EXPECT_CALL(stream_, sendMessageRaw_(_, false)); ProcessingRequest req; @@ -94,14 +97,14 @@ TEST_F(ExtProcStreamTest, SendToStream) { } TEST_F(ExtProcStreamTest, SendAndClose) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); EXPECT_CALL(stream_, sendMessageRaw_(_, true)); ProcessingRequest req; stream->send(std::move(req), true); } TEST_F(ExtProcStreamTest, ReceiveFromStream) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); // Send something and ensure that we get it. Doesn't really matter what. ProcessingResponse resp; @@ -131,7 +134,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) { } TEST_F(ExtProcStreamTest, StreamClosed) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); EXPECT_FALSE(last_response_); EXPECT_FALSE(grpc_closed_); @@ -144,7 +147,7 @@ TEST_F(ExtProcStreamTest, StreamClosed) { } TEST_F(ExtProcStreamTest, StreamError) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); EXPECT_FALSE(last_response_); EXPECT_FALSE(grpc_closed_); diff --git a/test/extensions/filters/http/ext_proc/config_test.cc b/test/extensions/filters/http/ext_proc/config_test.cc index 914e454ebd5b..86ad2ad521bd 100644 --- a/test/extensions/filters/http/ext_proc/config_test.cc +++ b/test/extensions/filters/http/ext_proc/config_test.cc @@ -21,8 +21,12 @@ TEST(HttpExtProcConfigTest, CorrectConfig) { target_uri: ext_proc_server stat_prefix: google failure_mode_allow: true - request_attributes: 'Foo, Bar, Baz' - response_attributes: More + request_attributes: + - 'Foo' + - 'Bar' + - 'Baz' + response_attributes: + - 'More' processing_mode: request_header_mode: send response_header_mode: skip @@ -32,6 +36,15 @@ TEST(HttpExtProcConfigTest, CorrectConfig) { response_trailer_mode: send filter_metadata: hello: "world" + metadata_options: + forwarding_namespaces: + typed: + - ns1 + untyped: + - ns2 + receiving_namespaces: + untyped: + - ns2 )EOF"; ExternalProcessingFilterConfig factory; @@ -53,8 +66,12 @@ TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { target_uri: ext_proc_server stat_prefix: google failure_mode_allow: true - request_attributes: 'Foo, Bar, Baz' - response_attributes: More + request_attributes: + - 'Foo' + - 'Bar' + - 'Baz' + response_attributes: + - 'More' processing_mode: request_header_mode: send response_header_mode: skip @@ -64,6 +81,15 @@ TEST(HttpExtProcConfigTest, CorrectConfigServerContext) { response_trailer_mode: send filter_metadata: hello: "world" + metadata_options: + forwarding_namespaces: + typed: + - ns1 + untyped: + - ns2 + receiving_namespaces: + untyped: + - ns2 )EOF"; ExternalProcessingFilterConfig factory; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 762606cca9a7..6f848d195d0a 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -7,6 +7,8 @@ #include "source/extensions/filters/http/ext_proc/config.h" #include "test/common/http/common.h" +#include "test/extensions/filters/http/ext_proc/logging_test_filter.pb.h" +#include "test/extensions/filters/http/ext_proc/logging_test_filter.pb.validate.h" #include "test/extensions/filters/http/ext_proc/utils.h" #include "test/integration/http_integration.h" #include "test/test_common/test_runtime.h" @@ -42,6 +44,11 @@ using Http::LowerCaseString; using namespace std::chrono_literals; +struct ConfigOptions { + bool valid_grpc_server = true; + bool add_logging_filter = false; +}; + // These tests exercise the ext_proc filter through Envoy's integration test // environment by configuring an instance of the Envoy server and driving it // through the mock network stack. @@ -67,14 +74,14 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, cleanupUpstreamAndDownstream(); } - void initializeConfig(bool valid_grpc_server = true) { + void initializeConfig(ConfigOptions config_option = {}) { scoped_runtime_.mergeValues( {{"envoy.reloadable_features.send_header_raw_value", header_raw_value_}}); scoped_runtime_.mergeValues( - {{"envoy_reloadable_features_immediate_response_use_filter_mutation_rule", + {{"envoy.reloadable_features.immediate_response_use_filter_mutation_rule", filter_mutation_rule_}}); - config_helper_.addConfigModifier([this, valid_grpc_server]( + config_helper_.addConfigModifier([this, config_option]( envoy::config::bootstrap::v3::Bootstrap& bootstrap) { // Ensure "HTTP2 with no prior knowledge." Necessary for gRPC and for headers ConfigHelper::setHttp2( @@ -89,24 +96,39 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, server_cluster->mutable_load_assignment()->set_cluster_name(cluster_name); } - if (valid_grpc_server) { + const std::string valid_grpc_cluster_name = "ext_proc_server_0"; + if (config_option.valid_grpc_server) { // Load configuration of the server from YAML and use a helper to add a grpc_service // stanza pointing to the cluster that we just made - setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_server_0", + setGrpcService(*proto_config_.mutable_grpc_service(), valid_grpc_cluster_name, grpc_upstreams_[0]->localAddress()); } else { // Set up the gRPC service with wrong cluster name and address. setGrpcService(*proto_config_.mutable_grpc_service(), "ext_proc_wrong_server", std::make_shared("127.0.0.1", 1234)); } - // Construct a configuration proto for our filter and then re-write it // to JSON so that we can add it to the overall config envoy::config::listener::v3::Filter ext_proc_filter; - ext_proc_filter.set_name("envoy.filters.http.ext_proc"); + std::string ext_proc_filter_name = "envoy.filters.http.ext_proc"; + ext_proc_filter.set_name(ext_proc_filter_name); ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_); config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter)); + // Add logging test filter only in Envoy gRPC mode. + // gRPC side stream logging is only supported in Envoy gRPC mode at the moment. + if (clientType() == Grpc::ClientType::EnvoyGrpc && config_option.add_logging_filter && + config_option.valid_grpc_server) { + test::integration::filters::LoggingTestFilterConfig logging_filter_config; + logging_filter_config.set_logging_id(ext_proc_filter_name); + logging_filter_config.set_upstream_cluster_name(valid_grpc_cluster_name); + envoy::config::listener::v3::Filter logging_filter; + logging_filter.set_name("logging-test-filter"); + logging_filter.mutable_typed_config()->PackFrom(logging_filter_config); + + config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(logging_filter)); + } + // Parameterize with defer processing to prevent bit rot as filter made // assumptions of data flow, prior relying on eager processing. config_helper_.addRuntimeOverride(Runtime::defer_processing_backedup_streams, @@ -440,6 +462,23 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithLogging) { + ConfigOptions config_option = {}; + config_option.add_logging_filter = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(*grpc_upstreams_[0], request_headers_msg); + // Just close the stream without doing anything + processor_stream_->startGrpcStream(); + processor_stream_->finishGrpcStream(Grpc::Status::Ok); + + handleUpstreamRequest(); + verifyDownstreamResponse(*response, 200); +} + // Test the filter using the default configuration by connecting to // an ext_proc server that responds to the request_headers message // by returning a failure before the first stream response can be sent. @@ -455,9 +494,25 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStream) { verifyDownstreamResponse(*response, 500); } +TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithLogging) { + ConfigOptions config_option = {}; + config_option.add_logging_filter = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(*grpc_upstreams_[0], request_headers_msg); + // Fail the stream immediately + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "500"}}, true); + verifyDownstreamResponse(*response, 500); +} + // Test the filter connecting to an invalid ext_proc server that will result in open stream failure. TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithInvalidSever) { - initializeConfig(/*valid_grpc_server=*/false); + ConfigOptions config_option = {}; + config_option.valid_grpc_server = false; + initializeConfig(config_option); HttpIntegrationTest::initialize(); auto response = sendDownstreamRequest(absl::nullopt); ProcessingRequest request_headers_msg; @@ -601,6 +656,42 @@ TEST_P(ExtProcIntegrationTest, GetAndSetHeaders) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, GetAndSetHeadersWithLogging) { + ConfigOptions config_option = {}; + config_option.add_logging_filter = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("x-remove-this"), "yes"); }); + + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) { + auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation(); + auto* mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_value("new"); + return true; + }); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + processResponseHeadersMessage( + *grpc_upstreams_[0], false, [](const HttpHeaders& headers, HeadersResponse&) { + Http::TestRequestHeaderMapImpl expected_response_headers{{":status", "200"}}; + EXPECT_THAT(headers.headers(), HeaderProtosEqual(expected_response_headers)); + return true; + }); + + verifyDownstreamResponse(*response, 200); +} + TEST_P(ExtProcIntegrationTest, GetAndSetHeadersNonUtf8WithValueInString) { initializeConfig(); HttpIntegrationTest::initialize(); @@ -1204,6 +1295,31 @@ TEST_P(ExtProcIntegrationTest, GetAndRespondImmediately) { EXPECT_EQ("{\"reason\": \"Not authorized\"}", response->body()); } +TEST_P(ExtProcIntegrationTest, GetAndRespondImmediatelyWithLogging) { + ConfigOptions config_option = {}; + config_option.add_logging_filter = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processAndRespondImmediately(*grpc_upstreams_[0], true, [](ImmediateResponse& immediate) { + immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::Unauthorized); + immediate.set_body("{\"reason\": \"Not authorized\"}"); + immediate.set_details("Failed because you are not authorized"); + auto* hdr1 = immediate.mutable_headers()->add_set_headers(); + hdr1->mutable_header()->set_key("x-failure-reason"); + hdr1->mutable_header()->set_value("testing"); + auto* hdr2 = immediate.mutable_headers()->add_set_headers(); + hdr2->mutable_header()->set_key("content-type"); + hdr2->mutable_header()->set_value("application/json"); + }); + + verifyDownstreamResponse(*response, 401); + EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-failure-reason", "testing")); + EXPECT_THAT(response->headers(), SingleHeaderValueIs("content-type", "application/json")); + EXPECT_EQ("{\"reason\": \"Not authorized\"}", response->body()); +} + TEST_P(ExtProcIntegrationTest, GetAndRespondImmediatelyWithInvalidCharacter) { initializeConfig(); HttpIntegrationTest::initialize(); @@ -1422,6 +1538,51 @@ TEST_P(ExtProcIntegrationTest, GetAndRespondImmediatelyWithEnvoyHeaderMutation) EXPECT_THAT(response->headers(), HasNoHeader("x-envoy-foo")); } +TEST_P(ExtProcIntegrationTest, GetAndImmediateRespondMutationAllowEnvoy) { + filter_mutation_rule_ = "true"; + proto_config_.mutable_mutation_rules()->mutable_allow_envoy()->set_value(true); + proto_config_.mutable_mutation_rules()->mutable_allow_all_routing()->set_value(true); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + processAndRespondImmediately(*grpc_upstreams_[0], true, [](ImmediateResponse& immediate) { + immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::Unauthorized); + auto* hdr = immediate.mutable_headers()->add_set_headers(); + hdr->mutable_header()->set_key("x-envoy-foo"); + hdr->mutable_header()->set_value("bar"); + auto* hdr1 = immediate.mutable_headers()->add_set_headers(); + hdr1->mutable_header()->set_key("host"); + hdr1->mutable_header()->set_value("test"); + }); + + verifyDownstreamResponse(*response, 401); + EXPECT_THAT(response->headers(), SingleHeaderValueIs("host", "test")); + EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-envoy-foo", "bar")); +} + +// Test the filter using an ext_proc server that responds to the request_header message +// by sending back an immediate_response message with x-envoy header mutation. +// The deprecated default checker allows x-envoy headers to be mutated and should +// override config-level checkers if the runtime guard is disabled. +TEST_P(ExtProcIntegrationTest, GetAndRespondImmediatelyWithDefaultHeaderMutationChecker) { + // this is default, but setting explicitly for test clarity + filter_mutation_rule_ = "false"; + proto_config_.mutable_mutation_rules()->mutable_allow_envoy()->set_value(false); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + processAndRespondImmediately(*grpc_upstreams_[0], true, [](ImmediateResponse& immediate) { + immediate.mutable_status()->set_code(envoy::type::v3::StatusCode::Unauthorized); + auto* hdr = immediate.mutable_headers()->add_set_headers(); + // Adding x-envoy header is allowed since default overrides config. + hdr->mutable_header()->set_key("x-envoy-foo"); + hdr->mutable_header()->set_value("bar"); + }); + verifyDownstreamResponse(*response, 401); + EXPECT_FALSE(response->headers().get(LowerCaseString("x-envoy-foo")).empty()); +} + // Test the filter with request body buffering enabled using // an ext_proc server that responds to the request_body message // by modifying a header that should cause an error. @@ -1572,6 +1733,25 @@ TEST_P(ExtProcIntegrationTest, RequestMessageTimeout) { verifyDownstreamResponse(*response, 500); } +TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithLogging) { + // ensure 200 ms timeout + proto_config_.mutable_message_timeout()->set_nanos(200000000); + ConfigOptions config_option = {}; + config_option.add_logging_filter = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + processRequestHeadersMessage(*grpc_upstreams_[0], true, + [this](const HttpHeaders&, HeadersResponse&) { + // Travel forward 400 ms + timeSystem().advanceTimeWaitImpl(400ms); + return false; + }); + + // We should immediately have an error response now + verifyDownstreamResponse(*response, 500); +} + // Same as the previous test but on the response path, since there are separate // timers for each. TEST_P(ExtProcIntegrationTest, ResponseMessageTimeout) { @@ -1593,6 +1773,27 @@ TEST_P(ExtProcIntegrationTest, ResponseMessageTimeout) { verifyDownstreamResponse(*response, 500); } +TEST_P(ExtProcIntegrationTest, ResponseMessageTimeoutWithLogging) { + // ensure 200 ms timeout + proto_config_.mutable_message_timeout()->set_nanos(200000000); + ConfigOptions config_option = {}; + config_option.add_logging_filter = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt); + handleUpstreamRequest(); + processResponseHeadersMessage(*grpc_upstreams_[0], false, + [this](const HttpHeaders&, HeadersResponse&) { + // Travel forward 400 ms + timeSystem().advanceTimeWaitImpl(400ms); + return false; + }); + + // We should immediately have an error response now + verifyDownstreamResponse(*response, 500); +} + // Send a request, wait longer than the "message timeout" before sending a response // from the external processor, but nothing should happen because we are ignoring // the timeout. @@ -1993,6 +2194,18 @@ TEST_P(ExtProcIntegrationTest, PerRouteGrpcService) { setGrpcService(*per_route.mutable_overrides()->mutable_grpc_service(), "ext_proc_server_1", grpc_upstreams_[1]->localAddress()); setPerRouteConfig(route, per_route); + + // Add logging test filter here in place since it has a different GrpcService from route. + if (clientType() == Grpc::ClientType::EnvoyGrpc) { + test::integration::filters::LoggingTestFilterConfig logging_filter_config; + logging_filter_config.set_logging_id("envoy.filters.http.ext_proc"); + logging_filter_config.set_upstream_cluster_name("ext_proc_server_1"); + envoy::config::listener::v3::Filter logging_filter; + logging_filter.set_name("logging-test-filter"); + logging_filter.mutable_typed_config()->PackFrom(logging_filter_config); + + config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(logging_filter)); + } }); HttpIntegrationTest::initialize(); @@ -2682,4 +2895,132 @@ TEST_P(ExtProcIntegrationTest, HeaderMutationResultSizeFailWithResponseTrailer) EXPECT_FALSE(response->complete()); } +// Test the case that client doesn't send trailer and the ext_proc filter config +// processing mode is set to send trailer. +TEST_P(ExtProcIntegrationTest, ClientNoTrailerProcessingModeSendTrailer) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED); + proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(); + HttpIntegrationTest::initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + // Client send data with end_stream set to true. + codec_client_->sendData(*request_encoder_, 10, true); + IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second); + processRequestHeadersMessage(*grpc_upstreams_[0], true, + [](const HttpHeaders& headers, HeadersResponse&) { + EXPECT_FALSE(headers.end_of_stream()); + return true; + }); + processRequestBodyMessage(*grpc_upstreams_[0], false, [](const HttpBody& body, BodyResponse&) { + EXPECT_TRUE(body.end_of_stream()); + return true; + }); + + handleUpstreamRequest(); + verifyDownstreamResponse(*response, 200); +} + +// Test when request trailer is received, it sends out the buffered body to ext_proc server. +TEST_P(ExtProcIntegrationTest, SkipHeaderTrailerSendBodyClientSendAll) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP); + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + codec_client_->sendData(*request_encoder_, 10, false); + IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second); + Http::TestRequestTrailerMapImpl request_trailers{{"x-trailer-foo", "yes"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + processRequestBodyMessage(*grpc_upstreams_[0], true, [](const HttpBody& body, BodyResponse&) { + EXPECT_FALSE(body.end_of_stream()); + return true; + }); + handleUpstreamRequest(); + verifyDownstreamResponse(*response, 200); +} + +TEST_P(ExtProcIntegrationTest, SendBodyBufferedPartialWithTrailer) { + proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED_PARTIAL); + proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + auto encoder_decoder = codec_client_->startRequest(headers); + request_encoder_ = &encoder_decoder.first; + IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second); + + // Send some data. + codec_client_->sendData(*request_encoder_, 10, false); + Http::TestRequestTrailerMapImpl request_trailers{{"request", "trailer"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + + processRequestBodyMessage(*grpc_upstreams_[0], true, absl::nullopt); + processRequestTrailersMessage(*grpc_upstreams_[0], false, absl::nullopt); + + handleUpstreamRequest(); + verifyDownstreamResponse(*response, 200); +} + +#if defined(USE_CEL_PARSER) +// Test the filter using the default configuration by connecting to +// an ext_proc server that responds to the request_headers message +// by requesting to modify the request headers. +TEST_P(ExtProcIntegrationTest, GetAndSetRequestResponseAttributes) { + proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND); + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SEND); + proto_config_.mutable_request_attributes()->Add("request.path"); + proto_config_.mutable_request_attributes()->Add("request.method"); + proto_config_.mutable_request_attributes()->Add("request.scheme"); + proto_config_.mutable_request_attributes()->Add("connection.mtls"); + proto_config_.mutable_response_attributes()->Add("response.code"); + proto_config_.mutable_response_attributes()->Add("response.code_details"); + + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders& req, HeadersResponse&) { + EXPECT_EQ(req.attributes().size(), 1); + auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc"); + EXPECT_EQ(proto_struct.fields().at("request.path").string_value(), "/"); + EXPECT_EQ(proto_struct.fields().at("request.method").string_value(), "GET"); + EXPECT_EQ(proto_struct.fields().at("request.scheme").string_value(), "http"); + EXPECT_EQ(proto_struct.fields().at("connection.mtls").bool_value(), false); + return true; + }); + + handleUpstreamRequest(); + + processResponseHeadersMessage( + *grpc_upstreams_[0], false, [](const HttpHeaders& req, HeadersResponse&) { + EXPECT_EQ(req.attributes().size(), 1); + auto proto_struct = req.attributes().at("envoy.filters.http.ext_proc"); + EXPECT_EQ(proto_struct.fields().at("response.code").string_value(), "200"); + EXPECT_EQ(proto_struct.fields().at("response.code_details").string_value(), "via_upstream"); + return true; + }); + + verifyDownstreamResponse(*response, 200); +} +#endif + } // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 72db2ad264b2..b1818a91b7ff 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -85,16 +85,16 @@ class HttpFilterTest : public testing::Test { EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); EXPECT_CALL(decoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_)); + EXPECT_CALL(encoder_callbacks_, streamInfo()).WillRepeatedly(ReturnRef(stream_info_)); + EXPECT_CALL(stream_info_, dynamicMetadata()).WillRepeatedly(ReturnRef(dynamic_metadata_)); + EXPECT_CALL(stream_info_, setDynamicMetadata(_, _)) + .Times(AnyNumber()) + .WillRepeatedly(Invoke(this, &HttpFilterTest::doSetDynamicMetadata)); - EXPECT_CALL(async_client_stream_info_, bytesSent()).WillRepeatedly(Return(100)); - EXPECT_CALL(async_client_stream_info_, bytesReceived()).WillRepeatedly(Return(200)); - EXPECT_CALL(async_client_stream_info_, upstreamClusterInfo()); - EXPECT_CALL(testing::Const(async_client_stream_info_), upstreamInfo()); - // Get pointer to MockUpstreamInfo. - std::shared_ptr mock_upstream_info = - std::dynamic_pointer_cast( - async_client_stream_info_.upstreamInfo()); - EXPECT_CALL(testing::Const(*mock_upstream_info), upstreamHost()); + EXPECT_CALL(decoder_callbacks_, connection()) + .WillRepeatedly(Return(OptRef{connection_})); + EXPECT_CALL(encoder_callbacks_, connection()) + .WillRepeatedly(Return(OptRef{connection_})); // Pointing dispatcher_.time_system_ to a SimulatedTimeSystem object. test_time_ = new Envoy::Event::SimulatedTimeSystem(); @@ -119,7 +119,10 @@ class HttpFilterTest : public testing::Test { if (!yaml.empty()) { TestUtility::loadFromYaml(yaml, proto_config); } - config_.reset(new FilterConfig(proto_config, 200ms, 10000, *stats_store_.rootScope(), "")); + config_ = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr))); filter_ = std::make_unique(config_, std::move(client_), proto_config.grpc_service()); filter_->setEncoderFilterCallbacks(encoder_callbacks_); EXPECT_CALL(encoder_callbacks_, encoderBufferLimit()).WillRepeatedly(Return(BufferSize)); @@ -161,10 +164,13 @@ class HttpFilterTest : public testing::Test { } ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, testing::Unused) { if (final_expected_grpc_service_.has_value()) { - EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), grpc_service)); + EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), + config_with_hash_key.config())); + std::cout << final_expected_grpc_service_.value().DebugString(); + std::cout << config_with_hash_key.config().DebugString(); } stream_callbacks_ = &callbacks; @@ -180,6 +186,10 @@ class HttpFilterTest : public testing::Test { return stream; } + void doSetDynamicMetadata(const std::string& ns, const ProtobufWkt::Struct& val) { + (*dynamic_metadata_.mutable_filter_metadata())[ns] = val; + }; + void doSend(ProcessingRequest&& request, Unused) { last_request_ = std::move(request); } bool doSendClose() { return !server_closed_stream_; } @@ -458,16 +468,15 @@ class HttpFilterTest : public testing::Test { filter_config_name); const Envoy::ProtobufWkt::Struct& loggedMetadata = filterState->filterMetadata(); EXPECT_THAT(loggedMetadata, ProtoEq(expected_metadata)); - EXPECT_EQ(filterState->bytesSent(), 100); - EXPECT_EQ(filterState->bytesReceived(), 200); } absl::optional final_expected_grpc_service_; + Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; std::unique_ptr client_; ExternalProcessorCallbacks* stream_callbacks_ = nullptr; ProcessingRequest last_request_; bool server_closed_stream_ = false; - NiceMock stats_store_; + testing::NiceMock stats_store_; FilterConfigSharedPtr config_; std::shared_ptr filter_; testing::NiceMock dispatcher_; @@ -483,6 +492,8 @@ class HttpFilterTest : public testing::Test { std::vector timers_; TestScopedRuntime scoped_runtime_; Envoy::Event::SimulatedTimeSystem* test_time_; + envoy::config::core::v3::Metadata dynamic_metadata_; + testing::NiceMock connection_; }; // Using the default configuration, test the filter with a processor that @@ -701,6 +712,40 @@ TEST_F(HttpFilterTest, PostAndRespondImmediately) { expectFilterState(Envoy::ProtobufWkt::Struct()); } +TEST_F(HttpFilterTest, PostAndRespondImmediatelyWithDisabledConfig) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + disable_immediate_response: true + )EOF"); + + EXPECT_EQ(filter_->decodeHeaders(request_headers_, false), FilterHeadersStatus::StopIteration); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + std::unique_ptr resp1 = std::make_unique(); + auto* immediate_response = resp1->mutable_immediate_response(); + immediate_response->mutable_status()->set_code(envoy::type::v3::StatusCode::BadRequest); + immediate_response->set_body("Bad request"); + immediate_response->set_details("Got a bad request"); + auto* immediate_headers = immediate_response->mutable_headers(); + auto* hdr1 = immediate_headers->add_set_headers(); + hdr1->mutable_header()->set_key("content-type"); + hdr1->mutable_header()->set_value("text/plain"); + stream_callbacks_->onReceiveMessage(std::move(resp1)); + + Buffer::OwnedImpl req_data("foo"); + EXPECT_EQ(filter_->decodeData(req_data, true), FilterDataStatus::Continue); + EXPECT_EQ(filter_->decodeTrailers(request_trailers_), FilterTrailersStatus::Continue); + EXPECT_EQ(filter_->encodeHeaders(response_headers_, true), FilterHeadersStatus::Continue); + filter_->onDestroy(); + + EXPECT_EQ(config_->stats().streams_started_.value(), 1); + EXPECT_EQ(config_->stats().stream_msgs_sent_.value(), 1); + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 1); + EXPECT_EQ(config_->stats().stream_msgs_received_.value(), 0); + EXPECT_EQ(config_->stats().streams_closed_.value(), 1); +} + // Using the default configuration, test the filter with a processor that // replies to the request_headers message with an "immediate response" message // during response headers processing that should result in a response being @@ -2422,6 +2467,7 @@ TEST_F(HttpFilterTest, ProcessingModeResponseHeadersOnlyWithoutCallingDecodeHead cb(route_config); })); final_expected_grpc_service_.emplace(route_proto.overrides().grpc_service()); + config_with_hash_key_.setConfig(route_proto.overrides().grpc_service()); response_headers_.addCopy(LowerCaseString(":status"), "200"); response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); diff --git a/test/extensions/filters/http/ext_proc/logging_test_filter.cc b/test/extensions/filters/http/ext_proc/logging_test_filter.cc new file mode 100644 index 000000000000..30838fd74600 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/logging_test_filter.cc @@ -0,0 +1,69 @@ +#include + +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "source/extensions/filters/http/common/factory_base.h" +#include "source/extensions/filters/http/common/pass_through_filter.h" +#include "source/extensions/filters/http/ext_proc/ext_proc.h" + +#include "test/extensions/filters/http/ext_proc/logging_test_filter.pb.h" +#include "test/extensions/filters/http/ext_proc/logging_test_filter.pb.validate.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +// A test filter that retrieve the logging info on encodeComplete. +class LoggingTestFilter : public Http::PassThroughFilter { +public: + LoggingTestFilter(const std::string& logging_id, const std::string& cluster_name) + : logging_id_(logging_id), expected_cluster_name_(cluster_name) {} + void encodeComplete() override { + ASSERT(decoder_callbacks_ != nullptr); + const Envoy::StreamInfo::FilterStateSharedPtr& filter_state = + decoder_callbacks_->streamInfo().filterState(); + const ExtProcLoggingInfo* ext_proc_logging_info = + filter_state->getDataReadOnly(logging_id_); + if (ext_proc_logging_info != nullptr) { + EXPECT_NE(ext_proc_logging_info->bytesSent(), 0); + EXPECT_NE(ext_proc_logging_info->bytesReceived(), 0); + ASSERT_TRUE(ext_proc_logging_info->upstreamHost() != nullptr); + EXPECT_EQ(ext_proc_logging_info->upstreamHost()->cluster().name(), expected_cluster_name_); + } + } + +private: + std::string logging_id_; + std::string expected_cluster_name_; +}; + +class LoggingTestFilterFactory : public Extensions::HttpFilters::Common::FactoryBase< + test::integration::filters::LoggingTestFilterConfig> { +public: + LoggingTestFilterFactory() : FactoryBase("logging-test-filter"){}; + + Http::FilterFactoryCb createFilterFactoryFromProtoTyped( + const test::integration::filters::LoggingTestFilterConfig& proto_config, const std::string&, + Server::Configuration::FactoryContext&) override { + return [=](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared( + proto_config.logging_id(), proto_config.upstream_cluster_name())); + }; + } +}; + +// Perform static registration +static Registry::RegisterFactory + register_; + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/logging_test_filter.proto b/test/extensions/filters/http/ext_proc/logging_test_filter.proto new file mode 100644 index 000000000000..c6e5768b6860 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/logging_test_filter.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package test.integration.filters; + +message LoggingTestFilterConfig { + string logging_id = 1; + string upstream_cluster_name = 2; +} diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index f655e977a2e4..5f4dedb90173 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -14,7 +14,7 @@ class MockClient : public ExternalProcessorClient { MockClient(); ~MockClient() override; MOCK_METHOD(ExternalProcessorStreamPtr, start, - (ExternalProcessorCallbacks&, const envoy::config::core::v3::GrpcService&, + (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, const StreamInfo::StreamInfo&)); }; diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index b9587e92b026..31d2b4824391 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -70,8 +70,10 @@ class OrderingTest : public testing::Test { if (cb) { (*cb)(proto_config); } - config_.reset(new FilterConfig(proto_config, kMessageTimeout, kMaxMessageTimeoutMs, - *stats_store_.rootScope(), "")); + config_ = std::make_shared( + proto_config, kMessageTimeout, kMaxMessageTimeoutMs, *stats_store_.rootScope(), "", + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr))); filter_ = std::make_unique(config_, std::move(client_), proto_config.grpc_service()); filter_->setEncoderFilterCallbacks(encoder_callbacks_); filter_->setDecoderFilterCallbacks(decoder_callbacks_); @@ -81,7 +83,7 @@ class OrderingTest : public testing::Test { // Called by the "start" method on the stream by the filter virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService&, + const Grpc::GrpcServiceConfigWithHashKey&, const StreamInfo::StreamInfo&) { stream_callbacks_ = &callbacks; auto stream = std::make_unique(); @@ -218,7 +220,7 @@ class OrderingTest : public testing::Test { class FastFailOrderingTest : public OrderingTest { // All tests using this class have gRPC streams that will fail while being opened. ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService&, + const Grpc::GrpcServiceConfigWithHashKey&, const StreamInfo::StreamInfo&) override { callbacks.onGrpcError(Grpc::Status::Internal); // Returns nullptr on start stream failure. diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index 8ca81765cb27..5dca528be610 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -66,7 +66,9 @@ DEFINE_PROTO_FUZZER( try { config = std::make_shared( proto_config, std::chrono::milliseconds(200), 200, *stats_store.rootScope(), - "ext_proc_prefix"); + "ext_proc_prefix", + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr))); } catch (const EnvoyException& e) { ENVOY_LOG_MISC(debug, "EnvoyException during ext_proc filter config validation: {}", e.what()); return; @@ -81,7 +83,7 @@ DEFINE_PROTO_FUZZER( EXPECT_CALL(*client, start(_, _, _)) .WillRepeatedly(Invoke( [&](ExternalProcessing::ExternalProcessorCallbacks&, - const envoy::config::core::v3::GrpcService&, + const Grpc::GrpcServiceConfigWithHashKey&, const StreamInfo::StreamInfo&) -> ExternalProcessing::ExternalProcessorStreamPtr { auto stream = std::make_unique(); EXPECT_CALL(*stream, send(_, _)) diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h index cbfafb5f2473..207b820e21df 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h @@ -30,7 +30,7 @@ class MockClient : public ExternalProcessing::ExternalProcessorClient { MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start, (ExternalProcessing::ExternalProcessorCallbacks & callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const StreamInfo::StreamInfo& stream_info)); }; diff --git a/test/mocks/grpc/mocks.h b/test/mocks/grpc/mocks.h index 3cff1d2d75aa..620e9f6ce646 100644 --- a/test/mocks/grpc/mocks.h +++ b/test/mocks/grpc/mocks.h @@ -116,6 +116,10 @@ class MockAsyncClientManager : public AsyncClientManager { MOCK_METHOD(RawAsyncClientSharedPtr, getOrCreateRawAsyncClient, (const envoy::config::core::v3::GrpcService& grpc_service, Stats::Scope& scope, bool skip_cluster_check)); + + MOCK_METHOD(RawAsyncClientSharedPtr, getOrCreateRawAsyncClientWithHashKey, + (const GrpcServiceConfigWithHashKey& config_with_hash_key, Stats::Scope& scope, + bool skip_cluster_check)); }; MATCHER_P(ProtoBufferEq, expected, "") {