Skip to content

Commit

Permalink
Merge pull request #636 from abudnik/oplock
Browse files Browse the repository at this point in the history
New oplocks: get request from queue, whose key id is not already locked
  • Loading branch information
bioothod committed Aug 17, 2015
2 parents 8b1b367 + f322e1b commit e0037c8
Show file tree
Hide file tree
Showing 15 changed files with 758 additions and 503 deletions.
5 changes: 3 additions & 2 deletions cache/slru_cache.cpp
Expand Up @@ -19,6 +19,7 @@
#endif

#include "slru_cache.hpp"
#include "library/request_queue.h"
#include <cassert>

#include "monitor/measure_points.h"
Expand Down Expand Up @@ -772,7 +773,7 @@ void slru_cache_t::life_check(void) {
memcpy(id.id, elem->id().id, DNET_ID_SIZE);

TIMER_START("life_check.sync_iterate.dnet_oplock");
dnet_oplock(m_node, &id);
dnet_oplock(m_backend, &id);
TIMER_STOP("life_check.sync_iterate.dnet_oplock");

// sync_element uses local_session which always uses DNET_FLAGS_NOLOCK
Expand All @@ -781,7 +782,7 @@ void slru_cache_t::life_check(void) {
elem->set_sync_state(data_t::sync_state_t::ERASE_PHASE);
}

dnet_opunlock(m_node, &id);
dnet_opunlock(m_backend, &id);
}
}

Expand Down
7 changes: 3 additions & 4 deletions example/monitoring-stats.txt
Expand Up @@ -6,10 +6,6 @@ io.cmd
io.cmd.{<any elliptics command type in upper case>}
time of commands for each type

io.cmd.{<any elliptics command type in upper case>}.lock_time
time spent in id lock waiting, for each command type,
(this will be 0 for commands with DNET_FLAGS_NOLOCK flag set)

io.cmd_recursive
time of commands that originate from other commands execution, overall for entire elliptics command stream
(right now only BULK_READ generate subcommands like that)
Expand Down Expand Up @@ -53,5 +49,8 @@ pool.{sys,<backend id>}.{blocking,nonblocking}.queue.size
pool.{sys,<backend id>}.{blocking,nonblocking}.queue.wait_time
time commands spent waiting in a pool's input queue

pool.{sys,<backend id>}.{blocking,nonblocking}.search_trans_time
time spent in searching of available request (with non-blocked key) in pool's queue

