Skip to content

Commit

Permalink
new bulk_read in python binding
Browse files Browse the repository at this point in the history
  • Loading branch information
abudnik committed Mar 17, 2017
1 parent 1e07997 commit ecccd51
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 7 deletions.
97 changes: 90 additions & 7 deletions bindings/python/elliptics_session.cpp
Expand Up @@ -577,13 +577,6 @@ class elliptics_session: public session, public bp::wrapper<session> {
return create_result(std::move(session::remove(transform(id).id())));
}

struct dnet_id_comparator {
bool operator() (const struct dnet_id &first, const struct dnet_id &second) const
{
return memcmp(first.id, second.id, sizeof(first.id)) < 0;
}
};

python_read_result bulk_read(const bp::api::object &keys) {
std::vector<dnet_io_attr> ios;
ios.reserve(bp::len(keys));
Expand Down Expand Up @@ -815,6 +808,45 @@ class elliptics_session: public python::elliptics_session {
newapi::session{*this}.server_send(std_keys, flags, chunk_size, src_group, std_dst_groups)
);
}

python_read_result bulk_read_json(const bp::api::object &keys) {
std::vector<dnet_id> std_keys;
std_keys.reserve(bp::len(keys));

for (bp::stl_input_iterator<elliptics_id> it(keys), end; it != end; ++it) {
std_keys.emplace_back(it->id());
}

return create_result(
newapi::session{*this}.bulk_read_json(std_keys)
);
}

python_read_result bulk_read_data(const bp::api::object &keys) {
std::vector<dnet_id> std_keys;
std_keys.reserve(bp::len(keys));

for (bp::stl_input_iterator<elliptics_id> it(keys), end; it != end; ++it) {
std_keys.emplace_back(it->id());
}

return create_result(
newapi::session{*this}.bulk_read_data(std_keys)
);
}

python_read_result bulk_read(const bp::api::object &keys) {
std::vector<dnet_id> std_keys;
std_keys.reserve(bp::len(keys));

for (bp::stl_input_iterator<elliptics_id> it(keys), end; it != end; ++it) {
std_keys.emplace_back(it->id());
}

return create_result(
newapi::session{*this}.bulk_read(std_keys)
);
}
};
} /* namespace newapi */

Expand Down Expand Up @@ -1898,6 +1930,57 @@ void init_elliptics_session() {
.def("server_send", &newapi::elliptics_session::server_send,
bp::args("keys", "flags", "chunk_size", "src_group", "dst_groups"))

.def("bulk_read_json", &newapi::elliptics_session::bulk_read_json,
(bp::arg("keys")),
"bulk_read_json(keys)\n"
" Read json for all specified keys from multiple groups. Multiple read requests to the same\n"
" node are merged together into a single request.\n"
" Return elliptics.AsyncResult.\n"
" -- keys - elliptics.Id\n\n"
" keys = []\n"
" keys.append(elliptics.Id([0] * 64, 1))\n"
" keys.append(elliptics.Id([1] * 64, 2))\n\n"
" result = session.bulk_read_json(keys)\n"
" for read_result in result:\n"
" print ('key: {}, json: {}, status: {}'\n"
" .format(read_result.id,\n"
" read_result.json,\n"
" read_result.status))\n")

.def("bulk_read_data", &newapi::elliptics_session::bulk_read_data,
(bp::arg("keys")),
"bulk_read_data(keys)\n"
" Read data for all specified keys from multiple groups. Multiple read requests to the same\n"
" node are merged together into a single request.\n"
" Return elliptics.AsyncResult.\n"
" -- keys - elliptics.Id\n\n"
" keys = []\n"
" keys.append(elliptics.Id([0] * 64, 1))\n"
" keys.append(elliptics.Id([1] * 64, 2))\n\n"
" result = session.bulk_read_data(keys)\n"
" for read_result in result:\n"
" print ('key: {}, data: {}, status: {}'\n"
" .format(read_result.id,\n"
" read_result.data,\n"
" read_result.status))\n")

.def("bulk_read", &newapi::elliptics_session::bulk_read,
(bp::arg("keys")),
"bulk_read(keys)\n"
" Read both json and data for all specified keys from multiple groups. Multiple read requests\n"
" to the same node are merged together into a single request.\n"
" Return elliptics.AsyncResult.\n"
" -- keys - elliptics.Id\n\n"
" keys = []\n"
" keys.append(elliptics.Id([0] * 64, 1))\n"
" keys.append(elliptics.Id([1] * 64, 2))\n\n"
" result = session.bulk_read(keys)\n"
" for read_result in result:\n"
" print ('key: {}, json: {}, data: {}, status: {}'\n"
" .format(read_result.id,\n"
" read_result.json,\n"
" read_result.data,\n"
" read_result.status))\n")
;
}

