Skip to content

Commit

Permalink
Remove the periodic refill event entirely.
Browse files Browse the repository at this point in the history
Now that we update our buckets on demand before reading or writing,
we no longer need to update them all every TokenBucketRefillInterval
msec.

When a connection runs out of bandwidth, we do need a way to
reenable it, however.  We do this by scheduling a timer to reenable
all blocked connections for TokenBucketRefillInterval msec after a
connection becomes blocked.

(If we were using PerConnBWRate more, it might make sense to have a
per-connection timer, rather than a single timeout. But since
PerConnBWRate is currently (mostly) unused, I'm going to go for the
simpler approach here, since usually whenever one connection has
become blocked on bandwidth, most connections are blocked on
bandwidth.)

Implements ticket 25373.
  • Loading branch information
nmathewson committed Apr 17, 2018
1 parent 780d1b4 commit 47df912
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 98 deletions.
7 changes: 7 additions & 0 deletions changes/bug25373
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
o Major features (main loop, CPU wakeup):
- The bandwidth-limitation logic has been refactored so that
bandwidth calculations are performed on-demand, rather than
every TokenBucketRefillInterval milliseconds.
This change should improve the granularity of our bandwidth
calculations, and limit the number of times that the Tor process needs
to wake up when it is idle. Closes ticket 25373.
8 changes: 5 additions & 3 deletions doc/tor.1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1285,9 +1285,11 @@ The following options are useful only for clients (that is, if
2 minutes)

[[TokenBucketRefillInterval]] **TokenBucketRefillInterval** __NUM__ [**msec**|**second**]::
Set the refill interval of Tor's token bucket to NUM milliseconds.
NUM must be between 1 and 1000, inclusive. Note that the configured
bandwidth limits are still expressed in bytes per second: this
Set the refill delay interval of Tor's token bucket to NUM milliseconds.
NUM must be between 1 and 1000, inclusive. When Tor is out of bandwidth,
on a connection or globally, it will wait up to this long before it tries
to use that connection again.
Note that bandwidth limits are still expressed in bytes per second: this
option only affects the frequency with which Tor checks to see whether
previously exhausted connections may read again.
Can not be changed while tor is running. (Default: 100 msec)
Expand Down
116 changes: 74 additions & 42 deletions src/or/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ static const char *proxy_type_to_string(int proxy_type);
static int get_proxy_type(void);
const tor_addr_t *conn_get_outbound_address(sa_family_t family,
const or_options_t *options, unsigned int conn_type);
static void blocked_connection_reenable_init(const or_options_t *options);
static void schedule_blocked_connection_reenable(void);

/** The last addresses that our network interface seemed to have been
* binding to. We use this as one way to detect when our IP changes.
Expand Down Expand Up @@ -3091,6 +3093,7 @@ connection_read_bw_exhausted(connection_t *conn, bool is_global_bw)
(void)is_global_bw;
conn->read_blocked_on_bw = 1;
connection_stop_reading(conn);
schedule_blocked_connection_reenable();
}

/**
Expand All @@ -3105,6 +3108,7 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw)
(void)is_global_bw;
conn->write_blocked_on_bw = 1;
connection_stop_reading(conn);
schedule_blocked_connection_reenable();
}

/** If we have exhausted our global buckets, or the buckets for conn,
Expand All @@ -3117,7 +3121,8 @@ connection_consider_empty_read_buckets(connection_t *conn)
if (!connection_is_rate_limited(conn))
return; /* Always okay. */

bool is_global = true;
int is_global = 1;

if (token_bucket_rw_get_read(&global_bucket) <= 0) {
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
Expand Down Expand Up @@ -3185,6 +3190,8 @@ connection_bucket_init(void)
(int32_t)options->BandwidthBurst,
now_ts);
}

blocked_connection_reenable_init(options);
}

/** Update the global connection bucket settings to a new value. */
Expand Down Expand Up @@ -3233,55 +3240,76 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts)
}
}

/** Time has passed; increment buckets appropriately and re-enable formerly
* blocked connections. */
void
connection_bucket_refill_all(time_t now, uint32_t now_ts)
{
smartlist_t *conns = get_connection_array();
/**
* Event to re-enable all connections that were previously blocked on read or
* write.
*/
static mainloop_event_t *reenable_blocked_connections_ev = NULL;

/* refill the global buckets */
token_bucket_rw_refill(&global_bucket, now_ts);
token_bucket_rw_refill(&global_relayed_bucket, now_ts);
last_refilled_global_buckets_ts = now_ts;
/** True iff reenable_blocked_connections_ev is currently scheduled. */
static int reenable_blocked_connections_is_scheduled = 0;

/* refill the per-connection buckets */
SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn);
/** Delay after which to run reenable_blocked_connections_ev. */
static struct timeval reenable_blocked_connections_delay;

if (conn->state == OR_CONN_STATE_OPEN) {
token_bucket_rw_refill(&or_conn->bucket, now_ts);
}
}

