Skip to content

Commit

Permalink
server_send: extend control structures and add write timeout, set it …
Browse files Browse the repository at this point in the history
…for write sessions
  • Loading branch information
bioothod committed Apr 25, 2016
1 parent 995ef03 commit 519b8f4
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 9 deletions.
6 changes: 5 additions & 1 deletion example/eblob_backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -998,12 +998,16 @@ static int blob_send(struct eblob_backend_config *cfg, void *state, struct dnet_
cmd->flags |= DNET_FLAGS_NEED_ACK;

backend_id = cfg->data.stat_id;
ctl = dnet_server_send_alloc(state, cmd, req->iflags, groups, req->group_num, backend_id);
ctl = dnet_server_send_alloc(state, cmd, groups, req->group_num);
if (!ctl) {
err = -ENOMEM;
goto err_out_exit;
}

ctl->iflags = req->iflags;
ctl->backend_id = backend_id;
ctl->timeout = req->timeout;

/*
* Deliberately clear NEED_ACK bit
* This function will iterate over provided ids,
Expand Down
6 changes: 4 additions & 2 deletions include/elliptics/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,10 @@ struct dnet_vm_stat {
int dnet_get_vm_stat(dnet_logger *l, struct dnet_vm_stat *st);

struct dnet_server_send_ctl;
struct dnet_server_send_ctl *dnet_server_send_alloc(void *state, struct dnet_cmd *cmd, uint64_t iflags,
int *groups, int group_num, int backend_id);

// all other fields of @dnet_server_send_ctl must be set manually, no need to increase number of parameters
// groups are here since they will be allocated as one chunk with control structure itself
struct dnet_server_send_ctl *dnet_server_send_alloc(void *state, struct dnet_cmd *cmd, int *groups, int group_num);
struct dnet_server_send_ctl *dnet_server_send_get(struct dnet_server_send_ctl *ctl);
int dnet_server_send_put(struct dnet_server_send_ctl *ctl);
int dnet_server_send_write(struct dnet_server_send_ctl *send,
Expand Down
7 changes: 6 additions & 1 deletion include/elliptics/packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ struct dnet_iterator_request
uint64_t flags; /* DNET_IFLAGS_* */
uint32_t group_num; /* Number of remote groups to send iterated data for server-send
iterator type */
uint32_t __reserved32;
uint32_t timeout; /* write session timeout */
uint64_t reserved[4];
} __attribute__ ((packed));

Expand All @@ -1103,6 +1103,7 @@ static inline void dnet_convert_iterator_request(struct dnet_iterator_request *r
r->action = dnet_bswap32(r->action);
r->range_num = dnet_bswap64(r->range_num);
r->group_num = dnet_bswap32(r->group_num);
r->timeout = dnet_bswap32(r->timeout);
dnet_convert_time(&r->time_begin);
dnet_convert_time(&r->time_end);
}
Expand Down Expand Up @@ -1243,13 +1244,17 @@ static inline void dnet_convert_monitor_stat_request(struct dnet_monitor_stat_re
struct dnet_server_send_request {
int id_num;
int group_num;
int timeout;
int __reserved32;
uint64_t iflags;
uint64_t __reserved[9];
};

static inline void dnet_convert_server_send_request(struct dnet_server_send_request *req)
{
req->id_num = dnet_bswap32(req->id_num);
req->group_num = dnet_bswap32(req->group_num);
req->timeout = dnet_bswap32(req->timeout);
req->iflags = dnet_bswap64(req->iflags);
}

Expand Down
14 changes: 9 additions & 5 deletions library/dnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,7 @@ static int dnet_iterator_server_send_complete(struct dnet_addr *addr, struct dne
return err;
}

struct dnet_server_send_ctl *dnet_server_send_alloc(void *state, struct dnet_cmd *cmd, uint64_t iflags,
int *groups, int group_num, int backend_id)
struct dnet_server_send_ctl *dnet_server_send_alloc(void *state, struct dnet_cmd *cmd, int *groups, int group_num)
{
int err;
struct dnet_net_state *st = state;
Expand All @@ -589,8 +588,6 @@ struct dnet_server_send_ctl *dnet_server_send_alloc(void *state, struct dnet_cmd
ctl->state = state;
ctl->cmd = *cmd;
ctl->bytes_pending_max = DNET_SERVER_SEND_WATERMARK_HIGH;
ctl->iflags = iflags;
ctl->backend_id = backend_id;
ctl->groups = (int *)(ctl + 1);
memcpy(ctl->groups, groups, sizeof(int) * group_num);
ctl->group_num = group_num;
Expand Down Expand Up @@ -760,6 +757,9 @@ int dnet_server_send_write(struct dnet_server_send_ctl *send,

dnet_session_set_trace_id(s, send->cmd.trace_id);
dnet_session_set_trace_bit(s, !!(send->cmd.flags & DNET_FLAGS_TRACE_BIT));
if (send->timeout)
dnet_session_set_timeout(s, send->timeout);

if (dnet_session_get_timeout(s)->tv_sec < 60)
dnet_session_set_timeout(s, 60);

Expand Down Expand Up @@ -1162,14 +1162,18 @@ static int dnet_iterator_start(struct dnet_backend_io *backend, struct dnet_net_
* dnet_cmd, thus it will store command structure without NEED_ACK bit.
*/
cmd->flags &= ~DNET_FLAGS_NEED_ACK;
sspriv = dnet_server_send_alloc(st, cmd, ireq->flags, dst_groups, ireq->group_num, backend->backend_id);
sspriv = dnet_server_send_alloc(st, cmd, dst_groups, ireq->group_num);
cmd->flags |= DNET_FLAGS_NEED_ACK;

if (!sspriv) {
err = -ENOMEM;
goto err_out_exit;
}

sspriv->timeout = ireq->timeout;
sspriv->iflags = ireq->flags;
sspriv->backend_id = backend->backend_id;

cpriv.next_callback = dnet_iterator_callback_server_send;
cpriv.next_private = sspriv;
break;
Expand Down
2 changes: 2 additions & 0 deletions library/elliptics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,8 @@ struct dnet_server_send_ctl {
int *groups; /* Groups to send WRITE commands */
int group_num;

int timeout; /* write timeout */

pthread_mutex_t write_lock; /* Lock for @write_wait */
pthread_cond_t write_wait; /* Waiting for pending writes */
long bytes_pending_max; /* maximum size of the 'queue' of write requests */
Expand Down

0 comments on commit 519b8f4

Please sign in to comment.