Expand Down
9 changes: 9 additions & 0 deletions bindings/python/result_entry.cpp
Expand Up @@ -258,6 +258,13 @@ bp::object lookup_result_get_record_info(const newapi::lookup_result_entry &resu
return bp::object(result.record_info());
}

elliptics_id read_result_get_id(const newapi::read_result_entry &result)
{
dnet_id id;
memcpy(&id, &result.command()->id, sizeof(struct dnet_id));
return elliptics_id(id);
}

bp::object read_result_get_record_info(const newapi::read_result_entry &result) {
if (result.status()) {
return bp::object();
Expand Down Expand Up @@ -534,6 +541,8 @@ void init_result_entry() {
bp::class_<newapi::read_result_entry, bp::bases<newapi::callback_result_entry>>("ReadResultEntry",
"Result of read which contains information of read key and read json and/or data.",
bp::no_init)
.add_property("id", newapi::read_result_get_id,
"elliptics.Id of read object")
.add_property("record_info", newapi::read_result_get_record_info,
"Information of read key.")
.add_property("io_info", newapi::read_result_get_io_info,
Expand Down
55 changes: 55 additions & 0 deletions tests/pytests/test_newapi.py
Expand Up @@ -132,3 +132,58 @@ def test_session_timestamps(simple_node):
result = session.write(key, json_string, len(json_string), data, len(data)).get()[0]
assert elliptics.Time.now() > result.record_info.data_timestamp > json_ts
assert result.record_info.json_timestamp == json_ts

@pytest.mark.usefixtures('servers')
def test_session_bulk_read(simple_node):
"""Test bulk_read_json, bulk_read_data, bulk_read."""
session = elliptics.newapi.Session(simple_node)
session.trace_id = make_trace_id('test_session_bulk_read')
session.groups = session.routes.groups()

# prepare test data
keys = []
datas = {}
def make_item(data, json):
return {"data": data, "json": json}

write_results = []
for group_id in session.groups:
session.groups = [group_id]
for i in range(10):
eid = session.transform('k{}'.format(i))
eid.group_id = group_id
keys.append(eid)
data = "data{}_{}".format(group_id, i)
json_string = json.dumps({'some': "json{}_{}".format(group_id, i)})
datas[repr(eid)] = make_item(data, json_string)
result = session.write(eid, json_string, len(json_string), data, len(data))
write_results.append(result)

for r in write_results:
assert r.get()[0].status == 0

assert len(keys) == len(datas)

# check bulk_read_json, bulk_read_data, bulk_read
def check_result(method, check_json, check_data):
result = method(keys)
counter = 0
for r in result:
counter += 1
assert repr(r.id) in datas
ref = datas[repr(r.id)]

if check_json:
assert ref["json"] == r.json
else:
assert not r.json

if check_data:
assert ref["data"] == r.data
else:
assert not r.data
assert counter == len(keys)

check_result(session.bulk_read_json, True, False)
check_result(session.bulk_read_data, False, True)
check_result(session.bulk_read, True, True)

0 comments on commit ecccd51

Please sign in to comment.