Skip to content

Commit

Permalink
RDMA/rxe: Add memory barriers to kernel queues
Browse files Browse the repository at this point in the history
[ Upstream commit ae6e843 ]

Earlier patches added memory barriers to protect user space to kernel
space communications. The user space queues were previously shown to have
occasional memory synchonization errors which were removed by adding
smp_load_acquire, smp_store_release barriers.  This patch extends that to
the case where queues are used between kernel space threads.

This patch also extends the queue types to include kernel ULP queues which
access the other end of the queues in kernel verbs calls like poll_cq and
post_send/recv.

Link: https://lore.kernel.org/r/20210914164206.19768-2-rpearsonhpe@gmail.com
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
Signed-off-by: Jason Gunthorpe <jgg@nvidia.com>
Signed-off-by: Sasha Levin <sashal@kernel.org>
  • Loading branch information
Bob Pearson authored and gregkh committed Aug 17, 2022
1 parent 931990c commit 66c735c
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 314 deletions.
12 changes: 3 additions & 9 deletions drivers/infiniband/sw/rxe/rxe_comp.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ static inline enum comp_state get_wqe(struct rxe_qp *qp,
/* we come here whether or not we found a response packet to see if
* there are any posted WQEs
*/
if (qp->is_user)
wqe = queue_head(qp->sq.queue, QUEUE_TYPE_FROM_USER);
else
wqe = queue_head(qp->sq.queue, QUEUE_TYPE_KERNEL);
wqe = queue_head(qp->sq.queue, QUEUE_TYPE_FROM_CLIENT);
*wqe_p = wqe;

/* no WQE or requester has not started it yet */
Expand Down Expand Up @@ -432,10 +429,7 @@ static void do_complete(struct rxe_qp *qp, struct rxe_send_wqe *wqe)
if (post)
make_send_cqe(qp, wqe, &cqe);

if (qp->is_user)
advance_consumer(qp->sq.queue, QUEUE_TYPE_FROM_USER);
else
advance_consumer(qp->sq.queue, QUEUE_TYPE_KERNEL);
queue_advance_consumer(qp->sq.queue, QUEUE_TYPE_FROM_CLIENT);

if (post)
rxe_cq_post(qp->scq, &cqe, 0);
Expand Down Expand Up @@ -539,7 +533,7 @@ static void rxe_drain_resp_pkts(struct rxe_qp *qp, bool notify)
wqe->status = IB_WC_WR_FLUSH_ERR;
do_complete(qp, wqe);
} else {
advance_consumer(q, q->type);
queue_advance_consumer(q, q->type);
}
}
}
Expand Down
25 changes: 5 additions & 20 deletions drivers/infiniband/sw/rxe/rxe_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ int rxe_cq_chk_attr(struct rxe_dev *rxe, struct rxe_cq *cq,
}

