Skip to content

Commit

Permalink
Working maps as HyperDex types.
Browse files Browse the repository at this point in the history
Maps may use strings or integers as the key or value respectively.

For every operation that may be performed on the value (when not in a map),
there is a "map_*" variant that operates on the map's value.

Known Bugs:
 - The append/prepend methods of strings blow up by not serializing correctly.
   Fix:  Figure out where the serializing problem is.
 - If the requested operation translates to more than one micro op, then there
   is a large chance that the ops will be rejected because they are not sorted
   correctly.
   Fix:  Pack the ops into the message in sorted order.
  • Loading branch information
rescrv committed Apr 11, 2012
1 parent cbed1c9 commit 29b8a02
Show file tree
Hide file tree
Showing 7 changed files with 1,283 additions and 74 deletions.
141 changes: 139 additions & 2 deletions hyperclient/hyperclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ hyperclient_del(struct hyperclient* client, const char* space, const char* key,
}
}

#define HYPERCLIENT_CDEF_STANDARD_CALL(OPNAME) \
#define HYPERCLIENT_CDEF_CUSTOM_CALL(OPNAME, ATTRPREFIX) \
int64_t \
hyperclient_ ## OPNAME(struct hyperclient* client, const char* space, const char* key, \
size_t key_sz, const struct hyperclient_attribute* attrs, \
size_t key_sz, const struct hyperclient ## ATTRPREFIX ## attribute* attrs, \
size_t attrs_sz, hyperclient_returncode* status) \
{ \
try \
Expand All @@ -201,6 +201,11 @@ hyperclient_del(struct hyperclient* client, const char* space, const char* key,
} \
}

#define HYPERCLIENT_CDEF_STANDARD_CALL(OPNAME) \
HYPERCLIENT_CDEF_CUSTOM_CALL(OPNAME, _)
#define HYPERCLIENT_CDEF_MAP_CALL(OPNAME) \
HYPERCLIENT_CDEF_CUSTOM_CALL(OPNAME, _map_)

HYPERCLIENT_CDEF_STANDARD_CALL(atomic_add)
HYPERCLIENT_CDEF_STANDARD_CALL(atomic_sub)
HYPERCLIENT_CDEF_STANDARD_CALL(atomic_mul)
Expand All @@ -217,6 +222,18 @@ HYPERCLIENT_CDEF_STANDARD_CALL(set_add)
HYPERCLIENT_CDEF_STANDARD_CALL(set_remove)
HYPERCLIENT_CDEF_STANDARD_CALL(set_intersect)
HYPERCLIENT_CDEF_STANDARD_CALL(set_union)
HYPERCLIENT_CDEF_MAP_CALL(map_add)
HYPERCLIENT_CDEF_MAP_CALL(map_remove)
HYPERCLIENT_CDEF_MAP_CALL(map_atomic_add)
HYPERCLIENT_CDEF_MAP_CALL(map_atomic_sub)
HYPERCLIENT_CDEF_MAP_CALL(map_atomic_mul)
HYPERCLIENT_CDEF_MAP_CALL(map_atomic_div)
HYPERCLIENT_CDEF_MAP_CALL(map_atomic_mod)
HYPERCLIENT_CDEF_MAP_CALL(map_atomic_and)
HYPERCLIENT_CDEF_MAP_CALL(map_atomic_or)
HYPERCLIENT_CDEF_MAP_CALL(map_atomic_xor)
HYPERCLIENT_CDEF_MAP_CALL(map_string_prepend)
HYPERCLIENT_CDEF_MAP_CALL(map_string_append)

int64_t
hyperclient_search(struct hyperclient* client, const char* space,
Expand Down Expand Up @@ -1005,6 +1022,16 @@ hyperclient :: del(const char* space, const char* key, size_t key_sz,
key, key_sz, attrs, attrs_sz, status); \
}

