Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mitake committed Jul 25, 2016
1 parent 96b8ee2 commit 51c7cec
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 133 deletions.
67 changes: 7 additions & 60 deletions sheep/gateway.c
Expand Up @@ -23,13 +23,6 @@ static inline void gateway_init_fwd_hdr(struct sd_req *fwd, struct sd_req *hdr)
fwd->proto_ver = SD_SHEEP_PROTO_VER;
}

struct req_iter {
uint8_t *buf;
uint32_t wlen;
uint32_t dlen;
uint64_t off;
};

static struct req_iter *prepare_replication_requests(struct request *req,
int *nr)
{
Expand Down Expand Up @@ -503,17 +496,15 @@ forward_info_advance(struct forward_info *fi, const struct node_id *nid,

static int gateway_forward_request(struct request *req)
{
int i, err_ret = SD_RES_SUCCESS;
int err_ret = SD_RES_SUCCESS;
uint64_t oid = req->rq.obj.oid;
struct sd_req hdr;
const struct sd_node *target_nodes[SD_MAX_NODES];
int nr_copies = get_req_copy_number(req), nr_reqs, nr_to_send = 0;
struct req_iter *reqs = NULL;

#ifdef HAVE_ACCELIO
struct xio_context *ctx;
struct xio_forward_info xio_fi;
#else
#ifndef HAVE_ACCELIO
int i;
unsigned wlen;
int ret;
struct forward_info fi;
Expand Down Expand Up @@ -591,54 +582,10 @@ static int gateway_forward_request(struct request *req)

#else /* HAVE_ACCELIO */

ctx = xio_context_create(NULL, 0, -1);

memset(&xio_fi, 0, sizeof(xio_fi));
xio_fi.nr_send = nr_to_send;
xio_fi.ctx = ctx;

for (i = 0; i < nr_to_send; i++) {
const struct node_id *nid = &target_nodes[i]->nid;
struct xio_forward_info_entry *fi_entry = &xio_fi.ent[i];
struct xio_session *session;
struct xio_connection *conn;
struct sd_req *copied_hdr;

fi_entry->nid = nid;
fi_entry->buf = reqs[i].buf;
fi_entry->wlen = reqs[i].wlen;
fi_entry->fi = &xio_fi;
session = sd_xio_gw_create_session(ctx, nid, fi_entry);
fi_entry->session = session;
conn = sd_xio_gw_create_connection(ctx, session, fi_entry);
fi_entry->conn = conn;

hdr.data_length = reqs[i].dlen;
hdr.obj.offset = reqs[i].off;
hdr.obj.ec_index = i;
hdr.obj.copy_policy = req->rq.obj.copy_policy;

copied_hdr = zalloc(sizeof(*copied_hdr));
if(unlikely(!copied_hdr)) {
err_ret = SD_RES_NO_MEM;
goto out;
}
memcpy(copied_hdr, &hdr, sizeof(hdr));

xio_gw_send_req(conn, copied_hdr, reqs[i].buf, sheep_need_retry,
req->rq.epoch, MAX_RETRY_COUNT);
}

xio_context_run_loop(ctx, XIO_INFINITE);

for (i = 0; i < nr_to_send; i++) {
struct xio_forward_info_entry *fi_entry = &xio_fi.ent[i];

xio_connection_destroy(fi_entry->conn);
xio_session_destroy(fi_entry->session);
}

xio_context_destroy(ctx);
err_ret = xio_send_gateway_reqs(nr_to_send, target_nodes, reqs, req);
if (err_ret)
sd_err("failed to send xio gateway requests: %s",
sd_strerror(err_ret));

#endif /* HAVE_ACCELIO */

Expand Down
10 changes: 10 additions & 0 deletions sheep/sheep_priv.h
Expand Up @@ -631,8 +631,18 @@ struct request *alloc_request(struct client_info *ci, uint32_t data_length);
void queue_request(struct request *req);
void free_request(struct request *req);

struct req_iter {
uint8_t *buf;
uint32_t wlen;
uint32_t dlen;
uint64_t off;
};

#ifdef HAVE_ACCELIO
void xio_send_reply(struct client_info *ci);

int xio_send_gateway_reqs(int nr_to_send, const struct sd_node *target_nodes[],
struct req_iter *reqs, struct request *req);
#endif

#endif
124 changes: 51 additions & 73 deletions sheep/xio_client.c
Expand Up @@ -17,6 +17,7 @@
#include "event.h"
#include "work.h"
#include "xio.h"
#include "sheep_priv.h"

#include <libxio.h>

Expand Down Expand Up @@ -254,98 +255,75 @@ int xio_exec_req(const struct node_id *nid, struct sd_req *hdr, void *data,
return 0;
}

static int gw_client_on_response(struct xio_session *session,
struct xio_msg *rsp,
int last_in_rxq,
void *cb_user_context)
{
struct xio_forward_info_entry *fi_entry =
(struct xio_forward_info_entry *)cb_user_context;
struct xio_forward_info *fi = fi_entry->fi;

struct xio_vmsg *pimsg = &rsp->in;
struct xio_iovec_ex *isglist = vmsg_sglist(pimsg);
struct xio_gateway_work {
struct work work;

int nents = vmsg_sglist_nents(pimsg), total = 0;
const struct node_id *nid;
struct sd_req hdr;

sd_debug("response on fi_entry %p", fi_entry);
uint8_t *buf;
uint32_t epoch;

for (int i = 0; i < nents; i++) {
memcpy((char *)fi_entry->buf + total,
isglist[i].iov_base, isglist[i].iov_len);

total += isglist[i].iov_len;
}
int finish_efd;
};

fi->nr_done++;
if (fi->nr_done == fi->nr_send)
xio_context_stop_loop(fi->ctx);
static void xio_gateway_work(struct work *work)
{
struct xio_gateway_work *w = container_of(work, struct xio_gateway_work, work);

return 0;
xio_exec_req(w->nid, &w->hdr, w->buf, sheep_need_retry, w->epoch, MAX_RETRY_COUNT);
}

static struct xio_session_ops gw_client_ses_ops = {
.on_session_event = on_session_event,
.on_session_established = NULL,
.on_msg = gw_client_on_response,
.on_msg_error = on_msg_error,
.assign_data_in_buf = client_assign_data_in_buf,
};

struct xio_session *sd_xio_gw_create_session(struct xio_context *ctx,
const struct node_id *nid,
void *user_ctx)
static void xio_gateway_main(struct work *work)
{
struct xio_session *session;
char url[256];
struct xio_session_params params;
struct xio_gateway_work *w = container_of(work, struct xio_gateway_work, work);

if (nid->io_transport_type == IO_TRANSPORT_TYPE_RDMA)
snprintf(url, 256, "rdma://%s",
addr_to_str(nid->io_addr, nid->io_port));
else
snprintf(url, 256, "tcp://%s",
addr_to_str(nid->io_addr, nid->io_port));
eventfd_xwrite(w->finish_efd, 1);
free(w);
}

memset(&params, 0, sizeof(params));
params.type = XIO_SESSION_CLIENT;
params.ses_ops = &gw_client_ses_ops;
params.uri = url;
params.user_context = user_ctx;
int xio_send_gateway_reqs(int nr_to_send, const struct sd_node *target_nodes[],
struct req_iter *reqs, struct request *req)
{
int efd, err_ret = 0;

session = xio_session_create(&params);
efd = eventfd(0, EFD_SEMAPHORE);
if (efd < 0) {
sd_err("failed to create event fd for notifying completion of"
" xio gateway requests: %m");
return SD_RES_SYSTEM_ERROR;
}

return session;
}
for (int i = 0; i < nr_to_send; i++) {
struct xio_gateway_work *w = zalloc(sizeof(*w));
if (!w) {
err_ret = SD_RES_NO_MEM;
sd_err("failed to allocate memory for xio gateway"
" request");

struct xio_connection *sd_xio_gw_create_connection(struct xio_context *ctx,
struct xio_session *session,
void *user_ctx)
{
struct xio_connection *conn;
struct xio_connection_params cparams;
goto out;
}

memset(&cparams, 0, sizeof(cparams));
cparams.session = session;
cparams.ctx = ctx;
cparams.conn_user_context = user_ctx;
w->nid = &target_nodes[i]->nid;
w->buf = reqs[i].buf;
w->epoch = req->rq.epoch;

conn = xio_connect(&cparams);
w->hdr.data_length = reqs[i].dlen;
w->hdr.obj.offset = reqs[i].off;
w->hdr.obj.ec_index = i;
w->hdr.obj.copy_policy = req->rq.obj.copy_policy;

return conn;
}
w->work.fn = xio_gateway_work;
w->work.done = xio_gateway_main;

void xio_gw_send_req(struct xio_connection *conn, struct sd_req *hdr,
void *data, bool (*need_retry)(uint32_t epoch),
uint32_t epoch, uint32_t max_count)
{
struct xio_msg *xreq = xzalloc(sizeof(*xreq));
struct sd_rsp *rsp = xzalloc(sizeof(*rsp));
queue_work(sys->xio_wqueue, &w->work);
}

client_msg_vec_init(xreq);
msg_prep_for_send(hdr, rsp, data, xreq);
for (int i = 0; i < nr_to_send; i++)
eventfd_xread(efd);

xio_send_request(conn, xreq);
out:
return err_ret;
}

void xio_init_main_ctx(void)
Expand Down

0 comments on commit 51c7cec

Please sign in to comment.