Skip to content

Commit

Permalink
rxrpc: Fix the excessive initial retransmission timeout
Browse files Browse the repository at this point in the history
rxrpc currently uses a fixed 4s retransmission timeout until the RTT is
sufficiently sampled.  This can cause problems with some fileservers with
calls to the cache manager in the afs filesystem being dropped from the
fileserver because a packet goes missing and the retransmission timeout is
greater than the call expiry timeout.

Fix this by:

 (1) Copying the RTT/RTO calculation code from Linux's TCP implementation
     and altering it to fit rxrpc.

 (2) Altering the various users of the RTT to make use of the new SRTT
     value.

 (3) Replacing the use of rxrpc_resend_timeout to use the calculated RTO
     value instead (which is needed in jiffies), along with a backoff.

Notes:

 (1) rxrpc provides RTT samples by matching the serial numbers on outgoing
     DATA packets that have the RXRPC_REQUEST_ACK set and PING ACK packets
     against the reference serial number in incoming REQUESTED ACK and
     PING-RESPONSE ACK packets.

 (2) Each packet that is transmitted on an rxrpc connection gets a new
     per-connection serial number, even for retransmissions, so an ACK can
     be cross-referenced to a specific trigger packet.  This allows RTT
     information to be drawn from retransmitted DATA packets also.

 (3) rxrpc maintains the RTT/RTO state on the rxrpc_peer record rather than
     on an rxrpc_call because many RPC calls won't live long enough to
     generate more than one sample.

 (4) The calculated SRTT value is in units of 8ths of a microsecond rather
     than nanoseconds.

The (S)RTT and RTO values are displayed in /proc/net/rxrpc/peers.

Fixes: 17926a7 ([AF_RXRPC]: Provide secure RxRPC sockets for use by userspace and kernel both"")
Signed-off-by: David Howells <dhowells@redhat.com>
  • Loading branch information
dhowells committed May 11, 2020
1 parent 42c556f commit c410bf0
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 155 deletions.
18 changes: 5 additions & 13 deletions fs/afs/fs_probe.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ void afs_fileserver_probe_result(struct afs_call *call)
struct afs_server *server = call->server;
unsigned int server_index = call->server_index;
unsigned int index = call->addr_ix;
unsigned int rtt = UINT_MAX;
unsigned int rtt_us;
bool have_result = false;
u64 _rtt;
int ret = call->error;

_enter("%pU,%u", &server->uuid, index);
Expand Down Expand Up @@ -93,15 +92,9 @@ void afs_fileserver_probe_result(struct afs_call *call)
}
}

/* Get the RTT and scale it to fit into a 32-bit value that represents
* over a minute of time so that we can access it with one instruction
* on a 32-bit system.
*/
_rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
_rtt /= 64;
rtt = (_rtt > UINT_MAX) ? UINT_MAX : _rtt;
if (rtt < server->probe.rtt) {
server->probe.rtt = rtt;
rtt_us = rxrpc_kernel_get_srtt(call->net->socket, call->rxcall);
if (rtt_us < server->probe.rtt) {
server->probe.rtt = rtt_us;
alist->preferred = index;
have_result = true;
}
Expand All @@ -113,8 +106,7 @@ void afs_fileserver_probe_result(struct afs_call *call)
spin_unlock(&server->probe_lock);

_debug("probe [%u][%u] %pISpc rtt=%u ret=%d",
server_index, index, &alist->addrs[index].transport,
(unsigned int)rtt, ret);
server_index, index, &alist->addrs[index].transport, rtt_us, ret);

have_result |= afs_fs_probe_done(server);
if (have_result)
Expand Down
18 changes: 5 additions & 13 deletions fs/afs/vl_probe.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ void afs_vlserver_probe_result(struct afs_call *call)
struct afs_addr_list *alist = call->alist;
struct afs_vlserver *server = call->vlserver;
unsigned int server_index = call->server_index;
unsigned int rtt_us = 0;
unsigned int index = call->addr_ix;
unsigned int rtt = UINT_MAX;
bool have_result = false;
u64 _rtt;
int ret = call->error;

_enter("%s,%u,%u,%d,%d", server->name, server_index, index, ret, call->abort_code);
Expand Down Expand Up @@ -93,15 +92,9 @@ void afs_vlserver_probe_result(struct afs_call *call)
}
}

