Skip to content

Commit

Permalink
Simplify interface between H3 and QUIC, and remove memcopy between them
Browse files Browse the repository at this point in the history
  • Loading branch information
maskit committed Sep 15, 2020
1 parent 112fc71 commit 42e8898
Show file tree
Hide file tree
Showing 47 changed files with 1,743 additions and 1,767 deletions.
3 changes: 3 additions & 0 deletions iocore/net/quic/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ libquic_a_SOURCES = \
QUICNewRenoCongestionController.cc \
QUICFlowController.cc \
QUICStreamState.cc \
QUICStreamAdapter.cc \
QUICStreamVCAdapter.cc \
QUICStream.cc \
QUICHandshake.cc \
QUICPacketHeaderProtector.cc \
Expand Down Expand Up @@ -99,6 +101,7 @@ libquic_a_SOURCES = \
QUICFrameGenerator.cc \
QUICFrameRetransmitter.cc \
QUICAddrVerifyState.cc \
QUICTransferProgressProvider.cc \
QUICBidirectionalStream.cc \
QUICCryptoStream.cc \
QUICUnidirectionalStream.cc \
Expand Down
90 changes: 85 additions & 5 deletions iocore/net/quic/Mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "QUICPinger.h"
#include "QUICPadder.h"
#include "QUICHandshakeProtocol.h"
#include "QUICStreamAdapter.h"

class MockQUICContext;

Expand Down Expand Up @@ -731,6 +732,76 @@ class MockQUICLossDetector : public QUICLossDetector
MockQUICCongestionController _cc;
};

class MockQUICStreamAdapter : public QUICStreamAdapter
{
public:
MockQUICStreamAdapter(QUICStream &stream) : QUICStreamAdapter(stream) {}

void
write_to_stream(const uint8_t *buf, size_t len)
{
this->_total_sending_data_len += len;
this->_sending_data_len += len;
}

int64_t
write(QUICOffset offset, const uint8_t *data, uint64_t data_length, bool fin) override
{
this->_total_receiving_data_len += data_length;
this->_receiving_data_len += data_length;
return data_length;
}
bool
is_eos() override
{
return false;
}
uint64_t
unread_len() override
{
return this->_sending_data_len;
}
uint64_t
read_len() override
{
return 0;
}
uint64_t
total_len() override
{
return this->_total_sending_data_len;
}
void
encourge_read() override
{
}
void
encourge_write() override
{
}
void
notify_eos() override
{
}

protected:
Ptr<IOBufferBlock>
_read(size_t len) override
{
this->_sending_data_len -= len;
Ptr<IOBufferBlock> block = make_ptr<IOBufferBlock>(new_IOBufferBlock());
block->alloc(iobuffer_size_to_index(len, BUFFER_SIZE_INDEX_32K));
block->fill(len);
return block;
}

private:
size_t _sending_data_len = 0;
size_t _total_sending_data_len = 0;
size_t _receiving_data_len = 0;
size_t _total_receiving_data_len = 0;
};

class MockQUICApplication : public QUICApplication
{
public:
Expand All @@ -740,20 +811,29 @@ class MockQUICApplication : public QUICApplication
main_event_handler(int event, Event *data)
{
if (event == 12345) {
QUICStreamIO *stream_io = static_cast<QUICStreamIO *>(data->cookie);
stream_io->write_reenable();
}
return EVENT_CONT;
}

void
on_new_stream(QUICStream &stream) override
{
auto ite = this->_streams.emplace(stream.id(), stream);
QUICStreamAdapter &adapter = ite.first->second;
stream.set_io_adapter(&adapter);
}

void
send(const uint8_t *data, size_t size, QUICStreamId stream_id)
{
QUICStreamIO *stream_io = this->_find_stream_io(stream_id);
stream_io->write(data, size);
auto ite = this->_streams.find(stream_id);
auto &adapter = ite->second;
adapter.write_to_stream(data, size);

eventProcessor.schedule_imm(this, ET_CALL, 12345, stream_io);
// eventProcessor.schedule_imm(this, ET_CALL, 12345, adapter);
}

std::unordered_map<QUICStreamId, MockQUICStreamAdapter> _streams;
};

class MockQUICPacketR : public QUICPacketR
Expand Down
248 changes: 1 addition & 247 deletions iocore/net/quic/QUICApplication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,172 +24,6 @@
#include "QUICApplication.h"
#include "QUICStream.h"

static constexpr char tag_stream_io[] = "quic_stream_io";
static constexpr char tag_app[] = "quic_app";

#define QUICStreamIODebug(fmt, ...) \
Debug(tag_stream_io, "[%s] [%" PRIu64 "] " fmt, this->_stream_vc->connection_info()->cids().data(), this->_stream_vc->id(), \
##__VA_ARGS__)

//
// QUICStreamIO
//
QUICStreamIO::QUICStreamIO(QUICApplication *app, QUICStreamVConnection *stream_vc) : _stream_vc(stream_vc)
{
this->_read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_8K);
this->_write_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_8K);

this->_read_buffer_reader = this->_read_buffer->alloc_reader();
this->_write_buffer_reader = this->_write_buffer->alloc_reader();

switch (stream_vc->direction()) {
case QUICStreamDirection::BIDIRECTIONAL:
this->_read_vio = stream_vc->do_io_read(app, INT64_MAX, this->_read_buffer);
this->_write_vio = stream_vc->do_io_write(app, INT64_MAX, this->_write_buffer_reader);
break;
case QUICStreamDirection::SEND:
this->_write_vio = stream_vc->do_io_write(app, INT64_MAX, this->_write_buffer_reader);
break;
case QUICStreamDirection::RECEIVE:
this->_read_vio = stream_vc->do_io_read(app, INT64_MAX, this->_read_buffer);
break;
default:
ink_assert(false);
break;
}
}

