Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
added Pipelining
  • Loading branch information
Asmod4n committed Dec 12, 2015
1 parent 8175f99 commit 1560447
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .gitignore
@@ -1 +1 @@
mruby
/mruby/
23 changes: 23 additions & 0 deletions include/mruby/redis.h
@@ -0,0 +1,23 @@
#ifndef MRUBY_REDIS_H
#define MRUBY_REDIS_H

#include <mruby.h>

#ifdef __cplusplus
extern "C" {
#endif

#define E_REDIS_ERROR (mrb_class_get_under(mrb, mrb_class_get(mrb, "Redis"), "ConnectionError"))
#define E_REDIS_REPLY_ERROR (mrb_class_get_under(mrb, mrb_class_get(mrb, "Redis"), "ReplyError"))
#ifndef E_EOF_ERROR
#define E_EOF_ERROR (mrb_class_get(mrb, "EOFError"))
#endif
#define E_REDIS_ERR_PROTOCOL (mrb_class_get_under(mrb, mrb_class_get(mrb, "Redis"), "ProtocolError"))
#define E_REDIS_ERR_OOM (mrb_class_get_under(mrb, mrb_class_get(mrb, "Redis"), "OOMError"))

#ifdef __cplusplus
}
#endif

#endif

7 changes: 7 additions & 0 deletions mrblib/error.rb
@@ -0,0 +1,7 @@
unless Object.const_defined?("IOError")
class IOError < StandardError; end
end

