diff --git a/APIs.txt b/APIs.txt index cf68ba00a..101fb608c 100644 --- a/APIs.txt +++ b/APIs.txt @@ -127,44 +127,35 @@ drive QUIC connections: ea_packets_out callback. -Connection Queues ------------------ - -Each connection lives in one or more queues. These are: - - - "All" queue. This is not really a queue, but rather a hash where - connections can be looked up by ID. It is possible for a connection - to exist *outside* of this queue: this happens when the connection is - closed, but still has packets to send. In this case, the connection - is present in - - Outgoing queue. This queue contains connections which have packets - to send. The connection in this queue are ordered by priority: the - connection that has gone longest without sending is first. - - Incoming queue. This queue contains connections that have incoming - packets. - - Pending RW Events queue. (See Connection Read-Write Events). - - Advisory Tick Time queue. This queue is used when packet pacing is - turned on (see es_pace_packets option). - -Each of these queues can be processed by a specialized function. They are, -respectively: - - - lsquic_engine_proc_all() - - lsquic_engine_send_unsent_packets() - - lsquic_engine_process_conns_with_incoming() - - lsquic_engine_process_conns_with_pend_rw() - - lsquic_engine_process_conns_to_tick() - -Processing, or "ticking," a connection removes it from Incoming, Pending -RW Events, and Advisory Tick Time queues. The connection gets placed -onto these queues as necessary. - -A simple approach is to - - Read packets from socket, give it to the engine using - lsquic_engine_packet_in(), and call - lsquic_engine_process_conns_with_incoming(); and - - Call lsquic_engine_proc_all() every few dozen milliseconds. +Connection Management +--------------------- +A connections needs to be processed once in a while. It needs to be +processed when one of the following is true: + + - There are incoming packets; + - A stream is both readable by the user code and the user code wants + to read from it; + - A stream is both writeable by the user code and the user code wants + to write to it; + - User has written to stream outside of on_write() callbacks (that is + allowed) and now there are packets ready to be sent; + - A timer (pacer, retransmission, idle, etc) has expired; + - A control frame needs to be sent out; + - A stream needs to be serviced or created. + +Each of these use cases is handled by a single function: + + lsquic_engine_process_conns() + +The connections to which the conditions above apply are processed (or +"ticked") in the least recently ticked order. After calling this function, +you can see when is the next time a connection needs to be processed using + + lsquic_engine_earliest_adv_tick() + +Based on this value, next event can be scheduled (in the event loop of +your choice). Connection ---------- @@ -235,12 +226,3 @@ Versions QUIC version are listed in enum lsquic_version. To specify a list of versions, they are usually placed in a bitmask, e.g. es_versions. - - -Connection Read-Write Events ----------------------------- - -TODO. - -(Do not worry about it if you are not writing to streams outside -of on_write() callback.) diff --git a/CHANGELOG b/CHANGELOG index b99a3a58e..11305537b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,61 @@ +2018-04-09 + + [API Change, OPTIMIZATION] Only process conns that need to be processed + + The API is simplified: do not expose the user code to several + queues. A "connection queue" is now an internal concept. + The user processes connections using the single function + lsquic_engine_process_conns(). When this function is called, + only those connections are processed that need to be processed. + A connection needs to be processed when: + + 1. New incoming packets have been fed to the connection. + 2. User wants to read from a stream that is readable. + 3. User wants to write to a stream that is writeable. + 4. There are buffered packets that can be sent out. (This + means that the user wrote to a stream outside of the + lsquic library callback.) + 5. A control frame (such as BLOCKED) needs to be sent out. + 6. A stream needs to be serviced or delayed stream needs to + be created. + 7. An alarm rings. + 8. Pacer timer expires. + + To achieve this, the library places the connections into two + priority queues (min heaps): + + 1. Tickable Queue; and + 2. Advisory Tick Time queue (ATTQ). + + Each time lsquic_engine_process_conns() is called, the Tickable + Queue is emptied. After the connections have been ticked, they are + queried again: if a connection is not being closed, it is placed + either in the Tickable Queue if it is ready to be ticked again or + it is placed in the Advisory Tick Time Queue. It is assumed that + a connection always has at least one timer set (the idle alarm). + + The connections in the Tickable Queue are arranged in the least + recently ticked order. This lets connections that have been quiet + longer to get their packets scheduled first. + + This change means that the library no longer needs to be ticked + periodically. The user code can query the library when is the + next tick event and schedule it exactly. When connections are + processed, only the tickable connections are processed, not *all* + the connections. When there are no tick events, it means that no + timer event is necessary -- only the file descriptor READ event + is active. + + The following are improvements and simplifications that have + been triggered: + + - Queue of connections with incoming packets is gone. + - "Pending Read/Write Events" Queue is gone (along with its + history and progress checks). This queue has become the + Tickable Queue. + - The connection hash no longer needs to track the connection + insertion order. + 2018-04-02 - [FEATURE] Windows support diff --git a/include/lsquic.h b/include/lsquic.h index e9f687b0f..62e93093f 100644 --- a/include/lsquic.h +++ b/include/lsquic.h @@ -178,9 +178,6 @@ struct lsquic_stream_if { /** By default, infinite loop checks are turned on */ #define LSQUIC_DF_PROGRESS_CHECK 1000 -/** By default, Pending RW Queue infinite loop checks are turned on: */ -#define LSQUIC_DF_PENDRW_CHECK 10 - /** By default, read/write events are dispatched in a loop */ #define LSQUIC_DF_RW_ONCE 0 @@ -331,23 +328,6 @@ struct lsquic_engine_settings { */ unsigned es_progress_check; - /** - * A non-zero value enables internal checks to identify suspected - * infinite loops in Pending RW Queue logic. The value of this - * setting is the number of times a connection on Pending RW Queue - * is allowed to be processed without making progress before it is - * banished from Pending RW Queue. - * - * Progress is considered to have happened if any of the following - * occurs: - * - User reads data, FIN, or new error (due to a reset) from a - * stream. - * - A new stream-related frame is packetized. - * - * The defaut value is @ref LSQUIC_DF_PENDRW_CHECK. - */ - unsigned es_pendrw_check; - /** * A non-zero value make stream dispatch its read-write events once * per call. @@ -364,19 +344,15 @@ struct lsquic_engine_settings { /** * If set, this value specifies that number of microseconds that - * functions @ref lsquic_engine_proc_all(), - * @ref lsquic_engine_process_conns_with_incoming(), - * @ref lsquic_engine_process_conns_to_tick(), and - * @ref lsquic_engine_process_conns_with_pend_rw() are allowed - * to spend before returning. + * @ref lsquic_engine_process_conns() is allowed to spend before + * returning. * * This is not an exact science and the connections must make * progress, so the deadline is checked after all connections get * a chance to tick and at least one batch of packets is sent out. * * When processing function runs out of its time slice, immediate - * calls to @ref lsquic_engine_has_pend_rw() and - * @ref lsquic_engine_has_unsent_packets() return false. + * calls to @ref lsquic_engine_has_unsent_packets() return false. * * The default value is @ref LSQUIC_DF_PROC_TIME_THRESH. */ @@ -503,50 +479,11 @@ lsquic_engine_packet_in (lsquic_engine_t *, void *peer_ctx); /** - * Process all connections. This function must be called often enough so + * Process tickable connections. This function must be called often enough so * that packets and connections do not expire. */ void -lsquic_engine_proc_all (lsquic_engine_t *engine); - -/** - * Process connections that have incoming packets. Call this after adding - * one or more incoming packets using lsquic_engine_packet_in(). - */ -void -lsquic_engine_process_conns_with_incoming (lsquic_engine_t *); - -/** - * Process connections in Advisory Tick Time queue whose tick times are in - * the past. - */ -void -lsquic_engine_process_conns_to_tick (lsquic_engine_t *); - -/** - * Returns true if engine has connections that have pending read or write - * events. - * - * Connections with pending read or write events are those that have at - * least one stream whose state changed outside of the regular callback - * mechanism. The simplest example is writing directly to the stream - * object when data comes in. - * - * A call to @ref lsquic_engine_proc_all, - * @ref lsquic_engine_process_conns_with_incoming, - * @ref lsquic_engine_process_conns_to_tick, or - * @ref lsquic_engine_process_conns_with_pend_rw removes processed connection - * from Pending RW queue. - */ -int -lsquic_engine_has_pend_rw (lsquic_engine_t *); - -/** - * Process connections that have pending read or write events (@see - * lsquic_engine_has_pend_rw for description). - */ -void -lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *); +lsquic_engine_process_conns (lsquic_engine_t *engine); /** * Returns true if engine has some unsent packets. This happens if @@ -895,10 +832,10 @@ void lsquic_conn_abort (lsquic_conn_t *c); /** - * Returns true if there is a connection on the Advisory Tick Time queue, - * false otherwise. If true, `diff' is set to the difference between - * the earliest advisory tick time and now. If the former is in the past, - * the value of `diff' is negative. + * Returns true if there are connections to be processed, false otherwise. + * If true, `diff' is set to the difference between the earliest advisory + * tick time and now. If the former is in the past, the value of `diff' + * is negative. */ int lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff); diff --git a/src/liblsquic/CMakeLists.txt b/src/liblsquic/CMakeLists.txt index 377666db5..7730aceae 100644 --- a/src/liblsquic/CMakeLists.txt +++ b/src/liblsquic/CMakeLists.txt @@ -52,6 +52,7 @@ SET(lsquic_STAT_SRCS lsquic_hpack_enc.c lsquic_xxhash.c lsquic_buf.c + lsquic_min_heap.c ) diff --git a/src/liblsquic/lsquic_alarmset.c b/src/liblsquic/lsquic_alarmset.c index ff8d811ef..363af8f23 100644 --- a/src/liblsquic/lsquic_alarmset.c +++ b/src/liblsquic/lsquic_alarmset.c @@ -52,3 +52,25 @@ lsquic_alarmset_ring_expired (lsquic_alarmset_t *alset, lsquic_time_t now) } } } + + +lsquic_time_t +lsquic_alarmset_mintime (const lsquic_alarmset_t *alset) +{ + lsquic_time_t expiry; + enum alarm_id al_id; + + if (alset->as_armed_set) + { + expiry = UINT64_MAX; + for (al_id = 0; al_id < MAX_LSQUIC_ALARMS; ++al_id) + if ((alset->as_armed_set & (1 << al_id)) + && alset->as_expiry[al_id] < expiry) + { + expiry = alset->as_expiry[al_id]; + } + return expiry; + } + else + return 0; +} diff --git a/src/liblsquic/lsquic_alarmset.h b/src/liblsquic/lsquic_alarmset.h index 74859a222..6fd0102c1 100644 --- a/src/liblsquic/lsquic_alarmset.h +++ b/src/liblsquic/lsquic_alarmset.h @@ -67,4 +67,7 @@ lsquic_alarmset_init_alarm (lsquic_alarmset_t *, enum alarm_id, void lsquic_alarmset_ring_expired (lsquic_alarmset_t *, lsquic_time_t now); +lsquic_time_t +lsquic_alarmset_mintime (const lsquic_alarmset_t *); + #endif diff --git a/src/liblsquic/lsquic_attq.c b/src/liblsquic/lsquic_attq.c index 653679762..f21d143b2 100644 --- a/src/liblsquic/lsquic_attq.c +++ b/src/liblsquic/lsquic_attq.c @@ -26,7 +26,6 @@ struct attq { struct malo *aq_elem_malo; struct attq_elem **aq_heap; - lsquic_time_t aq_min; unsigned aq_nelem; unsigned aq_nalloc; }; @@ -91,17 +90,6 @@ attq_verify (struct attq *q) #endif -int -attq_maybe_add (struct attq *q, struct lsquic_conn *conn, - lsquic_time_t advisory_time) -{ - if (advisory_time < q->aq_min) - return 1; - else - return attq_add(q, conn, advisory_time); -} - - static void attq_swap (struct attq *q, unsigned a, unsigned b) { @@ -273,13 +261,3 @@ attq_next_time (struct attq *q) else return NULL; } - - -lsquic_time_t -attq_set_min (struct attq *q, lsquic_time_t new_min) -{ - lsquic_time_t prev_value; - prev_value = q->aq_min; - q->aq_min = new_min; - return prev_value; -} diff --git a/src/liblsquic/lsquic_attq.h b/src/liblsquic/lsquic_attq.h index 26ae55e04..79b04aa01 100644 --- a/src/liblsquic/lsquic_attq.h +++ b/src/liblsquic/lsquic_attq.h @@ -27,11 +27,6 @@ attq_create (void); void attq_destroy (struct attq *); -/* Return 1 if advisory_time is too small, 0 on success, -1 on failure */ -int -attq_maybe_add (struct attq *, struct lsquic_conn *, - lsquic_time_t advisory_time); - /* Return 0 on success, -1 on failure (malloc) */ int attq_add (struct attq *, struct lsquic_conn *, lsquic_time_t advisory_time); @@ -48,7 +43,4 @@ attq_count_before (struct attq *, lsquic_time_t cutoff); const lsquic_time_t * attq_next_time (struct attq *); -lsquic_time_t -attq_set_min (struct attq *, lsquic_time_t new_min); - #endif diff --git a/src/liblsquic/lsquic_conn.h b/src/liblsquic/lsquic_conn.h index 3ee18d92f..8d0bb2862 100644 --- a/src/liblsquic/lsquic_conn.h +++ b/src/liblsquic/lsquic_conn.h @@ -24,7 +24,6 @@ struct parse_funcs; struct attq_elem; enum lsquic_conn_flags { - LSCONN_HAS_INCOMING = (1 << 0), LSCONN_HAS_OUTGOING = (1 << 1), LSCONN_HASHED = (1 << 2), LSCONN_HAS_PEER_SA = (1 << 4), @@ -35,25 +34,19 @@ enum lsquic_conn_flags { LSCONN_TCID0 = (1 << 9), LSCONN_VER_SET = (1 <<10), /* cn_version is set */ LSCONN_EVANESCENT = (1 <<11), /* evanescent connection */ - LSCONN_RW_PENDING = (1 <<12), + LSCONN_TICKABLE = (1 <<12), /* Connection is in the Tickable Queue */ LSCONN_COI_ACTIVE = (1 <<13), LSCONN_COI_INACTIVE = (1 <<14), LSCONN_SEND_BLOCKED = (1 <<15), /* Send connection blocked frame */ - LSCONN_NEVER_PEND_RW = (1 <<17), /* Do not put onto Pending RW queue */ + LSCONN_NEVER_TICKABLE = (1 <<17), /* Do not put onto the Tickable Queue */ LSCONN_ATTQ = (1 <<19), }; -#define TICK_BIT_PROGRESS 2 - /* A connection may have things to send and be closed at the same time. */ enum tick_st { TICK_SEND = (1 << 0), TICK_CLOSE = (1 << 1), - /* Progress was made (see @ref es_pendrw_check for definition of - * "progress.") - */ - TICK_PROGRESS= (1 << TICK_BIT_PROGRESS), }; #define TICK_QUIET 0 @@ -86,10 +79,13 @@ struct conn_iface void (*ci_destroy) (struct lsquic_conn *); -}; -#define RW_HIST_BITS 6 -typedef unsigned char rw_hist_idx_t; + int + (*ci_is_tickable) (struct lsquic_conn *); + + lsquic_time_t + (*ci_next_tick_time) (struct lsquic_conn *); +}; struct lsquic_conn { @@ -99,22 +95,18 @@ struct lsquic_conn *cn_esf; lsquic_cid_t cn_cid; STAILQ_ENTRY(lsquic_conn) cn_next_closed_conn; - TAILQ_ENTRY(lsquic_conn) cn_next_all, - cn_next_in, - cn_next_pend_rw, - cn_next_out, + STAILQ_ENTRY(lsquic_conn) cn_next_ticked; + TAILQ_ENTRY(lsquic_conn) cn_next_out, cn_next_hash; const struct conn_iface *cn_if; const struct parse_funcs *cn_pf; struct attq_elem *cn_attq_elem; lsquic_time_t cn_last_sent; + lsquic_time_t cn_last_ticked; enum lsquic_conn_flags cn_flags; enum lsquic_version cn_version; - unsigned cn_noprogress_count; unsigned cn_hash; unsigned short cn_pack_size; - rw_hist_idx_t cn_rw_hist_idx; - unsigned char cn_rw_hist_buf[ 1 << RW_HIST_BITS ]; unsigned char cn_peer_addr[sizeof(struct sockaddr_in6)], cn_local_addr[sizeof(struct sockaddr_in6)]; }; diff --git a/src/liblsquic/lsquic_conn_hash.c b/src/liblsquic/lsquic_conn_hash.c index ca40e14c6..4e39ee954 100644 --- a/src/liblsquic/lsquic_conn_hash.c +++ b/src/liblsquic/lsquic_conn_hash.c @@ -20,24 +20,19 @@ int -conn_hash_init (struct conn_hash *conn_hash, unsigned max_count) +conn_hash_init (struct conn_hash *conn_hash) { unsigned n; - if (!max_count) - max_count = 1000000; - memset(conn_hash, 0, sizeof(*conn_hash)); - conn_hash->ch_max_count = max_count; conn_hash->ch_nbits = 1; /* Start small */ - TAILQ_INIT(&conn_hash->ch_all); conn_hash->ch_buckets = malloc(sizeof(conn_hash->ch_buckets[0]) * n_buckets(conn_hash->ch_nbits)); if (!conn_hash->ch_buckets) return -1; for (n = 0; n < n_buckets(conn_hash->ch_nbits); ++n) TAILQ_INIT(&conn_hash->ch_buckets[n]); - LSQ_INFO("initialized: max_count: %u", conn_hash->ch_max_count); + LSQ_INFO("initialized"); return 0; } @@ -105,8 +100,6 @@ conn_hash_add (struct conn_hash *conn_hash, struct lsquic_conn *lconn) { const unsigned hash = XXH32(&lconn->cn_cid, sizeof(lconn->cn_cid), (uintptr_t) conn_hash); - if (conn_hash->ch_count >= conn_hash->ch_max_count) - return -1; if (conn_hash->ch_count >= n_buckets(conn_hash->ch_nbits) * CONN_HASH_MAX_PER_BUCKET && conn_hash->ch_nbits < sizeof(hash) * 8 - 1 && @@ -116,7 +109,6 @@ conn_hash_add (struct conn_hash *conn_hash, struct lsquic_conn *lconn) } const unsigned buckno = conn_hash_bucket_no(conn_hash, hash); lconn->cn_hash = hash; - TAILQ_INSERT_TAIL(&conn_hash->ch_all, lconn, cn_next_all); TAILQ_INSERT_TAIL(&conn_hash->ch_buckets[buckno], lconn, cn_next_hash); ++conn_hash->ch_count; return 0; @@ -127,7 +119,6 @@ void conn_hash_remove (struct conn_hash *conn_hash, struct lsquic_conn *lconn) { const unsigned buckno = conn_hash_bucket_no(conn_hash, lconn->cn_hash); - TAILQ_REMOVE(&conn_hash->ch_all, lconn, cn_next_all); TAILQ_REMOVE(&conn_hash->ch_buckets[buckno], lconn, cn_next_hash); --conn_hash->ch_count; } @@ -136,7 +127,8 @@ conn_hash_remove (struct conn_hash *conn_hash, struct lsquic_conn *lconn) void conn_hash_reset_iter (struct conn_hash *conn_hash) { - conn_hash->ch_next = TAILQ_FIRST(&conn_hash->ch_all); + conn_hash->ch_iter.cur_buckno = 0; + conn_hash->ch_iter.next_conn = TAILQ_FIRST(&conn_hash->ch_buckets[0]); } @@ -151,8 +143,16 @@ conn_hash_first (struct conn_hash *conn_hash) struct lsquic_conn * conn_hash_next (struct conn_hash *conn_hash) { - struct lsquic_conn *lconn = conn_hash->ch_next; + struct lsquic_conn *lconn = conn_hash->ch_iter.next_conn; + while (!lconn) + { + ++conn_hash->ch_iter.cur_buckno; + if (conn_hash->ch_iter.cur_buckno >= n_buckets(conn_hash->ch_nbits)) + return NULL; + lconn = TAILQ_FIRST(&conn_hash->ch_buckets[ + conn_hash->ch_iter.cur_buckno]); + } if (lconn) - conn_hash->ch_next = TAILQ_NEXT(lconn, cn_next_all); + conn_hash->ch_iter.next_conn = TAILQ_NEXT(lconn, cn_next_hash); return lconn; } diff --git a/src/liblsquic/lsquic_conn_hash.h b/src/liblsquic/lsquic_conn_hash.h index 2a728034b..4ee27376a 100644 --- a/src/liblsquic/lsquic_conn_hash.h +++ b/src/liblsquic/lsquic_conn_hash.h @@ -21,19 +21,20 @@ TAILQ_HEAD(lsquic_conn_head, lsquic_conn); struct conn_hash { - struct lsquic_conn_head ch_all; struct lsquic_conn_head *ch_buckets; - struct lsquic_conn *ch_next; + struct { + unsigned cur_buckno; + struct lsquic_conn *next_conn; + } ch_iter; unsigned ch_count; unsigned ch_nbits; - unsigned ch_max_count; }; #define conn_hash_count(conn_hash) (+(conn_hash)->ch_count) /* Returns -1 if malloc fails */ int -conn_hash_init (struct conn_hash *, unsigned max_count); +conn_hash_init (struct conn_hash *); void conn_hash_cleanup (struct conn_hash *); diff --git a/src/liblsquic/lsquic_engine.c b/src/liblsquic/lsquic_engine.c index 7a660be55..c92b5473f 100644 --- a/src/liblsquic/lsquic_engine.c +++ b/src/liblsquic/lsquic_engine.c @@ -53,6 +53,7 @@ #include "lsquic_version.h" #include "lsquic_hash.h" #include "lsquic_attq.h" +#include "lsquic_min_heap.h" #define LSQUIC_LOGGER_MODULE LSQLM_ENGINE #include "lsquic_logger.h" @@ -73,7 +74,8 @@ struct out_batch typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *); static void -process_connections (struct lsquic_engine *engine, conn_iter_f iter); +process_connections (struct lsquic_engine *engine, conn_iter_f iter, + lsquic_time_t now); static void engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag); @@ -96,26 +98,24 @@ force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); (e)->pub.enp_flags &= ~ENPUB_PROC; \ } while (0) -/* A connection can be referenced from one of six places: +/* A connection can be referenced from one of five places: * * 1. Connection hash: a connection starts its life in one of those. * * 2. Outgoing queue. * - * 3. Incoming queue. + * 3. Tickable queue * - * 4. Pending RW Events queue. + * 4. Advisory Tick Time queue. * - * 5. Advisory Tick Time queue. - * - * 6. Closing connections queue. This is a transient queue -- it only + * 5. Closing connections queue. This is a transient queue -- it only * exists for the duration of process_connections() function call. * * The idea is to destroy the connection when it is no longer referenced. * For example, a connection tick may return TICK_SEND|TICK_CLOSE. In - * that case, the connection is referenced from two places: (2) and (6). - * After its packets are sent, it is only referenced in (6), and at the - * end of the function call, when it is removed from (6), reference count + * that case, the connection is referenced from two places: (2) and (5). + * After its packets are sent, it is only referenced in (5), and at the + * end of the function call, when it is removed from (5), reference count * goes to zero and the connection is destroyed. If not all packets can * be sent, at the end of the function call, the connection is referenced * by (2) and will only be removed once all outgoing packets have been @@ -123,27 +123,11 @@ force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn); */ #define CONN_REF_FLAGS (LSCONN_HASHED \ |LSCONN_HAS_OUTGOING \ - |LSCONN_HAS_INCOMING \ - |LSCONN_RW_PENDING \ + |LSCONN_TICKABLE \ |LSCONN_CLOSING \ |LSCONN_ATTQ) -struct out_heap_elem -{ - struct lsquic_conn *ohe_conn; - lsquic_time_t ohe_last_sent; -}; - - -struct out_heap -{ - struct out_heap_elem *oh_elems; - unsigned oh_nalloc, - oh_nelem; -}; - - struct lsquic_engine @@ -166,142 +150,22 @@ struct lsquic_engine lsquic_packets_out_f packets_out; void *packets_out_ctx; void *bad_handshake_ctx; - struct conn_hash full_conns; - TAILQ_HEAD(, lsquic_conn) conns_in, conns_pend_rw; - struct out_heap conns_out; - /* Use a union because only one iterator is being used at any one time */ - union { - struct { - /* This iterator does not have any state: it uses `conns_in' */ - int ignore; - } conn_in; - struct { - /* This iterator does not have any state: it uses `conns_pend_rw' */ - int ignore; - } rw_pend; - struct { - /* Iterator state to process connections in Advisory Tick Time - * queue. - */ - lsquic_time_t cutoff; - } attq; - struct { - /* Iterator state to process all connections */ - int ignore; - } all; - struct { - lsquic_conn_t *conn; - } one; - } iter_state; + struct conn_hash conns_hash; + struct min_heap conns_tickable; + struct min_heap conns_out; struct eng_hist history; unsigned batch_size; - unsigned time_until_desired_tick; struct attq *attq; - lsquic_time_t proc_time; /* Track time last time a packet was sent to give new connections * priority lower than that of existing connections. */ lsquic_time_t last_sent; + unsigned n_conns; lsquic_time_t deadline; struct out_batch out_batch; }; -#define OHE_PARENT(i) ((i - 1) / 2) -#define OHE_LCHILD(i) (2 * i + 1) -#define OHE_RCHILD(i) (2 * i + 2) - - -static void -heapify_out_heap (struct out_heap *heap, unsigned i) -{ - struct out_heap_elem el; - unsigned smallest; - - assert(i < heap->oh_nelem); - - if (OHE_LCHILD(i) < heap->oh_nelem) - { - if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent < - heap->oh_elems[ i ].ohe_last_sent) - smallest = OHE_LCHILD(i); - else - smallest = i; - if (OHE_RCHILD(i) < heap->oh_nelem && - heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent < - heap->oh_elems[ smallest ].ohe_last_sent) - smallest = OHE_RCHILD(i); - } - else - smallest = i; - - if (smallest != i) - { - el = heap->oh_elems[ smallest ]; - heap->oh_elems[ smallest ] = heap->oh_elems[ i ]; - heap->oh_elems[ i ] = el; - heapify_out_heap(heap, smallest); - } -} - - -static void -oh_insert (struct out_heap *heap, lsquic_conn_t *conn) -{ - struct out_heap_elem el; - unsigned nalloc, i; - - if (heap->oh_nelem == heap->oh_nalloc) - { - if (0 == heap->oh_nalloc) - nalloc = 4; - else - nalloc = heap->oh_nalloc * 2; - heap->oh_elems = realloc(heap->oh_elems, - nalloc * sizeof(heap->oh_elems[0])); - if (!heap->oh_elems) - { /* Not much we can do here */ - LSQ_ERROR("realloc failed"); - return; - } - heap->oh_nalloc = nalloc; - } - - heap->oh_elems[ heap->oh_nelem ].ohe_conn = conn; - heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent; - ++heap->oh_nelem; - - i = heap->oh_nelem - 1; - while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent > - heap->oh_elems[ i ].ohe_last_sent) - { - el = heap->oh_elems[ OHE_PARENT(i) ]; - heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ]; - heap->oh_elems[ i ] = el; - i = OHE_PARENT(i); - } -} - - -static struct lsquic_conn * -oh_pop (struct out_heap *heap) -{ - struct lsquic_conn *conn; - - assert(heap->oh_nelem); - - conn = heap->oh_elems[0].ohe_conn; - --heap->oh_nelem; - if (heap->oh_nelem > 0) - { - heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ]; - heapify_out_heap(heap, 0); - } - - return conn; -} - - void lsquic_engine_init_settings (struct lsquic_engine_settings *settings, unsigned flags) @@ -336,7 +200,6 @@ lsquic_engine_init_settings (struct lsquic_engine_settings *settings, settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP; settings->es_honor_prst = LSQUIC_DF_HONOR_PRST; settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK; - settings->es_pendrw_check = LSQUIC_DF_PENDRW_CHECK; settings->es_rw_once = LSQUIC_DF_RW_ONCE; settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH; settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS; @@ -456,9 +319,7 @@ lsquic_engine_new (unsigned flags, engine->pub.enp_pmi_ctx = NULL; } engine->pub.enp_engine = engine; - TAILQ_INIT(&engine->conns_in); - TAILQ_INIT(&engine->conns_pend_rw); - conn_hash_init(&engine->full_conns, ~0); + conn_hash_init(&engine->conns_hash); engine->attq = attq_create(); eng_hist_init(&engine->history); engine->batch_size = INITIAL_OUT_BATCH_SIZE; @@ -489,23 +350,62 @@ shrink_batch_size (struct lsquic_engine *engine) static void destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn) { - conn->cn_flags |= LSCONN_NEVER_PEND_RW; + --engine->n_conns; + conn->cn_flags |= LSCONN_NEVER_TICKABLE; conn->cn_if->ci_destroy(conn); } +static int +maybe_grow_conn_heaps (struct lsquic_engine *engine) +{ + struct min_heap_elem *els; + unsigned count; + + if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable)) + return 0; /* Nothing to do */ + + if (lsquic_mh_nalloc(&engine->conns_tickable)) + count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2; + else + count = 8; + + els = malloc(sizeof(els[0]) * count); + if (!els) + { + LSQ_ERROR("%s: malloc failed", __func__); + return -1; + } + + LSQ_DEBUG("grew heaps to %u elements", count / 2); + memcpy(&els[0], engine->conns_tickable.mh_elems, + sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable)); + memcpy(&els[count / 2], engine->conns_out.mh_elems, + sizeof(els[0]) * lsquic_mh_count(&engine->conns_out)); + free(engine->conns_tickable.mh_elems); + engine->conns_tickable.mh_elems = els; + engine->conns_out.mh_elems = &els[count / 2]; + engine->conns_tickable.mh_nalloc = count / 2; + engine->conns_out.mh_nalloc = count / 2; + return 0; +} + + static lsquic_conn_t * new_full_conn_client (lsquic_engine_t *engine, const char *hostname, unsigned short max_packet_size) { lsquic_conn_t *conn; unsigned flags; + if (0 != maybe_grow_conn_heaps(engine)) + return NULL; flags = engine->flags & (ENG_SERVER|ENG_HTTP); conn = full_conn_client_new(&engine->pub, engine->stream_if, engine->stream_if_ctx, flags, hostname, max_packet_size); if (!conn) return NULL; - if (0 != conn_hash_add(&engine->full_conns, conn)) + ++engine->n_conns; + if (0 != conn_hash_add(&engine->conns_hash, conn)) { LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy", conn->cn_cid); @@ -514,7 +414,7 @@ new_full_conn_client (lsquic_engine_t *engine, const char *hostname, } assert(!(conn->cn_flags & (CONN_REF_FLAGS - & ~LSCONN_RW_PENDING /* This flag may be set as effect of user + & ~LSCONN_TICKABLE /* This flag may be set as effect of user callbacks */ ))); conn->cn_flags |= LSCONN_HASHED; @@ -542,7 +442,7 @@ find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, return NULL; } - conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id); + conn = conn_hash_find(&engine->conns_hash, packet_in->pi_conn_id); if (conn) { conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate); @@ -553,41 +453,19 @@ find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, } -static void -add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn, - enum rw_reason reason) -{ - int hist_idx; - - TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw); - engine_incref_conn(conn, LSCONN_RW_PENDING); - - hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); - conn->cn_rw_hist_buf[ hist_idx ] = reason; - ++conn->cn_rw_hist_idx; - - if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx) - EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), " - "rw_hist: %.*s", (char) reason, - (int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); - else - EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')", - (char) reason); -} - - #if !defined(NDEBUG) && __GNUC__ __attribute__((weak)) #endif void -lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, - lsquic_conn_t *conn, enum rw_reason reason) +lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub, + lsquic_conn_t *conn) { if (0 == (enpub->enp_flags & ENPUB_PROC) && - 0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW))) + 0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE))) { lsquic_engine_t *engine = (lsquic_engine_t *) enpub; - add_conn_to_pend_rw(engine, conn, reason); + lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); + engine_incref_conn(conn, LSCONN_TICKABLE); } } @@ -597,69 +475,24 @@ lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub, lsquic_conn_t *conn, lsquic_time_t tick_time) { lsquic_engine_t *const engine = (lsquic_engine_t *) enpub; - /* Instead of performing an update, we simply remove the connection from - * the queue and add it back. This should not happen in at the time of - * this writing. - */ - if (conn->cn_flags & LSCONN_ATTQ) - { - attq_remove(engine->attq, conn); - conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); - } - if (conn && !(conn->cn_flags & LSCONN_ATTQ) && - 0 == attq_maybe_add(engine->attq, conn, tick_time)) - engine_incref_conn(conn, LSCONN_ATTQ); -} - - -static void -update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn, - int progress_made) -{ - rw_hist_idx_t hist_idx; - const unsigned char *empty; - const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check; - - if (!pendrw_check) - return; - - /* Convert previous entry to uppercase: */ - hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1); - conn->cn_rw_hist_buf[ hist_idx ] -= 0x20; - - LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made); - if (progress_made) + if (conn->cn_flags & LSCONN_TICKABLE) { - conn->cn_noprogress_count = 0; - return; + /* Optimization: no need to add the connection to the Advisory Tick + * Time Queue: it is about to be ticked, after which it its next tick + * time may be queried again. + */; } - - EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made " - "no progress"); - ++conn->cn_noprogress_count; - if (conn->cn_noprogress_count <= pendrw_check) - return; - - conn->cn_flags |= LSCONN_NEVER_PEND_RW; - empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY, - sizeof(conn->cn_rw_hist_buf)); - if (empty) - LSQ_WARN("conn %"PRIu64" noprogress count reached %u " - "(rw_hist: %.*s): will not put it onto Pend RW queue again", - conn->cn_cid, conn->cn_noprogress_count, - (int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf); - else + else if (conn->cn_flags & LSCONN_ATTQ) { - hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1); - LSQ_WARN("conn %"PRIu64" noprogress count reached %u " - "(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again", - conn->cn_cid, conn->cn_noprogress_count, - /* First part of history: */ - (int) (sizeof(conn->cn_rw_hist_buf) - hist_idx), - conn->cn_rw_hist_buf + hist_idx, - /* Second part of history: */ - hist_idx, conn->cn_rw_hist_buf); + if (lsquic_conn_adv_time(conn) != tick_time) + { + attq_remove(engine->attq, conn); + if (0 != attq_add(engine->attq, conn, tick_time)) + engine_decref_conn(engine, conn, LSCONN_ATTQ); + } } + else if (0 == attq_add(engine->attq, conn, tick_time)) + engine_incref_conn(conn, LSCONN_ATTQ); } @@ -678,9 +511,10 @@ process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, return 1; } - if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) { - TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in); - engine_incref_conn(conn, LSCONN_HAS_INCOMING); + if (0 == (conn->cn_flags & LSCONN_TICKABLE)) + { + lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); + engine_incref_conn(conn, LSCONN_TICKABLE); } lsquic_conn_record_sockaddr(conn, sa_local, sa_peer); lsquic_packet_in_upref(packet_in); @@ -691,101 +525,11 @@ process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in, } -static int -conn_attq_expired (const struct lsquic_engine *engine, - const lsquic_conn_t *conn) -{ - assert(conn->cn_attq_elem); - return lsquic_conn_adv_time(conn) < engine->proc_time; -} - - -/* Iterator for connections with incoming packets */ -static lsquic_conn_t * -conn_iter_next_incoming (struct lsquic_engine *engine) -{ - enum lsquic_conn_flags addl_flags; - lsquic_conn_t *conn; - while ((conn = TAILQ_FIRST(&engine->conns_in))) - { - TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); - if (conn->cn_flags & LSCONN_RW_PENDING) - { - TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); - EV_LOG_CONN_EVENT(conn->cn_cid, - "removed from pending RW queue (processing incoming)"); - } - if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) - { - addl_flags = LSCONN_ATTQ; - attq_remove(engine->attq, conn); - } - else - addl_flags = 0; - conn = engine_decref_conn(engine, conn, - LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); - if (conn) - break; - } - return conn; -} - - -/* Iterator for connections with that have pending read/write events */ -static lsquic_conn_t * -conn_iter_next_rw_pend (struct lsquic_engine *engine) -{ - enum lsquic_conn_flags addl_flags; - lsquic_conn_t *conn; - while ((conn = TAILQ_FIRST(&engine->conns_pend_rw))) - { - TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); - EV_LOG_CONN_EVENT(conn->cn_cid, - "removed from pending RW queue (processing pending RW conns)"); - if (conn->cn_flags & LSCONN_HAS_INCOMING) - TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); - if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn)) - { - addl_flags = LSCONN_ATTQ; - attq_remove(engine->attq, conn); - } - else - addl_flags = 0; - conn = engine_decref_conn(engine, conn, - LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags); - if (conn) - break; - } - return conn; -} - - -void -lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine) -{ - LSQ_DEBUG("process connections with incoming packets"); - ENGINE_IN(engine); - process_connections(engine, conn_iter_next_incoming); - assert(TAILQ_EMPTY(&engine->conns_in)); - ENGINE_OUT(engine); -} - - int -lsquic_engine_has_pend_rw (lsquic_engine_t *engine) +lsquic_engine_has_tickable (lsquic_engine_t *engine) { return !(engine->flags & ENG_PAST_DEADLINE) - && !TAILQ_EMPTY(&engine->conns_pend_rw); -} - - -void -lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine) -{ - LSQ_DEBUG("process connections with pending RW events"); - ENGINE_IN(engine); - process_connections(engine, conn_iter_next_rw_pend); - ENGINE_OUT(engine); + && lsquic_mh_count(&engine->conns_tickable) > 0; } @@ -799,74 +543,33 @@ lsquic_engine_destroy (lsquic_engine_t *engine) engine->flags |= ENG_DTOR; #endif - while (engine->conns_out.oh_nelem > 0) + while ((conn = lsquic_mh_pop(&engine->conns_out))) { - --engine->conns_out.oh_nelem; - conn = engine->conns_out.oh_elems[ - engine->conns_out.oh_nelem ].ohe_conn; assert(conn->cn_flags & LSCONN_HAS_OUTGOING); (void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING); } - for (conn = conn_hash_first(&engine->full_conns); conn; - conn = conn_hash_next(&engine->full_conns)) - force_close_conn(engine, conn); - conn_hash_cleanup(&engine->full_conns); + while ((conn = lsquic_mh_pop(&engine->conns_tickable))) + { + assert(conn->cn_flags & LSCONN_TICKABLE); + (void) engine_decref_conn(engine, conn, LSCONN_TICKABLE); + } + for (conn = conn_hash_first(&engine->conns_hash); conn; + conn = conn_hash_next(&engine->conns_hash)) + force_close_conn(engine, conn); + conn_hash_cleanup(&engine->conns_hash); + assert(0 == engine->n_conns); attq_destroy(engine->attq); - assert(0 == engine->conns_out.oh_nelem); - assert(TAILQ_EMPTY(&engine->conns_pend_rw)); - lsquic_mm_cleanup(&engine->pub.enp_mm); - free(engine->conns_out.oh_elems); + assert(0 == lsquic_mh_count(&engine->conns_out)); + assert(0 == lsquic_mh_count(&engine->conns_tickable)); + free(engine->conns_tickable.mh_elems); free(engine); } -#if __GNUC__ -__attribute__((nonnull(3))) -#endif -static lsquic_conn_t * -remove_from_inc_andor_pend_rw (lsquic_engine_t *engine, - lsquic_conn_t *conn, const char *reason) -{ - assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)); - if (conn->cn_flags & LSCONN_HAS_INCOMING) - TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); - if (conn->cn_flags & LSCONN_RW_PENDING) - { - TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); - EV_LOG_CONN_EVENT(conn->cn_cid, - "removed from pending RW queue (%s)", reason); - } - conn = engine_decref_conn(engine, conn, - LSCONN_HAS_INCOMING|LSCONN_RW_PENDING); - assert(conn); - return conn; -} - - -static lsquic_conn_t * -conn_iter_next_one (lsquic_engine_t *engine) -{ - lsquic_conn_t *conn = engine->iter_state.one.conn; - if (conn) - { - if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) - conn = remove_from_inc_andor_pend_rw(engine, conn, "connect"); - if (conn && (conn->cn_flags & LSCONN_ATTQ) && - conn_attq_expired(engine, conn)) - { - attq_remove(engine->attq, conn); - conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); - } - engine->iter_state.one.conn = NULL; - } - return conn; -} - - lsquic_conn_t * lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, void *peer_ctx, lsquic_conn_ctx_t *conn_ctx, @@ -896,13 +599,13 @@ lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, conn = new_full_conn_client(engine, hostname, max_packet_size); if (!conn) return NULL; + lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); + engine_incref_conn(conn, LSCONN_TICKABLE); ENGINE_IN(engine); lsquic_conn_record_peer_sa(conn, peer_sa); conn->cn_peer_ctx = peer_ctx; lsquic_conn_set_ctx(conn, conn_ctx); - engine->iter_state.one.conn = conn; full_conn_client_call_on_new(conn); - process_connections(engine, conn_iter_next_one); ENGINE_OUT(engine); return conn; } @@ -911,19 +614,18 @@ lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa, static void remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn) { - conn_hash_remove(&engine->full_conns, conn); + conn_hash_remove(&engine->conns_hash, conn); (void) engine_decref_conn(engine, conn, LSCONN_HASHED); } static void -refflags2str (enum lsquic_conn_flags flags, char s[7]) +refflags2str (enum lsquic_conn_flags flags, char s[6]) { *s = 'C'; s += !!(flags & LSCONN_CLOSING); *s = 'H'; s += !!(flags & LSCONN_HASHED); *s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING); - *s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING); - *s = 'R'; s += !!(flags & LSCONN_RW_PENDING); + *s = 'T'; s += !!(flags & LSCONN_TICKABLE); *s = 'A'; s += !!(flags & LSCONN_ATTQ); *s = '\0'; } @@ -932,7 +634,7 @@ refflags2str (enum lsquic_conn_flags flags, char s[7]) static void engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag) { - char str[7]; + char str[6]; assert(flag & CONN_REF_FLAGS); assert(!(conn->cn_flags & flag)); conn->cn_flags |= flag; @@ -945,7 +647,7 @@ static lsquic_conn_t * engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn, enum lsquic_conn_flags flags) { - char str[7]; + char str[6]; assert(flags & CONN_REF_FLAGS); assert(conn->cn_flags & flags); #ifndef NDEBUG @@ -974,41 +676,32 @@ force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn) const enum lsquic_conn_flags flags = conn->cn_flags; assert(conn->cn_flags & CONN_REF_FLAGS); assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */ + assert(!(flags & LSCONN_TICKABLE)); /* Should be removed already */ assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */ - if (flags & LSCONN_HAS_INCOMING) - { - TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in); - (void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING); - } - if (flags & LSCONN_RW_PENDING) - { - TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw); - EV_LOG_CONN_EVENT(conn->cn_cid, - "removed from pending RW queue (engine destruction)"); - (void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING); - } if (flags & LSCONN_ATTQ) + { attq_remove(engine->attq, conn); + (void) engine_decref_conn(engine, conn, LSCONN_ATTQ); + } if (flags & LSCONN_HASHED) remove_conn_from_hash(engine, conn); } -/* Iterator for all connections. - * Returned connections are removed from the Incoming, Pending RW Event, - * and Advisory Tick Time queues if necessary. +/* Iterator for tickable connections (those on the Tickable Queue). Before + * a connection is returned, it is removed from the Advisory Tick Time queue + * if necessary. */ static lsquic_conn_t * -conn_iter_next_all (struct lsquic_engine *engine) +conn_iter_next_tickable (struct lsquic_engine *engine) { lsquic_conn_t *conn; - conn = conn_hash_next(&engine->full_conns); + conn = lsquic_mh_pop(&engine->conns_tickable); - if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))) - conn = remove_from_inc_andor_pend_rw(engine, conn, "process all"); - if (conn && (conn->cn_flags & LSCONN_ATTQ) - && conn_attq_expired(engine, conn)) + if (conn) + conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE); + if (conn && (conn->cn_flags & LSCONN_ATTQ)) { attq_remove(engine->attq, conn); conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); @@ -1018,62 +711,26 @@ conn_iter_next_all (struct lsquic_engine *engine) } -static lsquic_conn_t * -conn_iter_next_attq (struct lsquic_engine *engine) +void +lsquic_engine_process_conns (lsquic_engine_t *engine) { lsquic_conn_t *conn; + lsquic_time_t now; - conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff); - if (conn) - { - assert(conn->cn_flags & LSCONN_ATTQ); - if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)) - conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq"); - conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); - } - - return conn; -} - - -void -lsquic_engine_proc_all (lsquic_engine_t *engine) -{ ENGINE_IN(engine); - /* We poke each connection every time as initial implementation. If it - * proves to be too inefficient, we will need to figure out - * a) when to stop processing; and - * b) how to remember state between calls. - */ - conn_hash_reset_iter(&engine->full_conns); - process_connections(engine, conn_iter_next_all); - ENGINE_OUT(engine); -} - - -void -lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine) -{ - lsquic_time_t prev_min, now; now = lsquic_time_now(); - if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG)) + while ((conn = attq_pop(engine->attq, now))) { - const lsquic_time_t *expected_time; - int64_t diff; - expected_time = attq_next_time(engine->attq); - if (expected_time) - diff = *expected_time - now; - else - diff = -1; - LSQ_DEBUG("process connections in attq; time diff: %"PRIi64, diff); + conn = engine_decref_conn(engine, conn, LSCONN_ATTQ); + if (conn && !(conn->cn_flags & LSCONN_TICKABLE)) + { + lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); + engine_incref_conn(conn, LSCONN_TICKABLE); + } } - ENGINE_IN(engine); - prev_min = attq_set_min(engine->attq, now); /* Prevent infinite loop */ - engine->iter_state.attq.cutoff = now; - process_connections(engine, conn_iter_next_attq); - attq_set_min(engine->attq, prev_min); /* Restore previos value */ + process_connections(engine, conn_iter_next_tickable, now); ENGINE_OUT(engine); } @@ -1164,12 +821,12 @@ encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn, } -STAILQ_HEAD(closed_conns, lsquic_conn); +STAILQ_HEAD(conns_stailq, lsquic_conn); struct conns_out_iter { - struct out_heap *coi_heap; + struct min_heap *coi_heap; TAILQ_HEAD(, lsquic_conn) coi_active_list, coi_inactive_list; lsquic_conn_t *coi_next; @@ -1197,9 +854,9 @@ coi_next (struct conns_out_iter *iter) { lsquic_conn_t *conn; - if (iter->coi_heap->oh_nelem > 0) + if (lsquic_mh_count(iter->coi_heap) > 0) { - conn = oh_pop(iter->coi_heap); + conn = lsquic_mh_pop(iter->coi_heap); TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out); conn->cn_flags |= LSCONN_COI_ACTIVE; #ifndef NDEBUG @@ -1268,7 +925,7 @@ coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine) { TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out); conn->cn_flags &= ~LSCONN_COI_ACTIVE; - oh_insert(iter->coi_heap, conn); + lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent); } while ((conn = TAILQ_FIRST(&iter->coi_inactive_list))) { @@ -1358,7 +1015,7 @@ check_deadline (lsquic_engine_t *engine) static void send_packets_out (struct lsquic_engine *engine, - struct closed_conns *closed_conns) + struct conns_stailq *closed_conns) { unsigned n, w, n_sent, n_batches_sent; lsquic_packet_out_t *packet_out; @@ -1471,7 +1128,7 @@ int lsquic_engine_has_unsent_packets (lsquic_engine_t *engine) { return !(engine->flags & ENG_PAST_DEADLINE) - && ( engine->conns_out.oh_nelem > 0 + && ( lsquic_mh_count(&engine->conns_out) > 0 ) ; } @@ -1490,7 +1147,7 @@ void lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) { lsquic_conn_t *conn; - struct closed_conns closed_conns; + struct conns_stailq closed_conns; STAILQ_INIT(&closed_conns); reset_deadline(engine, lsquic_time_now()); @@ -1506,29 +1163,32 @@ lsquic_engine_send_unsent_packets (lsquic_engine_t *engine) static void -process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) +process_connections (lsquic_engine_t *engine, conn_iter_f next_conn, + lsquic_time_t now) { lsquic_conn_t *conn; enum tick_st tick_st; - lsquic_time_t now = lsquic_time_now(); - struct closed_conns closed_conns; + unsigned i; + lsquic_time_t next_tick_time; + struct conns_stailq closed_conns, ticked_conns; - engine->proc_time = now; eng_hist_tick(&engine->history, now); STAILQ_INIT(&closed_conns); + STAILQ_INIT(&ticked_conns); reset_deadline(engine, now); - while ((conn = next_conn(engine))) + i = 0; + while ((conn = next_conn(engine)) + ) { tick_st = conn->cn_if->ci_tick(conn, now); - if (conn_iter_next_rw_pend == next_conn) - update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS); + conn->cn_last_ticked = now + i /* Maintain relative order */ ++; if (tick_st & TICK_SEND) { if (!(conn->cn_flags & LSCONN_HAS_OUTGOING)) { - oh_insert(&engine->conns_out, conn); + lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent); engine_incref_conn(conn, LSCONN_HAS_OUTGOING); } } @@ -1539,6 +1199,8 @@ process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) if (conn->cn_flags & LSCONN_HASHED) remove_conn_from_hash(engine, conn); } + else + STAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked); } if (lsquic_engine_has_unsent_packets(engine)) @@ -1549,6 +1211,31 @@ process_connections (lsquic_engine_t *engine, conn_iter_f next_conn) (void) engine_decref_conn(engine, conn, LSCONN_CLOSING); } + /* TODO Heapification can be optimized by switching to the Floyd method: + * https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap + */ + while ((conn = STAILQ_FIRST(&ticked_conns))) + { + STAILQ_REMOVE_HEAD(&ticked_conns, cn_next_ticked); + if (!(conn->cn_flags & LSCONN_TICKABLE) + && conn->cn_if->ci_is_tickable(conn)) + { + lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked); + engine_incref_conn(conn, LSCONN_TICKABLE); + } + else if (!(conn->cn_flags & LSCONN_ATTQ)) + { + next_tick_time = conn->cn_if->ci_next_tick_time(conn); + if (next_tick_time) + { + if (0 == attq_add(engine->attq, conn, next_tick_time)) + engine_incref_conn(conn, LSCONN_ATTQ); + } + else + assert(0); + } + } + } @@ -1613,6 +1300,12 @@ lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff) const lsquic_time_t *next_time; lsquic_time_t now; + if (lsquic_mh_count(&engine->conns_tickable)) + { + *diff = 0; + return 1; + } + next_time = attq_next_time(engine->attq); if (!next_time) return 0; diff --git a/src/liblsquic/lsquic_engine_public.h b/src/liblsquic/lsquic_engine_public.h index fb1d7e510..629936a7d 100644 --- a/src/liblsquic/lsquic_engine_public.h +++ b/src/liblsquic/lsquic_engine_public.h @@ -26,45 +26,12 @@ struct lsquic_engine_public { unsigned enp_ver_tags_len; }; -/* These values are printable ASCII characters for ease of printing the - * whole history in a single line of a log message. If connection was - * processed as result of being put onto the queue, the letter is converted - * to uppercase. - * - * The letters are assigned by first letter of the verb for most obvious - * and important actions, like "read" and "write" and other letters of - * the verb or some other letters for other actions. - * - * Each reason is either expected to produce user read from the stream - * or putting stream data into packet for sending out. This is documented - * in a separate comment column below. - */ -enum rw_reason -{ - RW_REASON_EMPTY = '\0', /* No init required */ - - /* Expected action: */ - RW_REASON_USER_WRITE = 'w', /* write */ - RW_REASON_USER_WRITEV = 'v', /* write */ - RW_REASON_USER_READ = 'r', /* write (WINDOW_UPDATE frame) */ - RW_REASON_FLUSH = 'f', /* write */ - RW_REASON_STREAM_CLOSE = 'c', /* write */ - RW_REASON_RST_IN = 'n', /* read */ - RW_REASON_STREAM_IN = 'd', /* read */ - RW_REASON_RESET_EXT = 'e', /* write */ - RW_REASON_WANTREAD = 'a', /* read */ - RW_REASON_SHUTDOWN = 'u', /* write */ - RW_REASON_WRITEFILE = 't', /* write */ - RW_REASON_SENDFILE = 's', /* write */ -}; - -/* Put connection onto Pending RW Queue if it is not already on it. If +/* Put connection onto the Tickable Queue if it is not already on it. If * connection is being destroyed, this is a no-op. - * XXX Is the bit about "being destroyed" still true? */ void -lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, - lsquic_conn_t *conn, enum rw_reason); +lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *, + lsquic_conn_t *); /* Put connection onto Advisory Tick Time Queue if it is not already on it. */ diff --git a/src/liblsquic/lsquic_full_conn.c b/src/liblsquic/lsquic_full_conn.c index 1ecfd3ee9..556347004 100644 --- a/src/liblsquic/lsquic_full_conn.c +++ b/src/liblsquic/lsquic_full_conn.c @@ -255,9 +255,6 @@ reset_ack_state (struct full_conn *conn); static int write_is_possible (struct full_conn *); -static int -dispatch_stream_read_events (struct full_conn *, struct lsquic_stream *); - static const struct headers_stream_callbacks *headers_callbacks_ptr; #if KEEP_CLOSED_STREAM_HISTORY @@ -1168,7 +1165,7 @@ process_stream_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, * has not been completed yet. Nevertheless, this is good enough * for now. */ - dispatch_stream_read_events(conn, stream); + lsquic_stream_dispatch_read_events(stream); } return parsed_len; @@ -2350,43 +2347,23 @@ service_streams (struct full_conn *conn) } -static int -dispatch_stream_read_events (struct full_conn *conn, lsquic_stream_t *stream) -{ - struct stream_read_prog_status saved_status; - int progress_made; - - lsquic_stream_get_read_prog_status(stream, &saved_status); - lsquic_stream_dispatch_read_events(stream); - progress_made = lsquic_stream_progress_was_made(stream, &saved_status); - - return progress_made; -} - - -/* Return 1 if progress was made, 0 otherwise */ -static int +static void process_streams_read_events (struct full_conn *conn) { lsquic_stream_t *stream; struct stream_prio_iter spi; - int progress_count; if (TAILQ_EMPTY(&conn->fc_pub.read_streams)) - return 0; + return; lsquic_spi_init(&spi, TAILQ_FIRST(&conn->fc_pub.read_streams), TAILQ_LAST(&conn->fc_pub.read_streams, lsquic_streams_tailq), (uintptr_t) &TAILQ_NEXT((lsquic_stream_t *) NULL, next_read_stream), STREAM_WANT_READ, conn->fc_conn.cn_cid, "read"); - progress_count = 0; for (stream = lsquic_spi_first(&spi); stream; stream = lsquic_spi_next(&spi)) - progress_count += - dispatch_stream_read_events(conn, stream); - - return progress_count > 0; + lsquic_stream_dispatch_read_events(stream); } @@ -2429,15 +2406,16 @@ process_streams_write_events (struct full_conn *conn, int high_prio) } -/* Return 1 if progress was made, 0 otherwise. */ -static int +static void process_hsk_stream_read_events (struct full_conn *conn) { lsquic_stream_t *stream; TAILQ_FOREACH(stream, &conn->fc_pub.read_streams, next_read_stream) if (LSQUIC_STREAM_HANDSHAKE == stream->id) - return dispatch_stream_read_events(conn, stream); - return 0; + { + lsquic_stream_dispatch_read_events(stream); + break; + } } @@ -2641,13 +2619,13 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) struct full_conn *conn = (struct full_conn *) lconn; int have_delayed_packets; unsigned n; - int progress_made, s; - enum tick_st progress_tick = 0; + int s; + enum tick_st tick = 0; #define CLOSE_IF_NECESSARY() do { \ if (conn->fc_flags & FC_IMMEDIATE_CLOSE_FLAGS) \ { \ - progress_tick |= immediate_close(conn); \ + tick |= immediate_close(conn); \ goto end; \ } \ } while (0) @@ -2659,13 +2637,13 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) { \ LSQ_DEBUG("used up packet allowance, quiet now (line %d)", \ __LINE__); \ - progress_tick |= TICK_QUIET; \ + tick |= TICK_QUIET; \ } \ else \ { \ LSQ_DEBUG("used up packet allowance, sending now (line %d)",\ __LINE__); \ - progress_tick |= TICK_SEND; \ + tick |= TICK_SEND; \ } \ goto end; \ } \ @@ -2682,8 +2660,6 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) LSQ_DEBUG("memory used: %zd bytes", calc_mem_used(conn)); } - assert(!(conn->fc_conn.cn_flags & LSCONN_RW_PENDING)); - if (conn->fc_flags & FC_HAVE_SAVED_ACK) { (void) /* If there is an error, we'll fail shortly */ @@ -2709,10 +2685,9 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) * does not want to wait if it has the server information. */ if (conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE) - progress_made = process_streams_read_events(conn); + process_streams_read_events(conn); else - progress_made = process_hsk_stream_read_events(conn); - progress_tick |= progress_made << TICK_BIT_PROGRESS; + process_hsk_stream_read_events(conn); CLOSE_IF_NECESSARY(); if (lsquic_send_ctl_pacer_blocked(&conn->fc_send_ctl)) @@ -2774,7 +2749,7 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) * dropped and replaced by new ACK packet. This way, we are never * more than 1 packet over CWND. */ - progress_tick |= TICK_SEND; + tick |= TICK_SEND; goto end; } @@ -2830,7 +2805,6 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) } lsquic_send_ctl_set_buffer_stream_packets(&conn->fc_send_ctl, 0); - const unsigned n_sched = lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl); if (!(conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE)) { process_hsk_stream_write_events(conn); @@ -2860,8 +2834,6 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) process_streams_write_events(conn, 0); end_write: - progress_made = (n_sched < lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl)); - progress_tick |= progress_made << TICK_BIT_PROGRESS; skip_write: service_streams(conn); @@ -2883,10 +2855,10 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) !conn->fc_settings->es_silent_close) { generate_connection_close_packet(conn); - progress_tick |= TICK_SEND|TICK_CLOSE; + tick |= TICK_SEND|TICK_CLOSE; } else - progress_tick |= TICK_CLOSE; + tick |= TICK_CLOSE; goto end; } @@ -2901,7 +2873,7 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) } else { - progress_tick |= TICK_QUIET; + tick |= TICK_QUIET; goto end; } } @@ -2924,11 +2896,11 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now) lsquic_hash_count(conn->fc_pub.all_streams) > 0) lsquic_alarmset_set(&conn->fc_alset, AL_PING, now + TIME_BETWEEN_PINGS); - progress_tick |= TICK_SEND; + tick |= TICK_SEND; end: lsquic_send_ctl_set_buffer_stream_packets(&conn->fc_send_ctl, 1); - return progress_tick; + return tick; } @@ -3343,6 +3315,68 @@ lsquic_conn_status (lsquic_conn_t *lconn, char *errbuf, size_t bufsz) } +static int +full_conn_ci_is_tickable (lsquic_conn_t *lconn) +{ + struct full_conn *conn = (struct full_conn *) lconn; + const struct lsquic_stream *stream; + int can_send; + + /* This caches the value so that we only need to call the function once */ +#define CAN_SEND() \ + (can_send >= 0 ? can_send : \ + (can_send = lsquic_send_ctl_can_send(&conn->fc_send_ctl))) + + if (lsquic_send_ctl_has_buffered(&conn->fc_send_ctl)) + return 1; + + if (!TAILQ_EMPTY(&conn->fc_pub.service_streams)) + return 1; + + can_send = -1; + if (!TAILQ_EMPTY(&conn->fc_pub.sending_streams) && CAN_SEND()) + return 1; + + TAILQ_FOREACH(stream, &conn->fc_pub.read_streams, next_read_stream) + if (lsquic_stream_readable(stream)) + return 1; + + if (!TAILQ_EMPTY(&conn->fc_pub.write_streams) && CAN_SEND()) + { + TAILQ_FOREACH(stream, &conn->fc_pub.write_streams, next_write_stream) + if (lsquic_stream_write_avail(stream)) + return 1; + } + +#undef CAN_SEND + + return 0; +} + + +static lsquic_time_t +full_conn_ci_next_tick_time (lsquic_conn_t *lconn) +{ + struct full_conn *conn = (struct full_conn *) lconn; + lsquic_time_t alarm_time, pacer_time; + + alarm_time = lsquic_alarmset_mintime(&conn->fc_alset); + pacer_time = lsquic_send_ctl_next_pacer_time(&conn->fc_send_ctl); + + if (alarm_time && pacer_time) + { + if (alarm_time < pacer_time) + return alarm_time; + else + return pacer_time; + } + else if (alarm_time) + return alarm_time; + else + return pacer_time; +} + + static const struct headers_stream_callbacks headers_callbacks = { .hsc_on_headers = headers_stream_on_incoming_headers, @@ -3359,7 +3393,9 @@ static const struct conn_iface full_conn_iface = { .ci_destroy = full_conn_ci_destroy, .ci_handshake_failed = full_conn_ci_handshake_failed, .ci_handshake_ok = full_conn_ci_handshake_ok, + .ci_is_tickable = full_conn_ci_is_tickable, .ci_next_packet_to_send = full_conn_ci_next_packet_to_send, + .ci_next_tick_time = full_conn_ci_next_tick_time, .ci_packet_in = full_conn_ci_packet_in, .ci_packet_not_sent = full_conn_ci_packet_not_sent, .ci_packet_sent = full_conn_ci_packet_sent, diff --git a/src/liblsquic/lsquic_logger.c b/src/liblsquic/lsquic_logger.c index 834adbfd9..ead221fb9 100644 --- a/src/liblsquic/lsquic_logger.c +++ b/src/liblsquic/lsquic_logger.c @@ -70,6 +70,7 @@ enum lsq_log_level lsq_log_levels[N_LSQUIC_LOGGER_MODULES] = { [LSQLM_SPI] = LSQ_LOG_WARN, [LSQLM_DI] = LSQ_LOG_WARN, [LSQLM_PACER] = LSQ_LOG_WARN, + [LSQLM_MIN_HEAP] = LSQ_LOG_WARN, }; const char *const lsqlm_to_str[N_LSQUIC_LOGGER_MODULES] = { @@ -97,6 +98,7 @@ const char *const lsqlm_to_str[N_LSQUIC_LOGGER_MODULES] = { [LSQLM_SPI] = "spi", [LSQLM_DI] = "di", [LSQLM_PACER] = "pacer", + [LSQLM_MIN_HEAP] = "min-heap", }; const char *const lsq_loglevel2str[N_LSQUIC_LOG_LEVELS] = { diff --git a/src/liblsquic/lsquic_logger.h b/src/liblsquic/lsquic_logger.h index 8cd1e233e..d850f9369 100644 --- a/src/liblsquic/lsquic_logger.h +++ b/src/liblsquic/lsquic_logger.h @@ -70,6 +70,7 @@ enum lsquic_logger_module { LSQLM_SPI, LSQLM_DI, LSQLM_PACER, + LSQLM_MIN_HEAP, N_LSQUIC_LOGGER_MODULES }; diff --git a/src/liblsquic/lsquic_min_heap.c b/src/liblsquic/lsquic_min_heap.c new file mode 100644 index 000000000..67d766b75 --- /dev/null +++ b/src/liblsquic/lsquic_min_heap.c @@ -0,0 +1,92 @@ +/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ +/* + * lsquic_min_heap.c + */ + +#include +#include + +#include "lsquic_min_heap.h" +#define LSQUIC_LOGGER_MODULE LSQLM_MIN_HEAP +#include "lsquic_logger.h" + +#define MHE_PARENT(i) ((i - 1) / 2) +#define MHE_LCHILD(i) (2 * i + 1) +#define MHE_RCHILD(i) (2 * i + 2) + + +static void +heapify_min_heap (struct min_heap *heap, unsigned i) +{ + struct min_heap_elem el; + unsigned smallest; + + assert(i < heap->mh_nelem); + + if (MHE_LCHILD(i) < heap->mh_nelem) + { + if (heap->mh_elems[ MHE_LCHILD(i) ].mhe_val < + heap->mh_elems[ i ].mhe_val) + smallest = MHE_LCHILD(i); + else + smallest = i; + if (MHE_RCHILD(i) < heap->mh_nelem && + heap->mh_elems[ MHE_RCHILD(i) ].mhe_val < + heap->mh_elems[ smallest ].mhe_val) + smallest = MHE_RCHILD(i); + } + else + smallest = i; + + if (smallest != i) + { + el = heap->mh_elems[ smallest ]; + heap->mh_elems[ smallest ] = heap->mh_elems[ i ]; + heap->mh_elems[ i ] = el; + heapify_min_heap(heap, smallest); + } +} + + +void +lsquic_mh_insert (struct min_heap *heap, struct lsquic_conn *conn, uint64_t val) +{ + struct min_heap_elem el; + unsigned i; + + assert(heap->mh_nelem < heap->mh_nalloc); + + heap->mh_elems[ heap->mh_nelem ].mhe_conn = conn; + heap->mh_elems[ heap->mh_nelem ].mhe_val = val; + ++heap->mh_nelem; + + i = heap->mh_nelem - 1; + while (i > 0 && heap->mh_elems[ MHE_PARENT(i) ].mhe_val > + heap->mh_elems[ i ].mhe_val) + { + el = heap->mh_elems[ MHE_PARENT(i) ]; + heap->mh_elems[ MHE_PARENT(i) ] = heap->mh_elems[ i ]; + heap->mh_elems[ i ] = el; + i = MHE_PARENT(i); + } +} + + +struct lsquic_conn * +lsquic_mh_pop (struct min_heap *heap) +{ + struct lsquic_conn *conn; + + if (heap->mh_nelem == 0) + return NULL; + + conn = heap->mh_elems[0].mhe_conn; + --heap->mh_nelem; + if (heap->mh_nelem > 0) + { + heap->mh_elems[0] = heap->mh_elems[ heap->mh_nelem ]; + heapify_min_heap(heap, 0); + } + + return conn; +} diff --git a/src/liblsquic/lsquic_min_heap.h b/src/liblsquic/lsquic_min_heap.h new file mode 100644 index 000000000..3926ea6bd --- /dev/null +++ b/src/liblsquic/lsquic_min_heap.h @@ -0,0 +1,36 @@ +/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */ +/* + * lsquic_min_heap.h -- Min-heap for connections + */ + +#ifndef LSQUIC_MIN_HEAP_H +#define LSQUIC_MIN_HEAP_H 1 + +struct lsquic_conn; + +struct min_heap_elem +{ + struct lsquic_conn *mhe_conn; + uint64_t mhe_val; +}; + + +struct min_heap +{ + struct min_heap_elem *mh_elems; + unsigned mh_nalloc, + mh_nelem; +}; + + +void +lsquic_mh_insert (struct min_heap *, struct lsquic_conn *conn, uint64_t val); + +struct lsquic_conn * +lsquic_mh_pop (struct min_heap *); + +#define lsquic_mh_count(heap) (+(heap)->mh_nelem) + +#define lsquic_mh_nalloc(heap) (+(heap)->mh_nalloc) + +#endif diff --git a/src/liblsquic/lsquic_pacer.h b/src/liblsquic/lsquic_pacer.h index 6d9aa58c7..649e4c2f1 100644 --- a/src/liblsquic/lsquic_pacer.h +++ b/src/liblsquic/lsquic_pacer.h @@ -57,6 +57,8 @@ pacer_packet_scheduled (struct pacer *pacer, unsigned n_in_flight, void pacer_loss_event (struct pacer *); +#define pacer_delayed(pacer) ((pacer)->pa_flags & PA_LAST_SCHED_DELAYED) + #define pacer_next_sched(pacer) (+(pacer)->pa_next_sched) #endif diff --git a/src/liblsquic/lsquic_send_ctl.h b/src/liblsquic/lsquic_send_ctl.h index b72a2f4bb..b13fba067 100644 --- a/src/liblsquic/lsquic_send_ctl.h +++ b/src/liblsquic/lsquic_send_ctl.h @@ -219,12 +219,22 @@ lsquic_send_ctl_drop_scheduled (lsquic_send_ctl_t *); } \ } while (0) +#define lsquic_send_ctl_next_pacer_time(ctl) ( \ + ((ctl)->sc_flags & SC_PACE) \ + && pacer_delayed(&(ctl)->sc_pacer) \ + ? pacer_next_sched(&(ctl)->sc_pacer) \ + : 0 ) + enum lsquic_packno_bits lsquic_send_ctl_packno_bits (lsquic_send_ctl_t *); int lsquic_send_ctl_schedule_buffered (lsquic_send_ctl_t *, enum buf_packet_type); +#define lsquic_send_ctl_has_buffered(ctl) ( \ + TAILQ_FIRST(&(ctl)->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_packets) \ + || TAILQ_FIRST(&(ctl)->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets )) + #define lsquic_send_ctl_invalidate_bpt_cache(ctl) do { \ (ctl)->sc_cached_bpt.stream_id = 0; \ } while (0) diff --git a/src/liblsquic/lsquic_stream.c b/src/liblsquic/lsquic_stream.c index 6fcd10400..6b967aed7 100644 --- a/src/liblsquic/lsquic_stream.c +++ b/src/liblsquic/lsquic_stream.c @@ -71,9 +71,6 @@ stream_wantread (lsquic_stream_t *stream, int is_want); static int stream_wantwrite (lsquic_stream_t *stream, int is_want); -static int -stream_readable (const lsquic_stream_t *stream); - static ssize_t stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); @@ -174,13 +171,12 @@ stream_inside_callback (const lsquic_stream_t *stream) /* Here, "readable" means that the user is able to read from the stream. */ static void -maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream, - enum rw_reason reason) +maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream) { - if (!stream_inside_callback(stream) && stream_readable(stream)) + if (!stream_inside_callback(stream) && lsquic_stream_readable(stream)) { - lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub, - stream->conn_pub->lconn, reason); + lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, + stream->conn_pub->lconn); } } @@ -189,15 +185,14 @@ maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream, * scheduled to be sent out. */ static void -maybe_conn_to_pendrw_if_writeable (lsquic_stream_t *stream, - enum rw_reason reason) +maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream) { if (!stream_inside_callback(stream) && lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) && ! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl)) { - lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub, - stream->conn_pub->lconn, reason); + lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub, + stream->conn_pub->lconn); } } @@ -403,8 +398,8 @@ lsquic_stream_call_on_close (lsquic_stream_t *stream) } -static int -stream_readable (const lsquic_stream_t *stream) +int +lsquic_stream_readable (const lsquic_stream_t *stream) { /* A stream is readable if one of the following is true: */ return @@ -429,8 +424,8 @@ stream_readable (const lsquic_stream_t *stream) } -static size_t -stream_write_avail (const struct lsquic_stream *stream) +size_t +lsquic_stream_write_avail (const struct lsquic_stream *stream) { uint64_t stream_avail, conn_avail; @@ -516,7 +511,7 @@ lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) } if (got_next_offset) /* Checking the offset saves di_get_frame() call */ - maybe_conn_to_pendrw_if_readable(stream, RW_REASON_STREAM_IN); + maybe_conn_to_tickable_if_readable(stream); return 0; } else if (INS_FRAME_DUP == ins_frame) @@ -603,7 +598,7 @@ lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, } /* Let user collect error: */ - maybe_conn_to_pendrw_if_readable(stream, RW_REASON_RST_IN); + maybe_conn_to_tickable_if_readable(stream); lsquic_sfcw_consume_rem(&stream->fc); drop_frames_in(stream); @@ -793,7 +788,7 @@ lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov, if (!(stream->stream_flags & STREAM_SENDING_FLAGS)) TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream); stream->stream_flags |= STREAM_SEND_WUF; - maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_READ); + maybe_conn_to_tickable_if_writeable(stream); } } @@ -894,7 +889,7 @@ lsquic_stream_shutdown (lsquic_stream_t *stream, int how) maybe_finish_stream(stream); maybe_schedule_call_on_close(stream); if (how) - maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SHUTDOWN); + maybe_conn_to_tickable_if_writeable(stream); return 0; } @@ -1045,7 +1040,7 @@ lsquic_stream_wantread (lsquic_stream_t *stream, int is_want) if (!(stream->stream_flags & STREAM_U_READ_DONE)) { if (is_want) - maybe_conn_to_pendrw_if_readable(stream, RW_REASON_WANTREAD); + maybe_conn_to_tickable_if_readable(stream); return stream_wantread(stream, is_want); } else @@ -1083,7 +1078,8 @@ stream_dispatch_read_events_loop (lsquic_stream_t *stream) no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check; no_progress_count = 0; - while ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream)) + while ((stream->stream_flags & STREAM_WANT_READ) + && lsquic_stream_readable(stream)) { flags = stream->stream_flags & USER_PROGRESS_FLAGS; size = stream->read_offset; @@ -1121,7 +1117,7 @@ stream_dispatch_write_events_loop (lsquic_stream_t *stream) stream->stream_flags |= STREAM_LAST_WRITE_OK; while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)) == (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK) - && stream_write_avail(stream)) + && lsquic_stream_write_avail(stream)) { flags = stream->stream_flags & USER_PROGRESS_FLAGS; @@ -1149,7 +1145,7 @@ stream_dispatch_write_events_loop (lsquic_stream_t *stream) static void stream_dispatch_read_events_once (lsquic_stream_t *stream) { - if ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream)) + if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream)) { stream->stream_if->on_read(stream, stream->st_ctx); } @@ -1227,7 +1223,7 @@ lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) if (stream->stream_flags & STREAM_RW_ONCE) { if ((stream->stream_flags & STREAM_WANT_WRITE) - && stream_write_avail(stream)) + && lsquic_stream_write_avail(stream)) { stream->stream_if->on_write(stream, stream->st_ctx); } @@ -1395,7 +1391,7 @@ frame_gen_size (void *ctx) /* Make sure we are not writing past available size: */ remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx); - available = stream_write_avail(fg_ctx->fgc_stream); + available = lsquic_stream_write_avail(fg_ctx->fgc_stream); if (available < remaining) remaining = available; @@ -1452,7 +1448,7 @@ frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) stream->sm_n_buffered = 0; } - available = stream_write_avail(fg_ctx->fgc_stream); + available = lsquic_stream_write_avail(fg_ctx->fgc_stream); n_to_write = end - p; if (n_to_write > available) n_to_write = available; @@ -1662,7 +1658,7 @@ save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, return -1; } - avail = stream_write_avail(stream); + avail = lsquic_stream_write_avail(stream); if (avail < len) len = avail; @@ -1891,7 +1887,7 @@ lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code, if (do_close) lsquic_stream_close(stream); else - maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_RESET_EXT); + maybe_conn_to_tickable_if_writeable(stream); } @@ -1924,7 +1920,7 @@ lsquic_stream_close (lsquic_stream_t *stream) stream_shutdown_read(stream); maybe_schedule_call_on_close(stream); maybe_finish_stream(stream); - maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_STREAM_CLOSE); + maybe_conn_to_tickable_if_writeable(stream); return 0; } diff --git a/src/liblsquic/lsquic_stream.h b/src/liblsquic/lsquic_stream.h index 0c56b733b..e7e7c5787 100644 --- a/src/liblsquic/lsquic_stream.h +++ b/src/liblsquic/lsquic_stream.h @@ -280,18 +280,6 @@ struct stream_read_prog_status enum stream_flags srps_flags; }; -#define lsquic_stream_get_read_prog_status(stream, stats) do { \ - (stats)->srps_read_offset = (stream)->read_offset; \ - (stats)->srps_flags = \ - (stream)->stream_flags & STREAM_RW_PROG_FLAGS; \ -} while (0) - -#define lsquic_stream_progress_was_made(stream, stats) ( \ - (stats)->srps_read_offset != (stream)->read_offset \ - || (stats)->srps_flags != \ - ((stream)->stream_flags & STREAM_RW_PROG_FLAGS) \ -) - #define lsquic_stream_is_critical(stream) ( \ (stream)->id == LSQUIC_STREAM_HANDSHAKE || \ ((stream)->id == LSQUIC_STREAM_HEADERS && \ @@ -305,4 +293,10 @@ lsquic_stream_cid (const struct lsquic_stream *); #define lsquic_stream_has_data_to_flush(stream) ((stream)->sm_n_buffered > 0) +int +lsquic_stream_readable (const lsquic_stream_t *); + +size_t +lsquic_stream_write_avail (const struct lsquic_stream *); + #endif diff --git a/test/prog.c b/test/prog.c index 16e5a1824..86099aaec 100644 --- a/test/prog.c +++ b/test/prog.c @@ -28,9 +28,6 @@ static int prog_stopped; -static void -prog_set_onetimer (struct prog *prog, unsigned usec); - static const struct lsquic_packout_mem_if pmi = { .pmi_allocate = pba_allocate, .pmi_release = pba_release, @@ -91,8 +88,6 @@ prog_print_common_options (const struct prog *prog, FILE *out) " ::1:12345\n" " If no -s option is given, 0.0.0.0:12345 address\n" " is used.\n" -" -i USEC Library will `tick' every USEC microseconds. The default\n" -" is %u\n" #if LSQUIC_DONTFRAG_SUPPORTED " -D Set `do not fragment' flag on outgoing UDP packets\n" #endif @@ -122,7 +117,6 @@ prog_print_common_options (const struct prog *prog, FILE *out) " -S opt=val Socket options. Supported options:\n" " sndbuf=12345 # Sets SO_SNDBUF\n" " rcvbuf=12345 # Sets SO_RCVBUF\n" - , PROG_DEFAULT_PERIOD_USEC ); @@ -150,9 +144,6 @@ prog_set_opt (struct prog *prog, int opt, const char *arg) { switch (opt) { - case 'i': - prog->prog_period_usec = atoi(arg); - return 0; #if LSQUIC_DONTFRAG_SUPPORTED case 'D': { @@ -248,6 +239,7 @@ prog_connect (struct prog *prog) prog->prog_max_packet_size)) return -1; + prog_process_conns(prog); return 0; } @@ -278,30 +270,30 @@ prog_init_server (struct prog *prog) } -static void -drop_onetimer (struct prog *prog) -{ - if (prog->prog_onetimer) - { - event_del(prog->prog_onetimer); - event_free(prog->prog_onetimer); - prog->prog_onetimer = NULL; - } -} - - void -prog_maybe_set_onetimer (struct prog *prog) +prog_process_conns (struct prog *prog) { int diff; + struct timeval timeout; + + lsquic_engine_process_conns(prog->prog_engine); if (lsquic_engine_earliest_adv_tick(prog->prog_engine, &diff)) { - if (diff > 0) - prog_set_onetimer(prog, (unsigned) diff); + if (diff < 4000) + { + timeout.tv_sec = 0; + timeout.tv_usec = 4000; + } + else + { + timeout.tv_sec = (unsigned) diff / 1000000; + timeout.tv_usec = (unsigned) diff % 1000000; + } + + if (!prog_is_stopped()) + event_add(prog->prog_timer, &timeout); } - else - drop_onetimer(prog); } @@ -309,33 +301,8 @@ static void prog_timer_handler (int fd, short what, void *arg) { struct prog *const prog = arg; - lsquic_engine_proc_all(prog->prog_engine); - prog_maybe_set_onetimer(prog); -} - - -static void -prog_onetimer_handler (int fd, short what, void *arg) -{ - struct prog *const prog = arg; - lsquic_engine_process_conns_to_tick(prog->prog_engine); - prog_maybe_set_onetimer(prog); -} - - -static void -prog_set_onetimer (struct prog *prog, unsigned usec) -{ - struct timeval timeout; - - drop_onetimer(prog); - if (usec < 4000) - usec = 4000; - timeout.tv_sec = 0; - timeout.tv_usec = usec; - prog->prog_onetimer = event_new(prog->prog_eb, -1, EV_TIMEOUT, - prog_onetimer_handler, prog); - event_add(prog->prog_onetimer, &timeout); + if (!prog_is_stopped()) + prog_process_conns(prog); } @@ -350,19 +317,12 @@ prog_usr1_handler (int fd, short what, void *arg) int prog_run (struct prog *prog) { - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = prog->prog_period_usec; - prog->prog_timer = event_new(prog->prog_eb, -1, EV_PERSIST, - prog_timer_handler, prog); - event_add(prog->prog_timer, &timeout); #ifndef WIN32 prog->prog_usr1 = evsignal_new(prog->prog_eb, SIGUSR1, prog_usr1_handler, prog); evsignal_add(prog->prog_usr1, NULL); #endif - event_base_loop(prog->prog_eb, 0); return 0; @@ -392,7 +352,6 @@ prog_stop (struct prog *prog) sport_destroy(sport); } - drop_onetimer(prog); if (prog->prog_timer) { event_del(prog->prog_timer); @@ -414,11 +373,6 @@ prog_prep (struct prog *prog) int s; char err_buf[100]; - if (0 == prog->prog_period_usec) - prog->prog_period_usec = PROG_DEFAULT_PERIOD_USEC; - if (prog->prog_settings.es_proc_time_thresh == LSQUIC_DF_PROC_TIME_THRESH) - prog->prog_settings.es_proc_time_thresh = prog->prog_period_usec; - if (0 != lsquic_engine_check_settings(prog->prog_api.ea_settings, prog->prog_engine_flags, err_buf, sizeof(err_buf))) { @@ -442,6 +396,9 @@ prog_prep (struct prog *prog) if (!prog->prog_engine) return -1; + prog->prog_timer = event_new(prog->prog_eb, -1, 0, + prog_timer_handler, prog); + if (prog->prog_engine_flags & LSENG_SERVER) s = prog_init_server(prog); else diff --git a/test/prog.h b/test/prog.h index c31587059..852f8e606 100644 --- a/test/prog.h +++ b/test/prog.h @@ -19,13 +19,10 @@ struct prog unsigned prog_engine_flags; struct service_port prog_dummy_sport; /* Use for options */ unsigned prog_packout_max; -#define PROG_DEFAULT_PERIOD_USEC (10 * 1000) /* 10 ms default */ - unsigned prog_period_usec; unsigned short prog_max_packet_size; int prog_version_cleared; struct event_base *prog_eb; struct event *prog_timer, - *prog_onetimer, *prog_usr1; struct sport_head *prog_sports; struct lsquic_engine *prog_engine; @@ -48,7 +45,7 @@ prog_init (struct prog *, unsigned lsquic_engine_flags, struct sport_head *, # define IP_DONTFRAG_FLAG "" #endif -#define PROG_OPTS "i:m:c:y:L:l:o:H:s:S:Y:z:" SENDMMSG_FLAG IP_DONTFRAG_FLAG +#define PROG_OPTS "m:c:y:L:l:o:H:s:S:Y:z:" SENDMMSG_FLAG IP_DONTFRAG_FLAG /* Returns: * 0 Applied @@ -79,10 +76,10 @@ prog_connect (struct prog *); void prog_print_common_options (const struct prog *, FILE *); -void -prog_maybe_set_onetimer (struct prog *); - int prog_is_stopped (void); +void +prog_process_conns (struct prog *); + #endif diff --git a/test/test_common.c b/test/test_common.c index a6276ba57..4aadebcfc 100644 --- a/test/test_common.c +++ b/test/test_common.c @@ -359,16 +359,10 @@ read_handler (int fd, short flags, void *ctx) while (ROP_NOROOM == rop); if (n_batches) - { n += packs_in->n_alloc * (n_batches - 1); - lsquic_engine_process_conns_with_incoming(engine); - } - - while (lsquic_engine_has_pend_rw(engine)) - lsquic_engine_process_conns_with_pend_rw(engine); if (!prog_is_stopped()) - prog_maybe_set_onetimer(sport->sp_prog); + prog_process_conns(sport->sp_prog); LSQ_DEBUG("read %u packet%.*s in %u batch%s", n, n != 1, "s", n_batches, n_batches != 1 ? "es" : ""); } diff --git a/test/unittests/test_conn_hash.c b/test/unittests/test_conn_hash.c index df423e0ba..35d73c2a7 100644 --- a/test/unittests/test_conn_hash.c +++ b/test/unittests/test_conn_hash.c @@ -48,7 +48,7 @@ main (int argc, char **argv) lsquic_set_log_level("info"); malo = lsquic_malo_create(sizeof(*lconn)); - s = conn_hash_init(&conn_hash, nelems); + s = conn_hash_init(&conn_hash); assert(0 == s); for (n = 0; n < nelems; ++n) @@ -63,18 +63,6 @@ main (int argc, char **argv) assert(nelems == conn_hash_count(&conn_hash)); - { - lconn = get_new_lsquic_conn(malo); - find_lsconn = conn_hash_find(&conn_hash, lconn->cn_cid); - assert(!find_lsconn); - s = conn_hash_add(&conn_hash, lconn); - assert(-1 == s); - lsquic_malo_put(lconn); - } - - for (n = 0, lconn = conn_hash_first(&conn_hash); lconn; ++n, lconn = conn_hash_next(&conn_hash)) - assert(n == (uintptr_t) lconn->cn_if); - for (lconn = lsquic_malo_first(malo); lconn; lconn = lsquic_malo_next(malo)) { diff --git a/test/unittests/test_stream.c b/test/unittests/test_stream.c index 912391660..1d614eb5d 100644 --- a/test/unittests/test_stream.c +++ b/test/unittests/test_stream.c @@ -128,8 +128,8 @@ lsquic_send_ctl_determine_bpt (struct lsquic_send_ctl *ctl, /* This function is only here to avoid crash in the test: */ void -lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub, - lsquic_conn_t *conn, enum rw_reason reason) +lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub, + lsquic_conn_t *conn) { }