Skip to content
Permalink
Browse files

kv: buffer inserts and apply them from background thread

kv: use a very large buffer (.5GiB) to paper over merge latencies
  • Loading branch information
jacobgorm committed Feb 7, 2019
1 parent 198abef commit eee043dd41a6d4bb16b4bf84ff06493f8a3ff034
Showing with 90 additions and 9 deletions.
  1. +69 −9 kv.c
  2. +21 −0 kv.h
78 kv.c
@@ -15,6 +15,8 @@
#include "aio.h"
#include "kv.h"

static const int MAX_PENDING = 64; // times 8GiB

static void *kv_malloc(void *_s, size_t sz) {
return malloc(sz);
}
@@ -48,18 +50,52 @@ static void *kv_aio_thread(void *bs)
return NULL;
}

static void *kv_insert_thread(void *opaque)
{
struct kv *kv = opaque;
struct kv_buffered_insert *kbi;

for (;;) {
pthread_mutex_lock(&kv->insert_mutex);
while (!(kbi = TAILQ_FIRST(&kv->insert_queue))) {
if (kv->shutdown) {
return NULL;
}
pthread_cond_wait(&kv->insert_cond, &kv->insert_mutex);
}
TAILQ_REMOVE(&kv->insert_queue, kbi, queue_entry);
pthread_mutex_unlock(&kv->insert_mutex);

dubtree_insert(kv->t, kbi->n, kbi->keys, kbi->buffer, kbi->sizes, 0);
pthread_mutex_lock(&kv->ready_mutex);
if (--(kv->num_pending) < MAX_PENDING) {
pthread_cond_signal(&kv->ready_cond);
}
pthread_mutex_unlock(&kv->ready_mutex);

free(kbi->buffer);
free(kbi);
}
return NULL;
}

int kv_global_init(void) {
aio_global_init();
pthread_t tid;
pthread_create(&tid, NULL, kv_aio_thread, NULL);
return 0;
}

#define BUFFER_MAX (4<<20)
#define BUFFER_MAX (8<<20)

int kv_init(struct kv *kv, const char *kvinfo, int delete_on_close) {

memset(kv, 0, sizeof(*kv));
pthread_mutex_init(&kv->insert_mutex, NULL);
pthread_cond_init(&kv->insert_cond, NULL);
pthread_mutex_init(&kv->ready_mutex, NULL);
pthread_cond_init(&kv->ready_cond, NULL);
TAILQ_INIT(&kv->insert_queue);
pthread_create(&kv->insert_tid, NULL, kv_insert_thread, kv);

chunk_id_t top_id = {};
hash_t top_hash = {};
@@ -153,9 +189,20 @@ static int list_cmp(const void *va, const void *vb) {
}
}

static inline void kv_wait_pending(struct kv *kv, int howmany, int increment) {
pthread_mutex_lock(&kv->ready_mutex);
while (kv->num_pending > howmany) {
pthread_cond_wait(&kv->ready_cond, &kv->ready_mutex);
}
kv->num_pending += increment;
pthread_mutex_unlock(&kv->ready_mutex);
}

int kv_flush(struct kv *kv) {
int n = kv->n;
if (n) {
kv_wait_pending(kv, MAX_PENDING - 1, 1);
struct kv_buffered_insert *kbi = malloc(sizeof(struct kv_buffered_insert));
struct entry *list = malloc(sizeof(struct entry) * n);
size_t total = 0;
for (int i = 0; i < n; ++i) {
@@ -165,17 +212,22 @@ int kv_flush(struct kv *kv) {
total += kv->sizes[i];
}
qsort(list, n, sizeof(list[0]), list_cmp);
uint8_t *tmp = malloc(total);
uint8_t *t = tmp;
uint8_t *t;
t = kbi->buffer = malloc(total);
for (int i = 0; i < n; ++i) {
memcpy(t, kv->buffer + list[i].offset, list[i].size);
kv->keys[i] = list[i].key;
kv->offsets[i] = list[i].offset;
kv->sizes[i] = list[i].size;
kbi->keys[i] = list[i].key;
kbi->offsets[i] = list[i].offset;
kbi->sizes[i] = list[i].size;
t += list[i].size;
}
dubtree_insert(kv->t, kv->n, kv->keys, tmp, kv->sizes, 0);
free(tmp);
kbi->n = n;

pthread_mutex_lock(&kv->insert_mutex);
TAILQ_INSERT_TAIL(&kv->insert_queue, kbi, queue_entry);
pthread_cond_signal(&kv->insert_cond);
pthread_mutex_unlock(&kv->insert_mutex);

free(list);
kv->n = 0;
kv->b = kv->buffer;
@@ -206,6 +258,7 @@ int kv_find(struct kv *kv, uint8_t **rptr, size_t *rsize, uint64_t key) {
int r;

kv_flush(kv);
kv_wait_pending(kv, 0, 0);

uint64_t range = 0x100;
uint64_t base = key & ~(range - 1ULL);
@@ -232,6 +285,7 @@ int kv_find(struct kv *kv, uint8_t **rptr, size_t *rsize, uint64_t key) {

int kv_save(struct kv *kv, char *buffer, size_t size) {
kv_flush(kv);
kv_wait_pending(kv, 0, 0);

chunk_id_t top_id = {};
hash_t top_hash = {};
@@ -252,6 +306,12 @@ int kv_save(struct kv *kv, char *buffer, size_t size) {
}

int kv_close(struct kv *kv) {
kv_flush(kv);
kv->shutdown = 1;
pthread_cond_signal(&kv->insert_cond);
kv_wait_pending(kv, 0, 0);
pthread_join(kv->insert_tid, NULL);

if (!kv->saved) {
dubtree_delete(kv->t);
} else {
21 kv.h
@@ -2,11 +2,24 @@
#define __KV_H__

#include <stdint.h>
#include <pthread.h>

#include "queue.h"

struct DubTree;

#define KV_MAX_KEYS 0x1000

struct kv_buffered_insert {
uint8_t *b;
uint8_t *buffer;
int n;
uint64_t keys[KV_MAX_KEYS];
uint32_t sizes[KV_MAX_KEYS];
uint32_t offsets[KV_MAX_KEYS];
TAILQ_ENTRY(kv_buffered_insert) queue_entry;
};

struct kv {
struct DubTree *t;
char *kvdata;
@@ -23,6 +36,14 @@ struct kv {
int n;
int saved;
void *find_context;
TAILQ_HEAD(, kv_buffered_insert) insert_queue;
pthread_t insert_tid;
pthread_mutex_t insert_mutex;
pthread_cond_t insert_cond;
pthread_mutex_t ready_mutex;
pthread_cond_t ready_cond;
int num_pending;
int shutdown;
};

int kv_global_init(void);

0 comments on commit eee043d

Please sign in to comment.
You can’t perform that action at this time.