Skip to content

Commit

Permalink
listener: remove the peek from the listener filters (envoyproxy#17395)
Browse files Browse the repository at this point in the history
Signed-off-by: He Jie Xu <hejie.xu@intel.com>
  • Loading branch information
soulxu authored and ravenblackx committed Jun 8, 2022
1 parent f6664dd commit 3d35a28
Show file tree
Hide file tree
Showing 56 changed files with 2,211 additions and 1,215 deletions.
Expand Up @@ -49,9 +49,7 @@ This filter has a statistics tree rooted at *tls_inspector* with the following s
:header: Name, Type, Description
:widths: 1, 1, 2

connection_closed, Counter, Total connections closed
client_hello_too_large, Counter, Total unreasonably large Client Hello received
read_error, Counter, Total read errors
tls_found, Counter, Total number of times TLS was found
tls_not_found, Counter, Total number of times TLS was not found
alpn_found, Counter, Total number of times `Application-Layer Protocol Negotiation <https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation>`_ was successful
Expand Down
2 changes: 2 additions & 0 deletions docs/root/configuration/listeners/stats.rst
Expand Up @@ -26,6 +26,8 @@ with the following statistics:
downstream_pre_cx_active, Gauge, Sockets currently undergoing listener filter processing
global_cx_overflow, Counter, Total connections rejected due to enforcement of the global connection limit
no_filter_chain_match, Counter, Total connections that didn't match any filter chain
downstream_listener_filter_remote_close, Counter, Total connections closed by remote when peek data for listener filters
downstream_listener_filter_error, Counter, Total numbers of error when peek data for listener filters

.. _config_listener_stats_tls:

Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Expand Up @@ -7,6 +7,7 @@ Incompatible Behavior Changes

* sip-proxy: change API by replacing ``own_domain`` with :ref:`local_services <envoy_v3_api_msg_extensions.filters.network.sip_proxy.v3alpha.LocalService>`.
* tls: set TLS v1.2 as the default minimal version for servers. Users can still explicitly opt-in to 1.0 and 1.1 using :ref:`tls_minimum_protocol_version <envoy_v3_api_field_extensions.transport_sockets.tls.v3.TlsParameters.tls_minimum_protocol_version>`.
* tls-inspector: the listener filter tls inspector's stats ``connection_closed`` and ``read_error`` are removed. The new stats are introduced for listener, ``downstream_peek_remote_close`` and ``read_error`` :ref:`listener stats <config_listener_stats>`.

Minor Behavior Changes
----------------------
Expand Down
11 changes: 11 additions & 0 deletions envoy/buffer/buffer.h
Expand Up @@ -34,6 +34,17 @@ struct RawSlice {
bool operator!=(const RawSlice& rhs) const { return !(*this == rhs); }
};

/**
* A const raw memory data slice including the location and length.
*/
struct ConstRawSlice {
const void* mem_ = nullptr;
size_t len_ = 0;

bool operator==(const RawSlice& rhs) const { return mem_ == rhs.mem_ && len_ == rhs.len_; }
bool operator!=(const RawSlice& rhs) const { return !(*this == rhs); }
};

using RawSliceVector = absl::InlinedVector<RawSlice, 16>;

/**
Expand Down
9 changes: 9 additions & 0 deletions envoy/network/BUILD
Expand Up @@ -87,6 +87,7 @@ envoy_cc_library(
hdrs = ["filter.h"],
deps = [
":listen_socket_interface",
":listener_filter_buffer_interface",
":transport_socket_interface",
"//envoy/buffer:buffer_interface",
"//envoy/stream_info:stream_info_interface",
Expand Down Expand Up @@ -148,6 +149,14 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "listener_filter_buffer_interface",
hdrs = ["listener_filter_buffer.h"],
deps = [
"//envoy/buffer:buffer_interface",
],
)

envoy_cc_library(
name = "transport_socket_interface",
hdrs = ["transport_socket.h"],
Expand Down
15 changes: 15 additions & 0 deletions envoy/network/filter.h
Expand Up @@ -4,6 +4,7 @@

#include "envoy/buffer/buffer.h"
#include "envoy/network/listen_socket.h"
#include "envoy/network/listener_filter_buffer.h"
#include "envoy/network/transport_socket.h"
#include "envoy/stream_info/stream_info.h"
#include "envoy/upstream/host_description.h"
Expand Down Expand Up @@ -330,6 +331,20 @@ class ListenerFilter {
* @return status used by the filter manager to manage further filter iteration.
*/
virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE;

/**
* Called when data read from the connection. If the filter chain doesn't get
* enough data, the filter chain can be stopped, then waiting for more data.
* @param buffer the buffer of data.
* @return status used by the filter manager to manage further filter iteration.
*/
virtual FilterStatus onData(Network::ListenerFilterBuffer& buffer) PURE;

/**
* Return the size of data the filter want to inspect from the connection.
* @return the size of data inspect from the connection. 0 means filter needn't any data.
*/
virtual size_t maxReadBytes() const PURE;
};

