Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added "keys" op

(now, key prefix search is available on tcb (tokyo cabinet b+tree) storage)
  • Loading branch information...
commit 15b87f2c5adc139a0bde2b829ca186a4b0931847 1 parent 9250928
Masaki Fujimoto fujimoto authored
2  src/flared/ini_option.cc
View
@@ -364,7 +364,7 @@ int ini_option::reload() {
}
if (opt_var_map.count("replication-type")) {
- log_info(" replication_type: %s -> %s", this->_replication_type, opt_var_map["replication-type"].as<string>());
+ log_info(" replication_type: %s -> %s", this->_replication_type.c_str(), opt_var_map["replication-type"].as<string>().c_str());
cluster::replication t;
if (cluster::replication_cast(opt_var_map["replication-type"].as<string>(), t) < 0) {
2  src/flared/op_parser_text_node.cc
View
@@ -75,6 +75,8 @@ op* op_parser_text_node::_determine_op(const char* first, const char* buf, int&
} else {
r = static_cast<op*>(_new_ op_error(this->_connection));
}
+ } else if (strcmp(first, "keys") == 0) {
+ r = static_cast<op*>(_new_ op_keys(this->_connection, singleton<flared>::instance().get_cluster(), singleton<flared>::instance().get_storage()));
} else if (strcmp(first, "dump") == 0) {
r = static_cast<op*>(_new_ op_dump(this->_connection, singleton<flared>::instance().get_cluster(), singleton<flared>::instance().get_storage()));
} else if (strcmp(first, "dump_key") == 0) {
1  src/lib/Makefile.am
View
@@ -39,6 +39,7 @@ libflare_la_SOURCES = \
op_get.h op_get.cc \
op_gets.h op_gets.cc \
op_incr.h op_incr.cc \
+ op_keys.h op_keys.cc \
op_kill.h op_kill.cc \
op_meta.h op_meta.cc \
op_node_add.h op_node_add.cc \
4 src/lib/client.cc
View
@@ -72,7 +72,7 @@ op::result client::get(string key, storage::entry& e) {
e.key = key;
op_get* p = _new_ op_get(this->_connection, NULL, NULL);
- if (p->run_client(e) < 0) {
+ if (p->run_client(e, 0) < 0) {
log_err("get operation failed", 0);
_delete_(p);
return op::result_error;
@@ -90,7 +90,7 @@ op::result client::gets(string key, storage::entry& e) {
e.key = key;
op_gets* p = _new_ op_gets(this->_connection, NULL, NULL);
- if (p->run_client(e) < 0) {
+ if (p->run_client(e, 0) < 0) {
log_err("gets operation failed", 0);
_delete_(p);
return op::result_error;
4 src/lib/cluster.cc
View
@@ -940,7 +940,7 @@ int cluster::deactivate_node() {
*
* @todo fix performance issue
*/
-cluster::proxy_request cluster::pre_proxy_read(op_proxy_read* op, storage::entry& e, shared_queue_proxy_read& q_result) {
+cluster::proxy_request cluster::pre_proxy_read(op_proxy_read* op, storage::entry& e, void* parameter, shared_queue_proxy_read& q_result) {
partition p;
bool dummy;
int n = this->_determine_partition(e, p, false, dummy);
@@ -968,7 +968,7 @@ cluster::proxy_request cluster::pre_proxy_read(op_proxy_read* op, storage::entry
vector<string> proxy = op->get_proxy();
proxy.push_back(this->_node_key);
- shared_queue_proxy_read q(new queue_proxy_read(this, this->_storage, proxy, e, op->get_ident()));
+ shared_queue_proxy_read q(new queue_proxy_read(this, this->_storage, proxy, e, parameter, op->get_ident()));
if (this->_enqueue(shared_static_cast<thread_queue, queue_proxy_read>(q), node_key, e.get_key_hash_value(storage::hash_algorithm_bitshift), true) < 0) {
return proxy_request_error_enqueue;
}
2  src/lib/cluster.h
View
@@ -189,7 +189,7 @@ class cluster {
int activate_node();
int deactivate_node();
- proxy_request pre_proxy_read(op_proxy_read* op, storage::entry& e, shared_queue_proxy_read& q);
+ proxy_request pre_proxy_read(op_proxy_read* op, storage::entry& e, void* parameter, shared_queue_proxy_read& q);
proxy_request pre_proxy_write(op_proxy_write* op, shared_queue_proxy_write& q, uint64_t generic_value = 0);
proxy_request post_proxy_write(op_proxy_write* op, bool sync = false);
6 src/lib/op_get.cc
View
@@ -84,7 +84,7 @@ int op_get::_run_server() {
stats_object->increment_cmd_get();
shared_queue_proxy_read q;
- cluster::proxy_request r_proxy = this->_cluster->pre_proxy_read(this, *it, q);
+ cluster::proxy_request r_proxy = this->_cluster->pre_proxy_read(this, *it, this->_parameter, q);
if (r_proxy == cluster::proxy_request_complete) {
q_map[it->key] = q;
} else if (r_proxy == cluster::proxy_request_error_enqueue) {
@@ -139,7 +139,7 @@ int op_get::_run_server() {
return 0;
}
-int op_get::_run_client(storage::entry& e) {
+int op_get::_run_client(storage::entry& e, void* parameter) {
int request_len = e.key.size() + BUFSIZ;
char* request = _new_ char[request_len];
snprintf(request, request_len, "%s %s", this->get_ident().c_str(), e.key.c_str());
@@ -149,7 +149,7 @@ int op_get::_run_client(storage::entry& e) {
return r;
}
-int op_get::_run_client(list<storage::entry>& e) {
+int op_get::_run_client(list<storage::entry>& e, void* parameter) {
if (e.size() == 0) {
log_warning("passed 0 entry...", 0);
return -1;
4 src/lib/op_get.h
View
@@ -31,8 +31,8 @@ class op_get : public op_proxy_read {
protected:
virtual int _parse_server_parameter();
virtual int _run_server();
- virtual int _run_client(storage::entry& e);
- virtual int _run_client(list<storage::entry>& e);
+ virtual int _run_client(storage::entry& e, void* parameter);
+ virtual int _run_client(list<storage::entry>& e, void* parameter);
virtual int _parse_client_parameter(storage::entry& e);
virtual int _parse_client_parameter(list<storage::entry>& e);
};
240 src/lib/op_keys.cc
View
@@ -0,0 +1,240 @@
+/**
+ * op_keys.cc
+ *
+ * implementation of gree::flare::op_keys
+ *
+ * @author Masaki Fujimoto <fujimoto@php.net>
+ *
+ * $Id$
+ */
+#include "app.h"
+#include "op_keys.h"
+#include "queue_proxy_read.h"
+
+namespace gree {
+namespace flare {
+
+// {{{ ctor/dtor
+/**
+ * ctor for op_keys
+ */
+op_keys::op_keys(shared_connection c, cluster* cl, storage* st):
+ op_proxy_read(c, "keys", cl, st) {
+ this->_is_multiple_response = true;
+}
+
+/**
+ * dtor for op_keys
+ */
+op_keys::~op_keys() {
+}
+// }}}
+
+// {{{ operator overloads
+// }}}
+
+// {{{ public methods
+// }}}
+
+// {{{ protected methods
+/**
+ * parser server request parameters
+ */
+int op_keys::_parse_server_parameter() {
+ char* p;
+ if (this->_connection->readline(&p) < 0) {
+ return -1;
+ }
+
+ int n = 0;
+ char q[BUFSIZ];
+ string last_parameter;
+ for (;;) {
+ n += util::next_word(p + n, q, sizeof(q));
+ if (q[0] == '\0') {
+ break;
+ }
+ storage::entry e;
+ last_parameter = e.key = q;
+ log_debug("storing key [%s]", e.key.c_str());
+ this->_entry_list.push_back(e);
+ }
+ _delete_(p);
+
+ if (this->_entry_list.size() < 2) {
+ log_debug("not enough parameter [%d]", this->_entry_list.size());
+ return -1;
+ }
+
+ // remove last parameter and assume it as a limit parameter
+ this->_entry_list.pop_back();
+ int limit;
+ try {
+ limit = lexical_cast<int>(last_parameter);
+ if (limit <= 0) {
+ log_debug("invalid limit [%d]", limit);
+ return -1;
+ }
+ this->_parameter = reinterpret_cast<void*>(limit);
+ log_debug("storing limit [%d]", limit);
+ } catch (bad_lexical_cast e) {
+ log_debug("invalid limit [%s]", last_parameter.c_str());
+ return -1;
+ }
+
+ log_debug("found %d keys", this->_entry_list.size());
+
+ return 0;
+}
+
+int op_keys::_run_server() {
+ // this->_entry_list for response
+ int limit = reinterpret_cast<intptr_t>(this->_parameter);
+
+ map<string, shared_queue_proxy_read> q_map;
+ map<string, storage::result> r_map;
+ map<string, vector<string> > s_map;
+ vector<string> key_list;
+
+ for (list<storage::entry>::iterator it = this->_entry_list.begin(); it != this->_entry_list.end(); it++) {
+ stats_object->increment_cmd_get();
+
+ shared_queue_proxy_read q;
+ cluster::proxy_request r_proxy = this->_cluster->pre_proxy_read(this, *it, this->_parameter, q);
+ if (r_proxy == cluster::proxy_request_complete) {
+ q_map[it->key] = q;
+ } else if (r_proxy == cluster::proxy_request_error_enqueue) {
+ log_warning("proxy error (key=%s) -> continue processing (pretending not found)", it->key.c_str());
+ r_map[it->key] = storage::result_not_found;
+ } else if (r_proxy == cluster::proxy_request_error_partition) {
+ log_warning("partition error (key=%s) -> continue processing (pretending not found)", it->key.c_str());
+ r_map[it->key] = storage::result_not_found;
+ } else {
+ if (this->_storage->is_capable(storage::capability_prefix_search) == false) {
+ return this->_send_result(result_server_error, "not capable on current storage type");
+ }
+
+ // storage i/o
+ vector<string> tmp_list;
+ log_debug("key prefix search (key=%s, limit=%d)", it->key.c_str(), limit);
+ if (this->_storage->get_key(it->key, limit, tmp_list) < 0) {
+ log_warning("storage i/o error (key=%s) -> continue processing (pretending not found)", it->key.c_str());
+ r_map[it->key] = storage::result_not_found;
+ }
+ log_debug("found %d key(s)", tmp_list.size());
+
+ s_map[it->key] = tmp_list;
+ r_map[it->key] = storage::result_found;
+ }
+ }
+
+ for (list<storage::entry>::iterator it = this->_entry_list.begin(); it != this->_entry_list.end(); it++) {
+ if (q_map.count(it->key) > 0) {
+ shared_queue_proxy_read q = q_map[it->key];
+ q->sync();
+ list<storage::entry>& e = q->get_entry_list();
+
+ ostringstream s;
+ for (list<storage::entry>::iterator it = e.begin(); it != e.end(); it++) {
+ s << "KEY " << it->key << line_delimiter;
+ }
+ this->_connection->write(s.str().c_str(), s.str().size(), true);
+ } else if (r_map.count(it->key) == 0) {
+ log_warning("result map is inconsistent (key=%s)", it->key.c_str());
+ continue;
+ } else if (r_map[it->key] == storage::result_not_found) {
+ continue;
+ } else {
+ if (s_map.count(it->key) == 0) {
+ log_warning("result map is inconsistent (key=%s)", it->key.c_str());
+ continue;
+ }
+ vector<string>& e = s_map[it->key];
+ ostringstream s;
+ for (vector<string>::iterator it = e.begin(); it != e.end(); it++) {
+ s << "KEY " << *it << line_delimiter;
+ }
+ this->_connection->write(s.str().c_str(), s.str().size(), true);
+ }
+ }
+
+ this->_send_result(result_end);
+
+ return 0;
+}
+
+int op_keys::_run_client(storage::entry& e, void* parameter) {
+ // support multiple entries only
+ return -1;
+}
+
+int op_keys::_run_client(list<storage::entry>& e, void* parameter) {
+ if (e.size() == 0) {
+ log_warning("passed 0 entry...", 0);
+ return -1;
+ }
+
+ int limit = reinterpret_cast<intptr_t>(parameter);
+
+ ostringstream s;
+ s << this->get_ident();
+ for (list<storage::entry>::iterator it = e.begin(); it != e.end(); it++) {
+ s << " " << it->key;
+ }
+ s << " " << limit;
+
+ return this->_send_request(s.str().c_str());
+}
+
+int op_keys::_parse_client_parameter(storage::entry& e) {
+ // support multiple entries only
+ return -1;
+}
+
+int op_keys::_parse_client_parameter(list<storage::entry>& e) {
+ e.clear();
+
+ for (;;) {
+ char* p;
+ if (this->_connection->readline(&p) < 0) {
+ return -1;
+ }
+
+ if (strcmp(p, "END\n") == 0) {
+ _delete_(p);
+ break;
+ }
+
+ char q[BUFSIZ];
+ int n = util::next_word(p, q, sizeof(q));
+ if (strcmp(q, "KEY") != 0) {
+ log_debug("invalid token (q=%s)", q);
+ _delete_(p);
+ return -1;
+ }
+
+ storage::entry e_tmp;
+ n += util::next_word(p + n, q, sizeof(q));
+ if (q[0] == '\0') {
+ log_debug("no key strings found", 0);
+ _delete_(p);
+ return -1;
+ }
+ e_tmp.key = q;
+
+ e.push_back(e_tmp);
+
+ _delete_(p);
+ }
+
+ return 0;
+}
+// }}}
+
+// {{{ private methods
+// }}}
+
+} // namespace flare
+} // namespace gree
+
+// vim: foldmethod=marker tabstop=2 shiftwidth=2 autoindent
43 src/lib/op_keys.h
View
@@ -0,0 +1,43 @@
+/**
+ * op_keys.h
+ *
+ * @author Masaki Fujimoto <fujimoto@php.net>
+ *
+ * $Id$
+ */
+#ifndef __OP_KEYS_H__
+#define __OP_KEYS_H__
+
+#include "op_proxy_read.h"
+#include "storage.h"
+
+using namespace std;
+using namespace boost;
+
+namespace gree {
+namespace flare {
+
+/**
+ * opcode class (keys)
+ */
+class op_keys : public op_proxy_read {
+protected:
+
+public:
+ op_keys(shared_connection c, cluster* cl, storage* st);
+ virtual ~op_keys();
+
+protected:
+ int _parse_server_parameter();
+ int _run_server();
+ int _run_client(storage::entry&e, void* parameter);
+ int _run_client(list<storage::entry>& e, void* parameter);
+ int _parse_client_parameter(storage::entry& e);
+ int _parse_client_parameter(list<storage::entry>& e);
+};
+
+} // namespace flare
+} // namespace gree
+
+#endif // __OP_KEYS_H__
+// vim: foldmethod=marker tabstop=2 shiftwidth=2 autoindent
1  src/lib/op_parser.h
View
@@ -21,6 +21,7 @@
#include "op_get.h"
#include "op_gets.h"
#include "op_incr.h"
+#include "op_keys.h"
#include "op_kill.h"
#include "op_meta.h"
#include "op_node_add.h"
13 src/lib/op_proxy_read.cc
View
@@ -20,6 +20,7 @@ op_proxy_read::op_proxy_read(shared_connection c, string ident, cluster* cl, sto
op(c, ident),
_cluster(cl),
_storage(st) {
+ this->_is_multiple_response = false;
}
/**
@@ -36,8 +37,8 @@ op_proxy_read::~op_proxy_read() {
/**
* send client request
*/
-int op_proxy_read::run_client(storage::entry& e) {
- if (this->_run_client(e) < 0) {
+int op_proxy_read::run_client(storage::entry& e, void* parameter) {
+ if (this->_run_client(e, parameter) < 0) {
return -1;
}
@@ -47,8 +48,8 @@ int op_proxy_read::run_client(storage::entry& e) {
/**
* send client request
*/
-int op_proxy_read::run_client(list<storage::entry>& e) {
- if (this->_run_client(e) < 0) {
+int op_proxy_read::run_client(list<storage::entry>& e, void* parameter) {
+ if (this->_run_client(e, parameter) < 0) {
return -1;
}
@@ -68,11 +69,11 @@ int op_proxy_read::_run_server() {
return 0;
}
-int op_proxy_read::_run_client(storage::entry& e) {
+int op_proxy_read::_run_client(storage::entry& e, void* parameter) {
return 0;
}
-int op_proxy_read::_run_client(list<storage::entry>& e) {
+int op_proxy_read::_run_client(list<storage::entry>& e, void* parameter) {
return 0;
}
12 src/lib/op_proxy_read.h
View
@@ -28,21 +28,25 @@ class op_proxy_read : public op {
cluster* _cluster;
storage* _storage;
list<storage::entry> _entry_list;
+ void* _parameter;
+ bool _is_multiple_response;
public:
op_proxy_read(shared_connection c, string ident, cluster* cl, storage* st);
virtual ~op_proxy_read();
- virtual int run_client(storage::entry& e);
- virtual int run_client(list<storage::entry>& entry_list);
+ bool is_multiple_response() { return this->_is_multiple_response; };
+
+ virtual int run_client(storage::entry& e, void* parameter);
+ virtual int run_client(list<storage::entry>& entry_list, void* parameter);
list<storage::entry>& get_entry_list() { return this->_entry_list; };
protected:
virtual int _parse_server_parameter();
virtual int _run_server();
- virtual int _run_client(storage::entry& e);
- virtual int _run_client(list<storage::entry>& e);
+ virtual int _run_client(storage::entry& e, void* parameter);
+ virtual int _run_client(list<storage::entry>& e, void* parameter);
virtual int _parse_client_parameter(storage::entry& e);
virtual int _parse_client_parameter(list<storage::entry>& e);
};
19 src/lib/queue_proxy_read.cc
View
@@ -11,6 +11,7 @@
#include "op_proxy_read.h"
#include "op_get.h"
#include "op_gets.h"
+#include "op_keys.h"
namespace gree {
namespace flare {
@@ -19,12 +20,13 @@ namespace flare {
/**
* ctor for queue_proxy_read
*/
-queue_proxy_read::queue_proxy_read(cluster* cl, storage* st, vector<string> proxy, storage::entry entry, string op_ident):
+queue_proxy_read::queue_proxy_read(cluster* cl, storage* st, vector<string> proxy, storage::entry entry, void* parameter, string op_ident):
thread_queue("proxy_read"),
_cluster(cl),
_storage(st),
_proxy(proxy),
_entry(entry),
+ _parameter(parameter),
_op_ident(op_ident),
_result(op::result_none),
_result_message("") {
@@ -51,9 +53,18 @@ int queue_proxy_read::run(shared_connection c) {
p->set_proxy(this->_proxy);
int retry = queue_proxy_read::max_retry;
+ if (p->is_multiple_response()) {
+ this->_entry_list.push_back(this->_entry);
+ }
while (retry > 0) {
- if (p->run_client(this->_entry) >= 0) {
- break;
+ if (p->is_multiple_response()) {
+ if (p->run_client(this->_entry_list, this->_parameter) >= 0) {
+ break;
+ }
+ } else {
+ if (p->run_client(this->_entry, this->_parameter) >= 0) {
+ break;
+ }
}
if (c->is_available() == false) {
log_debug("reconnecting (host=%s, port=%d)", c->get_host().c_str(), c->get_port());
@@ -81,6 +92,8 @@ op_proxy_read* queue_proxy_read::_get_op(string op_ident, shared_connection c) {
return _new_ op_get(c, this->_cluster, this->_storage);
} else if (op_ident == "gets") {
return _new_ op_gets(c, this->_cluster, this->_storage);
+ } else if (op_ident == "keys") {
+ return _new_ op_keys(c, this->_cluster, this->_storage);
}
log_warning("unknown op (ident=%s)", op_ident.c_str());
7 src/lib/queue_proxy_read.h
View
@@ -8,6 +8,8 @@
#ifndef __QUEUE_PROXY_READ_H__
#define __QUEUE_PROXY_READ_H__
+#include <list>
+
#include "cluster.h"
#include "storage.h"
#include "thread_queue.h"
@@ -31,6 +33,8 @@ class queue_proxy_read : public thread_queue {
storage* _storage;
vector<string> _proxy;
storage::entry _entry;
+ list<storage::entry> _entry_list;
+ void* _parameter;
string _op_ident;
op::result _result;
string _result_message;
@@ -38,13 +42,14 @@ class queue_proxy_read : public thread_queue {
public:
static const int max_retry = 4;
- queue_proxy_read(cluster* cl, storage* st, vector<string> proxy, storage::entry entry, string op_ident);
+ queue_proxy_read(cluster* cl, storage* st, vector<string> proxy, storage::entry entry, void* parameter, string op_ident);
virtual ~queue_proxy_read();
virtual int run(shared_connection c);
op::result get_result() { return this->_result; };
string get_result_message() { return this->_result_message; };
storage::entry& get_entry() { return this->_entry; };
+ list<storage::entry>& get_entry_list() { return this->_entry_list; };
protected:
op_proxy_read* _get_op(string op_ident, shared_connection c);
7 src/lib/storage.h
View
@@ -61,6 +61,11 @@ class storage {
type_tcb,
};
+ enum capability {
+ capability_prefix_search,
+ capability_list,
+ };
+
enum compress {
compress_none,
compress_deflate,
@@ -263,8 +268,10 @@ class storage {
virtual int iter_end() = 0;
virtual uint32_t count() = 0;
virtual uint64_t size() = 0;
+ virtual int get_key(string key, int limit, vector<string>& r) { return -1; };
virtual type get_type() = 0;
+ virtual bool is_capable(capability c) = 0;
static inline int option_cast(string s, option& r) {
if (s == "") {
33 src/lib/storage_tcb.cc
View
@@ -612,6 +612,39 @@ uint32_t storage_tcb::count() {
uint64_t storage_tcb::size() {
return tcbdbfsiz(this->_db);
}
+
+int storage_tcb::get_key(string key, int limit, vector<string>& r) {
+ TCLIST* key_list = tcbdbfwmkeys(this->_db, key.c_str(), key.size(), limit);
+
+ int i;
+ for (i = 0; i < tclistnum(key_list); i++) {
+ int n;
+ const char* p = static_cast<const char*>(tclistval(key_list, i, &n));
+ if (p == NULL) {
+ break;
+ }
+ string tmp_key = p;
+ r.push_back(tmp_key);
+ }
+
+ tclistdel(key_list);
+
+ return 0;
+}
+
+bool storage_tcb::is_capable(capability c) {
+ switch (c) {
+ case capability_prefix_search:
+ return true;
+ case capability_list:
+ return true;
+ default:
+ // nop
+ break;
+ }
+
+ return false;
+}
// }}}
// {{{ protected methods
2  src/lib/storage_tcb.h
View
@@ -55,8 +55,10 @@ class storage_tcb : public storage {
virtual int iter_end();
virtual uint32_t count();
virtual uint64_t size();
+ virtual int get_key(string key, int limit, vector<string>& r);
virtual type get_type() { return this->_type; };
+ virtual bool is_capable(capability c);
protected:
virtual int _get_header(string key, entry& e);
4 src/lib/storage_tch.cc
View
@@ -590,6 +590,10 @@ uint32_t storage_tch::count() {
uint64_t storage_tch::size() {
return tchdbfsiz(this->_db);
}
+
+bool storage_tch::is_capable(capability c) {
+ return false;
+}
// }}}
// {{{ protected methods
1  src/lib/storage_tch.h
View
@@ -57,6 +57,7 @@ class storage_tch : public storage {
virtual uint64_t size();
virtual type get_type() { return this->_type; };
+ virtual bool is_capable(capability c);
protected:
virtual int _get_header(string key, entry& e);
Please sign in to comment.
Something went wrong with that request. Please try again.