Skip to content

Commit

Permalink
nbd/server: introduce NBDClient->lock to protect fields
Browse files Browse the repository at this point in the history
NBDClient has a number of fields that are accessed by both the export
AioContext and the main loop thread. When the AioContext lock is removed
these fields will need another form of protection.

Add NBDClient->lock and protect fields that are accessed by both
threads. Also add assertions where possible and otherwise add doc
comments stating assumptions about which thread and lock holding.

Note this patch moves the client->recv_coroutine assertion from
nbd_co_receive_request() to nbd_trip() where client->lock is held.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Message-ID: <20231221192452.1785567-7-stefanha@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
(cherry picked from commit 7075d23)
Signed-off-by: Michael Tokarev <mjt@tls.msk.ru>
  • Loading branch information
stefanhaRH authored and Michael Tokarev committed Mar 19, 2024
1 parent aee1039 commit 13fc21a
Showing 1 changed file with 111 additions and 33 deletions.
144 changes: 111 additions & 33 deletions nbd/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,25 @@ struct NBDClient {
int refcount; /* atomic */
void (*close_fn)(NBDClient *client, bool negotiated);

QemuMutex lock;

NBDExport *exp;
QCryptoTLSCreds *tlscreds;
char *tlsauthz;
QIOChannelSocket *sioc; /* The underlying data channel */
QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */

Coroutine *recv_coroutine;
Coroutine *recv_coroutine; /* protected by lock */

CoMutex send_lock;
Coroutine *send_coroutine;

bool read_yielding;
bool quiescing;
bool read_yielding; /* protected by lock */
bool quiescing; /* protected by lock */

QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
int nb_requests; /* protected by lock */
bool closing; /* protected by lock */

uint32_t check_align; /* If non-zero, check for aligned client requests */

Expand Down Expand Up @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)

len = qio_channel_readv(client->ioc, &iov, 1, errp);
if (len == QIO_CHANNEL_ERR_BLOCK) {
client->read_yielding = true;
WITH_QEMU_LOCK_GUARD(&client->lock) {
client->read_yielding = true;

/* Prompt main loop thread to re-run nbd_drained_poll() */
aio_wait_kick();
}
qio_channel_yield(client->ioc, G_IO_IN);
client->read_yielding = false;
if (client->quiescing) {
return -EAGAIN;
WITH_QEMU_LOCK_GUARD(&client->lock) {
client->read_yielding = false;
if (client->quiescing) {
return -EAGAIN;
}
}
continue;
} else if (len < 0) {
Expand Down Expand Up @@ -1528,6 +1537,7 @@ void nbd_client_put(NBDClient *client)
blk_exp_unref(&client->exp->common);
}
g_free(client->contexts.bitmaps);
qemu_mutex_destroy(&client->lock);
g_free(client);
}
}
Expand Down Expand Up @@ -1561,11 +1571,13 @@ static void client_close(NBDClient *client, bool negotiated)
{
assert(qemu_in_main_thread());

if (client->closing) {
return;
}
WITH_QEMU_LOCK_GUARD(&client->lock) {
if (client->closing) {
return;
}

client->closing = true;
client->closing = true;
}

/* Force requests to finish. They will drop their own references,
* then we'll close the socket and free the NBDClient.
Expand All @@ -1579,6 +1591,7 @@ static void client_close(NBDClient *client, bool negotiated)
}
}

/* Runs in export AioContext with client->lock held */
static NBDRequestData *nbd_request_get(NBDClient *client)
{
NBDRequestData *req;
Expand All @@ -1592,6 +1605,7 @@ static NBDRequestData *nbd_request_get(NBDClient *client)
return req;
}

/* Runs in export AioContext with client->lock held */
static void nbd_request_put(NBDRequestData *req)
{
NBDClient *client = req->client;
Expand All @@ -1617,21 +1631,27 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
NBDExport *exp = opaque;
NBDClient *client;

assert(qemu_in_main_thread());

trace_nbd_blk_aio_attached(exp->name, ctx);

exp->common.ctx = ctx;

QTAILQ_FOREACH(client, &exp->clients, next) {
assert(client->nb_requests == 0);
assert(client->recv_coroutine == NULL);
assert(client->send_coroutine == NULL);
WITH_QEMU_LOCK_GUARD(&client->lock) {
assert(client->nb_requests == 0);
assert(client->recv_coroutine == NULL);
assert(client->send_coroutine == NULL);
}
}
}

static void blk_aio_detach(void *opaque)
{
NBDExport *exp = opaque;

assert(qemu_in_main_thread());

trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);

exp->common.ctx = NULL;
Expand All @@ -1642,8 +1662,12 @@ static void nbd_drained_begin(void *opaque)
NBDExport *exp = opaque;
NBDClient *client;

assert(qemu_in_main_thread());

QTAILQ_FOREACH(client, &exp->clients, next) {
client->quiescing = true;
WITH_QEMU_LOCK_GUARD(&client->lock) {
client->quiescing = true;
}
}
}

Expand All @@ -1652,28 +1676,48 @@ static void nbd_drained_end(void *opaque)
NBDExport *exp = opaque;
NBDClient *client;

assert(qemu_in_main_thread());

QTAILQ_FOREACH(client, &exp->clients, next) {
client->quiescing = false;
nbd_client_receive_next_request(client);
WITH_QEMU_LOCK_GUARD(&client->lock) {
client->quiescing = false;
nbd_client_receive_next_request(client);
}
}
}

