Skip to content

Commit

Permalink
Merge pull request #4351 from aingerson/rxd_upstream
Browse files Browse the repository at this point in the history
prov/rxd: add peer connection management
  • Loading branch information
aingerson committed Sep 5, 2018
2 parents c2e649f + 392108d commit ecf639d
Show file tree
Hide file tree
Showing 9 changed files with 881 additions and 599 deletions.
12 changes: 12 additions & 0 deletions include/ofi_list.h
Expand Up @@ -170,6 +170,18 @@ dlist_remove_first_match(struct dlist_entry *head, dlist_func_t *match,
return item;
}

static inline void dlist_insert_order(struct dlist_entry *head, dlist_func_t *order,
struct dlist_entry *entry)
{
struct dlist_entry *item;

item = dlist_find_first_match(head, order, entry);
if (item)
dlist_insert_before(entry, item);
else
dlist_insert_tail(entry, head);
}

/* splices list at the front of the list 'head'
*
* BEFORE:
Expand Down
11 changes: 6 additions & 5 deletions man/fi_rxd.7.md
Expand Up @@ -52,11 +52,12 @@ The *rxd* provider checks for the following environment variables:
: Number of times to read the core provider's CQ for a segment completion
before trying to progress sends. Default is 1000.

*FI_OFI_RXD_OOO_RDM*
: Toggles out-of-order reliability mode. This indicates that the rxd provider
can assume the core provider will not drop any packets, but might deliver
packets out of order. As a result, resending is turned off and the receiver
will reassemble all received packets. This mode is turned off by default.
*FI_OFI_RXD_RETRY*
: Toggles retrying of packets and assumes reliability of individual packets
and will reassemble all received packets. Retrying is turned on by default.

*FI_OFI_RXD_MAX_PEERS*
: Maximum number of peers the provider should prepare to track. Default: 1024

# SEE ALSO

Expand Down
179 changes: 111 additions & 68 deletions prov/rxd/src/rxd.h
Expand Up @@ -74,19 +74,19 @@
#define RXD_TX_POOL_CHUNK_CNT 1024
#define RXD_RX_POOL_CHUNK_CNT 1024
#define RXD_MAX_UNACKED 128
#define RXD_MAX_PENDING 128
#define RXD_MAX_PKT_RETRY 50

#define RXD_REMOTE_CQ_DATA (1 << 0)
#define RXD_NO_COMPLETION (1 << 1)
#define RXD_INJECT (1 << 2)
#define RXD_RETRY (1 << 3)
#define RXD_LAST (1 << 4)
#define RXD_CTRL (1 << 5)
#define RXD_INLINE (1 << 6)

struct rxd_env {
int spin_count;
int ooo_rdm;
int retry;
int max_peers;
};

extern struct rxd_env rxd_env;
Expand All @@ -105,11 +105,38 @@ struct rxd_domain {
struct fid_domain *dg_domain;

ssize_t max_mtu_sz;
ssize_t max_inline_sz;
ssize_t max_seg_sz;
int mr_mode;
struct ofi_mr_map mr_map;//TODO use util_domain mr_map instead
};

struct rxd_peer {
struct dlist_entry entry;
fi_addr_t peer_addr;
uint32_t tx_seq_no;
uint32_t rx_seq_no;
uint32_t last_ack_seq_no;
uint32_t tx_msg_id;
uint32_t rx_msg_id;
uint16_t rx_window;//constant at MAX_UNACKED for now
uint16_t tx_window;//unused for now, will be used for slow start

uint16_t unacked_cnt;
int pending_cnt;

uint32_t curr_rx_id;
uint32_t curr_tx_id;

uint8_t blocking;
struct dlist_entry tx_list;
struct dlist_entry rx_list;
struct dlist_entry unacked;
struct dlist_entry pending;
struct dlist_entry buf_ops;
struct dlist_entry buf_cq;
};

struct rxd_av {
struct util_av util_av;
struct fid_av *dg_av;
Expand Down Expand Up @@ -138,22 +165,22 @@ struct rxd_ep {
size_t tx_size;
size_t prefix_size;
uint32_t posted_bufs;
uint32_t key;
int do_local_mr;

struct util_buf_pool *tx_pkt_pool;
struct util_buf_pool *rx_pkt_pool;
struct slist rx_pkt_list;

struct rxd_tx_fs *tx_fs;
struct rxd_rx_fs *rx_fs;
struct rxd_x_fs *tx_fs;
struct rxd_x_fs *rx_fs;

struct dlist_entry tx_list;
struct dlist_entry unexp_list;
struct dlist_entry unexp_tag_list;
struct dlist_entry rx_list;
struct dlist_entry rx_tag_list;
struct dlist_entry active_rx_list;
struct dlist_entry active_peers;

struct rxd_peer peers[];
};

static inline struct rxd_domain *rxd_ep_domain(struct rxd_ep *ep)
Expand All @@ -176,28 +203,17 @@ static inline struct rxd_cq *rxd_ep_rx_cq(struct rxd_ep *ep)
return container_of(ep->util_ep.rx_cq, struct rxd_cq, util_cq);
}

enum rxd_msg_type {
RXD_RTS,
RXD_CTS,
RXD_ACK,
RXD_FREE,
};

struct rxd_x_entry {
fi_addr_t peer;
fi_addr_t peer_x_addr;
uint32_t tx_id;
uint32_t rx_id;
uint32_t key;
enum rxd_msg_type state;
uint32_t msg_id;
uint64_t bytes_done;
uint32_t next_seg_no;
uint32_t start_seq;
uint32_t window;
uint32_t next_start;
uint64_t retry_time;
uint8_t retry_cnt;
uint32_t num_segs;
uint64_t seg_size;
uint32_t op;

uint32_t flags;
uint64_t ignore;
Expand All @@ -207,69 +223,101 @@ struct rxd_x_entry {
struct fi_cq_tagged_entry cq_entry;

struct dlist_entry entry;
struct slist pkt_list;
};
DECLARE_FREESTACK(struct rxd_x_entry, rxd_tx_fs);
DECLARE_FREESTACK(struct rxd_x_entry, rxd_rx_fs);
DECLARE_FREESTACK(struct rxd_x_entry, rxd_x_fs);

#define rxd_ep_rx_flags(rxd_ep) ((rxd_ep)->util_ep.rx_op_flags)
#define rxd_ep_tx_flags(rxd_ep) ((rxd_ep)->util_ep.tx_op_flags)

struct rxd_pkt_hdr {

enum rxd_msg_type {
RXD_MSG = ofi_op_msg,
RXD_TAGGED = ofi_op_tagged,
RXD_READ_REQ = ofi_op_read_req,
RXD_READ = ofi_op_read_rsp,
RXD_WRITE = ofi_op_write,
RXD_ATOMIC = ofi_op_atomic,
RXD_ATOMIC_FETCH = ofi_op_atomic_fetch,
RXD_ATOMIC_COMPARE = ofi_op_atomic_compare,
RXD_RTS,
RXD_CTS,
RXD_ACK,
RXD_DATA,
RXD_NO_OP,
};

struct rxd_base_hdr {
uint32_t version;
uint32_t flags;
uint32_t tx_id;
uint32_t rx_id;
uint32_t key;
uint32_t seg_no;
fi_addr_t peer;
uint32_t type;
};

struct rxd_ctrl_hdr {
uint8_t type;
uint8_t window;
uint16_t seg_size;
uint32_t op;
uint64_t size;
uint64_t data;
uint64_t tag;
uint8_t source[RXD_NAME_LENGTH];
struct rxd_pkt_hdr {
uint32_t flags;
uint32_t tx_id;
uint32_t rx_id;
uint32_t msg_id;
uint32_t seg_no;
uint32_t seq_no;
fi_addr_t peer;
};

struct rxd_ctrl_pkt {
struct rxd_pkt_hdr pkt_hdr;
struct rxd_ctrl_hdr ctrl_hdr;
char data[];
struct rxd_rts_pkt {
struct rxd_base_hdr base_hdr;
uint64_t dg_addr;
uint8_t source[RXD_NAME_LENGTH];
};

struct rxd_cts_pkt {
struct rxd_base_hdr base_hdr;
uint64_t dg_addr;
uint64_t peer_addr;
};

struct rxd_ack_pkt {
struct rxd_base_hdr base_hdr;
struct rxd_pkt_hdr pkt_hdr;
//TODO fill in more fields? Selective ack?
};

struct rxd_op_pkt {
struct rxd_base_hdr base_hdr;
struct rxd_pkt_hdr pkt_hdr;

uint64_t num_segs;
uint64_t tag;
uint64_t cq_data;
uint64_t size;

char msg[];
};

struct rxd_data_pkt {
struct rxd_pkt_hdr hdr;
char data[];
struct rxd_base_hdr base_hdr;
struct rxd_pkt_hdr pkt_hdr;

char msg[];
};

struct rxd_pkt_entry {
struct dlist_entry d_entry;
struct slist_entry s_entry;//TODO - keep both or make separate tx/rx pkt structs
size_t pkt_size;
uint64_t retry_time;
uint8_t retry_cnt;
struct fi_context context;
struct fid_mr *mr;
fi_addr_t peer;
void *pkt;
};

static inline struct rxd_ctrl_pkt *rxd_get_ctrl_pkt(struct rxd_pkt_entry *pkt_entry)
{
return (struct rxd_ctrl_pkt *) (pkt_entry->pkt);
}

static inline struct rxd_data_pkt *rxd_get_data_pkt(struct rxd_pkt_entry *pkt_entry)
static inline int rxd_pkt_type(struct rxd_pkt_entry *pkt_entry)
{
return (struct rxd_data_pkt *) (pkt_entry->pkt);
return ((struct rxd_base_hdr *) (pkt_entry->pkt))->type;
}

static inline int rxd_is_ctrl_pkt(struct rxd_pkt_entry *pkt_entry)
static inline struct rxd_pkt_hdr *rxd_get_pkt_hdr(struct rxd_pkt_entry *pkt_entry)
{
return (rxd_get_ctrl_pkt(pkt_entry))->pkt_hdr.flags & RXD_CTRL;
return &((struct rxd_ack_pkt *) (pkt_entry->pkt))->pkt_hdr;
}

static inline void rxd_set_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry)
Expand Down Expand Up @@ -320,31 +368,26 @@ fi_addr_t rxd_av_fi_addr(struct rxd_av *av, fi_addr_t dg_fiaddr);

/* Pkt resource functions */
int rxd_ep_post_buf(struct rxd_ep *ep);
ssize_t rxd_ep_post_ack(struct rxd_ep *rxd_ep, struct rxd_x_entry *rx_entry);
void rxd_post_cts(struct rxd_ep *rxd_ep, struct rxd_x_entry *rx_entry,
struct rxd_pkt_entry *rts_pkt);
void rxd_release_repost_rx(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry);
void rxd_ep_send_ack(struct rxd_ep *rxd_ep, fi_addr_t peer);
struct rxd_pkt_entry *rxd_get_tx_pkt(struct rxd_ep *ep);
void rxd_release_rx_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt);
void rxd_init_ctrl_pkt(struct rxd_ep *ep, struct rxd_x_entry *x_entry,
struct rxd_pkt_entry *pkt_entry, uint32_t type);
void rxd_release_tx_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt);
int rxd_ep_retry_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry,
struct rxd_x_entry *x_entry);
int rxd_ep_retry_pkt(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry);
ssize_t rxd_ep_post_data_pkts(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);

