Skip to content

Commit

Permalink
Start implementing support for operations on Redis lists, e.g. for lo…
Browse files Browse the repository at this point in the history
…gging.
  • Loading branch information
Castaglia committed Jan 13, 2017
1 parent 9324e99 commit 116db17
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 2 deletions.
10 changes: 10 additions & 0 deletions include/redis.h
Expand Up @@ -70,6 +70,11 @@ int pr_redis_remove(pr_redis_t *redis, module *m, const char *key);
int pr_redis_set(pr_redis_t *redis, module *m, const char *key, void *value,
size_t valuesz, time_t expires);

/* List operations */
int pr_redis_list_append(pr_redis_t *redis, module *m, const char *key,
void *value, size_t valuesz);
int pr_redis_list_remove(pr_redis_t *redis, module *m, const char *key);

/* Variants of the above, where the key values are arbitrary bits rather
* than being assumed to be strings.
*/
Expand All @@ -88,6 +93,11 @@ int pr_redis_kremove(pr_redis_t *redis, module *m, const char *key,
int pr_redis_kset(pr_redis_t *redis, module *m, const char *key, size_t keysz,
void *value, size_t valuesz, time_t expires);

int pr_redis_list_kappend(pr_redis_t *redis, module *m, const char *key,
size_t keysz, void *value, size_t valuesz);
int pr_redis_list_kremove(pr_redis_t *redis, module *m, const char *key,
size_t keysz);

/* For internal use only */

int redis_set_server(const char *server, int port);
Expand Down
144 changes: 142 additions & 2 deletions src/redis.c
Expand Up @@ -379,8 +379,8 @@ int pr_redis_conn_destroy(pr_redis_t *redis) {
}

int pr_redis_conn_clone(pool *p, pr_redis_t *redis) {
errno = ENOSYS;
return -1;
/* This is a no-op, for now. */
return 0;
}

static int modptr_cmp_cb(const void *k1, size_t ksz1, const void *k2,
Expand Down Expand Up @@ -658,6 +658,57 @@ int pr_redis_set(pr_redis_t *redis, module *m, const char *key, void *value,
return 0;
}

int pr_redis_list_append(pr_redis_t *redis, module *m, const char *key,
void *value, size_t valuesz) {
int res;

if (redis == NULL ||
m == NULL ||
key == NULL ||
value == NULL ||
valuesz == 0) {
errno = EINVAL;
return -1;
}

res = pr_redis_list_kappend(redis, m, key, strlen(key), value, valuesz);
if (res < 0) {
int xerrno = errno;

pr_trace_msg(trace_channel, 2,
"error appending to list using key '%s': %s", key, strerror(xerrno));

errno = xerrno;
return -1;
}

return 0;
}

int pr_redis_list_remove(pr_redis_t *redis, module *m, const char *key) {
int res;

if (redis == NULL ||
m == NULL ||
key == NULL) {
errno = EINVAL;
return -1;
}

res = pr_redis_list_kremove(redis, m, key, strlen(key));
if (res < 0) {
int xerrno = errno;

pr_trace_msg(trace_channel, 2,
"error removing list using key '%s': %s", key, strerror(xerrno));

errno = xerrno;
return -1;
}

return 0;
}

static const char *get_namespace_prefix(pr_redis_t *redis, module *m) {
const char *prefix = NULL;

Expand Down Expand Up @@ -1118,6 +1169,72 @@ int pr_redis_kset(pr_redis_t *redis, module *m, const char *key, size_t keysz,
return 0;
}

int pr_redis_list_kappend(pr_redis_t *redis, module *m, const char *key,
size_t keysz, void *value, size_t valuesz) {
int xerrno = 0;
pool *tmp_pool = NULL;
const char *cmd = NULL, *namespace_prefix;
redisReply *reply;

if (redis == NULL ||
m == NULL ||
key == NULL ||
keysz == 0 ||
value == NULL ||
valuesz == 0) {
errno = EINVAL;
return -1;
}

tmp_pool = make_sub_pool(redis->pool);
pr_pool_tag(tmp_pool, "Redis RPUSH pool");

namespace_prefix = get_namespace_prefix(redis, m);
if (namespace_prefix != NULL) {
key = pstrcat(tmp_pool, namespace_prefix, key, NULL);
}

cmd = "RPUSH";
reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
xerrno = errno;

if (reply == NULL) {
pr_trace_msg(trace_channel, 2,
"error appending to list using key (%lu bytes): %s",
(unsigned long) keysz, redis_strerror(tmp_pool, redis, xerrno));
destroy_pool(tmp_pool);
errno = xerrno;
return -1;
}

if (reply->type != REDIS_REPLY_INTEGER) {
pr_trace_msg(trace_channel, 2,
"expected INTEGER reply for %s, got %s", cmd, get_reply_type(reply));

if (reply->type == REDIS_REPLY_ERROR) {
pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
}

freeReplyObject(reply);
destroy_pool(tmp_pool);
errno = EINVAL;
return -1;
}

pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);

freeReplyObject(reply);
destroy_pool(tmp_pool);
return 0;
}

int pr_redis_list_kremove(pr_redis_t *redis, module *m, const char *key,
size_t keysz) {

/* Note: We can actually use just DEL here. */
return pr_redis_kremove(redis, m, key, keysz);
}

int redis_set_server(const char *server, int port) {
if (server == NULL ||
port < 1) {
Expand Down Expand Up @@ -1223,6 +1340,17 @@ int pr_redis_set(pr_redis_t *redis, module *m, const char *key, void *value,
return -1;
}

int pr_redis_list_append(pr_redis_t *redis, module *m, const char *key,
void *value, size_t valuesz) {
errno = ENOSYS;
return -1;
}

int pr_redis_list_remove(pr_redis_t *redis, module *m, const char *key) {
errno = ENOSYS;
return -1;
}

int pr_redis_kadd(pr_redis_t *redis, module *m, const char *key, size_t keysz,
void *value, size_t valuesz, time_t expires) {
errno = ENOSYS;
Expand Down Expand Up @@ -1253,6 +1381,18 @@ int pr_redis_kset(pr_redis_t *redis, module *m, const char *key, size_t keysz,
return -1;
}

int pr_redis_list_kappend(pr_redis_t *redis, module *m, const char *key,
size_t keysz, void *value, size_t valuesz) {
errno = ENOSYS;
return -1;
}

int pr_redis_list_kremove(pr_redis_t *redis, module *m, const char *key,
size_t keysz) {
errno = ENOSYS;
return -1;
}

int redis_set_server(const char *server, int port) {
errno = ENOSYS;
return -1;
Expand Down
113 changes: 113 additions & 0 deletions tests/api/redis.c
Expand Up @@ -753,6 +753,116 @@ START_TEST (redis_set_test) {
}
END_TEST

START_TEST (redis_list_remove_test) {
int res;
pr_redis_t *redis;
module m;
const char *key;

mark_point();
res = pr_redis_list_remove(NULL, NULL, NULL);
fail_unless(res < 0, "Failed to handle null redis");
fail_unless(errno == EINVAL, "Expected EINVAL (%d), got %s (%d)", EINVAL,
strerror(errno), errno);

mark_point();
redis = pr_redis_conn_new(p, NULL, 0);
fail_unless(redis != NULL, "Failed to open connection to Redis: %s",
strerror(errno));

mark_point();
res = pr_redis_list_remove(redis, NULL, NULL);
fail_unless(res < 0, "Failed to handle null module");
fail_unless(errno == EINVAL, "Expected EINVAL (%d), got %s (%d)", EINVAL,
strerror(errno), errno);

mark_point();
res = pr_redis_list_remove(redis, &m, NULL);
fail_unless(res < 0, "Failed to handle null key");
fail_unless(errno == EINVAL, "Expected EINVAL (%d), got %s (%d)", EINVAL,
strerror(errno), errno);

key = "testlistkey";

mark_point();
res = pr_redis_list_remove(redis, &m, key);
fail_unless(res < 0, "Failed to handle nonexistent list");
fail_unless(errno == ENOENT, "Expected ENOENT (%d), got %s (%d)", ENOENT,
strerror(errno), errno);

mark_point();
res = pr_redis_conn_destroy(redis);
fail_unless(res == 0, "Failed to close redis: %s", strerror(errno));
}
END_TEST

START_TEST (redis_list_append_test) {
int res;
pr_redis_t *redis;
module m;
const char *key;
char *val;
size_t valsz;

mark_point();
res = pr_redis_list_append(NULL, NULL, NULL, NULL, 0);
fail_unless(res < 0, "Failed to handle null redis");
fail_unless(errno == EINVAL, "Expected EINVAL (%d), got %s (%d)", EINVAL,
strerror(errno), errno);

mark_point();
redis = pr_redis_conn_new(p, NULL, 0);
fail_unless(redis != NULL, "Failed to open connection to Redis: %s",
strerror(errno));

mark_point();
res = pr_redis_list_append(redis, NULL, NULL, NULL, 0);
fail_unless(res < 0, "Failed to handle null module");
fail_unless(errno == EINVAL, "Expected EINVAL (%d), got %s (%d)", EINVAL,
strerror(errno), errno);

mark_point();
res = pr_redis_list_append(redis, &m, NULL, NULL, 0);
fail_unless(res < 0, "Failed to handle null key");
fail_unless(errno == EINVAL, "Expected EINVAL (%d), got %s (%d)", EINVAL,
strerror(errno), errno);

key = "testlistkey";

mark_point();
res = pr_redis_list_append(redis, &m, key, NULL, 0);
fail_unless(res < 0, "Failed to handle null value");
fail_unless(errno == EINVAL, "Expected EINVAL (%d), got %s (%d)", EINVAL,
strerror(errno), errno);

val = "Some JSON here";

mark_point();
res = pr_redis_list_append(redis, &m, key, val, 0);
fail_unless(res < 0, "Failed to handle empty value");
fail_unless(errno == EINVAL, "Expected EINVAL (%d), got %s (%d)", EINVAL,
strerror(errno), errno);

valsz = strlen(val);

mark_point();
(void) pr_redis_list_remove(redis, &m, key);

mark_point();
res = pr_redis_list_append(redis, &m, key, val, valsz);
fail_unless(res == 0, "Failed to append to list '%s': %s", key,
strerror(errno));

mark_point();
res = pr_redis_list_remove(redis, &m, key);
fail_unless(res == 0, "Failed to remove list '%s': %s", key, strerror(errno));

mark_point();
res = pr_redis_conn_destroy(redis);
fail_unless(res == 0, "Failed to close redis: %s", strerror(errno));
}
END_TEST

#endif /* PR_USE_REDIS */

Suite *tests_get_redis_suite(void) {
Expand All @@ -779,6 +889,9 @@ Suite *tests_get_redis_suite(void) {
tcase_add_test(testcase, redis_decr_test);
tcase_add_test(testcase, redis_set_test);

tcase_add_test(testcase, redis_list_remove_test);
tcase_add_test(testcase, redis_list_append_test);

suite_add_tcase(suite, testcase);
#endif /* PR_USE_REDIS */

Expand Down

0 comments on commit 116db17

Please sign in to comment.