Skip to content

Commit

Permalink
soreuseport: Fix socket selection for SO_INCOMING_CPU.
Browse files Browse the repository at this point in the history
[ Upstream commit b261eda ]

Kazuho Oku reported that setsockopt(SO_INCOMING_CPU) does not work
with setsockopt(SO_REUSEPORT) since v4.6.

With the combination of SO_REUSEPORT and SO_INCOMING_CPU, we could
build a highly efficient server application.

setsockopt(SO_INCOMING_CPU) associates a CPU with a TCP listener
or UDP socket, and then incoming packets processed on the CPU will
likely be distributed to the socket.  Technically, a socket could
even receive packets handled on another CPU if no sockets in the
reuseport group have the same CPU receiving the flow.

The logic exists in compute_score() so that a socket will get a higher
score if it has the same CPU with the flow.  However, the score gets
ignored after the blamed two commits, which introduced a faster socket
selection algorithm for SO_REUSEPORT.

This patch introduces a counter of sockets with SO_INCOMING_CPU in
a reuseport group to check if we should iterate all sockets to find
a proper one.  We increment the counter when

  * calling listen() if the socket has SO_INCOMING_CPU and SO_REUSEPORT

  * enabling SO_INCOMING_CPU if the socket is in a reuseport group

Also, we decrement it when

  * detaching a socket out of the group to apply SO_INCOMING_CPU to
    migrated TCP requests

  * disabling SO_INCOMING_CPU if the socket is in a reuseport group

When the counter reaches 0, we can get back to the O(1) selection
algorithm.

The overall changes are negligible for the non-SO_INCOMING_CPU case,
and the only notable thing is that we have to update sk_incomnig_cpu
under reuseport_lock.  Otherwise, the race prevents transitioning to
the O(n) algorithm and results in the wrong socket selection.

 cpu1 (setsockopt)               cpu2 (listen)
+-----------------+             +-------------+

lock_sock(sk1)                  lock_sock(sk2)

