Skip to content

Commit

Permalink
QUIC Receive Stream Management: Call QUIC flow control
Browse files Browse the repository at this point in the history
Reviewed-by: Matt Caswell <matt@openssl.org>
Reviewed-by: Hugo Landau <hlandau@openssl.org>
(Merged from #19351)
  • Loading branch information
t8m authored and hlandau committed Nov 14, 2022
1 parent bbf902c commit e77396f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 41 deletions.
23 changes: 18 additions & 5 deletions include/internal/quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "internal/quic_record_tx.h"
#include "internal/quic_record_rx.h"
#include "internal/quic_record_rx_wrap.h"
#include "internal/quic_fc.h"
#include "internal/quic_statm.h"

/*
* QUIC Send Stream
Expand Down Expand Up @@ -272,8 +274,9 @@ void ossl_quic_sstream_adjust_iov(size_t len,
* QUIC Receive Stream Manager
* ===========================
*
* The QUIC Receive Stream Manager (QUIC_RSTREAM) is responsible for:
*
* The QUIC Receive Stream Manager (QUIC_RSTREAM) is responsible for
* storing the received stream data frames until the application
* is able to read the data.
*
* The QUIC_RSTREAM is instantiated once for every stream that can receive data.
* (i.e., for a unidirectional receiving stream or for the receiving component
Expand All @@ -282,17 +285,27 @@ void ossl_quic_sstream_adjust_iov(size_t len,
typedef struct quic_rstream_st QUIC_RSTREAM;

/*
* Create a new instance of QUIC_RSTREAM.
* Create a new instance of QUIC_RSTREAM with pointers to the flow
* controller and statistics module. They can be NULL for unit testing.
* If they are non-NULL, the `rxfc` is called when receive stream data
* is queued or read by application. `statm` is queried for current rtt.
*/
QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx);
QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx, QUIC_RXFC *rxfc,
OSSL_STATM *statm);

/*
* Frees a QUIC_RSTREAM and any associated storage.
*/
void ossl_quic_rstream_free(QUIC_RSTREAM *qrs);

/*
*
* Adds received stream frame data to `qrs`. The `pkt_wrap` refcount is
* incremented if the `data` is queued directly without copying.
* It can be NULL for unit-testing purposes, i.e. if `data` is static or
* never released before calling ossl_quic_rstream_free().
* The `offset` is the absolute offset of the data in the stream.
* `data_len` can be 0 - can be useful for indicating `fin` for empty stream.
* Or to indicate `fin` without any further data added to the stream.
*/

int ossl_quic_rstream_queue_data(QUIC_RSTREAM *qrs, OSSL_QRX_PKT_WRAP *pkt_wrap,
Expand Down
40 changes: 36 additions & 4 deletions ssl/quic/quic_rstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,29 @@
* https://www.openssl.org/source/license.html
*/
#include "internal/common.h"
#include "internal/time.h"
#include "internal/quic_stream.h"
#include "internal/quic_sf_list.h"
#include "internal/quic_fc.h"
#include "internal/quic_error.h"

struct quic_rstream_st {
SFRAME_LIST fl;
QUIC_RXFC *rxfc;
OSSL_STATM *statm;
};

QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx)
QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx, QUIC_RXFC *rxfc,
OSSL_STATM *statm)
{
QUIC_RSTREAM *ret = OPENSSL_malloc(sizeof(*ret));

if (ret == NULL)
return NULL;

ossl_sframe_list_init(&ret->fl, qrx);
ret->rxfc = rxfc;
ret->statm = statm;
return ret;
}

Expand All @@ -40,6 +48,13 @@ int ossl_quic_rstream_queue_data(QUIC_RSTREAM *qrs, OSSL_QRX_PKT_WRAP *pkt_wrap,

range.start = offset;
range.end = offset + data_len;

if (qrs->rxfc != NULL
&& (!ossl_quic_rxfc_on_rx_stream_frame(qrs->rxfc, range.end, fin)
|| ossl_quic_rxfc_get_error(qrs->rxfc, 0) != QUIC_ERR_NO_ERROR))
/* QUIC_ERR_FLOW_CONTROL_ERROR or QUIC_ERR_FINAL_SIZE detected */
return 0;

return ossl_sframe_list_insert(&qrs->fl, &range, pkt_wrap, data, fin);
}

Expand Down Expand Up @@ -67,9 +82,8 @@ static int read_internal(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
break;
}

if (drop && offset != 0) {
if (drop && offset != 0)
ret = ossl_sframe_list_drop_frames(&qrs->fl, offset);
}

