Skip to content

Commit

Permalink
async swap_flush()
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobgorm committed Feb 27, 2019
1 parent 783e476 commit f0ae617
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 87 deletions.
85 changes: 21 additions & 64 deletions block-swap.c
Expand Up @@ -260,7 +260,7 @@ typedef struct BDRVSwapState {
thread_event can_insert_event;
uxen_thread insert_thread;

thread_event all_flushed_event;
ioh_event *quisced_event;

DubTree t;
void *find_context;
Expand All @@ -285,8 +285,6 @@ struct {
} swap_stats = {0,};
#endif

typedef void BlockDriverCompletionFunc(void *opaque, int ret);

typedef struct SwapAIOCB {
BlockDriverAIOCB common; /* must go first. */
struct SwapAIOCB *next;
Expand Down Expand Up @@ -418,16 +416,6 @@ static inline void swap_wait_can_insert(BDRVSwapState *s)
thread_event_wait(&s->can_insert_event);
}

static inline void swap_signal_all_flushed(BDRVSwapState *s)
{
thread_event_set(&s->all_flushed_event);
}

static inline void swap_wait_all_flushed(BDRVSwapState *s)
{
thread_event_wait(&s->all_flushed_event);
}

static void *swap_malloc(void *_s, size_t sz)
{
BDRVSwapState *s = _s;
Expand Down Expand Up @@ -517,8 +505,8 @@ swap_insert_thread(void * _s)
load = s->busy_blocks.load;
swap_unlock(s);

if (load == 0) {
swap_signal_all_flushed(s);
if (load == 0 && s->quisced_event) {
ioh_event_set(s->quisced_event);
}
if (r < 0) {
err(1, "dubtree_insert failed, r=%d!", r);
Expand Down Expand Up @@ -863,7 +851,6 @@ char *swap_resolve_via_fallback(BDRVSwapState *s, const char *fn)
return check;
}


int swap_open(BlockDriverState *bs, const char *filename, int flags)
{
memset(bs, 0 , sizeof(*bs));
Expand Down Expand Up @@ -1005,7 +992,6 @@ int swap_open(BlockDriverState *bs, const char *filename, int flags)
&s->can_write_event,
&s->insert_event,
&s->can_insert_event,
&s->all_flushed_event,
};

for (i = 0; i < sizeof(events) / sizeof(events[0]); ++i) {
Expand Down Expand Up @@ -1639,8 +1625,7 @@ static int __swap_nonblocking_write(BDRVSwapState *s, const uint8_t *buf,
return (BlockDriverAIOCB *) acb;
}

#if 1
int swap_flush(BlockDriverState *bs)
int swap_flush(BlockDriverState *bs, ioh_event *done_event)
{
BDRVSwapState *s = (BDRVSwapState*) bs->opaque;
LruCache *bc = &s->bc;
Expand All @@ -1664,6 +1649,8 @@ int swap_flush(BlockDriverState *bs)

debug_printf("swap: emptying cache lines\n");
swap_lock(s);
s->flush = 1;
s->quisced_event = done_event;
for (i = 0; i < (1 << bc->log_lines); ++i) {
LruCacheLine *cl = &bc->lines[i];
if (cl->value) {
Expand All @@ -1677,53 +1664,10 @@ int swap_flush(BlockDriverState *bs)
cl->key = 0;
cl->value = 0;
}
s->flush = 1;
swap_unlock(s);

debug_printf("swap: wait for all writes to complete\n");
for (;;) {
uint32_t load;
swap_lock(s);
load = s->busy_blocks.load;
swap_unlock(s);
if (!load) {
break;
}
swap_signal_write(s);
swap_wait_all_flushed(s);
}
debug_printf("swap: finished waiting for write threads\n");

assert(s->pqs[0].n_heap == 0);
assert(s->pqs[1].n_heap == 0);
assert(s->busy_blocks.load == 0);

#ifdef _WIN32
/* Release the heap used for buffers back to OS. */
int nleaks = __sync_fetch_and_add(&s->alloced, 0);
if (nleaks) {
debug_printf("swap: leaked %d allocs\n", nleaks);
assert(0);
}
swap_lock(s);
HeapDestroy(s->heap);
s->heap = HeapCreate(0, 0, 0);
swap_unlock(s);
#endif

/* Quiesce dubtree and release caches. */
swap_lock(s);
if (s->find_context) {
dubtree_end_find(&s->t, s->find_context);
s->find_context = NULL;
}
s->flush = 0;
dubtree_checkpoint(&s->t, &s->top_id, &s->top_hash);
swap_write_header(s);
swap_signal_write(s);
swap_unlock(s);
return 0;
}
#endif

void swap_close(BlockDriverState *bs)
{
Expand All @@ -1745,13 +1689,26 @@ void swap_close(BlockDriverState *bs)
swap_signal_insert(s);
wait_thread(s->insert_thread);

assert(s->pqs[0].n_heap == 0);
assert(s->pqs[1].n_heap == 0);
assert(s->busy_blocks.load == 0);

swap_lock(s);
if (s->find_context) {
dubtree_end_find(&s->t, s->find_context);
s->find_context = NULL;
}
s->flush = 0;
dubtree_checkpoint(&s->t, &s->top_id, &s->top_hash);
swap_write_header(s);
swap_unlock(s);

if (s->find_context) {
dubtree_end_find(&s->t, s->find_context);
s->find_context = NULL;
}
dubtree_close(&s->t);

thread_event_close(&s->all_flushed_event);
thread_event_close(&s->write_event);
thread_event_close(&s->can_write_event);

Expand Down
8 changes: 7 additions & 1 deletion block-swap.h
@@ -1,3 +1,6 @@
#ifndef __BLOCK_SWAP_H__
#define __BLOCK_SWAP_H__

typedef struct BlockDriverState {
void *opaque;
uint64_t total_sectors;
Expand All @@ -19,9 +22,12 @@ BlockDriverAIOCB *swap_aio_read(BlockDriverState *bs,
int64_t sector_num, uint8_t *buf, int nb_sectors,
BlockDriverCompletionFunc *cb, void *opaque);

int swap_flush(BlockDriverState *bs);
struct ioh_event;
int swap_flush(BlockDriverState *bs, struct ioh_event *done_event);
void swap_close(BlockDriverState *bs);
int swap_create(const char *filename, int64_t size, int flags);
int swap_open(BlockDriverState *bs, const char *filename, int flags);
int swap_remove(BlockDriverState *bs);
int swap_ioctl(BlockDriverState *bs, unsigned long int req, void *buf);

#endif /* __BLOCK_SWAP_H__ */
15 changes: 13 additions & 2 deletions img-heat.c
Expand Up @@ -40,9 +40,18 @@ static void wait(void) {
}
}

static ioh_event close_event;
static int can_exit = 0;

static void close_event_cb(void *opaque)
{
int *pi = opaque;
*pi = 1;
}

static void *disk_swap_thread(void *bs)
{
for (;;) {
while (!can_exit) {
aio_wait();
}
return NULL;
Expand Down Expand Up @@ -72,6 +81,7 @@ int main(int argc, char **argv)
const char *trace = argv[2];
FILE *tracefile = fopen(trace, "r");

ioh_event_init(&close_event, close_event_cb, &can_exit);
swap_open(&bs, dst, 0);
pthread_t tid;
pthread_create(&tid, NULL, disk_swap_thread, &bs);
Expand All @@ -94,7 +104,8 @@ int main(int argc, char **argv)
break;
}
}
swap_flush(&bs);
swap_flush(&bs, &close_event);
pthread_join(tid, NULL);
swap_close(&bs);
printf("primed %lu MiB\n", total / 2);
return 0;
Expand Down
23 changes: 13 additions & 10 deletions img-test.c
Expand Up @@ -20,12 +20,6 @@
void init_genrand64(unsigned long long seed);
unsigned long long genrand64_int64(void);

#if defined(_WIN32)
#include <windows.h>
DECLARE_PROGNAME;
#endif /* _WIN32 */


#ifdef _WIN32
static inline double rtc(void)
{
Expand All @@ -50,7 +44,6 @@ static inline double rtc(void)
}
#endif


#if defined(_WIN32)
#include <windows.h>
DECLARE_PROGNAME;
Expand Down Expand Up @@ -147,9 +140,18 @@ static void wait(void) {
}
}

static ioh_event close_event;
static int can_exit = 0;

static void close_event_cb(void *opaque)
{
int *pi = opaque;
*pi = 1;
}

static void *disk_swap_thread(void *bs)
{
for (;;) {
while (!can_exit) {
aio_wait();
}
return NULL;
Expand Down Expand Up @@ -187,6 +189,7 @@ int main(int argc, char **argv)
swap_create(dst, 100 << 20, 0);
}

ioh_event_init(&close_event, close_event_cb, &can_exit);
swap_open(&bs, dst, 0);
pthread_t tid;
pthread_create(&tid, NULL, disk_swap_thread, &bs);
Expand Down Expand Up @@ -231,7 +234,6 @@ int main(int argc, char **argv)
exit(1);
}
}
swap_flush(&bs);
t1 = rtc();
dt = t1 - t0;
printf("%.1f writes/s, %.2fMiB/s %s\n", ((double)i) / dt,
Expand Down Expand Up @@ -275,7 +277,8 @@ int main(int argc, char **argv)
t0 = t1;

}
swap_flush(&bs);
swap_flush(&bs, &close_event);
pthread_join(tid, NULL);
swap_close(&bs);
printf("test complete\n");
return 0;
Expand Down
23 changes: 13 additions & 10 deletions nbd.c
Expand Up @@ -28,6 +28,10 @@ extern void dump_swapstat(void);

static FILE *tracefile = NULL;

static int should_exit = 0;
static int should_close = 0;
static int can_exit = 0;

struct sock_info {
int sock;
};
Expand Down Expand Up @@ -149,11 +153,7 @@ static void got_data(void *opaque)
switch(ntohl(request.type)) {
case NBD_CMD_FLUSH: {
printf("got flush\n");
swap_flush(ci->bs);
r = safe_write(ci->sock, &reply, sizeof(reply));
if (r != sizeof(reply)) {
err(1, "sock write (c) failed");
}
assert(0);
break;
}

Expand Down Expand Up @@ -216,13 +216,12 @@ static void got_data(void *opaque)
}
}

static int should_exit = 0;
static int should_close = 0;
static BlockDriverState bs;
static ioh_event exit_event;
static ioh_event close_event;
static ioh_event flushed_event;

void close_event_cb(void *opaque)
static void close_event_cb(void *opaque)
{
int *pi = opaque;
*pi = 1;
Expand Down Expand Up @@ -405,19 +404,23 @@ int main(int argc, char **argv)

ioh_event_init(&close_event, &close_event_cb, &should_close);
ioh_event_init(&exit_event, &close_event_cb, &should_exit);
while (!should_exit) {
ioh_event_init(&flushed_event, &close_event_cb, &can_exit);
while (!can_exit) {
aio_wait();
if (should_close) {
shell(script, "close", NULL);
should_close = 0;
}
if (should_exit) {
swap_flush(&bs, &flushed_event);
should_exit = 0;
}
}
ioctl(device, NBD_DISCONNECT);
ioctl(device, NBD_CLEAR_SOCK);
int wstatus;
waitpid(child, &wstatus, 0);

swap_flush(&bs);
dump_swapstat();
swap_close(&bs);
return 0;
Expand Down

0 comments on commit f0ae617

Please sign in to comment.