reuseport_update_incoming_cpu(sk1, val)
.
|  /* set CPU as 0 */
|- WRITE_ONCE(sk1->incoming_cpu, val)
|
|                               spin_lock_bh(&reuseport_lock)
|                               reuseport_grow(sk2, reuse)
|                               .
|                               |- more_socks_size = reuse->max_socks * 2U;
|                               |- if (more_socks_size > U16_MAX &&
|                               |       reuse->num_closed_socks)
|                               |  .
|                               |  |- RCU_INIT_POINTER(sk1->sk_reuseport_cb, NULL);
|                               |  `- __reuseport_detach_closed_sock(sk1, reuse)
|                               |     .
|                               |     `- reuseport_put_incoming_cpu(sk1, reuse)
|                               |        .
|                               |        |  /* Read shutdown()ed sk1's sk_incoming_cpu
|                               |        |   * without lock_sock().
|                               |        |   */
|                               |        `- if (sk1->sk_incoming_cpu >= 0)
|                               |           .
|                               |           |  /* decrement not-yet-incremented
|                               |           |   * count, which is never incremented.
|                               |           |   */
|                               |           `- __reuseport_put_incoming_cpu(reuse);
|                               |
|                               `- spin_lock_bh(&reuseport_lock)
|
|- spin_lock_bh(&reuseport_lock)
|
|- reuse = rcu_dereference_protected(sk1->sk_reuseport_cb, ...)
|- if (!reuse)
|  .
|  |  /* Cannot increment reuse->incoming_cpu. */
|  `- goto out;
|
`- spin_unlock_bh(&reuseport_lock)

Fixes: e32ea7e ("soreuseport: fast reuseport UDP socket selection")
Fixes: c125e80 ("soreuseport: fast reuseport TCP socket selection")
Reported-by: Kazuho Oku <kazuhooku@gmail.com>
Signed-off-by: Kuniyuki Iwashima <kuniyu@amazon.com>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
Signed-off-by: Sasha Levin <sashal@kernel.org>
  • Loading branch information
q2ven authored and gregkh committed Dec 31, 2022
1 parent 094f561 commit 8741792
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 6 deletions.
2 changes: 2 additions & 0 deletions include/net/sock_reuseport.h
Expand Up @@ -16,6 +16,7 @@ struct sock_reuseport {
u16 max_socks; /* length of socks */
u16 num_socks; /* elements in socks */
u16 num_closed_socks; /* closed elements in socks */
u16 incoming_cpu;
/* The last synq overflow event timestamp of this
* reuse->socks[] group.
*/
Expand Down Expand Up @@ -58,5 +59,6 @@ static inline bool reuseport_has_conns(struct sock *sk)
}

void reuseport_has_conns_set(struct sock *sk);
void reuseport_update_incoming_cpu(struct sock *sk, int val);

#endif /* _SOCK_REUSEPORT_H */
2 changes: 1 addition & 1 deletion net/core/sock.c
Expand Up @@ -1302,7 +1302,7 @@ int sock_setsockopt(struct socket *sock, int level, int optname,
break;
}
case SO_INCOMING_CPU:
WRITE_ONCE(sk->sk_incoming_cpu, val);
reuseport_update_incoming_cpu(sk, val);
break;

case SO_CNX_ADVICE:
Expand Down
94 changes: 89 additions & 5 deletions net/core/sock_reuseport.c
Expand Up @@ -37,6 +37,70 @@ void reuseport_has_conns_set(struct sock *sk)
}
EXPORT_SYMBOL(reuseport_has_conns_set);

static void __reuseport_get_incoming_cpu(struct sock_reuseport *reuse)
{
/* Paired with READ_ONCE() in reuseport_select_sock_by_hash(). */
WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu + 1);
}

static void __reuseport_put_incoming_cpu(struct sock_reuseport *reuse)
{
/* Paired with READ_ONCE() in reuseport_select_sock_by_hash(). */
WRITE_ONCE(reuse->incoming_cpu, reuse->incoming_cpu - 1);
}

static void reuseport_get_incoming_cpu(struct sock *sk, struct sock_reuseport *reuse)
{
if (sk->sk_incoming_cpu >= 0)
__reuseport_get_incoming_cpu(reuse);
}

static void reuseport_put_incoming_cpu(struct sock *sk, struct sock_reuseport *reuse)
{
if (sk->sk_incoming_cpu >= 0)
__reuseport_put_incoming_cpu(reuse);
}

void reuseport_update_incoming_cpu(struct sock *sk, int val)
{
struct sock_reuseport *reuse;
int old_sk_incoming_cpu;

if (unlikely(!rcu_access_pointer(sk->sk_reuseport_cb))) {
/* Paired with REAE_ONCE() in sk_incoming_cpu_update()
* and compute_score().
*/
WRITE_ONCE(sk->sk_incoming_cpu, val);
return;
}

spin_lock_bh(&reuseport_lock);

/* This must be done under reuseport_lock to avoid a race with
* reuseport_grow(), which accesses sk->sk_incoming_cpu without
* lock_sock() when detaching a shutdown()ed sk.
*
* Paired with READ_ONCE() in reuseport_select_sock_by_hash().
*/
old_sk_incoming_cpu = sk->sk_incoming_cpu;
WRITE_ONCE(sk->sk_incoming_cpu, val);

reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));

/* reuseport_grow() has detached a closed sk. */
if (!reuse)
goto out;

if (old_sk_incoming_cpu < 0 && val >= 0)
__reuseport_get_incoming_cpu(reuse);
else if (old_sk_incoming_cpu >= 0 && val < 0)
__reuseport_put_incoming_cpu(reuse);

out:
spin_unlock_bh(&reuseport_lock);
}

static int reuseport_sock_index(struct sock *sk,
const struct sock_reuseport *reuse,
bool closed)
Expand Down Expand Up @@ -64,6 +128,7 @@ static void __reuseport_add_sock(struct sock *sk,
/* paired with smp_rmb() in reuseport_(select|migrate)_sock() */
smp_wmb();
reuse->num_socks++;
reuseport_get_incoming_cpu(sk, reuse);
}

static bool __reuseport_detach_sock(struct sock *sk,
Expand All @@ -76,6 +141,7 @@ static bool __reuseport_detach_sock(struct sock *sk,

reuse->socks[i] = reuse->socks[reuse->num_socks - 1];
reuse->num_socks--;
reuseport_put_incoming_cpu(sk, reuse);

return true;
}
Expand All @@ -86,6 +152,7 @@ static void __reuseport_add_closed_sock(struct sock *sk,
reuse->socks[reuse->max_socks - reuse->num_closed_socks - 1] = sk;
/* paired with READ_ONCE() in inet_csk_bind_conflict() */
WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks + 1);
reuseport_get_incoming_cpu(sk, reuse);
}

static bool __reuseport_detach_closed_sock(struct sock *sk,
Expand All @@ -99,6 +166,7 @@ static bool __reuseport_detach_closed_sock(struct sock *sk,
reuse->socks[i] = reuse->socks[reuse->max_socks - reuse->num_closed_socks];
/* paired with READ_ONCE() in inet_csk_bind_conflict() */
WRITE_ONCE(reuse->num_closed_socks, reuse->num_closed_socks - 1);
reuseport_put_incoming_cpu(sk, reuse);

return true;
}
Expand Down Expand Up @@ -166,6 +234,7 @@ int reuseport_alloc(struct sock *sk, bool bind_inany)
reuse->bind_inany = bind_inany;
reuse->socks[0] = sk;
reuse->num_socks = 1;
reuseport_get_incoming_cpu(sk, reuse);
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);

out:
Expand Down Expand Up @@ -209,6 +278,7 @@ static struct sock_reuseport *reuseport_grow(struct sock_reuseport *reuse)
more_reuse->reuseport_id = reuse->reuseport_id;
more_reuse->bind_inany = reuse->bind_inany;
more_reuse->has_conns = reuse->has_conns;
more_reuse->incoming_cpu = reuse->incoming_cpu;

memcpy(more_reuse->socks, reuse->socks,
reuse->num_socks * sizeof(struct sock *));
Expand Down Expand Up @@ -458,18 +528,32 @@ static struct sock *run_bpf_filter(struct sock_reuseport *reuse, u16 socks,
static struct sock *reuseport_select_sock_by_hash(struct sock_reuseport *reuse,
u32 hash, u16 num_socks)
{
struct sock *first_valid_sk = NULL;
int i, j;

i = j = reciprocal_scale(hash, num_socks);
while (reuse->socks[i]->sk_state == TCP_ESTABLISHED) {
do {
struct sock *sk = reuse->socks[i];

if (sk->sk_state != TCP_ESTABLISHED) {
/* Paired with WRITE_ONCE() in __reuseport_(get|put)_incoming_cpu(). */
if (!READ_ONCE(reuse->incoming_cpu))
return sk;

/* Paired with WRITE_ONCE() in reuseport_update_incoming_cpu(). */
if (READ_ONCE(sk->sk_incoming_cpu) == raw_smp_processor_id())
return sk;

if (!first_valid_sk)
first_valid_sk = sk;
}

i++;
if (i >= num_socks)
i = 0;
if (i == j)
return NULL;
}
} while (i != j);

return reuse->socks[i];
return first_valid_sk;
}

/**
Expand Down

0 comments on commit 8741792

Please sign in to comment.