Skip to content

Commit

Permalink
SUNRPC: More fixes for backlog congestion
Browse files Browse the repository at this point in the history
[ Upstream commit e86be3a ]

Ensure that we fix the XPRT_CONGESTED starvation issue for RDMA as well
as socket based transports.
Ensure we always initialise the request after waking up from the backlog
list.

Fixes: e877a88 ("SUNRPC in case of backlog, hand free slots directly to waiting task")
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
Signed-off-by: Sasha Levin <sashal@kernel.org>
  • Loading branch information
Trond Myklebust authored and gregkh committed Jun 3, 2021
1 parent 06397b6 commit 3b028c0
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 39 deletions.
2 changes: 2 additions & 0 deletions include/linux/sunrpc/xprt.h
Expand Up @@ -367,6 +367,8 @@ struct rpc_xprt * xprt_alloc(struct net *net, size_t size,
unsigned int num_prealloc,
unsigned int max_req);
void xprt_free(struct rpc_xprt *);
void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);

static inline int
xprt_enable_swap(struct rpc_xprt *xprt)
Expand Down
58 changes: 28 additions & 30 deletions net/sunrpc/xprt.c
Expand Up @@ -1603,33 +1603,40 @@ xprt_transmit(struct rpc_task *task)
spin_unlock(&xprt->queue_lock);
}

static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
static void xprt_complete_request_init(struct rpc_task *task)
{
if (task->tk_rqstp)
xprt_request_init(task);
}

void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
{
set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL);
rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
}
EXPORT_SYMBOL_GPL(xprt_add_backlog);

static bool __xprt_set_rq(struct rpc_task *task, void *data)
{
struct rpc_rqst *req = data;

if (task->tk_rqstp == NULL) {
memset(req, 0, sizeof(*req)); /* mark unused */
task->tk_status = -EAGAIN;
task->tk_rqstp = req;
return true;
}
return false;
}

static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
clear_bit(XPRT_CONGESTED, &xprt->state);
return false;
}
return true;
}
EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);

static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
{
Expand All @@ -1639,7 +1646,7 @@ static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task
goto out;
spin_lock(&xprt->reserve_lock);
if (test_bit(XPRT_CONGESTED, &xprt->state)) {
rpc_sleep_on(&xprt->backlog, task, NULL);
xprt_add_backlog(xprt, task);
ret = true;
}
spin_unlock(&xprt->reserve_lock);
Expand Down Expand Up @@ -1808,10 +1815,6 @@ xprt_request_init(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp;

if (req->rq_task)
/* Already initialized */
return;

req->rq_task = task;
req->rq_xprt = xprt;
req->rq_buffer = NULL;
Expand Down Expand Up @@ -1872,10 +1875,8 @@ void xprt_retry_reserve(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt;

task->tk_status = 0;
if (task->tk_rqstp != NULL) {
xprt_request_init(task);
if (task->tk_rqstp != NULL)
return;
}

task->tk_status = -EAGAIN;
xprt_do_reserve(xprt, task);
Expand All @@ -1900,24 +1901,21 @@ void xprt_release(struct rpc_task *task)
}

xprt = req->rq_xprt;
if (xprt) {
xprt_request_dequeue_xprt(task);
spin_lock(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request)
xprt->ops->release_request(task);
xprt_schedule_autodisconnect(xprt);
spin_unlock(&xprt->transport_lock);
if (req->rq_buffer)
xprt->ops->buf_free(task);
xdr_free_bvec(&req->rq_rcv_buf);
xdr_free_bvec(&req->rq_snd_buf);
if (req->rq_cred != NULL)
put_rpccred(req->rq_cred);
if (req->rq_release_snd_buf)
req->rq_release_snd_buf(req);
} else
xprt = task->tk_xprt;
xprt_request_dequeue_xprt(task);
spin_lock(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request)
xprt->ops->release_request(task);
xprt_schedule_autodisconnect(xprt);
spin_unlock(&xprt->transport_lock);
if (req->rq_buffer)
xprt->ops->buf_free(task);
xdr_free_bvec(&req->rq_rcv_buf);
xdr_free_bvec(&req->rq_snd_buf);
if (req->rq_cred != NULL)
put_rpccred(req->rq_cred);
if (req->rq_release_snd_buf)
req->rq_release_snd_buf(req);

task->tk_rqstp = NULL;
if (likely(!bc_prealloc(req)))
Expand Down
12 changes: 6 additions & 6 deletions net/sunrpc/xprtrdma/transport.c
Expand Up @@ -520,9 +520,8 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
return;

out_sleep:
set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN;
xprt_add_backlog(xprt, task);
}

/**
Expand All @@ -537,10 +536,11 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
struct rpcrdma_xprt *r_xprt =
container_of(xprt, struct rpcrdma_xprt, rx_xprt);

memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
clear_bit(XPRT_CONGESTED, &xprt->state);
rpcrdma_reply_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
if (!xprt_wake_up_backlog(xprt, rqst)) {
memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
}
}

static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
Expand Down
18 changes: 15 additions & 3 deletions net/sunrpc/xprtrdma/verbs.c
Expand Up @@ -1184,6 +1184,20 @@ rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
return mr;
}

/**
* rpcrdma_reply_put - Put reply buffers back into pool
* @buffers: buffer pool
* @req: object to return
*
*/
void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
if (req->rl_reply) {
rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
}
}

/**
* rpcrdma_buffer_get - Get a request buffer
* @buffers: Buffer pool from which to obtain a buffer
Expand Down Expand Up @@ -1212,9 +1226,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
*/
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
if (req->rl_reply)
rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
rpcrdma_reply_put(buffers, req);

spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
Expand Down
1 change: 1 addition & 0 deletions net/sunrpc/xprtrdma/xprt_rdma.h
Expand Up @@ -481,6 +481,7 @@ struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
struct rpcrdma_req *req);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req);

bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
gfp_t flags);
Expand Down

0 comments on commit 3b028c0

Please sign in to comment.