unless Object.const_defined?("EOFError")
class EOFError < IOError; end
end
196 changes: 191 additions & 5 deletions src/mrb_redis.c
Expand Up @@ -25,6 +25,7 @@
** [ MIT license: http://www.opensource.org/licenses/mit-license.php ]
*/

#include <mruby/redis.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -39,6 +40,10 @@
#include "mruby/string.h"
#include "mruby/class.h"
#include "mrb_redis.h"
#include <errno.h>
#include <mruby/error.h>
#include <mruby/throw.h>


#define DONE mrb_gc_arena_restore(mrb, 0);

Expand Down Expand Up @@ -68,6 +73,28 @@ static void redisContext_free(mrb_state *mrb, void *p) { redisFree(p); }

static const struct mrb_data_type redisContext_type = { "redisContext", redisContext_free, };

static inline void mrb_redis_check_error(redisContext *context, mrb_state *mrb) {
if (context->err != 0) {
if (errno != 0) {
mrb_sys_fail(mrb, context->errstr);
} else {
switch (context->err) {
case REDIS_ERR_EOF:
mrb_raise(mrb, E_EOF_ERROR, context->errstr);
break;
case REDIS_ERR_PROTOCOL:
mrb_raise(mrb, E_REDIS_ERR_PROTOCOL, context->errstr);
break;
case REDIS_ERR_OOM:
mrb_raise(mrb, E_REDIS_ERR_OOM, context->errstr);
break;
default:
mrb_raise(mrb, E_REDIS_ERROR, context->errstr);
}
}
}
}

static mrb_value mrb_redis_connect(mrb_state *mrb, mrb_value self) {
mrb_value host, port;
mrb_int timeout = 1;
Expand All @@ -86,9 +113,7 @@ static mrb_value mrb_redis_connect(mrb_state *mrb, mrb_value self) {

rc = redisConnectWithTimeout(mrb_str_to_cstr(mrb, host), mrb_fixnum(port), timeout_struct);
if (rc->err) {
struct RClass *redis = mrb_class_get(mrb, "Redis");
struct RClass *connectionError = mrb_class_get_under(mrb, redis, "ConnectionError");
mrb_raise(mrb, connectionError, "redis connection failed.");
mrb_raise(mrb, E_REDIS_ERROR, "redis connection failed.");
}

DATA_PTR(self) = rc;
Expand Down Expand Up @@ -804,13 +829,171 @@ static mrb_value mrb_redis_close(mrb_state *mrb, mrb_value self) {
return mrb_nil_value();
}

static inline mrb_value mrb_redis_get_ary_reply(redisReply *reply, mrb_state *mrb);

static inline mrb_value mrb_redis_get_reply(redisReply *reply, mrb_state *mrb) {
switch (reply->type) {
case REDIS_REPLY_STRING:
return mrb_str_new(mrb, reply->str, reply->len);
break;
case REDIS_REPLY_ARRAY:
return mrb_redis_get_ary_reply(reply, mrb);
break;
case REDIS_REPLY_INTEGER: {
if (FIXABLE(reply->integer))
return mrb_fixnum_value(reply->integer);
else
return mrb_float_value(mrb, reply->integer);
}
break;
case REDIS_REPLY_NIL:
return mrb_nil_value();
break;
case REDIS_REPLY_STATUS: {
mrb_sym status = mrb_intern(mrb, reply->str, reply->len);
return mrb_symbol_value(status);
}
break;
case REDIS_REPLY_ERROR: {
mrb_value err = mrb_str_new(mrb, reply->str, reply->len);
return mrb_exc_new_str(mrb, E_REDIS_REPLY_ERROR, err);
}
break;
default:
mrb_raise(mrb, E_REDIS_ERROR, "unknown reply type");
}
}

static inline mrb_value mrb_redis_get_ary_reply(redisReply *reply, mrb_state *mrb)
{
mrb_value ary = mrb_ary_new_capa(mrb, reply->elements);
int ai = mrb_gc_arena_save(mrb);
for (size_t element_couter = 0; element_couter < reply->elements; element_couter++) {
mrb_value element = mrb_redis_get_reply(reply->element[element_couter], mrb);
mrb_ary_push(mrb, ary, element);
mrb_gc_arena_restore(mrb, ai);
}
return ary;
}

static mrb_value mrb_redisAppendCommandArgv(mrb_state *mrb, mrb_value self) {
mrb_sym command;
mrb_value *mrb_argv;
mrb_int argc = 0;

mrb_get_args(mrb, "n*", &command, &mrb_argv, &argc);
argc++;

const char *argv[argc];
size_t argvlen[argc];
mrb_int command_len;
argv[0] = mrb_sym2name_len(mrb, command, &command_len);
argvlen[0] = command_len;

for (mrb_int argc_current = 1; argc_current < argc; argc_current++) {
mrb_value curr = mrb_str_to_str(mrb, mrb_argv[argc_current - 1]);
argv[argc_current] = RSTRING_PTR(curr);
argvlen[argc_current] = RSTRING_LEN(curr);
}

mrb_sym queue_counter_sym = mrb_intern_lit(mrb, "queue_counter");
mrb_value queue_counter_val = mrb_iv_get(mrb, self, queue_counter_sym);
mrb_int queue_counter = 1;
if (mrb_fixnum_p(queue_counter_val)) {
queue_counter = mrb_fixnum(queue_counter_val);
if (mrb_int_add_overflow(queue_counter, 1, &queue_counter)) {
mrb_raise(mrb, E_RUNTIME_ERROR, "integer addition would overflow");
}
}

redisContext *context = (redisContext *) DATA_PTR(self);
errno = 0;
int rc = redisAppendCommandArgv(context, argc, argv, argvlen);
if (rc == REDIS_OK) {
mrb_iv_set(mrb, self, queue_counter_sym, mrb_fixnum_value(queue_counter));
} else {
mrb_redis_check_error(context, mrb);
}

return self;
}

static mrb_value mrb_redisGetReply(mrb_state *mrb, mrb_value self) {
mrb_sym queue_counter_sym = mrb_intern_lit(mrb, "queue_counter");
mrb_value queue_counter_val = mrb_iv_get(mrb, self, queue_counter_sym);
mrb_int queue_counter = -1;
if (mrb_fixnum_p(queue_counter_val)) {
queue_counter = mrb_fixnum(queue_counter_val);
}

redisContext *context = (redisContext *) DATA_PTR(self);
redisReply *reply = NULL;
mrb_value reply_val = self;
errno = 0;
int rc = redisGetReply(context, (void **) &reply);
if (rc == REDIS_OK && reply != NULL) {
struct mrb_jmpbuf* prev_jmp = mrb->jmp;
struct mrb_jmpbuf c_jmp;

MRB_TRY(&c_jmp)
{
mrb->jmp = &c_jmp;
reply_val = mrb_redis_get_reply(reply, mrb);
if (queue_counter > 1) {
mrb_iv_set(mrb, self, queue_counter_sym, mrb_fixnum_value(--queue_counter));
} else {
mrb_iv_remove(mrb, self, queue_counter_sym);
}
mrb->jmp = prev_jmp;
}
MRB_CATCH(&c_jmp)
{
mrb->jmp = prev_jmp;
freeReplyObject(reply);
MRB_THROW(mrb->jmp);
}
MRB_END_EXC(&c_jmp);

freeReplyObject(reply);
} else {
mrb_redis_check_error(context, mrb);
}

return reply_val;
}

static mrb_value
mrb_redisGetBulkReply(mrb_state *mrb, mrb_value self)
{
mrb_value queue_counter_val = mrb_iv_get(mrb, self, mrb_intern_lit(mrb, "queue_counter"));

if (!mrb_fixnum_p(queue_counter_val))
mrb_raise(mrb, E_RUNTIME_ERROR, "nothing queued yet");

mrb_int queue_counter = mrb_fixnum(queue_counter_val);

mrb_value bulk_reply = mrb_ary_new_capa(mrb, queue_counter);
int ai = mrb_gc_arena_save(mrb);

do {
mrb_value reply = mrb_redisGetReply(mrb, self);
mrb_ary_push(mrb, bulk_reply, reply);
mrb_gc_arena_restore(mrb, ai);
} while (--queue_counter > 0);

return bulk_reply;
}

void mrb_mruby_redis_gem_init(mrb_state *mrb) {
struct RClass *redis;
struct RClass *redis, *redis_error;

redis = mrb_define_class(mrb, "Redis", mrb->object_class);
MRB_SET_INSTANCE_TT(redis, MRB_TT_DATA);

mrb_define_class_under(mrb, redis, "ConnectionError", E_RUNTIME_ERROR);
redis_error = mrb_define_class_under(mrb, redis, "ConnectionError", E_RUNTIME_ERROR);
mrb_define_class_under(mrb, redis, "ReplyError", redis_error);
mrb_define_class_under(mrb, redis, "ProtocolError", redis_error);
mrb_define_class_under(mrb, redis, "OOMError", redis_error);

mrb_define_method(mrb, redis, "initialize", mrb_redis_connect, MRB_ARGS_ANY());
mrb_define_method(mrb, redis, "select", mrb_redis_select, MRB_ARGS_REQ(1));
Expand Down Expand Up @@ -855,6 +1038,9 @@ void mrb_mruby_redis_gem_init(mrb_state *mrb) {
mrb_define_method(mrb, redis, "zscore", mrb_redis_zscore, MRB_ARGS_REQ(2));
mrb_define_method(mrb, redis, "publish", mrb_redis_pub, MRB_ARGS_ANY());
mrb_define_method(mrb, redis, "close", mrb_redis_close, MRB_ARGS_NONE());
mrb_define_method(mrb, redis, "queue", mrb_redisAppendCommandArgv, (MRB_ARGS_REQ(1)|MRB_ARGS_REST()));
mrb_define_method(mrb, redis, "reply", mrb_redisGetReply, MRB_ARGS_NONE());
mrb_define_method(mrb, redis, "bulk_reply", mrb_redisGetBulkReply, MRB_ARGS_NONE());
DONE;
}

Expand Down
16 changes: 16 additions & 0 deletions test/redis.rb
Expand Up @@ -448,6 +448,22 @@
assert_equal 2, r.zcard("myzset")
end

assert("Pipelined commands") do
redis = Redis.new HOST, PORT
redis.queue(:set, "mruby-redis-test:foo", "bar")
redis.queue(:get, "mruby-redis-test:foo")
assert_equal(:OK, redis.reply)
assert_equal("bar", redis.reply)

redis.queue(:set, "mruby-redis-test:foo", "bar")
redis.queue(:get, "mruby-redis-test:foo")
assert_equal([:OK, "bar"], redis.bulk_reply)

redis.queue(:nonexistant)
assert_kind_of(Redis::ReplyError, redis.reply)
redis.del("mruby-redis-test:foo")
end

#assert("Redis#zrevrange") do
# r = Redis.new HOST, PORT
# r.del "hs"
Expand Down

0 comments on commit 1560447

Please sign in to comment.