if (cq) {
if (cq->is_user)
count = queue_count(cq->queue, QUEUE_TYPE_TO_USER);
else
count = queue_count(cq->queue, QUEUE_TYPE_KERNEL);

count = queue_count(cq->queue, QUEUE_TYPE_TO_CLIENT);
if (cqe < count) {
pr_warn("cqe(%d) < current # elements in queue (%d)",
cqe, count);
Expand Down Expand Up @@ -65,7 +61,7 @@ int rxe_cq_from_init(struct rxe_dev *rxe, struct rxe_cq *cq, int cqe,
int err;
enum queue_type type;

type = uresp ? QUEUE_TYPE_TO_USER : QUEUE_TYPE_KERNEL;
type = QUEUE_TYPE_TO_CLIENT;
cq->queue = rxe_queue_init(rxe, &cqe,
sizeof(struct rxe_cqe), type);
if (!cq->queue) {
Expand Down Expand Up @@ -117,11 +113,7 @@ int rxe_cq_post(struct rxe_cq *cq, struct rxe_cqe *cqe, int solicited)

spin_lock_irqsave(&cq->cq_lock, flags);

if (cq->is_user)
full = queue_full(cq->queue, QUEUE_TYPE_TO_USER);
else
full = queue_full(cq->queue, QUEUE_TYPE_KERNEL);

full = queue_full(cq->queue, QUEUE_TYPE_TO_CLIENT);
if (unlikely(full)) {
spin_unlock_irqrestore(&cq->cq_lock, flags);
if (cq->ibcq.event_handler) {
Expand All @@ -134,17 +126,10 @@ int rxe_cq_post(struct rxe_cq *cq, struct rxe_cqe *cqe, int solicited)
return -EBUSY;
}

if (cq->is_user)
addr = producer_addr(cq->queue, QUEUE_TYPE_TO_USER);
else
addr = producer_addr(cq->queue, QUEUE_TYPE_KERNEL);

addr = queue_producer_addr(cq->queue, QUEUE_TYPE_TO_CLIENT);
memcpy(addr, cqe, sizeof(*cqe));

if (cq->is_user)
advance_producer(cq->queue, QUEUE_TYPE_TO_USER);
else
advance_producer(cq->queue, QUEUE_TYPE_KERNEL);
queue_advance_producer(cq->queue, QUEUE_TYPE_TO_CLIENT);

spin_unlock_irqrestore(&cq->cq_lock, flags);

Expand Down
12 changes: 4 additions & 8 deletions drivers/infiniband/sw/rxe/rxe_qp.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
qp->sq.max_inline = init->cap.max_inline_data = wqe_size;
wqe_size += sizeof(struct rxe_send_wqe);

type = uresp ? QUEUE_TYPE_FROM_USER : QUEUE_TYPE_KERNEL;
type = QUEUE_TYPE_FROM_CLIENT;
qp->sq.queue = rxe_queue_init(rxe, &qp->sq.max_wr,
wqe_size, type);
if (!qp->sq.queue)
Expand All @@ -248,12 +248,8 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
return err;
}

if (qp->is_user)
qp->req.wqe_index = producer_index(qp->sq.queue,
QUEUE_TYPE_FROM_USER);
else
qp->req.wqe_index = producer_index(qp->sq.queue,
QUEUE_TYPE_KERNEL);
qp->req.wqe_index = queue_get_producer(qp->sq.queue,
QUEUE_TYPE_FROM_CLIENT);

qp->req.state = QP_STATE_RESET;
qp->req.opcode = -1;
Expand Down Expand Up @@ -293,7 +289,7 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
pr_debug("qp#%d max_wr = %d, max_sge = %d, wqe_size = %d\n",
qp_num(qp), qp->rq.max_wr, qp->rq.max_sge, wqe_size);

type = uresp ? QUEUE_TYPE_FROM_USER : QUEUE_TYPE_KERNEL;
type = QUEUE_TYPE_FROM_CLIENT;
qp->rq.queue = rxe_queue_init(rxe, &qp->rq.max_wr,
wqe_size, type);
if (!qp->rq.queue)
Expand Down
30 changes: 23 additions & 7 deletions drivers/infiniband/sw/rxe/rxe_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,33 @@ struct rxe_queue *rxe_queue_init(struct rxe_dev *rxe, int *num_elem,
static int resize_finish(struct rxe_queue *q, struct rxe_queue *new_q,
unsigned int num_elem)
{
if (!queue_empty(q, q->type) && (num_elem < queue_count(q, q->type)))
enum queue_type type = q->type;
u32 prod;
u32 cons;

if (!queue_empty(q, q->type) && (num_elem < queue_count(q, type)))
return -EINVAL;

while (!queue_empty(q, q->type)) {
memcpy(producer_addr(new_q, new_q->type),
consumer_addr(q, q->type),
new_q->elem_size);
advance_producer(new_q, new_q->type);
advance_consumer(q, q->type);
prod = queue_get_producer(new_q, type);
cons = queue_get_consumer(q, type);

while (!queue_empty(q, type)) {
memcpy(queue_addr_from_index(new_q, prod),
queue_addr_from_index(q, cons), new_q->elem_size);
prod = queue_next_index(new_q, prod);
cons = queue_next_index(q, cons);
}

new_q->buf->producer_index = prod;
q->buf->consumer_index = cons;

/* update private index copies */
if (type == QUEUE_TYPE_TO_CLIENT)
new_q->index = new_q->buf->producer_index;
else
q->index = q->buf->consumer_index;

/* exchange rxe_queue headers */
swap(*q, *new_q);

return 0;
Expand Down

0 comments on commit 66c735c

Please sign in to comment.