Skip to content

Commit

Permalink
quic: further implementation details
Browse files Browse the repository at this point in the history
PR-URL: #48244
Reviewed-By: Yagiz Nizipli <yagiz.nizipli@sentry.io>
Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
  • Loading branch information
jasnell authored and RafaelGSS committed Dec 15, 2023
1 parent 297cb6f commit 3fb7fc9
Show file tree
Hide file tree
Showing 7 changed files with 1,229 additions and 38 deletions.
34 changes: 34 additions & 0 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,27 @@ class DataQueueImpl final : public DataQueue,
"entries", entries_, "std::vector<std::unique_ptr<Entry>>");
}

void addBackpressureListener(BackpressureListener* listener) override {
if (idempotent_) return;
DCHECK_NOT_NULL(listener);
backpressure_listeners_.insert(listener);
}

void removeBackpressureListener(BackpressureListener* listener) override {
if (idempotent_) return;
DCHECK_NOT_NULL(listener);
backpressure_listeners_.erase(listener);
}

void NotifyBackpressure(size_t amount) {
if (idempotent_) return;
for (auto& listener : backpressure_listeners_) listener->EntryRead(amount);
}

bool HasBackpressureListeners() const noexcept {
return !backpressure_listeners_.empty();
}

std::shared_ptr<Reader> get_reader() override;
SET_MEMORY_INFO_NAME(DataQueue)
SET_SELF_SIZE(DataQueueImpl)
Expand All @@ -173,6 +194,8 @@ class DataQueueImpl final : public DataQueue,
std::optional<uint64_t> capped_size_ = std::nullopt;
bool locked_to_reader_ = false;

std::unordered_set<BackpressureListener*> backpressure_listeners_;

friend class DataQueue;
friend class IdempotentDataQueueReader;
friend class NonIdempotentDataQueueReader;
Expand Down Expand Up @@ -433,6 +456,17 @@ class NonIdempotentDataQueueReader final
return;
}

// If there is a backpressure listener, lets report on how much data
// was actually read.
if (data_queue_->HasBackpressureListeners()) {
// How much did we actually read?
size_t read = 0;
for (uint64_t n = 0; n < count; n++) {
read += vecs[n].len;
}
data_queue_->NotifyBackpressure(read);
}

// Now that we have updated this readers state, we can forward
// everything on to the outer next.
std::move(next)(status, vecs, count, std::move(done));
Expand Down
12 changes: 12 additions & 0 deletions src/dataqueue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ class DataQueue : public MemoryRetainer {
using Done = bob::Done;
};

// A BackpressureListener can be used to receive notifications
// when a non-idempotent DataQueue releases entries as they
// are consumed.
class BackpressureListener {
public:
virtual void EntryRead(size_t amount) = 0;
};

// A DataQueue::Entry represents a logical chunk of data in the queue.
// The entry may or may not represent memory-resident data. It may
// or may not be consumable more than once.
Expand Down Expand Up @@ -285,6 +293,10 @@ class DataQueue : public MemoryRetainer {
// been set, maybeCapRemaining() will return std::nullopt.
virtual std::optional<uint64_t> maybeCapRemaining() const = 0;

// BackpressureListeners only work on non-idempotent DataQueues.
virtual void addBackpressureListener(BackpressureListener* listener) = 0;
virtual void removeBackpressureListener(BackpressureListener* listener) = 0;

static void Initialize(Environment* env, v8::Local<v8::Object> target);
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
};
Expand Down
11 changes: 5 additions & 6 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "node_bob.h"
#include "uv.h"
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC

Expand Down Expand Up @@ -79,7 +80,7 @@ void Session::Application::AcknowledgeStreamData(Stream* stream,

void Session::Application::BlockStream(int64_t id) {
auto stream = session().FindStream(id);
if (stream) stream->Blocked();
if (stream) stream->EmitBlocked();
}

bool Session::Application::CanAddHeader(size_t current_count,
Expand Down Expand Up @@ -233,7 +234,7 @@ void Session::Application::SendPendingData() {
// and no more outbound data can be sent.
CHECK_LE(ndatalen, 0);
auto stream = session_->FindStream(stream_data.id);
if (stream) stream->End();
if (stream) stream->EndWritable();
continue;
}
case NGTCP2_ERR_WRITE_MORE: {
Expand Down Expand Up @@ -360,10 +361,8 @@ class DefaultApplication final : public Session::Application {
stream_data->data,
arraysize(stream_data->data),
kMaxVectorCount);
switch (ret) {
case bob::Status::STATUS_EOS:
stream_data->fin = 1;
break;
if (ret == bob::Status::STATUS_EOS) {
stream_data->fin = 1;
}
} else {
stream_data->fin = 1;
Expand Down
8 changes: 6 additions & 2 deletions src/quic/bindingdata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,12 @@ CallbackScopeBase::CallbackScopeBase(Environment* env)
: env(env), context_scope(env->context()), try_catch(env->isolate()) {}

CallbackScopeBase::~CallbackScopeBase() {
if (try_catch.HasCaught() && !try_catch.HasTerminated()) {
errors::TriggerUncaughtException(env->isolate(), try_catch);
if (try_catch.HasCaught()) {
if (!try_catch.HasTerminated() && env->can_call_into_js()) {
errors::TriggerUncaughtException(env->isolate(), try_catch);
} else {
try_catch.ReThrow();
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/quic/bindingdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ constexpr size_t kMaxVectorCount = 16;
V(session_version_negotiation, SessionVersionNegotiation) \
V(session_path_validation, SessionPathValidation) \
V(stream_close, StreamClose) \
V(stream_error, StreamError) \
V(stream_created, StreamCreated) \
V(stream_reset, StreamReset) \
V(stream_headers, StreamHeaders) \
Expand Down Expand Up @@ -304,6 +303,8 @@ struct CallbackScopeBase {
~CallbackScopeBase();
};

// Maintains a strong reference to BaseObject type ptr to keep it alive during
// a MakeCallback during which it might be destroyed.
template <typename T>
struct CallbackScope final : public CallbackScopeBase {
BaseObjectPtr<T> ref;
Expand Down
Loading

0 comments on commit 3fb7fc9

Please sign in to comment.