Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Avlasov <yavlasov@google.com>
  • Loading branch information
yanavlasov committed Jun 21, 2024
1 parent 7263c53 commit fd17760
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 33 deletions.
2 changes: 0 additions & 2 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ void AsyncStreamImpl::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_str
stream_callbacks_.onHeaders(std::move(headers), end_stream);
closeRemote(end_stream);
// At present, the AsyncStream is always fully closed when the server half closes the stream.
// This is the case even when allow_multiplexed_upstream_half_close runtime flag is set, as there
// are currently no known use cases where early server half close needs to be supported.
//
// Always ensure we close locally to trigger completion. Another option would be to issue a stream
// reset here if local isn't yet closed, triggering cleanup along a more standardized path.
Expand Down
4 changes: 1 addition & 3 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
OptRef<const Tracing::Config> tracingConfig() const override;
const ScopeTrackedObject& scope() override;
OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() override { return *this; }
bool isHalfCloseEnabled() override {
return filter_manager_.allowUpstreamHalfClose() ? true : false;
}
bool isHalfCloseEnabled() override { return filter_manager_.allowUpstreamHalfClose(); }

// DownstreamStreamFilterCallbacks
void setRoute(Router::RouteConstSharedPtr route) override;
Expand Down
20 changes: 12 additions & 8 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ FilterManager::commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_st
ENVOY_STREAM_LOG(trace,
"commonEncodePrefix end_stream: {}, isHalfCloseEnabled: {}, force_close: {}",
*this, end_stream, filter_manager_callbacks_.isHalfCloseEnabled(),
static_cast<bool>(state_.force_close_stream_));
static_cast<bool>(state_.should_force_close_stream_));
if (filter == nullptr) {
// half close is enabled in case tcp proxying is done with http1 encoder. In this case, we
// should not set the local_complete_ flag to true when end_stream is true.
Expand All @@ -915,7 +915,7 @@ FilterManager::commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_st
if (allow_upstream_half_close_) {
if (end_stream) {
state_.encoder_end_stream_ = true;
if (!filter_manager_callbacks_.isHalfCloseEnabled() || state_.force_close_stream_) {
if (!filter_manager_callbacks_.isHalfCloseEnabled() || state_.should_force_close_stream_) {
ASSERT(!state_.local_complete_);
state_.local_complete_ = true;
}
Expand Down Expand Up @@ -973,7 +973,7 @@ void DownstreamFilterManager::sendLocalReply(
const bool is_head_request = state_.is_head_request_;
const bool is_grpc_request = state_.is_grpc_request_;
// Local reply closes the stream even if downstream is not half closed.
state_.force_close_stream_ = true;
state_.should_force_close_stream_ = true;

// Stop filter chain iteration if local reply was sent while filter decoding or encoding callbacks
// are running.
Expand Down Expand Up @@ -1303,7 +1303,7 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
if (!(Http::CodeUtility::is2xx(response_status) || Http::CodeUtility::is1xx(response_status))) {
// Even if upstream half close is enabled the stream is closed on error responses from the
// server.
state_.force_close_stream_ = true;
state_.should_force_close_stream_ = true;
}
}
filter_manager_callbacks_.encodeHeaders(headers, modified_end_stream);
Expand Down Expand Up @@ -1512,24 +1512,28 @@ void FilterManager::maybeEndEncode(bool end_stream) {
ASSERT(!state_.remote_encode_complete_);
state_.remote_encode_complete_ = true;
if (allow_upstream_half_close_) {
maybeCloseStream();
checkAndCloseStreamIfFullyClosed();
} else {
state_.stream_closed_ = true;
filter_manager_callbacks_.endStream();
}
}
}

void FilterManager::maybeCloseStream() {
ASSERT(allow_upstream_half_close_);
void FilterManager::checkAndCloseStreamIfFullyClosed() {
// This function is only used when half close semantics are enabled.
if (!allow_upstream_half_close_) {
return;
}

if (state_.stream_closed_) {
return;
}

// If upstream half close is enabled then close the stream either when force close
// is set (i.e local reply) or when both server and client half closed.
if (state_.remote_encode_complete_ &&
(state_.remote_decode_complete_ || state_.force_close_stream_)) {
(state_.remote_decode_complete_ || state_.should_force_close_stream_)) {
state_.stream_closed_ = true;
ENVOY_STREAM_LOG(trace, "closing stream", *this);
filter_manager_callbacks_.endStream();
Expand Down
25 changes: 11 additions & 14 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,10 @@ class FilterManager : public ScopeTrackedObject,
// ScopeTrackedObject
void dumpState(std::ostream& os, int indent_level = 0) const override {
const char* spaces = spacesForLevel(indent_level);
os << spaces << "FilterManager " << this << DUMP_MEMBER(state_.has_1xx_headers_) << "\n";
os << spaces << "FilterManager " << this << DUMP_MEMBER(state_.has_1xx_headers_)
<< DUMP_MEMBER(state_.remote_decode_complete_) << DUMP_MEMBER(state_.remote_encode_complete_)
<< DUMP_MEMBER(state_.encoder_end_stream_) << DUMP_MEMBER(state_.stream_closed_)
<< DUMP_MEMBER(state_.should_force_close_stream_) << "\n";

DUMP_DETAILS(filter_manager_callbacks_.requestHeaders());
DUMP_DETAILS(filter_manager_callbacks_.requestTrailers());
Expand Down Expand Up @@ -750,9 +753,7 @@ class FilterManager : public ScopeTrackedObject,
void decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
state_.remote_decode_complete_ = end_stream;
decodeHeaders(nullptr, headers, end_stream);
if (allow_upstream_half_close_) {
maybeCloseStream();
}
checkAndCloseStreamIfFullyClosed();
}

/**
Expand All @@ -763,9 +764,7 @@ class FilterManager : public ScopeTrackedObject,
void decodeData(Buffer::Instance& data, bool end_stream) {
state_.remote_decode_complete_ = end_stream;
decodeData(nullptr, data, end_stream, FilterIterationStartState::CanStartFromCurrent);
if (allow_upstream_half_close_) {
maybeCloseStream();
}
checkAndCloseStreamIfFullyClosed();
}

/**
Expand All @@ -775,9 +774,7 @@ class FilterManager : public ScopeTrackedObject,
void decodeTrailers(RequestTrailerMap& trailers) {
state_.remote_decode_complete_ = true;
decodeTrailers(nullptr, trailers);
if (allow_upstream_half_close_) {
maybeCloseStream();
}
checkAndCloseStreamIfFullyClosed();
}

/**
Expand All @@ -794,7 +791,7 @@ class FilterManager : public ScopeTrackedObject,
*/
void maybeEndEncode(bool end_stream);

void maybeCloseStream();
void checkAndCloseStreamIfFullyClosed();

virtual void sendLocalReply(Code code, absl::string_view body,
const std::function<void(ResponseHeaderMap& headers)>& modify_headers,
Expand Down Expand Up @@ -884,7 +881,7 @@ class FilterManager : public ScopeTrackedObject,
created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false),
non_100_response_headers_encoded_(false), under_on_local_reply_(false),
decoder_filter_chain_aborted_(false), encoder_filter_chain_aborted_(false),
saw_downstream_reset_(false), stream_closed_(false), force_close_stream_(false) {}
saw_downstream_reset_(false), stream_closed_(false), should_force_close_stream_(false) {}
uint32_t filter_call_state_{0};

bool remote_decode_complete_ : 1; // Set when decoder filter chain iteration has completed.
Expand Down Expand Up @@ -914,8 +911,8 @@ class FilterManager : public ScopeTrackedObject,
bool stream_closed_ : 1; // Set when both remote_decode_complete_ and remote_encode_complete_ is
// true observed for the first time and prevents ending the stream
// multiple times. Only set when allow_upstream_half_close is enabled.
bool force_close_stream_ : 1; // Set to indicate that stream should be closed due to either
// local reply or error response from the server.
bool should_force_close_stream_ : 1; // Set to indicate that stream should be fully closed due
// to either local reply or error response from the server.

// The following 3 members are booleans rather than part of the space-saving bitfield as they
// are passed as arguments to functions expecting bools. Extend State using the bitfield
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class UpstreamFilterManager : public Http::FilterManager {
state().remote_encode_complete_ = true;
state().encoder_end_stream_ = true;
state().local_complete_ = true;
state().force_close_stream_ = true;
state().should_force_close_stream_ = true;
// TODO(alyssawilk) this should be done through the router to play well with hedging.
upstream_request_.parent_.callbacks()->sendLocalReply(code, body, modify_headers, grpc_status,
details);
Expand Down
8 changes: 5 additions & 3 deletions source/extensions/upstreams/http/tcp/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ void TcpUpstream::onUpstreamData(Buffer::Instance& data, bool end_stream) {
//
// Save the indicator to close the stream before calling the decodeData since when the
// allow_multiplexed_upstream_half_close is false the call to decodeHeader with end_stream==true
// will delete the TcpUpstream object. NOTE: it this point Envoy can not support half closed TCP
// upstream as there is currently no detection of half closed vs fully closed TCP connections.
bool force_reset = force_reset_on_upstream_half_close_ && end_stream && !downstream_complete_;
// will delete the TcpUpstream object.
// NOTE: it this point Envoy can not support half closed TCP upstream as there is currently no
// distinction between half closed vs fully closed TCP peers.
const bool force_reset =
force_reset_on_upstream_half_close_ && end_stream && !downstream_complete_;
bytes_meter_->addWireBytesReceived(data.length());
upstream_request_->decodeData(data, end_stream);
// force_reset is true only when allow_multiplexed_upstream_half_close is true and in this case
Expand Down
5 changes: 4 additions & 1 deletion test/common/http/conn_manager_impl_test_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,10 @@ TEST_F(HttpConnectionManagerImplTest, ConnectionDuration) {
EXPECT_EQ(1U, stats_.named_.downstream_cx_max_duration_reached_.value());
}

TEST_F(HttpConnectionManagerImplTest, DISABLED_IntermediateBufferingEarlyResponse) {
TEST_F(HttpConnectionManagerImplTest, IntermediateBufferingEarlyResponse) {
TestScopedRuntime scoped_runtime;
scoped_runtime.mergeValues(
{{"envoy.reloadable_features.allow_multiplexed_upstream_half_close", "false"}});
setup(false, "");

setupFilterChain(2, 0);
Expand Down
2 changes: 1 addition & 1 deletion test/integration/protocol_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4884,7 +4884,7 @@ TEST_P(ProtocolIntegrationTest, ServerHalfCloseBeforeClientWithErrorAndBufferedR
}
}

TEST_P(ProtocolIntegrationTest, H2UpstreamHalfCloseBeforeH1Dowstream) {
TEST_P(ProtocolIntegrationTest, H2UpstreamHalfCloseBeforeH1Downstream) {
// This test is only for H/1 downstream and H/2 or H/3 upstream
// Other cases are covered by the ServerHalfCloseBeforeClientWithBufferedResponseData
// It verifies that H/1 downstream request is not reset when H/2 upstream completes the stream
Expand Down

0 comments on commit fd17760

Please sign in to comment.