/* Tx/Rx entry sub-functions */
void rxd_tx_entry_free(struct rxd_ep *ep, struct rxd_x_entry *tx_entry);
void rxd_rx_entry_free(struct rxd_ep *ep, struct rxd_x_entry *rx_entry);
void rxd_ep_free_acked_pkts(struct rxd_ep *ep, struct rxd_x_entry *x_entry,
uint32_t last_acked);
void rxd_set_timeout(struct rxd_x_entry *x_entry);
void rxd_set_timeout(struct rxd_pkt_entry *pkt_entry);

/* Progress functions */
void rxd_tx_entry_progress(struct rxd_ep *ep, struct rxd_x_entry *tx_entry,
int try_send);
void rxd_handle_send_comp(struct rxd_ep *ep, struct fi_cq_msg_entry *comp);
void rxd_handle_recv_comp(struct rxd_ep *ep, struct fi_cq_msg_entry *comp);
void rxd_progress_inline(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry,
struct rxd_x_entry *rx_entry);
void rxd_progress_op(struct rxd_ep *ep, struct rxd_pkt_entry *pkt_entry,
struct rxd_x_entry *rx_entry);

/* CQ sub-functions */
void rxd_cq_report_error(struct rxd_cq *cq, struct fi_cq_err_entry *err_entry);
Expand Down
2 changes: 1 addition & 1 deletion prov/rxd/src/rxd_attr.c
Expand Up @@ -66,7 +66,7 @@ struct fi_domain_attr rxd_domain_attr = {
.data_progress = FI_PROGRESS_MANUAL,
.resource_mgmt = FI_RM_ENABLED,
.av_type = FI_AV_UNSPEC,
.cq_data_size = sizeof_field(struct rxd_ctrl_hdr, data),
.cq_data_size = sizeof_field(struct rxd_op_pkt, cq_data),
.mr_key_size = sizeof(uint64_t),
.cq_cnt = 128,
.ep_cnt = 128,
Expand Down
3 changes: 1 addition & 2 deletions prov/rxd/src/rxd_av.c
Expand Up @@ -351,8 +351,7 @@ int rxd_av_create(struct fid_domain *domain_fid, struct fi_av_attr *attr,
util_attr.addrlen = sizeof(fi_addr_t);
util_attr.overhead = attr->count;
util_attr.flags = OFI_AV_HASH;
if (attr->type == FI_AV_UNSPEC)
attr->type = FI_AV_TABLE;
attr->type = FI_AV_TABLE;

ret = ofi_av_init(&domain->util_domain, attr, &util_attr,
&av->util_av, context);
Expand Down

0 comments on commit ecf639d

Please sign in to comment.