if (ret) {
*readbytes = readbytes_;
Expand All @@ -82,7 +96,25 @@ static int read_internal(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
int ossl_quic_rstream_read(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
size_t *readbytes, int *fin)
{
return read_internal(qrs, buf, size, readbytes, fin, 1);
OSSL_TIME rtt;

if (qrs->statm != NULL) {
OSSL_RTT_INFO rtt_info;

ossl_statm_get_rtt_info(qrs->statm, &rtt_info);
rtt = rtt_info.smoothed_rtt;
} else {
rtt = ossl_time_zero();
}

if (!read_internal(qrs, buf, size, readbytes, fin, 1))
return 0;

if (qrs->rxfc != NULL
&& !ossl_quic_rxfc_on_retire(qrs->rxfc, *readbytes, rtt))
return 0;

return 1;
}

int ossl_quic_rstream_peek(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
Expand Down
62 changes: 32 additions & 30 deletions ssl/quic/quic_sf_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,43 +101,35 @@ int ossl_sframe_list_insert(SFRAME_LIST *fl, UINT_RANGE *range,
const unsigned char *data, int fin)
{
STREAM_FRAME *sf, *new_frame, *prev_frame, *next_frame;
#ifndef NDEBUG
uint64_t curr_end = fl->tail != NULL ? fl->tail->range.end
: fl->offset;

/* nothing there yet */
if (fl->tail == NULL) {
if ((fin && fl->offset > range->end)
|| (fl->fin && fl->offset < range->end)) {
/* TODO(QUIC): protocol violation: FINAL_SIZE_ERROR */
return 0;
}
fl->fin = fin || fl->fin;
/* This check for FINAL_SIZE_ERROR is handled by QUIC FC already */
assert((!fin || curr_end <= range->end)
&& (!fl->fin || curr_end >= range->end));
#endif

if (fl->offset >= range->end)
return 1;
if (fl->offset >= range->end)
goto end;

/* nothing there yet */
if (fl->tail == NULL) {
fl->tail = fl->head = stream_frame_new(range, pkt, data);
if (fl->tail == NULL)
return 0;
++fl->num_frames;
return 1;
}

if ((fin && fl->tail->range.end > range->end)
|| (fl->fin && fl->tail->range.end < range->end)) {
/* TODO(QUIC): protocol violation: FINAL_SIZE_ERROR */
return 0;
++fl->num_frames;
goto end;
}
fl->fin = fin || fl->fin;

if (fl->offset >= range->end)
return 1;

/* TODO(QUIC): Check for fl->num_frames and start copying if too many */

/* optimize insertion at the end */
if (fl->tail->range.start < range->start) {
if (fl->tail->range.end >= range->end) {
return 1;
}
if (fl->tail->range.end >= range->end)
goto end;

return append_frame(fl, range, pkt, data);
}

Expand All @@ -146,12 +138,12 @@ int ossl_sframe_list_insert(SFRAME_LIST *fl, UINT_RANGE *range,
sf = sf->next)
prev_frame = sf;

if (prev_frame != NULL && prev_frame->range.end >= range->end) {
return 1;
}
if (!ossl_assert(sf != NULL))
/* frame list invariant broken */
return 0;

if (sf == NULL)
return append_frame(fl, range, pkt, data);
if (prev_frame != NULL && prev_frame->range.end >= range->end)
goto end;

/*
* Now we must create a new frame although in the end we might drop it,
Expand All @@ -177,24 +169,32 @@ int ossl_sframe_list_insert(SFRAME_LIST *fl, UINT_RANGE *range,
--fl->num_frames;
stream_frame_free(fl, drop_frame);
}

if (next_frame != NULL) {
/* check whether the new_frame is redundant because there is no gap */
if (prev_frame != NULL
&& next_frame->range.start <= prev_frame->range.end) {
stream_frame_free(fl, new_frame);
return 1;
goto end;
}
next_frame->prev = new_frame;
} else {
fl->tail = new_frame;
}

new_frame->next = next_frame;
new_frame->prev = prev_frame;

if (prev_frame != NULL)
prev_frame->next = new_frame;
else
fl->head = new_frame;

++fl->num_frames;

end:
fl->fin = fin || fl->fin;

return 1;
}

Expand Down Expand Up @@ -254,9 +254,11 @@ int ossl_sframe_list_drop_frames(SFRAME_LIST *fl, uint64_t limit)
stream_frame_free(fl, drop_frame);
}
fl->head = sf;

if (sf != NULL)
sf->prev = NULL;
else
fl->tail = NULL;

return 1;
}
4 changes: 2 additions & 2 deletions test/quic_stream_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ static int test_rstream_simple(void)
size_t readbytes = 0, avail = 0;
int fin = 0;

if (!TEST_ptr(rstream = ossl_quic_rstream_new(NULL)))
if (!TEST_ptr(rstream = ossl_quic_rstream_new(NULL, NULL, NULL)))
goto err;

if (!TEST_true(ossl_quic_rstream_queue_data(rstream, NULL, 5,
Expand Down Expand Up @@ -407,7 +407,7 @@ static int test_rstream_random(int idx)

if (!TEST_ptr(bulk_data = OPENSSL_malloc(data_size))
|| !TEST_ptr(read_buf = OPENSSL_malloc(data_size))
|| !TEST_ptr(rstream = ossl_quic_rstream_new(NULL)))
|| !TEST_ptr(rstream = ossl_quic_rstream_new(NULL, NULL, NULL)))
goto err;

for (i = 0; i < data_size; ++i)
Expand Down

0 comments on commit e77396f

Please sign in to comment.