Skip to content

Commit

Permalink
DOSE-607 ensure agent_resume runs to completion (openzfs#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
don-brady committed Jan 20, 2022
1 parent fa2940c commit 7b15686
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 72 deletions.
3 changes: 3 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/root_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ impl RootConnectionState {
response.insert("request_id", &request_id).unwrap();
response.insert("token", &token).unwrap();
super_trace!("sending response: {:?}", response);
if nvl.exists("reissued") {
maybe_die_with(|| "after reissued write block request".to_string());
}
Ok(Some(response))
}))
}
Expand Down
1 change: 1 addition & 0 deletions include/sys/vdev_object_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
#define AGENT_MESSAGE "message"
#define AGENT_CHECKPOINT "checkpoint"
#define AGENT_ROLLBACK "rollback"
#define AGENT_REISSUE "reissue"

typedef struct vdev_object_store_stats {
uint64_t voss_blocks_count;
Expand Down
236 changes: 164 additions & 72 deletions module/os/linux/zfs/vdev_object_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ typedef enum {
VOS_SERIAL_TYPES
} vos_serial_types_t;

typedef enum {
VOS_RESUME_NOT_RUNNING = 0,
VOS_RESUME_START = (1 << 0),
VOS_RESUME_OPENING = (1 << 1),
VOS_RESUME_OPENED = (1 << 2),
VOS_RESUME_REISSUE = (1 << 3),
VOS_RESUME_FAILED = (1 << 4)
} agent_resume_state_t;

/*
* Per request private data
*/
Expand Down Expand Up @@ -133,6 +142,10 @@ typedef struct vdev_object_store {
const char *vos_feature_enable;
uint64_t vos_result;

kmutex_t vos_resume_lock;
kcondvar_t vos_resume_cv;
agent_resume_state_t vos_resume_state;

uint64_t vos_next_block;
uberblock_t vos_uberblock;
nvlist_t *vos_config;
Expand Down Expand Up @@ -879,75 +892,21 @@ agent_resume_state_check(vdev_t *vd)
}

static void
agent_resume(void *arg)
agent_resume_set_state(vdev_object_store_t *vos, agent_resume_state_t state)
{
vdev_t *vd = arg;
vdev_object_store_t *vos = vd->vdev_tsd;
spa_t *spa = vd->vdev_spa;
int ret;
boolean_t destroying = spa_state(spa) == POOL_STATE_DESTROYED;

zfs_dbgmsg("agent_resume running");
ASSERT(MUTEX_NOT_HELD(&vos->vos_resume_lock));

/*
* Wait till the socket is opened.
*/
mutex_enter(&vos->vos_sock_lock);
zfs_object_store_wait(vos, VOS_SOCK_OPEN);

if (spa->spa_load_state == SPA_LOAD_CREATE) {
/*
* Since we're resuming a pool creation, just
* replay the message but don't wait for the completion.
* The original caller of the pool creation will be
* woken up when the response is received.
*/
agent_create_pool(vd, vos);
mutex_exit(&vos->vos_sock_lock);
return;
}
mutex_exit(&vos->vos_sock_lock);

/*
* If there was an open in progress when we resumed, all we need
* to do is restart the open. There can't be any other outstanding
* requests before the initial open complets.
*/
boolean_t in_open = !vos->vos_open_completed;

/*
* If we are destroying the pool or closing it, the open call is
* unnecessary. Bypassing it is a small performance optimization.
*/
if (destroying || !vos->vos_closing) {
if (agent_open_pool(vd, vos,
vdev_object_store_open_mode(spa_mode(vd->vdev_spa)),
B_TRUE) != 0) {
zfs_dbgmsg("agent resume failed, pool open failed");
vdev_set_state(vd, B_FALSE, VDEV_STATE_CANT_OPEN,
VDEV_AUX_OPEN_FAILED);
vos->vos_agent_thread_exit = B_TRUE;
return;
}
if (in_open) {
zfs_dbgmsg("agent resume during initial open");
return;
}
}
if (vos->vos_closing) {
mutex_enter(&vos->vos_sock_lock);
agent_close_pool(vos, destroying);
return;
}
mutex_enter(&vos->vos_resume_lock);
vos->vos_resume_state = state;
mutex_exit(&vos->vos_resume_lock);
}

static int
agent_resume_reissue(vdev_object_store_t *vos, vdev_t *vd)
{
int ret;

if ((ret = agent_resume_state_check(vd)) != 0) {
zfs_dbgmsg("agent resume failed, uberblock changed");
vdev_set_state(vd, B_FALSE, VDEV_STATE_CANT_OPEN,
VDEV_AUX_MODIFIED);
vos->vos_agent_thread_exit = B_TRUE;
return;
}
agent_resume_set_state(vos, VOS_RESUME_REISSUE);

mutex_enter(&vos->vos_sock_lock);

Expand All @@ -957,6 +916,7 @@ agent_resume(void *arg)

vdev_queue_t *vq = &vd->vdev_queue;
mutex_enter(&vq->vq_lock);

for (zio_t *zio = avl_first(&vq->vq_active_tree); zio != NULL;
zio = AVL_NEXT(&vq->vq_active_tree, zio)) {
uint64_t req = zio->io_offset >> SPA_MINBLOCKSHIFT;
Expand All @@ -974,15 +934,18 @@ agent_resume(void *arg)
nvlist_t *nv = agent_io_block_alloc(zio);
fnvlist_add_uint64(nv, AGENT_REQUEST_ID, req);
fnvlist_add_uint64(nv, AGENT_TOKEN, (uint64_t)zio);
/* tag this I/O for possibe agent maybe_die_with() candidate */
if (zio->io_type == ZIO_TYPE_WRITE)
fnvlist_add_boolean(nv, AGENT_REISSUE);
zfs_dbgmsg("ZIO REISSUE (%px) req %llu",
zio, (u_longlong_t)req);
if ((ret = agent_request(vos, nv, FTAG)) != 0) {
zfs_dbgmsg("agent_resume failed: %d", ret);
agent_io_block_free(nv);
vos->vos_agent_thread_exit = B_TRUE;
mutex_exit(&vq->vq_lock);
mutex_exit(&vos->vos_sock_lock);
return;
agent_resume_set_state(vos, VOS_RESUME_FAILED);
return (-1);
}
agent_io_block_free(nv);
}
Expand Down Expand Up @@ -1020,11 +983,13 @@ agent_resume(void *arg)
agent_free_blocks(vos) != 0) {
zfs_dbgmsg("agent_resume freeing failed");
mutex_exit(&vos->vos_sock_lock);
return;
agent_resume_set_state(vos, VOS_RESUME_FAILED);
return (-1);
}

if (vos->vos_send_txg_selector == VOS_TXG_END ||
vos->vos_send_txg_selector == VOS_TXG_END_AGAIN) {
spa_t *spa = vd->vdev_spa;
size_t nvlen;
char *nvbuf = fnvlist_pack(vos->vos_config, &nvlen);
agent_end_txg(vos, spa_syncing_txg(spa),
Expand All @@ -1041,8 +1006,110 @@ agent_resume(void *arg)
vos->vos_sock_state = VOS_SOCK_READY;
cv_broadcast(&vos->vos_sock_cv);
mutex_exit(&vos->vos_sock_lock);
return (0);
}

zfs_dbgmsg("agent_resume completed");
static void
agent_resume(void *arg)
{
vdev_t *vd = arg;
vdev_object_store_t *vos = vd->vdev_tsd;
spa_t *spa = vd->vdev_spa;
mode_t open_mode = vdev_object_store_open_mode(spa_mode(vd->vdev_spa));
int ret;
boolean_t destroying = spa_state(spa) == POOL_STATE_DESTROYED;

zfs_dbgmsg("agent_resume running");

/* This task runs until successful completion of agent_resume_reissue */
while (!vos->vos_agent_thread_exit) {
/* synchronize with main agent thread */
mutex_enter(&vos->vos_resume_lock);
while (vos->vos_resume_state != VOS_RESUME_START) {
cv_wait(&vos->vos_resume_cv, &vos->vos_resume_lock);
}
mutex_exit(&vos->vos_resume_lock);

/*
* Wait till the socket is opened.
*/
mutex_enter(&vos->vos_sock_lock);
zfs_object_store_wait(vos, VOS_SOCK_OPEN);

if (spa->spa_load_state == SPA_LOAD_CREATE) {
/*
* Since we're resuming a pool creation, just
* replay the message but don't wait for the completion.
* The original caller of the pool creation will be
* woken up when the response is received.
*/
agent_create_pool(vd, vos);
mutex_exit(&vos->vos_sock_lock);
break;
}
mutex_exit(&vos->vos_sock_lock);

/*
* If the initial vdev open was in progress when we resumed,
* all we need to do is restart the open. There can't be any
* other outstanding requests before the initial open complets.
*/
boolean_t in_initial_open = !vos->vos_open_completed;

/*
* If we are destroying the pool or closing it, the open call is
* unnecessary. Bypassing for a small performance optimization.
*/
if (!destroying && !vos->vos_closing) {
agent_resume_set_state(vos, VOS_RESUME_OPENING);
uint64_t result = agent_open_pool(vd, vos, open_mode,
B_TRUE);
if (result == ERESTART) {
zfs_dbgmsg("agent_resume retry opening pool");
agent_resume_set_state(vos, VOS_RESUME_FAILED);
continue;
}
if (result != 0) {
zfs_dbgmsg("agent_resume: pool open failed, "
"err %llu", (u_longlong_t)result);
vdev_set_state(vd, B_FALSE,
VDEV_STATE_CANT_OPEN, VDEV_AUX_OPEN_FAILED);
vos->vos_agent_thread_exit = B_TRUE;
break;
}
if (in_initial_open) {
zfs_dbgmsg("agent resume during initial open");
break;
}
zfs_dbgmsg("agent resume: pool opened");
agent_resume_set_state(vos, VOS_RESUME_OPENED);
}
ASSERT(!in_initial_open);
if (vos->vos_closing) {
mutex_enter(&vos->vos_sock_lock);
agent_close_pool(vos, destroying);
break;
}

if ((ret = agent_resume_state_check(vd)) != 0) {
zfs_dbgmsg("agent resume failed, uberblock changed");
vdev_set_state(vd, B_FALSE, VDEV_STATE_CANT_OPEN,
VDEV_AUX_MODIFIED);
vos->vos_agent_thread_exit = B_TRUE;
break;
}

/*
* Reissue I/O that was inflight when the agent restarted.
* If another agent restart occurs before we complete we
* start over, finishing where we left off.
*/
if (agent_resume_reissue(vos, vd) == 0)
break;
}

agent_resume_set_state(vos, VOS_RESUME_NOT_RUNNING);
zfs_dbgmsg("agent_resume task completed");
}

static uint64_t
Expand Down Expand Up @@ -1328,6 +1395,7 @@ agent_reader(void *arg)
if (zfs_flags & ZFS_DEBUG_OBJECT_STORE) {
zfs_dbgmsg("got response from agent type=%s", type);
}
vos->vos_result = 0;
// XXX debug message the nvlist
if (strcmp(type, AGENT_TYPE_CREATE_POOL_DONE) == 0) {
if (nvlist_lookup_string(nv, AGENT_CAUSE, &cause) == 0) {
Expand Down Expand Up @@ -1598,9 +1666,29 @@ vdev_agent_thread(void *arg)
zfs_dbgmsg("REOPENED(%px) sock " SOCK_FMT, curthread,
vos->vos_sock);

/* XXX - make sure we only run this once and it completes */
VERIFY3U(taskq_dispatch(resume_taskq,
agent_resume, vd, TQ_SLEEP), !=, TASKQID_INVALID);
/*
* A resume task reissues I/O that was interrupted by an
* agent restart. If an existing task exists help it along.
*/
mutex_enter(&vos->vos_resume_lock);
if (vos->vos_resume_state != VOS_RESUME_NOT_RUNNING) {
zfs_dbgmsg("Existing resume task running, state %d",
(int)vos->vos_resume_state);

if (vos->vos_resume_state == VOS_RESUME_OPENING) {
/* force serial waiter to give up on open */
vos->vos_result = SET_ERROR(ERESTART);
agent_serial_done(vos, VOS_SERIAL_OPEN_POOL);
/* resume task waits for VOS_RESUME_START */
}
} else {
VERIFY3U(taskq_dispatch(resume_taskq, agent_resume,
vd, TQ_SLEEP), !=, TASKQID_INVALID);
}
/* Signal resume task to start (avoids race for open state) */
vos->vos_resume_state = VOS_RESUME_START;
cv_broadcast(&vos->vos_resume_cv);
mutex_exit(&vos->vos_resume_lock);
}

mutex_enter(&vos->vos_lock);
Expand Down Expand Up @@ -1641,9 +1729,11 @@ vdev_object_store_init(spa_t *spa, nvlist_t *nv, void **tsd)
mutex_init(&vos->vos_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&vos->vos_stats_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&vos->vos_sock_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&vos->vos_resume_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&vos->vos_outstanding_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&vos->vos_cv, NULL, CV_DEFAULT, NULL);
cv_init(&vos->vos_sock_cv, NULL, CV_DEFAULT, NULL);
cv_init(&vos->vos_resume_cv, NULL, CV_DEFAULT, NULL);
cv_init(&vos->vos_outstanding_cv, NULL, CV_DEFAULT, NULL);
avl_create(&vos->vos_pending_stats_tree, pending_stats_compare,
sizeof (object_store_stats_call_t),
Expand Down Expand Up @@ -1684,9 +1774,11 @@ vdev_object_store_fini(vdev_t *vd)
mutex_destroy(&vos->vos_lock);
mutex_destroy(&vos->vos_stats_lock);
mutex_destroy(&vos->vos_sock_lock);
mutex_destroy(&vos->vos_resume_lock);
mutex_destroy(&vos->vos_outstanding_lock);
cv_destroy(&vos->vos_cv);
cv_destroy(&vos->vos_sock_cv);
cv_destroy(&vos->vos_resume_cv);
cv_destroy(&vos->vos_outstanding_cv);
avl_destroy(&vos->vos_pending_stats_tree);
if (vos->vos_endpoint != NULL) {
Expand Down

0 comments on commit 7b15686

Please sign in to comment.