Skip to content
Permalink
Browse files

add a simple key-value store callable from python

    also rewrite aio and libcurl code
  • Loading branch information...
jacobgorm committed Dec 14, 2018
1 parent 51e9ee9 commit bfe20b789cfebedd91b9599106e370e4bef57812
Showing with 724 additions and 156 deletions.
  1. +32 −0 CMakeLists.txt
  2. +162 −81 aio.c
  3. +5 −5 aio.h
  4. +1 −0 block-swap/crypto.c
  5. +16 −11 block-swap/dubtree.c
  6. +1 −1 img-fsck.c
  7. +2 −2 img-heat.c
  8. +2 −2 img-test.c
  9. +32 −38 ioh.c
  10. +2 −4 ioh.h
  11. +265 −0 kv.c
  12. +36 −0 kv.h
  13. +22 −12 nbd.c
  14. +146 −0 pykv.c
@@ -5,6 +5,8 @@ find_package(CURL)
find_package(OpenSSL)

set(CMAKE_C_FLAGS "-Wall -Werror -Ofast -Wfatal-errors -Wno-unused-result -D_POSIX_C_SOURCE=200809L -g")
set(CMAKE_POSITION_INDEPENDENT_CODE ON)


#set (SANITIZE 1)
if (SANITIZE)
@@ -25,6 +27,16 @@ add_library(swap
block-swap/simpletree.c
)

add_library(kv
kv.c
aio.c
ioh.c
block-swap/crypto.c
block-swap/dubtree.c
block-swap/hashtable.c
block-swap/simpletree.c
)