/* Get the RTT and scale it to fit into a 32-bit value that represents
* over a minute of time so that we can access it with one instruction
* on a 32-bit system.
*/
_rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
_rtt /= 64;
rtt = (_rtt > UINT_MAX) ? UINT_MAX : _rtt;
if (rtt < server->probe.rtt) {
server->probe.rtt = rtt;
rtt_us = rxrpc_kernel_get_srtt(call->net->socket, call->rxcall);
if (rtt_us < server->probe.rtt) {
server->probe.rtt = rtt_us;
alist->preferred = index;
have_result = true;
}
Expand All @@ -113,8 +106,7 @@ void afs_vlserver_probe_result(struct afs_call *call)
spin_unlock(&server->probe_lock);

_debug("probe [%u][%u] %pISpc rtt=%u ret=%d",
server_index, index, &alist->addrs[index].transport,
(unsigned int)rtt, ret);
server_index, index, &alist->addrs[index].transport, rtt_us, ret);

have_result |= afs_vl_probe_done(server);
if (have_result) {
Expand Down
2 changes: 1 addition & 1 deletion include/net/af_rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ bool rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
struct sockaddr_rxrpc *);
u64 rxrpc_kernel_get_rtt(struct socket *, struct rxrpc_call *);
u32 rxrpc_kernel_get_srtt(struct socket *, struct rxrpc_call *);
int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
rxrpc_user_attach_call_t, unsigned long, gfp_t,
unsigned int);
Expand Down
17 changes: 7 additions & 10 deletions include/trace/events/rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1112,18 +1112,17 @@ TRACE_EVENT(rxrpc_rtt_tx,
TRACE_EVENT(rxrpc_rtt_rx,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
rxrpc_serial_t send_serial, rxrpc_serial_t resp_serial,
s64 rtt, u8 nr, s64 avg),
u32 rtt, u32 rto),

TP_ARGS(call, why, send_serial, resp_serial, rtt, nr, avg),
TP_ARGS(call, why, send_serial, resp_serial, rtt, rto),

TP_STRUCT__entry(
__field(unsigned int, call )
__field(enum rxrpc_rtt_rx_trace, why )
__field(u8, nr )
__field(rxrpc_serial_t, send_serial )
__field(rxrpc_serial_t, resp_serial )
__field(s64, rtt )
__field(u64, avg )
__field(u32, rtt )
__field(u32, rto )
),

TP_fast_assign(
Expand All @@ -1132,18 +1131,16 @@ TRACE_EVENT(rxrpc_rtt_rx,
__entry->send_serial = send_serial;
__entry->resp_serial = resp_serial;
__entry->rtt = rtt;
__entry->nr = nr;
__entry->avg = avg;
__entry->rto = rto;
),

TP_printk("c=%08x %s sr=%08x rr=%08x rtt=%lld nr=%u avg=%lld",
TP_printk("c=%08x %s sr=%08x rr=%08x rtt=%u rto=%u",
__entry->call,
__print_symbolic(__entry->why, rxrpc_rtt_rx_traces),
__entry->send_serial,
__entry->resp_serial,
__entry->rtt,
__entry->nr,
__entry->avg)
__entry->rto)
);