QUICStreamIO::~QUICStreamIO()
{
// All readers will be deallocated
free_MIOBuffer(this->_read_buffer);
free_MIOBuffer(this->_write_buffer);
};

uint32_t
QUICStreamIO::stream_id() const
{
return this->_stream_vc->id();
}

bool
QUICStreamIO::is_bidirectional() const
{
return this->_stream_vc->is_bidirectional();
}

int64_t
QUICStreamIO::read(uint8_t *buf, int64_t len)
{
if (is_debug_tag_set(tag_stream_io)) {
if (this->_read_vio->nbytes == INT64_MAX) {
QUICStreamIODebug("nbytes=- ndone=%" PRId64 " read_avail=%" PRId64 " read_len=%" PRId64, this->_read_vio->ndone,
this->_read_buffer_reader->read_avail(), len);
} else {
QUICStreamIODebug("nbytes=%" PRId64 " ndone=%" PRId64 " read_avail=%" PRId64 " read_len=%" PRId64, this->_read_vio->nbytes,
this->_read_vio->ndone, this->_read_buffer_reader->read_avail(), len);
}
}

int64_t nread = this->_read_buffer_reader->read(buf, len);
if (nread > 0) {
this->_read_vio->ndone += nread;
}

this->_stream_vc->on_read();

return nread;
}

int64_t
QUICStreamIO::peek(uint8_t *buf, int64_t len)
{
return this->_read_buffer_reader->memcpy(buf, len) - reinterpret_cast<char *>(buf);
}

void
QUICStreamIO::consume(int64_t len)
{
this->_read_buffer_reader->consume(len);
this->_stream_vc->on_read();
}

bool
QUICStreamIO::is_read_done() const
{
return this->_read_vio->ntodo() == 0;
}

int64_t
QUICStreamIO::write(const uint8_t *buf, int64_t len)
{
SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());

int64_t nwritten = this->_write_buffer->write(buf, len);
if (nwritten > 0) {
this->_nwritten += nwritten;
}

return len;
}

int64_t
QUICStreamIO::write(IOBufferReader *r, int64_t len)
{
SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());

int64_t bytes_avail = this->_write_buffer->write_avail();

if (bytes_avail > 0) {
if (is_debug_tag_set(tag_stream_io)) {
if (this->_write_vio->nbytes == INT64_MAX) {
QUICStreamIODebug("nbytes=- ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64, this->_write_vio->ndone,
bytes_avail, len);
} else {
QUICStreamIODebug("nbytes=%" PRId64 " ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64,
this->_write_vio->nbytes, this->_write_vio->ndone, bytes_avail, len);
}
}

int64_t bytes_len = std::min(bytes_avail, len);
int64_t nwritten = this->_write_buffer->write(r, bytes_len);

if (nwritten > 0) {
this->_nwritten += nwritten;
}

return nwritten;
} else {
return 0;
}
}

// TODO: Similar to other "write" apis, but do not copy.
int64_t
QUICStreamIO::write(IOBufferBlock *b)
{
ink_assert(!"not implemented yet");
return 0;
}

void
QUICStreamIO::write_done()
{
this->_write_vio->nbytes = this->_nwritten;
}

void
QUICStreamIO::read_reenable()
{
return this->_read_vio->reenable();
}

void
QUICStreamIO::write_reenable()
{
return this->_write_vio->reenable();
}

//
// QUICApplication
//
Expand All @@ -198,84 +32,4 @@ QUICApplication::QUICApplication(QUICConnection *qc) : Continuation(new_ProxyMut
this->_qc = qc;
}

QUICApplication::~QUICApplication()
{
for (auto const &kv : this->_stream_map) {
delete kv.second;
}
}

// @brief Bind stream and application
void
QUICApplication::set_stream(QUICStreamVConnection *stream_vc, QUICStreamIO *stream_io)
{
if (stream_io == nullptr) {
stream_io = new QUICStreamIO(this, stream_vc);
}
this->_stream_map.insert(std::make_pair(stream_vc->id(), stream_io));
}

// @brief Bind stream and application
void
QUICApplication::set_stream(QUICStreamIO *stream_io)
{
this->_stream_map.insert(std::make_pair(stream_io->stream_id(), stream_io));
}

bool
QUICApplication::is_stream_set(QUICStreamVConnection *stream)
{
auto result = this->_stream_map.find(stream->id());

return result != this->_stream_map.end();
}

void
QUICApplication::reenable(QUICStreamVConnection *stream)
{
QUICStreamIO *stream_io = this->_find_stream_io(stream->id());
if (stream_io) {
stream_io->read_reenable();
stream_io->write_reenable();
} else {
Debug(tag_app, "[%s] Unknown Stream id=%" PRIx64, this->_qc->cids().data(), stream->id());
}

return;
}

void
QUICApplication::unset_stream(QUICStreamVConnection *stream)
{
QUICStreamIO *stream_io = this->_find_stream_io(stream->id());
if (stream_io) {
this->_stream_map.erase(stream->id());
}
}

QUICStreamIO *
QUICApplication::_find_stream_io(QUICStreamId id)
{
auto result = this->_stream_map.find(id);

if (result == this->_stream_map.end()) {
return nullptr;
} else {
return result->second;
}
}

QUICStreamIO *
QUICApplication::_find_stream_io(VIO *vio)
{
if (vio == nullptr) {
return nullptr;
}

QUICStream *stream = dynamic_cast<QUICStream *>(vio->vc_server);
if (stream == nullptr) {
return nullptr;
}

return this->_find_stream_io(stream->id());
}
QUICApplication::~QUICApplication() {}

0 comments on commit 42e8898

Please sign in to comment.