install(TARGETS swap
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
@@ -42,3 +54,23 @@ add_executable(img-heat img-heat.c mt19937-64.c)
target_link_libraries(img-heat swap -lpthread -llz4 -luuid ${OPENSSL_LIBRARIES} ${CURL_LIBRARIES})

install(TARGETS oneroot img-fsck img-test DESTINATION bin)

find_package(PythonInterp 3.5 REQUIRED)
find_package(PythonLibs 3.5 REQUIRED)

add_library(pykv SHARED pykv.c)
set_target_properties(
pykv
PROPERTIES
PREFIX ""
OUTPUT_NAME pykv
LINKER_LANGUAGE C
)
target_include_directories(pykv PUBLIC
${PYTHON_INCLUDE_DIRS}
${NUMPY_INCLUDE_DIR}
${PROJECT_SOURCE_DIR}/src/headers
)
target_link_libraries(pykv
${PYTHON_LIBRARIES}
kv -lpthread -llz4 -luuid ${OPENSSL_LIBRARIES} ${CURL_LIBRARIES})
243 aio.c
@@ -1,6 +1,7 @@
#include <assert.h>
#include <err.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -13,8 +14,6 @@
#include <curl/curl.h>
#include <curl/easy.h>

struct CURLM *cmh;

typedef struct AioEntry {
int fd;
void (*cb) (void *opaque);
@@ -23,24 +22,72 @@ typedef struct AioEntry {

static AioEntry aios[1024];

void swap_aio_init(void)
{
ioh_init();
static int curl_timer_callback(CURLM *cmd, long timeout_ms, void *userp);
static int curl_socket_cb(CURL *ch, curl_socket_t s, int what, void *cbp, void *sockp);

struct curl_state {
pthread_mutex_t lock;
struct CURLM *cmh;
long timeout;
ioh_event curl_wakeup;
fd_set readset, writeset, errset;
int max;
int running;
} curl_global;

static inline struct curl_state *cs_get(void *opaque) {
struct curl_state *cs = opaque;
pthread_mutex_lock(&cs->lock);
return cs;
}

static inline void cs_put(struct curl_state **pcs) {
struct curl_state *cs = *pcs;
pthread_mutex_unlock(&cs->lock);
*pcs = NULL;
}

static void curl_wakeup_cb(void *opaque) {
struct curl_state *cs = cs_get(opaque);
curl_multi_socket_action(cs->cmh, CURL_SOCKET_TIMEOUT, 0, &cs->running);
ioh_event_clear(&cs->curl_wakeup);
cs_put(&cs);
}

static void aio_init_curl(struct curl_state *cs) {

pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&cs->lock, &attr);

ioh_event_init(&cs->curl_wakeup, curl_wakeup_cb, cs);
cs->cmh = curl_multi_init();
curl_multi_setopt(cs->cmh, CURLMOPT_TIMERFUNCTION, curl_timer_callback);
curl_multi_setopt(cs->cmh, CURLMOPT_TIMERDATA, &cs->timeout);
curl_multi_setopt(cs->cmh, CURLMOPT_SOCKETFUNCTION, curl_socket_cb);
curl_multi_setopt(cs->cmh, CURLMOPT_SOCKETDATA, &curl_global);
FD_ZERO(&cs->readset);
FD_ZERO(&cs->writeset);
FD_ZERO(&cs->errset);
cs->max = -1;
}

void aio_init(void) {
for (int i = 0; i < sizeof(aios) / sizeof(aios[0]); ++i) {
AioEntry *e = &aios[i];
memset(e, 0, sizeof(*e));
e->fd = -1;
}
curl_global_init(CURL_GLOBAL_ALL);
cmh = curl_multi_init();
curl_global_init(CURL_GLOBAL_DEFAULT);
aio_init_curl(&curl_global);
}

void swap_aio_close(void)
{
curl_multi_cleanup(cmh);
void aio_close(void) {
//curl_multi_cleanup(cmh);
}

void swap_aio_add_wait_object(int fd, void (*cb) (void *opaque), void *opaque) {
void aio_add_wait_object(int fd, void (*cb) (void *opaque), void *opaque) {
for (int i = 0; i < sizeof(aios) / sizeof(aios[0]); ++i) {
AioEntry *e = &aios[i];
if (e->fd == -1 && __sync_bool_compare_and_swap(&e->fd, -1, fd)) {
@@ -52,62 +99,65 @@ void swap_aio_add_wait_object(int fd, void (*cb) (void *opaque), void *opaque) {
assert(0);
}

int swap_aio_add_curl_handle(CURL *ch) {
assert(cmh);
return (int) curl_multi_add_handle(cmh, ch);
int aio_add_curl_handle(CURL *ch) {
int r;
struct curl_state *cs = cs_get(&curl_global);
CURLMcode cr = curl_multi_add_handle(curl_global.cmh, ch);
if (cr == CURLM_OK) {
ioh_event_set(&cs->curl_wakeup);
r = 0;
} else {
errx(1, "%s %p\n", curl_multi_strerror(cr), cs->cmh);
r = -1;
}
cs_put(&cs);
return r;
}

extern void dump_swapstat(void);
void dubtree_cleanup_curl_handle(CURL *ch);

int swap_aio_wait(void)
static int curl_timer_callback(CURLM *cmd, long timeout_ms, void *userp) {
*((long *) userp) = timeout_ms;
return 0;
}

static int curl_socket_cb(CURL *ch, curl_socket_t s, int what, void *opaque, void *sockp)
{
assert(cmh);
assert(s < FD_SETSIZE);
struct curl_state *cs = opaque;
if (what == CURL_POLL_REMOVE) {
FD_CLR(s, &cs->readset);
FD_CLR(s, &cs->writeset);
FD_CLR(s, &cs->errset);
} else {
FD_SET(s, &cs->readset);
FD_SET(s, &cs->writeset);
FD_SET(s, &cs->errset);
cs->max = cs->max > (s + 1) ? cs->max : (s + 1);
}
return 0;
}

void aio_wait(void) {
long timeout = -1;
int max = -1;
struct timeval tv = {0, 0};
fd_set readset, writeset, errset;
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&errset);

CURLMcode mode;
int running;
int num_msgs;
struct timeval tv = {};
long curl_timeout;
do {
mode = curl_multi_perform(cmh, &running);
} while (mode == CURLM_CALL_MULTI_PERFORM);

do {
CURLMsg *msg = curl_multi_info_read(cmh, &num_msgs);
if (msg) {
int response;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE,
&response);
if (response != 200 && response != 206) {
errx(1, "got HTTP response %u\n", response);
}
curl_multi_remove_handle(cmh, msg->easy_handle);
dubtree_cleanup_curl_handle(msg->easy_handle);
curl_easy_cleanup(msg->easy_handle);
}
} while (num_msgs);

CURLMcode cr;
cr = curl_multi_fdset(cmh, &readset, &writeset, &errset, &max);
if (cr != CURLM_OK) {
printf("curl_multi_fdset failed with %s!\n", curl_multi_strerror(cr));
assert(0);
}

curl_multi_timeout(cmh, &curl_timeout);
if (curl_timeout < 0) {
curl_timeout = 1000;
}
if (max == -1) {
curl_timeout = 1000; /// XXX curl says 10
struct curl_state *cs = cs_get(&curl_global);
if (cs->running) {
memcpy(&readset, &cs->readset, sizeof(readset));
memcpy(&writeset, &cs->writeset, sizeof(writeset));
memcpy(&errset, &cs->errset, sizeof(errset));
timeout = cs->timeout;
max = cs->max;
} else {
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_ZERO(&errset);
}
cs_put(&cs);

max = max > ioh_fd() ? max : ioh_fd();
FD_SET(ioh_fd(), &readset);
@@ -120,29 +170,48 @@ int swap_aio_wait(void)
}
}

if (timeout >= 0) {
tv.tv_sec = timeout / 1000;
if(tv.tv_sec > 1) {
tv.tv_sec = 1;
} else {
tv.tv_usec = (timeout % 1000) * 1000;
}
}
int r;
do {
r = select(max + 1, &readset, &writeset, &errset,
timeout >= 0 ? &tv : NULL);
if (r < 0) {
warn("select failed");
}
} while (r < 0 && errno == EINTR);
if (r < 0) {
err(1, "select failed");
}

/* convert to struct usable by select */
tv.tv_sec = curl_timeout / 1000;
tv.tv_usec = (curl_timeout % 1000) * 1000;
//fprintf(stderr, "wait %ld %ld\n", tv.tv_sec, tv.tv_usec);

int r = select(max + 1, &readset, &writeset, &errset, &tv);
if (r > 0) {
if (FD_ISSET(ioh_fd(), &readset)) {
for (;;) {
char byte;
int r2 = read(ioh_fd(), &byte, sizeof(byte));
if (r2 != sizeof(byte)) {
if (errno == EWOULDBLOCK) {
break;
} else {
printf("r=%d\n", r2);
assert(0);
}
if (r == 0) {
cs = cs_get(&curl_global);
curl_multi_socket_action(cs->cmh, CURL_SOCKET_TIMEOUT, 0, &cs->running);
cs_put(&cs);
} else if (r > 0) {
cs = cs_get(&curl_global);
for (int i = 0; i < max; ++i) {
if (FD_ISSET(i, &cs->readset)) {
int evmask = 0;
evmask |= FD_ISSET(i, &readset) ? CURL_CSELECT_IN : 0;
evmask |= FD_ISSET(i, &writeset) ? CURL_CSELECT_OUT : 0;
evmask |= FD_ISSET(i, &errset) ? CURL_CSELECT_ERR : 0;
if (evmask) {
curl_multi_socket_action(cs->cmh, i, 0, &cs->running);
}
ioh_event_service_callbacks();
}
}
cs_put(&cs);

if (FD_ISSET(ioh_fd(), &readset)) {
ioh_service();
}
for (int i = 0; i < sizeof(aios) / sizeof(aios[0]); ++i) {
AioEntry *e = &aios[i];
int fd = e->fd;
@@ -151,12 +220,24 @@ int swap_aio_wait(void)
e->cb(e->opaque);
}
}
} else {
//dump_swapstat();
}
if (r == 0 && curl_timeout < 0) {
return 1;
} else {
return 0;

cs = cs_get(&curl_global);
int num_msgs;
CURLMsg *msg;
while ((msg = curl_multi_info_read(cs->cmh, &num_msgs))) {
if (msg->msg == CURLMSG_DONE) {
int response;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE,
&response);
if (response != 200 && response != 206) {
printf("got bad response %u\n", response);
assert(0);
}
dubtree_cleanup_curl_handle(msg->easy_handle);
curl_multi_remove_handle(cs->cmh, msg->easy_handle);
curl_easy_cleanup(msg->easy_handle);
}
}
cs_put(&cs);
}
10 aio.h
@@ -4,10 +4,10 @@
struct ioh_event;
struct CURL;

void swap_aio_init(void);
void swap_aio_close(void);
int swap_aio_add_curl_handle(struct CURL *ch);
void swap_aio_add_wait_object(int fd, void (*cb) (void *opaque), void *opaque);
int swap_aio_wait(void);
void aio_init(void);
void aio_close(void);
int aio_add_curl_handle(struct CURL *ch);
void aio_add_wait_object(int fd, void (*cb) (void *opaque), void *opaque);
void aio_wait(void);

#endif /* __AIO_H__ */
@@ -21,6 +21,7 @@ __attribute__((constructor)) static void crypto_global_init(void)

void crypto_init(Crypto *crypto, const uint8_t *key)
{
assert(key);
if (!(crypto->cipher = EVP_aes_256_gcm())) {
errx(1, "EVP_aes_256_gcm failed");
}

0 comments on commit bfe20b7

Please sign in to comment.
You can’t perform that action at this time.