/* Runs in export AioContext */
static void nbd_wake_read_bh(void *opaque)
{
NBDClient *client = opaque;
qio_channel_wake_read(client->ioc);
}

static bool nbd_drained_poll(void *opaque)
{
NBDExport *exp = opaque;
NBDClient *client;

assert(qemu_in_main_thread());

QTAILQ_FOREACH(client, &exp->clients, next) {
if (client->nb_requests != 0) {
/*
* If there's a coroutine waiting for a request on nbd_read_eof()
* enter it here so we don't depend on the client to wake it up.
*/
if (client->recv_coroutine != NULL && client->read_yielding) {
qio_channel_wake_read(client->ioc);
}
WITH_QEMU_LOCK_GUARD(&client->lock) {
if (client->nb_requests != 0) {
/*
* If there's a coroutine waiting for a request on nbd_read_eof()
* enter it here so we don't depend on the client to wake it up.
*
* Schedule a BH in the export AioContext to avoid missing the
* wake up due to the race between qio_channel_wake_read() and
* qio_channel_yield().
*/
if (client->recv_coroutine != NULL && client->read_yielding) {
aio_bh_schedule_oneshot(nbd_export_aio_context(client->exp),
nbd_wake_read_bh, client);
}

return true;
return true;
}
}
}

Expand All @@ -1684,6 +1728,8 @@ static void nbd_eject_notifier(Notifier *n, void *data)
{
NBDExport *exp = container_of(n, NBDExport, eject_notifier);

assert(qemu_in_main_thread());

blk_exp_request_shutdown(&exp->common);
}

Expand Down Expand Up @@ -2569,7 +2615,6 @@ static int coroutine_fn nbd_co_receive_request(NBDRequestData *req,
int ret;

g_assert(qemu_in_coroutine());
assert(client->recv_coroutine == qemu_coroutine_self());
ret = nbd_receive_request(client, request, errp);
if (ret < 0) {
return ret;
Expand Down Expand Up @@ -2978,6 +3023,9 @@ static coroutine_fn void nbd_trip(void *opaque)
*/

trace_nbd_trip();

qemu_mutex_lock(&client->lock);

if (client->closing) {
goto done;
}
Expand All @@ -2993,7 +3041,21 @@ static coroutine_fn void nbd_trip(void *opaque)
}

req = nbd_request_get(client);
ret = nbd_co_receive_request(req, &request, &local_err);

/*
* nbd_co_receive_request() returns -EAGAIN when nbd_drained_begin() has
* set client->quiescing but by the time we get back nbd_drained_end() may
* have already cleared client->quiescing. In that case we try again
* because nothing else will spawn an nbd_trip() coroutine until we set
* client->recv_coroutine = NULL further down.
*/
do {
assert(client->recv_coroutine == qemu_coroutine_self());
qemu_mutex_unlock(&client->lock);
ret = nbd_co_receive_request(req, &request, &local_err);
qemu_mutex_lock(&client->lock);
} while (ret == -EAGAIN && !client->quiescing);

client->recv_coroutine = NULL;

if (client->closing) {
Expand All @@ -3005,15 +3067,16 @@ static coroutine_fn void nbd_trip(void *opaque)
}

if (ret == -EAGAIN) {
assert(client->quiescing);
goto done;
}

nbd_client_receive_next_request(client);

if (ret == -EIO) {
goto disconnect;
}

qemu_mutex_unlock(&client->lock);
qio_channel_set_cork(client->ioc, true);

if (ret < 0) {
Expand All @@ -3033,6 +3096,10 @@ static coroutine_fn void nbd_trip(void *opaque)
g_free(request.contexts->bitmaps);
g_free(request.contexts);
}

qio_channel_set_cork(client->ioc, false);
qemu_mutex_lock(&client->lock);

if (ret < 0) {
error_prepend(&local_err, "Failed to send reply: ");
goto disconnect;
Expand All @@ -3047,11 +3114,13 @@ static coroutine_fn void nbd_trip(void *opaque)
goto disconnect;
}

qio_channel_set_cork(client->ioc, false);
done:
if (req) {
nbd_request_put(req);
}

qemu_mutex_unlock(&client->lock);

if (!nbd_client_put_nonzero(client)) {
aio_co_reschedule_self(qemu_get_aio_context());
nbd_client_put(client);
Expand All @@ -3062,13 +3131,19 @@ static coroutine_fn void nbd_trip(void *opaque)
if (local_err) {
error_reportf_err(local_err, "Disconnect client, due to: ");
}

nbd_request_put(req);
qemu_mutex_unlock(&client->lock);

aio_co_reschedule_self(qemu_get_aio_context());
client_close(client, true);
nbd_client_put(client);
}

/*
* Runs in export AioContext and main loop thread. Caller must hold
* client->lock.
*/
static void nbd_client_receive_next_request(NBDClient *client)
{
if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
Expand All @@ -3094,7 +3169,9 @@ static coroutine_fn void nbd_co_client_start(void *opaque)
return;
}

nbd_client_receive_next_request(client);
WITH_QEMU_LOCK_GUARD(&client->lock) {
nbd_client_receive_next_request(client);
}
}

/*
Expand All @@ -3111,6 +3188,7 @@ void nbd_client_new(QIOChannelSocket *sioc,
Coroutine *co;

client = g_new0(NBDClient, 1);
qemu_mutex_init(&client->lock);
client->refcount = 1;
client->tlscreds = tlscreds;
if (tlscreds) {
Expand Down

0 comments on commit 13fc21a

Please sign in to comment.