TRACE_EVENT(rxrpc_timer,
Expand Down
1 change: 1 addition & 0 deletions net/rxrpc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rxrpc-y := \
peer_event.o \
peer_object.o \
recvmsg.o \
rtt.o \
security.o \
sendmsg.o \
skbuff.o \
Expand Down
25 changes: 17 additions & 8 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <linux/atomic.h>
#include <linux/seqlock.h>
#include <linux/win_minmax.h>
#include <net/net_namespace.h>
#include <net/netns/generic.h>
#include <net/sock.h>
Expand Down Expand Up @@ -311,11 +312,14 @@ struct rxrpc_peer {
#define RXRPC_RTT_CACHE_SIZE 32
spinlock_t rtt_input_lock; /* RTT lock for input routine */
ktime_t rtt_last_req; /* Time of last RTT request */
u64 rtt; /* Current RTT estimate (in nS) */
u64 rtt_sum; /* Sum of cache contents */
u64 rtt_cache[RXRPC_RTT_CACHE_SIZE]; /* Determined RTT cache */
u8 rtt_cursor; /* next entry at which to insert */
u8 rtt_usage; /* amount of cache actually used */
unsigned int rtt_count; /* Number of samples we've got */

u32 srtt_us; /* smoothed round trip time << 3 in usecs */
u32 mdev_us; /* medium deviation */
u32 mdev_max_us; /* maximal mdev for the last rtt period */
u32 rttvar_us; /* smoothed mdev_max */
u32 rto_j; /* Retransmission timeout in jiffies */
u8 backoff; /* Backoff timeout */

u8 cong_cwnd; /* Congestion window size */
};
Expand Down Expand Up @@ -1041,7 +1045,6 @@ extern unsigned long rxrpc_idle_ack_delay;
extern unsigned int rxrpc_rx_window_size;
extern unsigned int rxrpc_rx_mtu;
extern unsigned int rxrpc_rx_jumbo_max;
extern unsigned long rxrpc_resend_timeout;

extern const s8 rxrpc_ack_priority[];

Expand Down Expand Up @@ -1069,8 +1072,6 @@ void rxrpc_send_keepalive(struct rxrpc_peer *);
* peer_event.c
*/
void rxrpc_error_report(struct sock *);
void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace,
rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t);
void rxrpc_peer_keepalive_worker(struct work_struct *);

/*
Expand Down Expand Up @@ -1102,6 +1103,14 @@ extern const struct seq_operations rxrpc_peer_seq_ops;
void rxrpc_notify_socket(struct rxrpc_call *);
int rxrpc_recvmsg(struct socket *, struct msghdr *, size_t, int);

/*
* rtt.c
*/
void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace,
rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t);
unsigned long rxrpc_get_rto_backoff(struct rxrpc_peer *, bool);
void rxrpc_peer_init_rtt(struct rxrpc_peer *);

/*
* rxkad.c
*/
Expand Down
2 changes: 1 addition & 1 deletion net/rxrpc/call_accept.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb)
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
ktime_t now = skb->tstamp;

if (call->peer->rtt_usage < 3 ||
if (call->peer->rtt_count < 3 ||
ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now))
rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
true, true,
Expand Down
22 changes: 8 additions & 14 deletions net/rxrpc/call_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
} else {
unsigned long now = jiffies, ack_at;

if (call->peer->rtt_usage > 0)
ack_at = nsecs_to_jiffies(call->peer->rtt);
if (call->peer->srtt_us != 0)
ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3);
else
ack_at = expiry;

Expand Down Expand Up @@ -157,24 +157,18 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call)
static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
{
struct sk_buff *skb;
unsigned long resend_at;
unsigned long resend_at, rto_j;
rxrpc_seq_t cursor, seq, top;
ktime_t now, max_age, oldest, ack_ts, timeout, min_timeo;
ktime_t now, max_age, oldest, ack_ts;
int ix;
u8 annotation, anno_type, retrans = 0, unacked = 0;

_enter("{%d,%d}", call->tx_hard_ack, call->tx_top);

if (call->peer->rtt_usage > 1)
timeout = ns_to_ktime(call->peer->rtt * 3 / 2);
else
timeout = ms_to_ktime(rxrpc_resend_timeout);
min_timeo = ns_to_ktime((1000000000 / HZ) * 4);
if (ktime_before(timeout, min_timeo))
timeout = min_timeo;
rto_j = call->peer->rto_j;

now = ktime_get_real();
max_age = ktime_sub(now, timeout);
max_age = ktime_sub(now, jiffies_to_usecs(rto_j));

spin_lock_bh(&call->lock);

Expand Down Expand Up @@ -219,7 +213,7 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
}

resend_at = nsecs_to_jiffies(ktime_to_ns(ktime_sub(now, oldest)));
resend_at += jiffies + rxrpc_resend_timeout;
resend_at += jiffies + rto_j;
WRITE_ONCE(call->resend_at, resend_at);