#define HYPERCLIENT_MAP_CPPDEF(PREFIX, OPNAME, OPNAMECAPS) \
int64_t \
hyperclient :: PREFIX ## _ ## OPNAME(const char* space, const char* key, size_t key_sz, \
const struct hyperclient_map_attribute* attrs, size_t attrs_sz, \
enum hyperclient_returncode* status) \
{ \
return PREFIX ## _ops(hyperdex::OP_ ## OPNAMECAPS, space, \
key, key_sz, attrs, attrs_sz, status); \
}

int64_t
hyperclient :: atomic_ops(int action, const char* space,
const char* key, size_t key_sz,
Expand Down Expand Up @@ -1347,6 +1374,116 @@ HYPERCLIENT_CPPDEF(set, remove, SET_REMOVE)
HYPERCLIENT_CPPDEF(set, intersect, SET_INTERSECT)
HYPERCLIENT_CPPDEF(set, union, SET_UNION)

int64_t
hyperclient :: map_ops(int action, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status)
{
if (maintain_coord_connection(status) < 0)
{
return -1;
}

// A new pending op
e::intrusive_ptr<pending> op;
op = new pending_statusonly(hyperdex::REQ_ATOMIC, hyperdex::RESP_ATOMIC, status);
// The size of the buffer we need
size_t sz = HDRSIZE
+ sizeof(uint32_t)
+ key_sz
+ sizeof(uint32_t)
+ MICROOP_BASE_SIZE * attrs_sz;

for (size_t i = 0; i < attrs_sz; ++i)
{
sz += attrs[i].map_key_sz;
sz += attrs[i].value_sz;
}

std::auto_ptr<e::buffer> msg(e::buffer::create(sz));
e::buffer::packer p = msg->pack_at(HDRSIZE);
p = p << e::slice(key, key_sz) << static_cast<uint32_t>(attrs_sz);
hyperdex::spaceid si = m_config->space(space);

if (si == hyperdex::spaceid())
{
*status = HYPERCLIENT_UNKNOWNSPACE;
return -1;
}

std::vector<hyperdex::attribute> dims = m_config->dimension_names(si);
assert(dims.size() > 0);

// XXX Need to sort attrs by attr number
for (size_t i = 0; i < attrs_sz; ++i)
{
if (attrs[i].datatype != HYPERDATATYPE_MAP_STRING_STRING &&
attrs[i].datatype != HYPERDATATYPE_MAP_STRING_INT64 &&
attrs[i].datatype != HYPERDATATYPE_MAP_INT64_STRING &&
attrs[i].datatype != HYPERDATATYPE_MAP_INT64_INT64)
{
*status = HYPERCLIENT_WRONGTYPE;
return -1 - i;
}

int idx = validate_attr(dims, NULL, attrs[i].attr, attrs[i].datatype, status);

if (idx < 0)
{
return -1 - i;
}

hyperdex::microop o;
o.attr = idx;
o.type = attrs[i].datatype;
o.action = static_cast<hyperdex::microop_type>(action);

if (attrs[i].datatype == HYPERDATATYPE_MAP_STRING_STRING)
{
o.argv2_string = e::slice(attrs[i].map_key, attrs[i].map_key_sz);
o.argv1_string = e::slice(attrs[i].value, attrs[i].value_sz);
}
else if (attrs[i].datatype == HYPERDATATYPE_MAP_STRING_INT64)
{
o.argv2_string = e::slice(attrs[i].map_key, attrs[i].map_key_sz);
o.argv1_int64 = 0;
memmove(&o.argv1_int64, attrs[i].value, std::min(attrs[i].value_sz, sizeof(int64_t)));
}
else if (attrs[i].datatype == HYPERDATATYPE_MAP_INT64_STRING)
{
o.argv2_int64 = 0;
memmove(&o.argv2_int64, attrs[i].map_key, std::min(attrs[i].map_key_sz, sizeof(int64_t)));
o.argv1_string = e::slice(attrs[i].value, attrs[i].value_sz);
}
else if (attrs[i].datatype == HYPERDATATYPE_MAP_INT64_INT64)
{
o.argv2_int64 = 0;
memmove(&o.argv2_int64, attrs[i].map_key, std::min(attrs[i].map_key_sz, sizeof(int64_t)));
o.argv1_int64 = 0;
memmove(&o.argv1_int64, attrs[i].value, std::min(attrs[i].value_sz, sizeof(int64_t)));
}

p = p << o;
}

assert(!p.error());
return add_keyop(space, key, key_sz, msg, op);
}

HYPERCLIENT_MAP_CPPDEF(map, add, MAP_ADD)
HYPERCLIENT_MAP_CPPDEF(map, remove, MAP_REMOVE)
HYPERCLIENT_MAP_CPPDEF(map, atomic_add, INT64_ADD)
HYPERCLIENT_MAP_CPPDEF(map, atomic_sub, INT64_SUB)
HYPERCLIENT_MAP_CPPDEF(map, atomic_mul, INT64_MUL)
HYPERCLIENT_MAP_CPPDEF(map, atomic_div, INT64_DIV)
HYPERCLIENT_MAP_CPPDEF(map, atomic_mod, INT64_MOD)
HYPERCLIENT_MAP_CPPDEF(map, atomic_and, INT64_AND)
HYPERCLIENT_MAP_CPPDEF(map, atomic_or, INT64_OR)
HYPERCLIENT_MAP_CPPDEF(map, atomic_xor, INT64_XOR)
HYPERCLIENT_MAP_CPPDEF(map, string_prepend, STRING_PREPEND)
HYPERCLIENT_MAP_CPPDEF(map, string_append, STRING_APPEND)

int64_t
hyperclient :: search(const char* space,
const struct hyperclient_attribute* eq, size_t eq_sz,
Expand Down
110 changes: 110 additions & 0 deletions hyperclient/hyperclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ struct hyperclient_attribute
enum hyperdatatype datatype;
};

struct hyperclient_map_attribute
{
const char* attr; /* NULL-terminated */
const char* map_key;
size_t map_key_sz;
const char* value;
size_t value_sz;
enum hyperdatatype datatype;
};

struct hyperclient_range_query
{
const char* attr;
Expand Down Expand Up @@ -500,6 +510,66 @@ hyperclient_set_union(struct hyperclient* client, const char* space,
const struct hyperclient_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_add(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_remove(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_atomic_add(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_atomic_sub(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_atomic_mul(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_atomic_div(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_atomic_mod(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_atomic_and(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_atomic_or(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

int64_t
hyperclient_map_atomic_xor(struct hyperclient* client, const char* space,
const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);

/* Perform a search for objects which match "eq" and "rn".
*
* Each time hyperclient_loop returns the identifier generated by a call to
Expand Down Expand Up @@ -620,6 +690,42 @@ class hyperclient
int64_t set_union(const char* space, const char* key, size_t key_sz,
const struct hyperclient_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_add(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_remove(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_atomic_add(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_atomic_sub(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_atomic_mul(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_atomic_div(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_atomic_mod(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_atomic_and(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_atomic_or(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_atomic_xor(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_string_prepend(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t map_string_append(const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs, size_t attrs_sz,
enum hyperclient_returncode* status);
int64_t search(const char* space,
const struct hyperclient_attribute* eq, size_t eq_sz,
const struct hyperclient_range_query* rn, size_t rn_sz,
Expand Down Expand Up @@ -655,6 +761,10 @@ class hyperclient
const char* space, const char* key, size_t key_sz,
const struct hyperclient_attribute* attrs,
size_t attrs_sz, hyperclient_returncode* status);
int64_t map_ops(int action,
const char* space, const char* key, size_t key_sz,
const struct hyperclient_map_attribute* attrs,
size_t attrs_sz, hyperclient_returncode* status);
int64_t add_keyop(const char* space,
const char* key,
size_t key_sz,
Expand Down
Loading

0 comments on commit 29b8a02

Please sign in to comment.