Skip to content

Commit

Permalink
mobile: removing min delivery size as it's complicated and unused (en…
Browse files Browse the repository at this point in the history
…voyproxy#31200)

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk committed Dec 6, 2023
1 parent 6c5a273 commit 7c6a382
Show file tree
Hide file tree
Showing 28 changed files with 50 additions and 217 deletions.
4 changes: 2 additions & 2 deletions mobile/library/cc/stream_prototype.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ StreamPrototype::StreamPrototype(EngineSharedPtr engine) : engine_(engine) {
callbacks_ = std::make_shared<StreamCallbacks>();
}

StreamSharedPtr StreamPrototype::start(bool explicit_flow_control, uint64_t min_delivery_size) {
StreamSharedPtr StreamPrototype::start(bool explicit_flow_control) {
auto envoy_stream = init_stream(engine_->engine_);
start_stream(engine_->engine_, envoy_stream, callbacks_->asEnvoyHttpCallbacks(),
explicit_flow_control, min_delivery_size);
explicit_flow_control);
return std::make_shared<Stream>(engine_->engine_, envoy_stream);
}

Expand Down
2 changes: 1 addition & 1 deletion mobile/library/cc/stream_prototype.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class StreamPrototype {
public:
StreamPrototype(EngineSharedPtr engine);

StreamSharedPtr start(bool explicit_flow_control = false, uint64_t min_delivery_size = 0);
StreamSharedPtr start(bool explicit_flow_control = false);

StreamPrototype& setOnHeaders(OnHeadersCallback closure);
StreamPrototype& setOnData(OnDataCallback closure);
Expand Down
30 changes: 9 additions & 21 deletions mobile/library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ Client::DirectStreamCallbacks::DirectStreamCallbacks(DirectStream& direct_stream
envoy_http_callbacks bridge_callbacks,
Client& http_client)
: direct_stream_(direct_stream), bridge_callbacks_(bridge_callbacks), http_client_(http_client),
explicit_flow_control_(direct_stream_.explicit_flow_control_),
min_delivery_size_(direct_stream_.min_delivery_size_) {}
explicit_flow_control_(direct_stream_.explicit_flow_control_) {}

void Client::DirectStreamCallbacks::encodeHeaders(const ResponseHeaderMap& headers,
bool end_stream) {
Expand Down Expand Up @@ -113,7 +112,7 @@ void Client::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool end_

// The response_data_ is systematically assigned here because resumeData can
// incur an asynchronous callback to sendDataToBridge.
if ((explicit_flow_control_ || min_delivery_size_ != 0) && !response_data_) {
if (explicit_flow_control_ && !response_data_) {
response_data_ = std::make_unique<Buffer::WatermarkBuffer>(
[this]() -> void { onBufferedDataDrained(); }, [this]() -> void { onHasBufferedData(); },
[]() -> void {});
Expand All @@ -123,36 +122,26 @@ void Client::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool end_
response_data_->setWatermarks(1000000);
}

// Try to send data if
// 1) in default flow control mode
// 2) if resumeData has been called in explicit flow control mode.
// sendDataToBridge will enforce delivery size limits.
// Send data if in default flow control mode, or if resumeData has been called in explicit
// flow control mode.
if (bytes_to_send_ > 0 || !explicit_flow_control_) {
ASSERT(!hasBufferedData());
sendDataToBridge(data, end_stream);
}

// If not all the bytes have been sent up, buffer any remaining data in response_data.
if (data.length() != 0) {
ENVOY_LOG(debug, "[S{}] buffering {} bytes. {} total bytes buffered.",
direct_stream_.stream_handle_, data.length(),
data.length() + response_data_->length());
ASSERT(explicit_flow_control_);
ENVOY_LOG(
debug, "[S{}] buffering {} bytes due to explicit flow control. {} total bytes buffered.",
direct_stream_.stream_handle_, data.length(), data.length() + response_data_->length());
response_data_->move(data);
}
}

void Client::DirectStreamCallbacks::sendDataToBridge(Buffer::Instance& data, bool end_stream) {
ASSERT(!explicit_flow_control_ || bytes_to_send_ > 0);

if (min_delivery_size_ > 0 && (data.length() < min_delivery_size_) && !end_stream) {
ENVOY_LOG(
debug,
"[S{}] defering sending {} bytes due to delivery size limits (limit={} end stream={})",
direct_stream_.stream_handle_, data.length(), min_delivery_size_, end_stream);

return; // Not enough data to justify sending up to the bridge.
}

// Cap by bytes_to_send_ if and only if applying explicit flow control.
uint32_t bytes_to_send = calculateBytesToSend(data, bytes_to_send_);
// Update the number of bytes consumed by this non terminal callback.
Expand Down Expand Up @@ -472,11 +461,10 @@ void Client::DirectStream::dumpState(std::ostream&, int indent_level) const {
}

void Client::startStream(envoy_stream_t new_stream_handle, envoy_http_callbacks bridge_callbacks,
bool explicit_flow_control, uint64_t min_delivery_size) {
bool explicit_flow_control) {
ASSERT(dispatcher_.isThreadSafe());
Client::DirectStreamSharedPtr direct_stream{new DirectStream(new_stream_handle, *this)};
direct_stream->explicit_flow_control_ = explicit_flow_control;
direct_stream->min_delivery_size_ = min_delivery_size;
direct_stream->callbacks_ =
std::make_unique<DirectStreamCallbacks>(*direct_stream, bridge_callbacks, *this);

Expand Down
13 changes: 2 additions & 11 deletions mobile/library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,9 @@ class Client : public Logger::Loggable<Logger::Id::http> {
* @param stream, the stream to start.
* @param bridge_callbacks, wrapper for callbacks for events on this stream.
* @param explicit_flow_control, whether the stream will require explicit flow control.
* @param min_delivery_size, if greater than zero, indicates the smallest number of bytes that
* will be delivered up via the on_data callbacks without end stream.
*/
void startStream(envoy_stream_t stream, envoy_http_callbacks bridge_callbacks,
bool explicit_flow_control, uint64_t min_delivery_size);
bool explicit_flow_control);

/**
* Send headers over an open HTTP stream. This method can be invoked once and needs to be called
Expand Down Expand Up @@ -206,15 +204,12 @@ class Client : public Logger::Loggable<Logger::Id::http> {
absl::optional<envoy_error> error_;
bool success_{};

// Buffered response data when in explicit flow control or buffering due to min delivery size.
// Buffered response data when in explicit flow control mode.
Buffer::InstancePtr response_data_;
ResponseTrailerMapPtr response_trailers_;
// True if the bridge should operate in explicit flow control mode, and only send
// data when it is requested by the caller.
bool explicit_flow_control_{};
// If greater than zero, indicates the minimum size of data that should be
// delivered up via on_data without end stream.
uint64_t min_delivery_size_{};
// Set true when the response headers have been forwarded to the bridge.
bool response_headers_forwarded_{};
// Called in closeStream() to communicate that the end of the stream has
Expand Down Expand Up @@ -341,10 +336,6 @@ class Client : public Logger::Loggable<Logger::Id::http> {
// back, avoids excessive buffering of response bodies if the response body is
// read faster than the mobile caller can process it.
bool explicit_flow_control_ = false;
// If this is non-zero, Envoy will buffer at the C++ layer until either
// min_delivery_size_ bytes are received or end_stream is received. If this is
// zero, it will deliver data as it arrivies, modulo explicit flow control rules.
uint64_t min_delivery_size_{};
// Latest intel data retrieved from the StreamInfo.
envoy_stream_intel stream_intel_{-1, -1, 0, 0};
envoy_final_stream_intel envoy_final_stream_intel_{-1, -1, -1, -1, -1, -1, -1, -1,
Expand Down
4 changes: 2 additions & 2 deletions mobile/library/common/jni/jni_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ extern "C" JNIEXPORT jlong JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibr

extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibrary_startStream(
JNIEnv* env, jclass, jlong engine_handle, jlong stream_handle, jobject j_context,
jboolean explicit_flow_control, jlong min_delivery_size) {
jboolean explicit_flow_control) {

// TODO: To be truly safe we may need stronger guarantees of operation ordering on this ref.
jobject retained_context = env->NewGlobalRef(j_context);
Expand All @@ -936,7 +936,7 @@ extern "C" JNIEXPORT jint JNICALL Java_io_envoyproxy_envoymobile_engine_JniLibra
retained_context};
envoy_status_t result = start_stream(static_cast<envoy_engine_t>(engine_handle),
static_cast<envoy_stream_t>(stream_handle), native_callbacks,
explicit_flow_control, min_delivery_size);
explicit_flow_control);
if (result != ENVOY_SUCCESS) {
env->DeleteGlobalRef(retained_context); // No callbacks are fired and we need to release
}
Expand Down
8 changes: 3 additions & 5 deletions mobile/library/common/main_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ static std::atomic<envoy_stream_t> current_stream_handle_{0};
envoy_stream_t init_stream(envoy_engine_t) { return current_stream_handle_++; }

envoy_status_t start_stream(envoy_engine_t engine, envoy_stream_t stream,
envoy_http_callbacks callbacks, bool explicit_flow_control,
uint64_t min_delivery_size) {
envoy_http_callbacks callbacks, bool explicit_flow_control) {
return runOnEngineDispatcher(
engine, [stream, callbacks, explicit_flow_control, min_delivery_size](auto& engine) -> void {
engine.httpClient().startStream(stream, callbacks, explicit_flow_control,
min_delivery_size);
engine, [stream, callbacks, explicit_flow_control](auto& engine) -> void {
engine.httpClient().startStream(stream, callbacks, explicit_flow_control);
});
}

Expand Down
5 changes: 1 addition & 4 deletions mobile/library/common/main_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@ envoy_stream_t init_stream(envoy_engine_t engine);
* @param stream, handle to the stream to be started.
* @param callbacks, the callbacks that will run the stream callbacks.
* @param explicit_flow_control, whether to enable explicit flow control on the response stream.
* @param min_delivery_size, if non-zero, indicates the smallest non-terminal number of bytes which
* should be delivered via on_data callbacks.
* @return envoy_stream, with a stream handle and a success status, or a failure status.
*/
envoy_status_t start_stream(envoy_engine_t engine, envoy_stream_t stream,
envoy_http_callbacks callbacks, bool explicit_flow_control,
uint64_t min_delivery_size);
envoy_http_callbacks callbacks, bool explicit_flow_control);

/**
* Send headers over an open HTTP stream. This method can be invoked once and needs to be called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ public AndroidEngineImpl(Context context, EnvoyOnEngineRunning runningCallback,
}

@Override
public EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks, boolean explicitFlowControl,
long minDeliverySize) {
return envoyEngine.startStream(callbacks, explicitFlowControl, minDeliverySize);
public EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks, boolean explicitFlowControl) {
return envoyEngine.startStream(callbacks, explicitFlowControl);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ public interface EnvoyEngine {
*
* @param callbacks The callbacks for receiving callbacks from the stream.
* @param explicitFlowControl Whether explicit flow control will be enabled for this stream.
* @param minDeliverySize If nonzero, indicates the smallest number of response body bytes which
* should be delivered sans end stream.
* @return A stream that may be used for sending data.
*/
EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks, boolean explicitFlowControl,
long minDeliverySize);
EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks, boolean explicitFlowControl);

/**
* Terminates the running engine.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,13 @@ public EnvoyEngineImpl(EnvoyOnEngineRunning runningCallback, EnvoyLogger logger,
*
* @param callbacks The callbacks for the stream.
* @param explicitFlowControl Whether explicit flow control will be enabled for this stream.
* @param minDeliverySize If nonzero, indicates the smallest number of response body bytes which
* should be delivered sans end stream.
* @return A stream that may be used for sending data.
*/
@Override
public EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks, boolean explicitFlowControl,
long minDeliverySize) {
public EnvoyHTTPStream startStream(EnvoyHTTPCallbacks callbacks, boolean explicitFlowControl) {
long streamHandle = JniLibrary.initStream(engineHandle);
EnvoyHTTPStream stream = new EnvoyHTTPStream(engineHandle, streamHandle, callbacks,
explicitFlowControl, minDeliverySize);
EnvoyHTTPStream stream =
new EnvoyHTTPStream(engineHandle, streamHandle, callbacks, explicitFlowControl);
stream.start();
return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ public class EnvoyHTTPStream {
private final long engineHandle;
private final long streamHandle;
private final boolean explicitFlowControl;
private final long minDeliverySize;
private final JvmCallbackContext callbacksContext;

/**
* Start the stream via the JNI library.
*/
void start() {
JniLibrary.startStream(engineHandle, streamHandle, callbacksContext, explicitFlowControl,
minDeliverySize);
JniLibrary.startStream(engineHandle, streamHandle, callbacksContext, explicitFlowControl);
}

/**
Expand All @@ -29,15 +27,12 @@ void start() {
* @param streamHandle Underlying handle of the HTTP stream owned by an Envoy engine.
* @param callbacks The callbacks for the stream.
* @param explicitFlowControl Whether explicit flow control will be enabled for this stream.
* @param minDeliverySize If nonzero, indicates the smallest number of response body bytes which
* should be delivered sans end stream.
*/
public EnvoyHTTPStream(long engineHandle, long streamHandle, EnvoyHTTPCallbacks callbacks,
boolean explicitFlowControl, long minDeliverySize) {
boolean explicitFlowControl) {
this.engineHandle = engineHandle;
this.streamHandle = streamHandle;
this.explicitFlowControl = explicitFlowControl;
this.minDeliverySize = minDeliverySize;
callbacksContext = new JvmCallbackContext(callbacks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,11 @@ private static class JavaLoader {
* callbacks.
* @param explicitFlowControl, whether explicit flow control should be enabled
* for the stream.
* @param minDeliverySize If nonzero, indicates the smallest number of response body bytes which
* should be delivered sans end stream.
* @return envoy_stream, with a stream handle and a success status, or a failure
* status.
*/
protected static native int startStream(long engine, long stream, JvmCallbackContext context,
boolean explicitFlowControl, long minDeliverySize);
boolean explicitFlowControl);

/**
* Send headers over an open HTTP stream. This method can be invoked once and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ public void start() {
public void run() {
try {
mStream.setStream(mRequestContext.getEnvoyEngine().startStream(
CronvoyBidirectionalStream.this, /* explicitFlowCrontrol= */ true,
/* minDeliverySize */ 0));
CronvoyBidirectionalStream.this, /* explicitFlowCrontrol= */ true));
if (!mDelayRequestHeadersUntilFirstFlush) {
mStream.sendHeaders(mEnvoyRequestHeaders, mReadOnly);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,7 @@ private void fireOpenConnection() {
mCurrentUrl, mRequestContext.getBuilder().quicEnabled());
mCronvoyCallbacks = new CronvoyHttpCallbacks();
mStream.set(mRequestContext.getEnvoyEngine().startStream(mCronvoyCallbacks,
/* explicitFlowCrontrol */ true,
/* minDeliverySize */ 0));
/* explicitFlowCrontrol= */ true));
mStream.get().sendHeaders(envoyRequestHeaders, mUploadDataStream == null);
if (mUploadDataStream != null && mUrlChain.size() == 1) {
mUploadDataStream.initializeWithRequest();
Expand Down Expand Up @@ -995,7 +994,7 @@ public void onSendWindowAvailable(EnvoyStreamIntel streamIntel) {
return;
}

mUploadDataStream.readDataReady(); // Have the next request body delivery to be sent.
mUploadDataStream.readDataReady(); // Have the next request body chunk to be sent.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import java.util.concurrent.Executors
open class StreamPrototype(private val engine: EnvoyEngine) {
private val callbacks = StreamCallbacks()
private var explicitFlowControl = false
private var minDeliverySize: Long = 0
private var useByteBufferPosition = false

/**
Expand All @@ -26,23 +25,10 @@ open class StreamPrototype(private val engine: EnvoyEngine) {
* @return The new stream.
*/
open fun start(executor: Executor = Executors.newSingleThreadExecutor()): Stream {
val engineStream =
engine.startStream(createCallbacks(executor), explicitFlowControl, minDeliverySize)
val engineStream = engine.startStream(createCallbacks(executor), explicitFlowControl)
return Stream(engineStream, useByteBufferPosition)
}

/**
* Sets min delivery: data will be buffered in the C++ layer until the min delivery length or end
* stream is read.
*
* @param value set the minimum delivery size fo for this stream
* @return This stream, for chaining syntax.
*/
fun setMinDeliverySize(value: Long): StreamPrototype {
this.minDeliverySize = value
return this
}

/**
* Allows explicit flow control to be enabled. When flow control is enabled, the owner of a stream
* is responsible for providing a buffer to receive response body data. If the buffer is smaller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ internal class MockEnvoyEngine : EnvoyEngine {

override fun startStream(
callbacks: EnvoyHTTPCallbacks?,
explicitFlowControl: Boolean,
minDeliverySize: Long
explicitFlowControl: Boolean
): EnvoyHTTPStream {
return MockEnvoyHTTPStream(callbacks!!, explicitFlowControl, minDeliverySize)
return MockEnvoyHTTPStream(callbacks!!, explicitFlowControl)
}

override fun terminate() = Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import java.nio.ByteBuffer
*/
internal class MockEnvoyHTTPStream(
val callbacks: EnvoyHTTPCallbacks,
val explicitFlowControl: Boolean,
val minDeliverySize: Long
) : EnvoyHTTPStream(0, 0, callbacks, explicitFlowControl, minDeliverySize) {
val explicitFlowControl: Boolean
) : EnvoyHTTPStream(0, 0, callbacks, explicitFlowControl) {
override fun sendHeaders(headers: MutableMap<String, MutableList<String>>?, endStream: Boolean) {}

override fun sendData(data: ByteBuffer?, endStream: Boolean) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ internal constructor(private val onStart: ((stream: MockStream) -> Unit)?) :
StreamPrototype(MockEnvoyEngine()) {
override fun start(executor: Executor): Stream {
val callbacks = createCallbacks(executor)
val stream = MockStream(MockEnvoyHTTPStream(callbacks, false, 0))
val stream = MockStream(MockEnvoyHTTPStream(callbacks, false))
onStart?.invoke(stream)
return stream
}
Expand Down
2 changes: 1 addition & 1 deletion mobile/library/objective-c/EnvoyHTTPStreamImpl.m
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ - (instancetype)initWithHandle:(envoy_stream_t)handle
// start_stream could result in a reset that would release the native ref.
_strongSelf = self;
envoy_status_t result =
start_stream(engineHandle, _streamHandle, native_callbacks, explicitFlowControl, 0);
start_stream(engineHandle, _streamHandle, native_callbacks, explicitFlowControl);
if (result != ENVOY_SUCCESS) {
_strongSelf = nil;
return nil;
Expand Down
2 changes: 1 addition & 1 deletion mobile/test/common/engine_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ TEST_F(EngineTest, EarlyExit) {
ASSERT_EQ(engine_->terminate(), ENVOY_SUCCESS);
ASSERT_TRUE(test_context.on_exit.WaitForNotificationWithTimeout(absl::Seconds(10)));

start_stream(handle, 0, {}, false, 0);
start_stream(handle, 0, {}, false);

engine_.reset();
}
Expand Down
Loading

0 comments on commit 7c6a382

Please sign in to comment.