if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
&& token_bucket_rw_get_read(&global_bucket) > 0 /* and we can read */
&& (!connection_counts_as_relayed_traffic(conn, now) ||
token_bucket_rw_get_read(&global_relayed_bucket) > 0)
&& (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) > 0)) {
/* and either a non-cell conn or a cell conn with non-empty bucket */
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for read", (int)conn->s));
conn->read_blocked_on_bw = 0;
/**
* Re-enable all connections that were previously blocked on read or write.
* This event is scheduled after enough time has elapsed to be sure
* that the buckets will refill when the connections have something to do.
*/
static void
reenable_blocked_connections_cb(mainloop_event_t *ev, void *arg)
{
(void)ev;
(void)arg;
SMARTLIST_FOREACH_BEGIN(get_connection_array(), connection_t *, conn) {
if (conn->read_blocked_on_bw == 1) {
connection_start_reading(conn);
conn->read_blocked_on_bw = 0;
}

if (conn->write_blocked_on_bw == 1
&& token_bucket_rw_get_write(&global_bucket) > 0 /* and we can write */
&& (!connection_counts_as_relayed_traffic(conn, now) ||
token_bucket_rw_get_write(&global_relayed_bucket) > 0)
&& (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) > 0)) {
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for write", (int)conn->s));
conn->write_blocked_on_bw = 0;
if (conn->write_blocked_on_bw == 1) {
connection_start_writing(conn);
conn->write_blocked_on_bw = 0;
}
} SMARTLIST_FOREACH_END(conn);

reenable_blocked_connections_is_scheduled = 0;
}

/**
* Initialize the mainloop event that we use to wake up connections that
* find themselves blocked on bandwidth.
*/
static void
blocked_connection_reenable_init(const or_options_t *options)
{
if (! reenable_blocked_connections_ev) {
reenable_blocked_connections_ev =
mainloop_event_new(reenable_blocked_connections_cb, NULL);
reenable_blocked_connections_is_scheduled = 0;
}
time_t sec = options->TokenBucketRefillInterval / 1000;
int msec = (options->TokenBucketRefillInterval % 1000);
reenable_blocked_connections_delay.tv_sec = sec;
reenable_blocked_connections_delay.tv_usec = msec * 1000;
}

/**
* Called when we have blocked a connection for being low on bandwidth:
* schedule an event to reenable such connections, if it is not already
* scheduled.
*/
static void
schedule_blocked_connection_reenable(void)
{
if (reenable_blocked_connections_is_scheduled)
return;
if (BUG(reenable_blocked_connections_ev == NULL)) {
blocked_connection_reenable_init(get_options());
}
mainloop_event_schedule(reenable_blocked_connections_ev,
&reenable_blocked_connections_delay);
reenable_blocked_connections_is_scheduled = 1;
}

/** Read bytes from conn-\>s and process them.
Expand Down Expand Up @@ -5216,6 +5244,10 @@ connection_free_all(void)
tor_free(last_interface_ipv4);
tor_free(last_interface_ipv6);
last_recorded_accounting_at = 0;

mainloop_event_free(reenable_blocked_connections_ev);
reenable_blocked_connections_is_scheduled = 0;
memset(&reenable_blocked_connections_delay, 0, sizeof(struct timeval));
}

/** Log a warning, and possibly emit a control event, that <b>received</b> came
Expand Down
53 changes: 0 additions & 53 deletions src/or/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -2370,43 +2370,6 @@ systemd_watchdog_callback(periodic_timer_t *timer, void *arg)
}
#endif /* defined(HAVE_SYSTEMD_209) */

/** Timer: used to invoke refill_callback(). */
static periodic_timer_t *refill_timer = NULL;

/** Millisecond when refall_callback was last invoked. */
static struct timeval refill_timer_current_millisecond;

/** Libevent callback: invoked periodically to refill token buckets
* and count r/w bytes. */
static void
refill_callback(periodic_timer_t *timer, void *arg)
{
struct timeval now;

int milliseconds_elapsed = 0;

(void)timer;
(void)arg;

tor_gettimeofday(&now);

/* If this is our first time, no time has passed. */
if (refill_timer_current_millisecond.tv_sec) {
long mdiff = tv_mdiff(&refill_timer_current_millisecond, &now);
if (mdiff > INT_MAX)
mdiff = INT_MAX;
milliseconds_elapsed = (int)mdiff;
}

if (milliseconds_elapsed > 0) {
connection_bucket_refill_all((time_t)now.tv_sec,
monotime_coarse_get_stamp());
}

/* remember what time it is, for next time */
refill_timer_current_millisecond = now;
}

#ifndef _WIN32
/** Called when a possibly ignorable libevent error occurs; ensures that we
* don't get into an infinite loop by ignoring too many errors from
Expand Down Expand Up @@ -2707,20 +2670,6 @@ do_main_loop(void)
}
#endif /* defined(HAVE_SYSTEMD_209) */

if (!refill_timer) {
struct timeval refill_interval;
int msecs = get_options()->TokenBucketRefillInterval;

refill_interval.tv_sec = msecs/1000;
refill_interval.tv_usec = (msecs%1000)*1000;

refill_timer = periodic_timer_new(tor_libevent_get_base(),
&refill_interval,
refill_callback,
NULL);
tor_assert(refill_timer);
}

#ifdef HAVE_SYSTEMD
{
const int r = sd_notify(0, "READY=1");
Expand Down Expand Up @@ -3477,7 +3426,6 @@ tor_free_all(int postfork)
smartlist_free(active_linked_connection_lst);
periodic_timer_free(second_timer);
teardown_periodic_events();
periodic_timer_free(refill_timer);
tor_event_free(shutdown_did_not_work_event);
tor_event_free(initialize_periodic_events_event);
mainloop_event_free(directory_all_unreachable_cb_event);
Expand Down Expand Up @@ -3505,7 +3453,6 @@ tor_free_all(int postfork)
heartbeat_callback_first_time = 1;
n_libevent_errors = 0;
current_second = 0;
memset(&refill_timer_current_millisecond, 0, sizeof(struct timeval));

if (!postfork) {
release_lockfile();
Expand Down

0 comments on commit 47df912

Please sign in to comment.