using ListenerFilterPtr = std::unique_ptr<ListenerFilter>;
Expand Down
33 changes: 33 additions & 0 deletions envoy/network/listener_filter_buffer.h
@@ -0,0 +1,33 @@
#pragma once

#include <memory>

#include "envoy/buffer/buffer.h"
#include "envoy/common/pure.h"

namespace Envoy {
namespace Network {

/**
* Interface for ListenerFilterBuffer
*/
class ListenerFilterBuffer {
public:
virtual ~ListenerFilterBuffer() = default;

/**
* Return a single const raw slice to the buffer of the data.
* @return a Buffer::ConstRawSlice pointed to raw buffer.
*/
virtual const Buffer::ConstRawSlice rawSlice() const PURE;

/**
* Drain the data from the beginning of the buffer.
* @param length the length of data to drain.
* @return a bool indicate the drain is successful or not.
*/
virtual bool drain(uint64_t length) PURE;
};

} // namespace Network
} // namespace Envoy
11 changes: 11 additions & 0 deletions source/common/network/BUILD
Expand Up @@ -242,6 +242,17 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "listener_filter_buffer_lib",
srcs = ["listener_filter_buffer_impl.cc"],
hdrs = ["listener_filter_buffer_impl.h"],
deps = [
"//envoy/network:io_handle_interface",
"//envoy/network:listener_filter_buffer_interface",
"//source/common/buffer:buffer_lib",
],
)