pool.{sys,<backend id>}.{blocking,nonblocking}.active_threads
tracks pool's thread utilization (how many threads are actually working)
2 changes: 1 addition & 1 deletion library/CMakeLists.txt
Expand Up @@ -9,6 +9,7 @@ set(ELLIPTICS_CLIENT_SRCS
node.c
notify_common.c
pool.c
request_queue.cpp
rbtree.c
trans.c
tests.c
Expand All @@ -17,7 +18,6 @@ set(ELLIPTICS_CLIENT_SRCS
)
set(ELLIPTICS_SRCS
dnet.c
locks.c
notify.c
server.c
route.cpp
Expand Down
42 changes: 16 additions & 26 deletions library/dnet.c
Expand Up @@ -36,6 +36,7 @@
#include <unistd.h>

#include "elliptics.h"
#include "request_queue.h"
#include "monitor/monitor.h"

#include "elliptics/packet.h"
Expand Down Expand Up @@ -748,6 +749,8 @@ static int dnet_cmd_bulk_read(struct dnet_backend_io *backend, struct dnet_net_s
struct dnet_io_attr *ios = io + 1;
uint64_t count = 0;
uint64_t i;
int use_oplock;
struct dnet_id lock_id = { .group_id = cmd->id.group_id };

struct dnet_cmd read_cmd = *cmd;
read_cmd.size = sizeof(struct dnet_io_attr);
Expand All @@ -757,39 +760,34 @@ static int dnet_cmd_bulk_read(struct dnet_backend_io *backend, struct dnet_net_s
dnet_convert_io_attr(io);
count = io->size / sizeof(struct dnet_io_attr);

if (count > 0) {
cmd->flags &= ~DNET_FLAGS_NEED_ACK;
}

/*
* we have to drop io lock, otherwise it will be grabbed again in dnet_process_cmd_raw() being recursively called
* Lock will be taken again after loop has been finished
*/
if (!(cmd->flags & DNET_FLAGS_NOLOCK)) {
dnet_opunlock(st->n, &cmd->id);
}

dnet_log(st->n, DNET_LOG_NOTICE, "%s: starting BULK_READ for %d commands",
dnet_dump_id(&cmd->id), (int) count);

for (i = 0; i < count; i++) {
/*
* First key is already locked by request_queue::take_request().
* Check that i-th key is not equal to the first key.
*/
use_oplock = (i > 0) && !(cmd->flags & DNET_FLAGS_NOLOCK) && !dnet_id_cmp_str((const unsigned char *)&ios[i].id, (const unsigned char *)&cmd->id.id);
if (use_oplock) {
memcpy(&lock_id.id, &ios[i].id, DNET_ID_SIZE);
dnet_oplock(backend, &lock_id);
}

ret = dnet_process_cmd_raw(backend, st, &read_cmd, &ios[i], 1);
dnet_log(st->n, DNET_LOG_NOTICE, "%s: processing BULK_READ.READ for %d/%d command, err: %d",
dnet_dump_id(&cmd->id), (int) i, (int) count, ret);

if (i + 1 == count)
cmd->flags |= DNET_FLAGS_NEED_ACK;
if (use_oplock) {
dnet_opunlock(backend, &lock_id);
}

if (!ret)
err = 0;
else if (err == -1)
err = ret;
}

if (!(cmd->flags & DNET_FLAGS_NOLOCK)) {
dnet_oplock(st->n, &cmd->id);
}

return err;
}

Expand Down Expand Up @@ -1044,11 +1042,6 @@ int dnet_process_cmd_raw(struct dnet_backend_io *backend, struct dnet_net_state
HANDY_TIMER_SCOPE(recursive ? "io.cmd_recursive" : "io.cmd");
FORMATTED(HANDY_TIMER_SCOPE, ("io.cmd%s.%s", (recursive ? "_recursive" : ""), dnet_cmd_string(cmd->cmd)));

if (!(cmd->flags & DNET_FLAGS_NOLOCK)) {
FORMATTED(HANDY_TIMER_SCOPE, ("io.cmd.%s.lock_time", dnet_cmd_string(cmd->cmd)));
dnet_oplock(n, &cmd->id);
}

gettimeofday(&start, NULL);

err = dnet_process_cmd_without_backend_raw(st, cmd, data);
Expand Down Expand Up @@ -1119,9 +1112,6 @@ int dnet_process_cmd_raw(struct dnet_backend_io *backend, struct dnet_net_state

err = dnet_send_ack(st, cmd, err, recursive);

if (!(cmd->flags & DNET_FLAGS_NOLOCK))
dnet_opunlock(n, &cmd->id);

dnet_stat_inc(st->stat, cmd->cmd, err);
if (st->__join_state == DNET_JOIN)
dnet_counter_inc(n, cmd->cmd, err);
Expand Down
31 changes: 4 additions & 27 deletions library/elliptics.h
Expand Up @@ -421,7 +421,8 @@ enum dnet_work_io_mode {

struct dnet_work_pool;
struct dnet_work_io {
struct list_head list;
struct list_head reply_list;
struct list_head request_list;
int thread_index;
uint64_t trans;
pthread_t tid;
Expand Down Expand Up @@ -450,11 +451,10 @@ struct dnet_work_pool {
struct dnet_backend_io *io;
int mode;
int num;
struct list_head list;
struct list_stat list_stats;
pthread_mutex_t lock;
pthread_cond_t wait;
struct dnet_work_io *wio_list;

void *request_queue;
};

struct dnet_work_pool_place
Expand Down Expand Up @@ -520,28 +520,6 @@ void dnet_io_exit(struct dnet_node *n);

void dnet_io_req_free(struct dnet_io_req *r);

struct dnet_locks_entry {
struct rb_node lock_tree_entry;
struct list_head lock_list_entry;
pthread_mutex_t lock;
pthread_cond_t wait;
struct dnet_id id;
int locked;
atomic_t refcnt;
};

struct dnet_locks {
struct list_head lock_list;
struct rb_root lock_tree;
pthread_mutex_t lock;
};

void dnet_locks_destroy(struct dnet_node *n);
int dnet_locks_init(struct dnet_node *n, int num);
void dnet_oplock(struct dnet_node *n, struct dnet_id *key);
void dnet_opunlock(struct dnet_node *n, struct dnet_id *key);
int dnet_optrylock(struct dnet_node *n, struct dnet_id *key);

struct dnet_config_data {
void (*destroy_config_data) (struct dnet_config_data *);

Expand Down Expand Up @@ -655,7 +633,6 @@ struct dnet_node
int server_prio;
int client_prio;

struct dnet_locks *locks;
/*
* List of dnet_iterator.
* Used for iterator management e.g. pause/continue actions.
Expand Down

0 comments on commit e0037c8

Please sign in to comment.