Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Multi-interface for synchronouse mode

Synchronous interface has multi-mode

Callbacks refactored to manipulate opaque AV* directly, instead of
proxying values to PLCB_sync_t.

Provided functions to switch callbacks between multi and non-multi mode

CAS is now an IV on 64 bit perls.

Tests for multi-mode
  • Loading branch information...
commit 90c6b30a38000e4f4e5aaf8e4459f49240fbbcc7 1 parent 1839081
@mnunberg authored
View
113 Client.xs
@@ -1,6 +1,7 @@
#include "perl-couchbase.h"
#include "plcb-util.h"
#include <libcouchbase/libevent_io_opts.h>
+
static inline void
wait_for_single_response(PLCB_t *object)
{
@@ -66,7 +67,7 @@ SV *PLCB_construct(const char *pkg, AV *options)
libcouchbase_set_cookie(instance, object);
- plcb_setup_callbacks(object);
+ plcb_callbacks_setup(object);
blessed_obj = newSV(0);
sv_setiv(newSVrv(blessed_obj, "Couchbase::Client"), PTR2IV(object));
@@ -112,6 +113,19 @@ PLCB_connect(SV *self)
}
+#define _sync_return_single(object, err, syncp) \
+ if(err != LIBCOUCHBASE_SUCCESS ) { \
+ plcb_ret_set_err(object, syncp->ret, err); \
+ } else { \
+ wait_for_single_response(object); \
+ } \
+ return plcb_ret_blessed_rv(object, syncp->ret);
+
+#define _sync_initialize_single(object, syncp); \
+ syncp = &object->sync; \
+ syncp->parent = object; \
+ syncp->ret = newAV();
+
static SV *PLCB_set_common(SV *self,
SV *key, SV *value,
int storop,
@@ -123,23 +137,18 @@ static SV *PLCB_set_common(SV *self,
STRLEN klen = 0, vlen = 0;
char *skey, *sval;
PLCB_sync_t *syncp;
- AV *ret_av;
- SV *ret_rv;
time_t exp;
uint32_t store_flags = 0;
mk_instance_vars(self, instance, object);
plcb_get_str_or_die(key, skey, klen, "Key");
- plcb_get_str_or_die(value, sval, vlen, "Value");
-
- syncp = &(object->sync);
- plcb_sync_initialize(syncp, object, skey, klen);
-
+ plcb_get_str_or_die(value, sval, vlen, "Value");
/*Clear existing error status first*/
av_clear(object->errors);
- ret_av = newAV();
+
+ _sync_initialize_single(object, syncp);
exp = exp_offset ? time(NULL) + exp_offset : 0;
@@ -148,14 +157,7 @@ static SV *PLCB_set_common(SV *self,
skey, klen, SvPVX(value), vlen, store_flags, exp, cas);
plcb_convert_storage_free(object, value, store_flags);
- if(err != LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_err(object, ret_av, err);
- } else {
- wait_for_single_response(object);
- plcb_ret_set_err(object, ret_av, syncp->err);
- plcb_ret_set_cas(object, ret_av, &syncp->cas);
- }
- bless_return(object, ret_rv, ret_av);
+ _sync_return_single(object, err, syncp);
}
static SV *PLCB_arithmetic_common(SV *self,
@@ -167,9 +169,6 @@ static SV *PLCB_arithmetic_common(SV *self,
libcouchbase_t instance;
char *skey;
- SV *ret_rv;
- AV *ret_av;
-
STRLEN nkey;
PLCB_sync_t *syncp;
@@ -180,26 +179,15 @@ static SV *PLCB_arithmetic_common(SV *self,
exp = exp_offset ? time(NULL) + exp_offset : 0;
plcb_get_str_or_die(key, skey, nkey, "Key");
-
- syncp = &(object->sync);
- plcb_sync_initialize(syncp, object, skey, nkey);
- ret_av = newAV();
+
+ _sync_initialize_single(object, syncp);
err = libcouchbase_arithmetic(
instance, syncp, skey, nkey, delta,
exp, do_create, initial
);
- if(err != LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_err(object, ret_av, err);
- } else {
- wait_for_single_response(object);
- plcb_ret_set_err(object, ret_av, syncp->err);
-
- if(syncp->err == LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_numval(object, ret_av, syncp->arithmetic, syncp->cas);
- }
- }
- bless_return(object, ret_rv, ret_av);
+
+ _sync_return_single(object, err, syncp);
}
static SV *PLCB_get_common(SV *self, SV *key, int exp_offset)
@@ -210,43 +198,22 @@ static SV *PLCB_get_common(SV *self, SV *key, int exp_offset)
libcouchbase_error_t err;
STRLEN klen;
char *skey;
- AV *ret_av;
- SV *ret_rv;
time_t exp;
time_t *exp_arg;
mk_instance_vars(self, instance, object);
plcb_get_str_or_die(key, skey, klen, "Key");
+ _sync_initialize_single(object, syncp);
- ret_av = newAV();
- syncp = &(object->sync);
- plcb_sync_initialize(syncp, object, skey, klen);
av_clear(object->errors);
-
- if(exp_offset) {
- exp = time(NULL) + exp_offset;
- exp_arg = &exp;
- } else {
- exp_arg = NULL;
- }
+ exp_arg = (exp_offset && (exp = time(NULL) + exp_offset)) ? &exp : NULL;
+
err = libcouchbase_mget(instance, syncp, 1,
(const void * const*)&skey, &klen,
exp_arg);
- if(err != LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_err(object, ret_av, err);
- } else {
- wait_for_single_response(object);
-
- plcb_ret_set_err(object, ret_av, syncp->err);
- if(syncp->err == LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_strval(
- object, ret_av, syncp->value, syncp->nvalue,
- syncp->store_flags, syncp->cas);
- }
- }
- bless_return(object, ret_rv, ret_av);
+ _sync_return_single(object, err, syncp);
}
SV *PLCB_get_errors(SV *self)
@@ -300,21 +267,12 @@ SV *PLCB_remove(SV *self, SV *key, uint64_t cas)
mk_instance_vars(self, instance, object);
plcb_get_str_or_die(key, skey, key_len, "Key");
- ret_av = newAV();
av_clear(object->errors);
- syncp = &(object->sync);
- plcb_sync_initialize(syncp, object, skey, key_len);
+ _sync_initialize_single(object, syncp);
- if( (err = libcouchbase_remove(instance, syncp, skey, key_len, cas))
- != LIBCOUCHBASE_SUCCESS) {
- plcb_ret_set_err(object, ret_av, err);
- } else {
- wait_for_single_response(object);
-
- plcb_ret_set_err(object, ret_av, syncp->err);
- }
- bless_return(object, ret_rv, ret_av);
+ err = libcouchbase_remove(instance, syncp, skey, key_len, cas);
+ _sync_return_single(object, err, syncp);
}
SV *PLCB_stats(SV *self, AV *stats)
@@ -409,7 +367,6 @@ MODULE = Couchbase::Client PACKAGE = Couchbase::Client PREFIX = PLCB_
PROTOTYPES: DISABLE
-
SV *
PLCB_construct(pkg, options)
const char *pkg
@@ -545,6 +502,7 @@ PLCB_cas(self, key, value, cas_sv, ...)
CODE:
if(SvTYPE(cas_sv) == SVt_NULL) {
/*don't bother the network if we know our CAS operation will fail*/
+ warn("I was given a null cas!");
RETVAL = return_empty(self,
LIBCOUCHBASE_KEY_EEXISTS, "I was given an undef cas");
return;
@@ -557,7 +515,7 @@ PLCB_cas(self, key, value, cas_sv, ...)
self, key, value,
LIBCOUCHBASE_SET,
exp_offset, *cas_val);
-
+ assert(RETVAL != &PL_sv_undef);
OUTPUT:
RETVAL
@@ -711,4 +669,9 @@ int
PLCB_connect(self)
SV *self
-INCLUDE: Async.xs
+
+BOOT:
+boot_Couchbase__Client_multi(aTHX_ cv);
+
+
+INCLUDE: Async.xs
View
533 Client_multi.xs
@@ -0,0 +1,533 @@
+#include "perl-couchbase.h"
+
+#define MULTI_STACK_ELEM 128
+
+#ifndef mk_instance_vars
+#define mk_instance_vars(sv, inst_name, obj_name) \
+ if(!SvROK(sv)) { die("self must be a reference"); } \
+ obj_name = NUM2PTR(PLCB_t*, SvIV(SvRV(sv))); \
+ if(!obj_name) { die("tried to access de-initialized PLCB_t"); } \
+ inst_name = obj_name->instance;
+
+#endif
+
+#define _fetch_assert(tmpsv, av, idx, diemsg) \
+ if( (tmpsv = av_fetch(av, idx, 0)) == NULL) { \
+ die("%s (expected something at %d)", diemsg, idx); \
+ }
+
+
+#define _MULTI_INIT_COMMON(object, ret, nreq, args, now) \
+ if( (nreq = av_len(args) + 1) == 0 ) { \
+ die("Need at least one spec"); \
+ } \
+ ret = newHV(); \
+ SAVEFREESV(ret); \
+ now = time(NULL); \
+ object->npending = nreq; \
+ av_clear(object->errors);
+
+#define _MAYBE_STACK_ALLOC(syncp, stackp)
+
+#define _SYNC_RESULT_INIT(object, hv, sync) \
+ sync.ret = newAV(); \
+ hv_store(hv, sync.key, sync.nkey, \
+ plcb_ret_blessed_rv(object, sync.ret), 0); \
+ sync.parent = object;
+
+
+#define _exp_from_av(av, idx, nowvar, expvar, tmpsv) \
+ if( (tmpsv = av_fetch(av, idx, 0)) && (expvar = SvUV(*tmpsv))) { \
+ expvar += nowvar; \
+ }
+
+#define _cas_from_av(av, idx, casvar, tmpsv) \
+ if( (tmpsv = av_fetch(av, idx, 0)) && SvTRUE(*tmpsv) ) { \
+ casvar = plcb_sv_to_u64(*tmpsv); \
+ }
+
+#define _MAYBE_SET_IMMEDIATE_ERROR(err, retav, waitvar) \
+ if(err == LIBCOUCHBASE_SUCCESS) { waitvar++; } \
+ else { \
+ plcb_ret_set_err(object, retav, err); \
+ }
+
+#define _MAYBE_WAIT(waitvar) \
+ if(waitvar) { \
+ object->io_ops->run_event_loop(object->io_ops); \
+ }
+
+#define _dMULTI_VARS \
+ PLCB_t *object; \
+ libcouchbase_t instance; \
+ libcouchbase_error_t err; \
+ int nreq, i; \
+ time_t now; \
+ HV *ret;
+
+enum {
+ MULTI_CMD_GET = 1,
+ MULTI_CMD_TOUCH,
+ MULTI_CMD_GAT,
+
+ MULTI_CMD_SET,
+ MULTI_CMD_ADD,
+ MULTI_CMD_REPLACE,
+ MULTI_CMD_APPEND,
+ MULTI_CMD_PREPEND,
+ MULTI_CMD_REMOVE,
+ MULTI_CMD_CAS,
+
+ MULTI_CMD_ARITHMETIC,
+ MULTI_CMD_INCR,
+ MULTI_CMD_DECR
+};
+
+static inline libcouchbase_storage_t
+_cmd2storop(int cmd)
+{
+ switch(cmd) {
+ case MULTI_CMD_SET:
+ case MULTI_CMD_CAS:
+ return LIBCOUCHBASE_SET;
+ case MULTI_CMD_ADD:
+ return LIBCOUCHBASE_ADD;
+ case MULTI_CMD_REPLACE:
+ return LIBCOUCHBASE_REPLACE;
+ case MULTI_CMD_APPEND:
+ return LIBCOUCHBASE_APPEND;
+ case MULTI_CMD_PREPEND:
+ return LIBCOUCHBASE_PREPEND;
+ default:
+ die("Unhandled command %d", cmd);
+ return LIBCOUCHBASE_ADD;
+ }
+}
+
+static SV*
+PLCB_multi_get_common(SV *self, AV *args, int cmd)
+{
+ _dMULTI_VARS;
+
+ void **keys;
+ size_t *sizes;
+ time_t *exps;
+ SV **tmpsv;
+
+ void *keys_stacked[MULTI_STACK_ELEM];
+ size_t sizes_stacked[MULTI_STACK_ELEM];
+ time_t exps_stacked[MULTI_STACK_ELEM];
+
+ mk_instance_vars(self, instance, object);
+ _MULTI_INIT_COMMON(object, ret, nreq, args, now);
+
+ PLCB_sync_t *syncp = &object->sync;
+ syncp->parent = object;
+ syncp->ret = (AV*)ret;
+
+ if(nreq <= MULTI_STACK_ELEM) {
+ keys = keys_stacked;
+ sizes = sizes_stacked;
+ exps = (cmd == MULTI_CMD_GET) ? NULL : exps_stacked;
+ } else {
+ Newx(keys, nreq, void*); SAVEFREEPV(keys);
+ Newx(sizes, nreq, size_t); SAVEFREEPV(sizes);
+ if(cmd == MULTI_CMD_GET) {
+ exps = NULL;
+ } else {
+ Newx(exps, nreq, time_t); SAVEFREEPV(exps);
+ }
+ }
+
+ for(i = 0; i < nreq; i++) {
+ _fetch_assert(tmpsv, args, i, "arguments");
+
+ if(SvTYPE(*tmpsv) <= SVt_PV) {
+ if(exps) {
+ die("This command requires a valid expiry");
+ }
+ plcb_get_str_or_die(*tmpsv, keys[i], sizes[i], "key");
+ } else {
+ AV *argav;
+
+ if(SvROK(*tmpsv) == 0 || ( (argav = (AV*)SvRV(*tmpsv))
+ && SvTYPE(argav) != SVt_PVAV)) {
+ die("Expected an array reference");
+ }
+ _fetch_assert(tmpsv, argav, 0, "missing key");
+
+ plcb_get_str_or_die(*tmpsv, keys[i], sizes[i], "key");
+
+ if(exps) {
+ _fetch_assert(tmpsv, argav, 1, "expiry");
+ if(! (exps[i] = SvUV(*tmpsv)) ) {
+ die("expiry of 0 passed. This is not what you want");
+ }
+ }
+ }
+ }
+
+ plcb_callbacks_set_multi(object);
+
+ if(cmd == MULTI_CMD_TOUCH) {
+ err = libcouchbase_mtouch(instance, syncp, nreq,
+ (const void* const*)keys, sizes, exps);
+ } else {
+ err = libcouchbase_mget(instance, syncp, nreq,
+ (const void* const*)keys, sizes, NULL);
+ }
+
+ if(err == LIBCOUCHBASE_SUCCESS) {
+ object->io_ops->run_event_loop(object->io_ops);
+ } else {
+ for(i = 0; i < nreq; i++) {
+ AV *errav = newAV();
+ plcb_ret_set_err(object, errav, err);
+ hv_store(ret, keys[i], sizes[i],
+ plcb_ret_blessed_rv(object, errav), 0);
+ }
+ }
+
+ plcb_callbacks_set_single(object);
+
+ return newRV_inc( (SV*)ret);
+}
+
+static SV*
+PLCB_multi_set_common(SV *self, AV *args, int cmd)
+{
+ _dMULTI_VARS;
+ PLCB_sync_t *syncs = NULL;
+ PLCB_sync_t syncs_stacked[MULTI_STACK_ELEM];
+ libcouchbase_storage_t storop;
+ int nwait;
+
+ mk_instance_vars(self, instance, object);
+
+ _MULTI_INIT_COMMON(object, ret, nreq, args, now);
+
+ if(nreq <= MULTI_STACK_ELEM) {
+ syncs = syncs_stacked;
+ } else {
+ Newx(syncs, nreq, PLCB_sync_t);
+ SAVEFREEPV(syncs);
+ }
+
+ nwait = 0;
+ storop = _cmd2storop(cmd);
+
+ for(i = 0; i < nreq; i++) {
+ AV *argav;
+ SV **tmpsv;
+ char *value;
+ STRLEN nvalue;
+ SV *value_sv;
+ uint32_t store_flags;
+ uint64_t cas = 0;
+ time_t exp = 0;
+
+ _fetch_assert(tmpsv, args, i, "empty argument in spec");
+
+ if (SvROK(*tmpsv) == 0 || ( ((argav = (AV*)SvRV(*tmpsv)) &&
+ SvTYPE(argav) != SVt_PVAV))) {
+ die("Expected array reference");
+ }
+
+ _fetch_assert(tmpsv, argav, 0, "expected key");
+ plcb_get_str_or_die(*tmpsv, syncs[i].key, syncs[i].nkey, "key");
+ _fetch_assert(tmpsv, argav, 1, "expected_value");
+ plcb_get_str_or_die(*tmpsv, value, nvalue, "value");
+ value_sv = *tmpsv;
+
+ switch(cmd) {
+ case MULTI_CMD_SET:
+ case MULTI_CMD_ADD:
+ case MULTI_CMD_REPLACE:
+ case MULTI_CMD_APPEND:
+ case MULTI_CMD_PREPEND:
+ _exp_from_av(argav, 2, now, exp, tmpsv);
+ _cas_from_av(argav, 3, cas, tmpsv);
+ break;
+ case MULTI_CMD_CAS:
+ _fetch_assert(tmpsv, argav, 2, "Expected cas");
+ _cas_from_av(argav, 2, cas, tmpsv);
+ _exp_from_av(argav, 3, now, exp, tmpsv);
+ break;
+ default:
+ die("Unhandled command %d", cmd);
+ }
+
+ _SYNC_RESULT_INIT(object, ret, syncs[i]);
+
+ plcb_convert_storage(object, &value_sv, &nvalue, &store_flags);
+
+ err = libcouchbase_store(
+ instance, &syncs[i], storop, syncs[i].key, syncs[i].nkey,
+ SvPVX(value_sv), nvalue, store_flags, exp, cas);
+
+ plcb_convert_storage_free(object, value_sv, store_flags);
+
+ _MAYBE_SET_IMMEDIATE_ERROR(err, syncs[i].ret, nwait);
+
+ }
+ _MAYBE_WAIT(nwait);
+ return newRV_inc( (SV*)ret);
+}
+
+static SV*
+PLCB_multi_arithmetic_common(SV *self, AV *args, int cmd)
+{
+ _dMULTI_VARS;
+
+ PLCB_sync_t *syncs;
+ PLCB_sync_t syncs_stacked[MULTI_STACK_ELEM];
+ int nwait = 0;
+
+ mk_instance_vars(self, instance, object);
+ _MULTI_INIT_COMMON(object, ret, nreq, args, now);
+
+ if(nreq <= MULTI_STACK_ELEM) {
+ syncs = syncs_stacked;
+ } else {
+ Newx(syncs, nreq, PLCB_sync_t);
+ SAVEFREEPV(syncs);
+ }
+
+ for(i = 0; i < nreq; i++) {
+ AV *argav;
+ SV **tmpsv;
+ time_t exp = 0;
+ int64_t delta = 1;
+ uint64_t initial = 0;
+ int do_create = 0;
+
+ #define _do_arith_simple(only_sv) \
+ plcb_get_str_or_die(only_sv, syncs[i].key, syncs[i].nkey, "key"); \
+ delta = (cmd == MULTI_CMD_DECR) ? (-delta) : delta; \
+ goto GT_CBC_CMD;
+
+ _fetch_assert(tmpsv, args, i, "empty argument in spec");
+
+
+ if(SvTYPE(*tmpsv) == SVt_PV) {
+ /*simple key*/
+ if(cmd == MULTI_CMD_ARITHMETIC) {
+ die("Expected array reference!");
+ }
+ _do_arith_simple(*tmpsv);
+ } else {
+ if(SvROK(*tmpsv) == 0 || ( (argav = (AV*)SvRV(*tmpsv)) &&
+ SvTYPE(argav) != SVt_PVAV)) {
+ die("Expected ARRAY reference");
+ }
+ }
+
+ _fetch_assert(tmpsv, argav, 0, "expected key");
+
+ if(av_len(argav) == 0) {
+ _do_arith_simple(*tmpsv);
+ } else {
+ plcb_get_str_or_die(*tmpsv, syncs[i].key, syncs[i].nkey, "key");
+ }
+
+ _fetch_assert(tmpsv, argav, 1, "expected delta");
+ delta = SvIV(*tmpsv);
+ delta = (cmd == MULTI_CMD_DECR) ? (-delta) : delta;
+
+ if(cmd != MULTI_CMD_ARITHMETIC) {
+ goto GT_CBC_CMD;
+ }
+
+ /*fetch initial value here*/
+ if( (tmpsv = av_fetch(argav, 2, 0)) && SvTYPE(*tmpsv) != SVt_NULL ) {
+ initial = SvUV(*tmpsv);
+ do_create = 1;
+ }
+
+ if ( (tmpsv = av_fetch(argav, 3, 0)) && (exp = SvUV(*tmpsv)) ) {
+ exp += now;
+ }
+
+ GT_CBC_CMD:
+
+ _SYNC_RESULT_INIT(object, ret, syncs[i]);
+ err = libcouchbase_arithmetic(instance, &syncs[i], syncs[i].key,
+ syncs[i].nkey,
+ delta, exp, do_create, initial);
+ _MAYBE_SET_IMMEDIATE_ERROR(err, syncs[i].ret, nwait);
+
+ }
+
+ _MAYBE_WAIT(nwait);
+ return newRV_inc( (SV*)ret);
+}
+
+static SV*
+PLCB_multi_remove(SV *self, AV *args)
+{
+ _dMULTI_VARS;
+ PLCB_sync_t *syncs = NULL;
+ PLCB_sync_t syncs_stacked[MULTI_STACK_ELEM];
+
+ int nwait = 0;
+
+ mk_instance_vars(self, instance, object);
+ _MULTI_INIT_COMMON(object, ret, nreq, args, now);
+
+ if(nreq < MULTI_STACK_ELEM) {
+ syncs = syncs_stacked;
+ } else {
+ Newx(syncs, nreq, PLCB_sync_t);
+ SAVEFREEPV(syncs);
+ }
+
+ for(i = 0; i < nreq; i++) {
+ AV *argav;
+ SV **tmpsv;
+ uint64_t cas = 0;
+
+ _fetch_assert(tmpsv, args, i, "empty arguments in spec");
+ if(SvTYPE(*tmpsv) == SVt_PV) {
+ plcb_get_str_or_die(*tmpsv, syncs[i].key, syncs[i].nkey, "key");
+ } else {
+ if(SvROK(*tmpsv) == 0 || ( (argav = (AV*)SvRV(*tmpsv)) &&
+ SvTYPE(argav) != SVt_PVAV)) {
+ die("Expected ARRAY reference");
+ }
+ _fetch_assert(tmpsv, argav, 0, "key");
+ plcb_get_str_or_die(*tmpsv, syncs[i].key, syncs[i].nkey, "key");
+ _cas_from_av(argav, 1, cas, tmpsv);
+ }
+
+ _SYNC_RESULT_INIT(object, ret, syncs[i]);
+
+ err = libcouchbase_remove(instance, &syncs[i],
+ syncs[i].key, syncs[i].nkey, cas);
+ _MAYBE_SET_IMMEDIATE_ERROR(err, syncs[i].ret, nwait);
+ }
+ _MAYBE_WAIT(nwait);
+ return newRV_inc( (SV*)ret );
+
+}
+
+static int get_cmd_map[] = {
+ MULTI_CMD_GET,
+ MULTI_CMD_TOUCH,
+ MULTI_CMD_GAT,
+};
+
+static int set_cmd_map[] = {
+ MULTI_CMD_SET,
+ MULTI_CMD_ADD,
+ MULTI_CMD_REPLACE,
+ MULTI_CMD_APPEND,
+ MULTI_CMD_PREPEND,
+ MULTI_CMD_CAS
+};
+
+static int arith_cmd_map[] = {
+ MULTI_CMD_ARITHMETIC,
+ MULTI_CMD_INCR,
+ MULTI_CMD_DECR
+};
+
+
+
+#define _MAYBE_MULTI_ARG(array) \
+ if(items == 2) { \
+ array = (AV*)ST(1); warn("Using second stack item for AV"); \
+ if( (SvROK((SV*)array)) && (array = (AV*)SvRV((SV*)array))) { \
+ if(SvTYPE(array) < SVt_PVAV) { \
+ die("Expected ARRAY reference for arguments"); \
+ } \
+ } \
+ } else if (items > 2) { \
+ array = (AV*)sv_2mortal((SV*)av_make(items - 1, (SP - items + 2))); \
+ } else { \
+ die("Usage: %s(self, args)", GvNAME(GvCV(cv))); \
+ }
+
+MODULE = Couchbase::Client_multi PACKAGE = Couchbase::Client PREFIX = PLCB_
+
+PROTOTYPES: DISABLE
+
+SV* PLCB_get_multi(self, ...)
+ SV *self
+
+ ALIAS:
+ touch_multi = 1
+ gat_multi = 2
+
+ PREINIT:
+ int cmd;
+ AV *args;
+
+ CODE:
+ cmd = get_cmd_map[ix];
+ _MAYBE_MULTI_ARG(args);
+
+ RETVAL = PLCB_multi_get_common(self, args, cmd);
+
+ OUTPUT:
+ RETVAL
+
+SV*
+PLCB_set_multi(self, ...)
+ SV *self
+
+ ALIAS:
+ add_multi = 1
+ replace_multi = 2
+ append_multi = 3
+ prepend_multi = 4
+ cas_multi = 5
+
+ PREINIT:
+ int cmd;
+ AV *args;
+
+ CODE:
+ cmd = set_cmd_map[ix];
+ _MAYBE_MULTI_ARG(args);
+ RETVAL = PLCB_multi_set_common(self, args, cmd);
+
+ OUTPUT:
+ RETVAL
+
+SV*
+PLCB_arithmetic_multi(self, ...)
+ SV *self
+
+ ALIAS:
+ incr_multi = 1
+ decr_multi = 2
+
+ PREINIT:
+ AV *args;
+ int cmd;
+
+ CODE:
+ cmd = arith_cmd_map[ix];
+ _MAYBE_MULTI_ARG(args);
+ RETVAL = PLCB_multi_arithmetic_common(self, args, cmd);
+
+ OUTPUT:
+ RETVAL
+
+SV*
+PLCB_remove_multi(self, ...)
+ SV *self
+
+ ALIAS:
+ delete_multi = 1
+
+ PREINIT:
+ AV *args;
+
+ CODE:
+ _MAYBE_MULTI_ARG(args);
+ RETVAL = PLCB_multi_remove(self, args);
+
+ OUTPUT:
+ RETVAL
+
View
1  MANIFEST
@@ -23,6 +23,7 @@ lib/Couchbase/Test/Async.pm
lib/Couchbase/Test/Async/Loop.pm
Client.xs
+Client_multi.xs
Async.xs
perl-couchbase.h
View
3  Makefile.PL
@@ -23,7 +23,8 @@ WriteMakefile(
VERSION_FROM => 'lib/Couchbase/Client.pm',
ABSTRACT_FROM => 'lib/Couchbase/Client.pm',
OBJECT => 'callbacks.o convert.o ctor.o Client.o ' .
- 'async.o async_callbacks.o async_events.o',
+ 'async.o async_callbacks.o async_events.o ' .
+ 'Client_multi.o',
($ExtUtils::MakeMaker::VERSION >= 6.3002
? ('LICENSE'=> 'perl')
View
81 callbacks.c
@@ -16,19 +16,6 @@ signal_done(PLCB_sync_t *sync)
sync->parent->io_ops);
}
-#define syncp_assign_keys(syncp, err, key, nkey, cas) \
- syncp->key = key; \
- syncp->nkey = nkey; \
- syncp->cas = cas; \
- syncp->err = err;
-
-#define syncp_assign_strval(syncp, err, key, nkey, val, nval, cas, flags) \
- syncp_assign_keys(syncp, err, key, nkey, cas); \
- syncp->value = val; \
- syncp->nvalue = nval; \
- syncp->store_flags = flags;
-
-
void plcb_callback_get(
libcouchbase_t instance,
const void *cookie,
@@ -38,7 +25,37 @@ void plcb_callback_get(
uint32_t flags, uint64_t cas)
{
PLCB_sync_t *syncp = plcb_sync_cast(cookie);
- syncp_assign_strval(syncp, err, key, nkey, value, nvalue, cas, flags);
+ plcb_ret_set_err(syncp->parent, syncp->ret, err);
+ if(err == LIBCOUCHBASE_SUCCESS && nvalue) {
+ plcb_ret_set_strval(
+ syncp->parent, syncp->ret, value, nvalue, flags, cas);
+ }
+ signal_done(syncp);
+}
+
+void plcb_callback_multi_get(
+ libcouchbase_t instance,
+ const void *cookie,
+ libcouchbase_error_t err,
+ const void *key, size_t nkey,
+ const void *value, size_t nvalue,
+ uint32_t flags, uint64_t cas)
+{
+ PLCB_sync_t *syncp = plcb_sync_cast(cookie);
+ AV *ret;
+ HV *results;
+
+ ret = newAV();
+ results = (HV*)(syncp->ret);
+
+ hv_store(results, key, nkey, plcb_ret_blessed_rv(syncp->parent, ret), 0);
+
+ plcb_ret_set_err(syncp->parent, ret, err);
+
+ if(err == LIBCOUCHBASE_SUCCESS && nvalue) {
+ plcb_ret_set_strval(
+ syncp->parent, ret, value, nvalue, flags, cas);
+ }
signal_done(syncp);
}
@@ -51,7 +68,10 @@ void plcb_callback_storage(
uint64_t cas)
{
PLCB_sync_t *syncp = plcb_sync_cast(cookie);
- syncp_assign_keys(syncp, err, key, nkey, cas);
+ plcb_ret_set_err(syncp->parent, syncp->ret, err);
+ if(err == LIBCOUCHBASE_SUCCESS) {
+ plcb_ret_set_cas(syncp->parent, syncp->ret, &cas);
+ }
signal_done(syncp);
}
@@ -61,8 +81,10 @@ static void arithmetic_callback(
uint64_t value, uint64_t cas)
{
PLCB_sync_t *syncp = plcb_sync_cast(cookie);
- syncp_assign_keys(syncp, err, key, nkey, cas);
- syncp->arithmetic = value;
+ plcb_ret_set_err(syncp->parent, syncp->ret, err);
+ if(err == LIBCOUCHBASE_SUCCESS) {
+ plcb_ret_set_numval(syncp->parent, syncp->ret, value, cas);
+ }
signal_done(syncp);
}
@@ -114,6 +136,15 @@ static void keyop_callback(
NULL, 0, 0, 0);
}
+static void keyop_multi_callback(
+ libcouchbase_t instance, const void *cookie,
+ libcouchbase_error_t err,
+ const void *key, size_t nkey)
+{
+ plcb_callback_multi_get(instance, cookie, err, key, nkey,
+ NULL, 0, 0, 0);
+}
+
static void stat_callback(
libcouchbase_t instance, const void *cookie,
const char *server,
@@ -169,7 +200,21 @@ static void stat_callback(
LEAVE;
}
-void plcb_setup_callbacks(PLCB_t *object)
+void plcb_callbacks_set_multi(PLCB_t *object)
+{
+ libcouchbase_t instance = object->instance;
+ libcouchbase_set_get_callback(instance, plcb_callback_multi_get);
+ libcouchbase_set_touch_callback(instance, keyop_multi_callback);
+}
+
+void plcb_callbacks_set_single(PLCB_t *object)
+{
+ libcouchbase_t instance = object->instance;
+ libcouchbase_set_get_callback(instance, plcb_callback_get);
+ libcouchbase_set_touch_callback(instance, keyop_callback);
+}
+
+void plcb_callbacks_setup(PLCB_t *object)
{
libcouchbase_t instance = object->instance;
libcouchbase_set_get_callback(instance, plcb_callback_get);
View
101 lib/Couchbase/Client.pm
@@ -1,5 +1,11 @@
package Couchbase::Client;
-require XSLoader;
+
+BEGIN {
+ require XSLoader;
+ our $VERSION = '0.01_1';
+ XSLoader::load(__PACKAGE__, $VERSION);
+}
+
use strict;
use warnings;
@@ -13,11 +19,10 @@ my $have_zlib = eval "use Compress::Zlib; 1;";
use Log::Fu;
use Array::Assign;
-our $VERSION = '0.01_1';
-XSLoader::load(__PACKAGE__, $VERSION);
{
no warnings 'once';
*gets = \&get;
+ *gets_multi = \&get_multi;
}
#this function converts hash options for compression and serialization
@@ -183,7 +188,7 @@ older memcached clients like L<Cache::Memcached> and L<Cache::Memcached::Fast>
This client is mainly written in C and interacts with C<libcouchbase> - the common
couchbase client library, which must be installed.
-=head2 METHODS
+=head2 BASIC METHODS
All of the protocol methods (L</get>, L</set>, etc) return a common return value of
L<Couchbase::Client::Return> which stores operation-specific information and
@@ -442,6 +447,94 @@ If C<cas> is also specified, the deletion will only be performed if C<key> still
maintains the same CAS value as C<cas>.
+=head2 MULTI METHODS
+
+These methods gain performance and save on network I/O by batch-enqueueing
+operations.
+
+Of these, only the C<get> and C<touch> methods currently do 'true' multi batching.
+
+The other commands are still batched internally in the XS code, saving on xsub
+call overhead.
+
+All of these functions return a hash reference, whose keys are the keys specified
+for the operation, and whose values are L<Couchbase::Client::Return> objects
+specifying the result of the operation for that key.
+
+Calling the multi methods generally involves passing a series of array references.
+Each n-tuple passed in the list should contain arguments conforming to the
+calling convention of the non-multi command variant.
+
+Thus, where you would do:
+
+ $rv = $o->foo($arg1, $arg2, $arg3)
+
+The C<_multi> version would be
+
+ $rvs = $o->foo_multi(
+ [$arg1_0, $arg2_0, $arg3_0],
+ [$arg1_1, $arg2_1, $arg3_1],
+ );
+
+The n-tuples themselves may either be grouped into a 'list', or an array reference
+itself:
+
+ my @arglist = map { [$h->{key}, $k->{value} ] };
+
+ $o->set(@arglist);
+
+ #the same as:
+
+ $o->set( [ map [ { $h->{key}, $h->{value } ] }] );
+
+ #and the same as:
+
+ $o->set(map{ [$h->{key}, $h->{value}] });
+
+
+
+=head3 get_multi(@keys)
+
+=head3 get_multi(\@keys)
+
+=head3 gets_multi
+
+alias to L</get_multi>
+
+=head3 touch_multi([key, exp]..)
+
+
+=head3 set_multi([key => value, ...], [key => value, ...])
+
+
+Performs multiple set operations on a multitude of keys. Input parameters are
+array references. The contents of these array references follow the same
+convention as calls to L</set> do. Thus:
+
+ $o->set_multi(['Foo', 'foo_value', 120], ['Bar', 'bar_value']);
+
+will set the key C<foo> to C<foo_value>, with an expiry of 120 seconds in the
+future. C<bar> is set to C<bar_value>, without any expiry.
+
+=head3 cas_multi([key => value, $cas, ...])
+
+Multi version of L</cas>
+
+=head3 arithmetic_multi([key => $delta, ...])
+
+Multi version of L</arithmetic>
+
+=head3 incr_multi(@keys)
+
+=head3 decr_multi(@keys)
+
+=head3 incr_multi( [key, amount], ... )
+
+=head3 decr_multi( [key, amount], ... )
+
+
+=head2 INFORMATIONAL METHODS
+
=head3 get_errors()
Returns a list of client/server errors which have ocurred during the last operation.
View
82 lib/Couchbase/Test/ClientSync.pm
@@ -177,4 +177,86 @@ sub T05_conversion :Test(no_plan) {
ok($@, "Got error for append/prepending a serialized structure ($@)");
}
+sub _multi_check_ret {
+ my ($rv,$keys) = @_;
+ my $nkeys = scalar @$keys;
+ my $defined = scalar grep defined $_, values %$rv;
+ my $n_ok = scalar grep $_->is_ok, values %$rv;
+
+ is(scalar keys %$rv, $nkeys, "Expected number of keys");
+ is($defined, $nkeys, "All values defined");
+ is($n_ok,$nkeys, "All returned ok (no errors)");
+
+}
+
+sub T06_multi :Test(no_plan) {
+ my $self = shift;
+ my $o = $self->cbo;
+ my @keys = @{$self->{basic_keys}};
+
+ my $rv = $o->set_multi(
+ map { [$_, $_] } @keys);
+
+ ok($rv && ref $rv eq 'HASH', "Got hash result for multi operation");
+ ok(scalar keys %$rv == scalar @keys,
+ "got expected number of results");
+
+ is(grep(defined $_, values %$rv), scalar @keys, "All values defined");
+ is(scalar grep(!$rv->{$_}->is_ok, @keys), 0, "No errors");
+
+ $rv = $o->get_multi(@keys);
+ _multi_check_ret($rv, \@keys);
+
+ is(scalar grep($rv->{$_}->value eq $_, @keys), scalar @keys,
+ "get_multi: Got expected values");
+
+ $rv = $o->cas_multi(
+ map { [$_, scalar(reverse $_), $rv->{$_}->cas ] } @keys );
+ _multi_check_ret($rv, \@keys);
+
+ #Remove them all:
+
+ note "Remove (no CAS)";
+ $rv = $o->remove_multi(@keys);
+ _multi_check_ret($rv, \@keys);
+
+ $rv = $o->set_multi(map { [$_, $_] } @keys);
+ _multi_check_ret($rv, \@keys);
+
+ note "Remove (with CAS)";
+ $rv = $o->remove_multi(map { [ $_, $rv->{$_}->cas] } @keys);
+ _multi_check_ret($rv, \@keys);
+
+ note "Trying arithmetic..";
+
+ $rv = $o->arithmetic_multi(
+ map { [$_, 666, undef, 120] } @keys
+ );
+ ok(scalar(
+ grep {$_->errnum == COUCHBASE_KEY_ENOENT} values %$rv
+ ) == scalar @keys,
+ "ENOENT for non-existent deleted arithmetic keys");
+
+
+ #try arithmetic again:
+ $rv = $o->arithmetic_multi(
+ map { [$_, 666, 42, 120] } @keys);
+ _multi_check_ret($rv, \@keys);
+
+ is(scalar grep($_->value == 42, values %$rv), scalar @keys,
+ "all keys have expected value");
+
+ $rv = $o->incr_multi(@keys);
+ _multi_check_ret($rv, \@keys);
+
+ is(scalar grep($_->value == 43, values %$rv), scalar @keys,
+ "all keys have been incremented");
+
+ $rv = $o->decr_multi(
+ map {[ $_, 41 ]} @keys);
+ _multi_check_ret($rv, \@keys);
+ is(scalar grep($_->value == 2, values %$rv), scalar @keys,
+ "all keys have been decremented");
+}
+
1;
View
20 perl-couchbase.h
@@ -25,24 +25,10 @@ typedef struct {
PLCB_t *parent;
const char *key;
size_t nkey;
- const char *value;
- size_t nvalue;
- uint64_t cas;
- uint64_t arithmetic;
- libcouchbase_error_t err;
- uint32_t store_flags;
+ AV *ret;
} PLCB_sync_t;
#define plcb_sync_cast(p) (PLCB_sync_t*)(p)
-#define plcb_sync_initialize(syncp, object, k, ksz) \
- syncp->parent = object; \
- syncp->key = k; \
- syncp->nkey = ksz; \
- syncp->cas = syncp->nvalue = 0; \
- syncp->value = NULL; \
- syncp->err = 0; \
- syncp->arithmetic = 0; \
- syncp->store_flags = 0; \
typedef enum {
PLCBf_DIE_ON_ERROR = 0x1,
@@ -159,7 +145,9 @@ typedef enum {
} PLCB_ctor_idx_t;
-void plcb_setup_callbacks(PLCB_t *object);
+void plcb_callbacks_setup(PLCB_t *object);
+void plcb_callbacks_set_multi(PLCB_t *object);
+void plcb_callbacks_set_single(PLCB_t *object);
/*options for common constructor settings*/
void plcb_ctor_cbc_opts(AV *options,
View
3  plcb-return.h
@@ -15,7 +15,8 @@ typedef enum {
} PLCB_ret_idx_t;
#define plcb_ret_set_cas(obj, ret, cas) \
- av_store(ret, PLCB_RETIDX_CAS, newSVpvn((char*)cas, 8));
+ av_store(ret, PLCB_RETIDX_CAS, \
+ plcb_sv_from_u64_new(cas) );
#define plcb_ret_set_strval(obj, ret, value, nvalue, flags, cas) \
av_store(ret, PLCB_RETIDX_VALUE, \
View
13 plcb-util.h
@@ -17,7 +17,15 @@
#define plcb_sv_to_u64(sv) SvUV(sv)
#define plcb_sv_to_64(sv) (int64_t)(plcb_sv_to_u64(sv))
#define plcb_sv_from_u64(sv, num) (sv_setuv(sv, num))
+#define plcb_sv_from_u64_new(nump) newSVuv( (*nump) )
+
+#define plcb_cas_from_sv(sv, cas_p, lenvar) \
+ (SvIOK(sv)) \
+ ? cas_p = (uint64_t*)&(SvIVX(sv)) \
+ : (uint64_t*)die("Expected valid (UV) cas. IOK not true")
+
#else
+
static inline uint64_t plcb_sv_to_u64(SV *in)
{
char *sv_blob;
@@ -31,10 +39,12 @@ static inline uint64_t plcb_sv_to_u64(SV *in)
return ret;
}
#define plcb_sv_to_64(sv) (plcb_sv_to_u64(sv))
+
#define plcb_sv_from_u64(sv, num) \
(sv_setpvn(sv, (const char const*)&(num), 8))
-#endif
+#define plcb_sv_from_u64_new(nump) \
+ newSVpv((const char* const)(nump), 8)
/*Extract a packed 8 byte blob from an SV into a CAS value*/
@@ -47,6 +57,7 @@ static inline uint64_t plcb_sv_to_u64(SV *in)
) \
: (void*)die("CAS specified, but is null")
+#endif /*PLCB_PERL64*/
/*assertively extract a non-null key from an SV, together with its length*/
Please sign in to comment.
Something went wrong with that request. Please try again.