envoy_cc_library(
name = "listener_lib",
srcs = [
Expand Down
107 changes: 107 additions & 0 deletions source/common/network/listener_filter_buffer_impl.cc
@@ -0,0 +1,107 @@
#include "source/common/network/listener_filter_buffer_impl.h"

#include <string>

namespace Envoy {
namespace Network {

ListenerFilterBufferImpl::ListenerFilterBufferImpl(IoHandle& io_handle,
Event::Dispatcher& dispatcher,
ListenerFilterBufferOnCloseCb close_cb,
ListenerFilterBufferOnDataCb on_data_cb,
uint64_t buffer_size)
: io_handle_(io_handle), dispatcher_(dispatcher), on_close_cb_(close_cb),
on_data_cb_(on_data_cb), buffer_(std::make_unique<uint8_t[]>(buffer_size)),
base_(buffer_.get()), buffer_size_(buffer_size) {
// If the buffer_size not greater than 0, it means that doesn't expect any data.
ASSERT(buffer_size > 0);

io_handle_.initializeFileEvent(
dispatcher_, [this](uint32_t events) { onFileEvent(events); },
Event::PlatformDefaultTriggerType, Event::FileReadyType::Read);
}

const Buffer::ConstRawSlice ListenerFilterBufferImpl::rawSlice() const {
Buffer::ConstRawSlice slice;
slice.mem_ = base_;
slice.len_ = data_size_;
return slice;
}

bool ListenerFilterBufferImpl::drain(uint64_t length) {
if (length == 0) {
return true;
}

ASSERT(length <= data_size_);

uint64_t read_size = 0;
while (read_size < length) {
auto result = io_handle_.recv(base_, length - read_size, 0);
ENVOY_LOG(trace, "recv returned: {}", result.return_value_);

if (!result.ok()) {
// `IoErrorCode::Again` isn't processed here, since
// the data already in the socket buffer.
return false;
}
read_size += result.return_value_;
}
base_ += length;
data_size_ -= length;
return true;
}

PeekState ListenerFilterBufferImpl::peekFromSocket() {
// Reset buffer base in case of draining changed base.
auto old_base = base_;
base_ = buffer_.get();
const auto result = io_handle_.recv(base_, buffer_size_, MSG_PEEK);
ENVOY_LOG(trace, "recv returned: {}", result.return_value_);

if (!result.ok()) {
if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) {
ENVOY_LOG(trace, "recv return try again");
base_ = old_base;
return PeekState::Again;
}
ENVOY_LOG(debug, "recv failed: {}: {}", static_cast<int>(result.err_->getErrorCode()),
result.err_->getErrorDetails());
return PeekState::Error;
}
// Remote closed
if (result.return_value_ == 0) {
ENVOY_LOG(debug, "recv failed: remote closed");
return PeekState::RemoteClose;
}
data_size_ = result.return_value_;
ASSERT(data_size_ <= buffer_size_);

return PeekState::Done;
}

void ListenerFilterBufferImpl::resetCapacity(uint64_t size) {
buffer_ = std::make_unique<uint8_t[]>(size);
base_ = buffer_.get();
buffer_size_ = size;
data_size_ = 0;
}

void ListenerFilterBufferImpl::activateFileEvent(uint32_t events) { onFileEvent(events); }

void ListenerFilterBufferImpl::onFileEvent(uint32_t events) {
ENVOY_LOG(trace, "onFileEvent: {}", events);

auto state = peekFromSocket();
if (state == PeekState::Done) {
on_data_cb_(*this);
} else if (state == PeekState::Error) {
on_close_cb_(true);
} else if (state == PeekState::RemoteClose) {
on_close_cb_(false);
}
// Did nothing for `Api::IoError::IoErrorCode::Again`
}

} // namespace Network
} // namespace Envoy
72 changes: 72 additions & 0 deletions source/common/network/listener_filter_buffer_impl.h
@@ -0,0 +1,72 @@
#pragma once

#include <functional>
#include <memory>

#include "envoy/buffer/buffer.h"
#include "envoy/network/io_handle.h"
#include "envoy/network/listener_filter_buffer.h"

#include "source/common/buffer/buffer_impl.h"

namespace Envoy {
namespace Network {

class ListenerFilterBufferImpl;
using ListenerFilterBufferOnCloseCb = std::function<void(bool)>;
using ListenerFilterBufferOnDataCb = std::function<void(ListenerFilterBufferImpl&)>;

enum class PeekState {
// Peek data status successful.
Done,
// Need to try again.
Again,
// Error to peek data.
Error,
// Connection closed by remote.
RemoteClose,
};

class ListenerFilterBufferImpl : public ListenerFilterBuffer, Logger::Loggable<Logger::Id::filter> {
public:
ListenerFilterBufferImpl(IoHandle& io_handle, Event::Dispatcher& dispatcher,
ListenerFilterBufferOnCloseCb close_cb,
ListenerFilterBufferOnDataCb on_data_cb, uint64_t buffer_size);

// ListenerFilterBuffer
const Buffer::ConstRawSlice rawSlice() const override;
bool drain(uint64_t length) override;

/**
* Trigger the data peek from the socket.
*/
PeekState peekFromSocket();

void reset() { io_handle_.resetFileEvents(); }

void activateFileEvent(uint32_t events);
uint64_t capacity() const { return buffer_size_; }
void resetCapacity(uint64_t size);

private:
void onFileEvent(uint32_t events);

IoHandle& io_handle_;
Event::Dispatcher& dispatcher_;
ListenerFilterBufferOnCloseCb on_close_cb_;
ListenerFilterBufferOnDataCb on_data_cb_;

// The buffer for the data peeked from the socket.
std::unique_ptr<uint8_t[]> buffer_;
// The start of buffer.
uint8_t* base_;
// The size of buffer;
uint64_t buffer_size_;
// The size of valid data.
uint64_t data_size_{0};
};

using ListenerFilterBufferImplPtr = std::unique_ptr<ListenerFilterBufferImpl>;

} // namespace Network
} // namespace Envoy

0 comments on commit 3d35a28

Please sign in to comment.