if (unacked)
Expand All @@ -234,7 +228,7 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
rxrpc_timer_set_for_resend);
spin_unlock_bh(&call->lock);
ack_ts = ktime_sub(now, call->acks_latest_ts);
if (ktime_to_ns(ack_ts) < call->peer->rtt)
if (ktime_to_us(ack_ts) < (call->peer->srtt_us >> 3))
goto out;
rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
rxrpc_propose_ack_ping_for_lost_ack);
Expand Down
6 changes: 3 additions & 3 deletions net/rxrpc/input.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ static void rxrpc_congestion_management(struct rxrpc_call *call,
/* We analyse the number of packets that get ACK'd per RTT
* period and increase the window if we managed to fill it.
*/
if (call->peer->rtt_usage == 0)
if (call->peer->rtt_count == 0)
goto out;
if (ktime_before(skb->tstamp,
ktime_add_ns(call->cong_tstamp,
call->peer->rtt)))
ktime_add_us(call->cong_tstamp,
call->peer->srtt_us >> 3)))
goto out_no_clear_ca;
change = rxrpc_cong_rtt_window_end;
call->cong_tstamp = skb->tstamp;
Expand Down
5 changes: 0 additions & 5 deletions net/rxrpc/misc.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ unsigned int rxrpc_rx_mtu = 5692;
*/
unsigned int rxrpc_rx_jumbo_max = 4;

/*
* Time till packet resend (in milliseconds).
*/
unsigned long rxrpc_resend_timeout = 4 * HZ;

const s8 rxrpc_ack_priority[] = {
[0] = 0,
[RXRPC_ACK_DELAY] = 1,
Expand Down
9 changes: 3 additions & 6 deletions net/rxrpc/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
(test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events) ||
retrans ||
call->cong_mode == RXRPC_CALL_SLOW_START ||
(call->peer->rtt_usage < 3 && sp->hdr.seq & 1) ||
(call->peer->rtt_count < 3 && sp->hdr.seq & 1) ||
ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000),
ktime_get_real())))
whdr.flags |= RXRPC_REQUEST_ACK;
Expand Down Expand Up @@ -423,13 +423,10 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
if (whdr.flags & RXRPC_REQUEST_ACK) {
call->peer->rtt_last_req = skb->tstamp;
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_data, serial);
if (call->peer->rtt_usage > 1) {
if (call->peer->rtt_count > 1) {
unsigned long nowj = jiffies, ack_lost_at;

ack_lost_at = nsecs_to_jiffies(2 * call->peer->rtt);
if (ack_lost_at < 1)
ack_lost_at = 1;

ack_lost_at = rxrpc_get_rto_backoff(call->peer, retrans);
ack_lost_at += nowj;
WRITE_ONCE(call->ack_lost_at, ack_lost_at);
rxrpc_reduce_call_timer(call, ack_lost_at, nowj,
Expand Down
46 changes: 0 additions & 46 deletions net/rxrpc/peer_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,52 +295,6 @@ static void rxrpc_distribute_error(struct rxrpc_peer *peer, int error,
}
}

/*
* Add RTT information to cache. This is called in softirq mode and has
* exclusive access to the peer RTT data.
*/
void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
rxrpc_serial_t send_serial, rxrpc_serial_t resp_serial,
ktime_t send_time, ktime_t resp_time)
{
struct rxrpc_peer *peer = call->peer;
s64 rtt;
u64 sum = peer->rtt_sum, avg;
u8 cursor = peer->rtt_cursor, usage = peer->rtt_usage;

rtt = ktime_to_ns(ktime_sub(resp_time, send_time));
if (rtt < 0)
return;

spin_lock(&peer->rtt_input_lock);

/* Replace the oldest datum in the RTT buffer */
sum -= peer->rtt_cache[cursor];
sum += rtt;
peer->rtt_cache[cursor] = rtt;
peer->rtt_cursor = (cursor + 1) & (RXRPC_RTT_CACHE_SIZE - 1);
peer->rtt_sum = sum;
if (usage < RXRPC_RTT_CACHE_SIZE) {
usage++;
peer->rtt_usage = usage;
}

spin_unlock(&peer->rtt_input_lock);

/* Now recalculate the average */
if (usage == RXRPC_RTT_CACHE_SIZE) {
avg = sum / RXRPC_RTT_CACHE_SIZE;
} else {
avg = sum;
do_div(avg, usage);
}

/* Don't need to update this under lock */
peer->rtt = avg;
trace_rxrpc_rtt_rx(call, why, send_serial, resp_serial, rtt,
usage, avg);
}

/*
* Perform keep-alive pings.
*/
Expand Down
Loading

0 comments on commit c410bf0

Please sign in to comment.