Skip to content

Commit

Permalink
Merge pull request #3 from j-xiong/master
Browse files Browse the repository at this point in the history
psm provider update
  • Loading branch information
shefty committed Aug 22, 2014
2 parents 1574d77 + 5e691f4 commit 101a4f0
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 63 deletions.
96 changes: 88 additions & 8 deletions prov/psm/src/psmx.h
Expand Up @@ -28,6 +28,7 @@ extern "C" {
#include <rdma/fi_tagged.h>
#include <rdma/fi_rma.h>
#include <rdma/fi_atomic.h>
#include <rdma/fi_trigger.h>
#include <rdma/fi_cm.h>
#include <rdma/fi_errno.h>
#include <psm.h>
Expand All @@ -40,19 +41,25 @@ extern "C" {
#endif

#define PSMX_TIME_OUT 120
#define PSMX_SUPPORTED_FLAGS (FI_BLOCK | \
FI_READ | FI_WRITE | FI_RECV | FI_SEND | \
FI_REMOTE_READ | FI_REMOTE_WRITE | \
FI_INJECT | FI_BUFFERED_RECV | \
FI_MULTI_RECV | FI_REMOTE_COMPLETE | \
FI_EVENT | FI_REMOTE_SIGNAL | FI_CANCEL)
#define PSMX_DEFAULT_FLAGS (0)

#define PSMX_EP_CAPS (FI_TAGGED | FI_MSG)
#define PSMX_OP_FLAGS (FI_INJECT | FI_MULTI_RECV | FI_EVENT | FI_TRIGGER)

#define PSMX_EP_CAP_BASE (FI_TAGGED | FI_INJECT | FI_TRIGGER | \
FI_BUFFERED_RECV | FI_MULTI_RECV | \
FI_SEND | FI_RECV | FI_CANCEL)
#define PSMX_EP_CAP_OPT1 (FI_MSG)
#define PSMX_EP_CAP_OPT2 (0)
#define PSMX_EP_CAP (PSMX_EP_CAP_BASE | PSMX_EP_CAP_OPT1 | PSMX_EP_CAP_OPT2)

#define PSMX_DOMAIN_CAP (FI_WRITE_COHERENT | FI_CONTEXT | \
FI_USER_MR_KEY | FI_DYNAMIC_MR)

#define PSMX_OUI_INTEL 0x0002b3L
#define PSMX_PROTOCOL 0x0001

#define PSMX_MAX_MSG_SIZE ((0x1ULL << 32) - 1)
#define PSMX_INJECT_SIZE (64)

#define PSMX_MSG_BIT (0x1ULL << 63)

enum psmx_context_type {
Expand Down Expand Up @@ -128,6 +135,61 @@ struct psmx_fid_eq {
struct psmx_event *pending_error;
};

enum psmx_triggered_op {
PSMX_TRIGGERED_SEND,
PSMX_TRIGGERED_RECV,
PSMX_TRIGGERED_TSEND,
PSMX_TRIGGERED_TRECV,
};

struct psmx_trigger {
enum psmx_triggered_op op;
struct psmx_fid_cntr *cntr;
size_t threshold;
union {
struct {
struct fid_ep *ep;
const void *buf;
size_t len;
void *desc;
const void *dest_addr;
void *context;
uint64_t flags;
} send;
struct {
struct fid_ep *ep;
void *buf;
size_t len;
void *desc;
const void *src_addr;
void *context;
uint64_t flags;
} recv;
struct {
struct fid_ep *ep;
const void *buf;
size_t len;
void *desc;
const void *dest_addr;
uint64_t tag;
void *context;
uint64_t flags;
} tsend;
struct {
struct fid_ep *ep;
void *buf;
size_t len;
void *desc;
const void *src_addr;
uint64_t tag;
uint64_t ignore;
void *context;
uint64_t flags;
} trecv;
};
struct psmx_trigger *next;
};

struct psmx_fid_cntr {
struct fid_cntr cntr;
struct psmx_fid_domain *domain;
Expand All @@ -137,6 +199,7 @@ struct psmx_fid_cntr {
volatile uint64_t counter;
pthread_mutex_t mutex;
pthread_cond_t cond;
struct psmx_trigger *trigger;
};

struct psmx_fid_av {
Expand Down Expand Up @@ -179,6 +242,7 @@ struct psmx_fid_ep {
uint64_t pending_writes;
uint64_t pending_reads;
uint64_t pending_atomics;
size_t min_multi_recv;
};

struct psmx_fid_mr {
Expand Down Expand Up @@ -236,6 +300,22 @@ struct psmx_event *psmx_eq_create_event(struct psmx_fid_eq *fid_eq,
int psmx_eq_poll_mq(struct psmx_fid_eq *eq,
struct psmx_fid_domain *domain_if_null_eq);
struct psmx_fid_mr *psmx_mr_hash_get(uint64_t key);
int psmx_mr_validate(struct psmx_fid_mr *mr, uint64_t addr, size_t len, uint64_t access);
void psmx_cntr_check_trigger(struct psmx_fid_cntr *cntr);
void psmx_cntr_add_trigger(struct psmx_fid_cntr *cntr, struct psmx_trigger *trigger);

ssize_t _psmx_sendto(struct fid_ep *ep, const void *buf, size_t len,
void *desc, const void *dest_addr, void *context,
uint64_t flags);
ssize_t _psmx_recvfrom(struct fid_ep *ep, void *buf, size_t len,
void *desc, const void *src_addr, void *context,
uint64_t flags);
ssize_t _psmx_tagged_sendto(struct fid_ep *ep, const void *buf, size_t len,
void *desc, const void *dest_addr, uint64_t tag,
void *context, uint64_t flags);
ssize_t _psmx_tagged_recvfrom(struct fid_ep *ep, void *buf, size_t len,
void *desc, const void *src_addr, uint64_t tag,
uint64_t ignore, void *context, uint64_t flags);

#ifdef __cplusplus
}
Expand Down
13 changes: 11 additions & 2 deletions prov/psm/src/psmx_av.c
Expand Up @@ -122,6 +122,10 @@ static int psmx_av_insert(struct fid_av *av, const void *addr, size_t count,

fid_av = container_of(av, struct psmx_fid_av, av);

/* TODO: support the FI_RANGE flag */
if (flags)
return -FI_EBADFLAGS;

errors = (psm_error_t *) calloc(count, sizeof *errors);
if (!errors)
return -ENOMEM;
Expand Down Expand Up @@ -150,10 +154,15 @@ static int psmx_av_insert(struct fid_av *av, const void *addr, size_t count,
/* prevent connecting to the same ep twice, which is fatal in PSM */
for (i=0; i<count; i++) {
psm_epconn_t epconn;
if (psm_ep_epid_lookup(((psm_epid_t *) addr)[i], &epconn) == PSM_OK)
if (psm_ep_epid_lookup(((psm_epid_t *) addr)[i], &epconn) == PSM_OK) {
((psm_epaddr_t *) fi_addr)[i] = epconn.addr;
else
psmx_set_epaddr_context(fid_av->domain,
((psm_epid_t *) addr)[i],
((psm_epaddr_t *) fi_addr)[i]);
}
else {
mask[i] = 1;
}
}

err = psm_ep_connect(fid_av->domain->psm_ep, count,
Expand Down
89 changes: 89 additions & 0 deletions prov/psm/src/psmx_cntr.c
Expand Up @@ -32,6 +32,91 @@

#include "psmx.h"

void psmx_cntr_check_trigger(struct psmx_fid_cntr *cntr)
{
struct psmx_trigger *trigger;

/* TODO: protect the trigger list with mutex */

if (!cntr->trigger)
return;

trigger = cntr->trigger;
while (trigger) {
if (cntr->counter < trigger->threshold)
break;

cntr->trigger = trigger->next;
switch (trigger->op) {
case PSMX_TRIGGERED_SEND:
_psmx_sendto(trigger->send.ep,
trigger->send.buf,
trigger->send.len,
trigger->send.desc,
trigger->send.dest_addr,
trigger->send.context,
trigger->send.flags);
break;
case PSMX_TRIGGERED_RECV:
_psmx_recvfrom(trigger->recv.ep,
trigger->recv.buf,
trigger->recv.len,
trigger->recv.desc,
trigger->recv.src_addr,
trigger->recv.context,
trigger->recv.flags);
break;
case PSMX_TRIGGERED_TSEND:
_psmx_tagged_sendto(trigger->tsend.ep,
trigger->tsend.buf,
trigger->tsend.len,
trigger->tsend.desc,
trigger->tsend.dest_addr,
trigger->tsend.tag,
trigger->tsend.context,
trigger->tsend.flags);
break;
case PSMX_TRIGGERED_TRECV:
_psmx_tagged_recvfrom(trigger->trecv.ep,
trigger->trecv.buf,
trigger->trecv.len,
trigger->trecv.desc,
trigger->trecv.src_addr,
trigger->trecv.tag,
trigger->trecv.ignore,
trigger->trecv.context,
trigger->trecv.flags);
break;
default:
psmx_debug("%s: %d unsupported op\n", __func__, trigger->op);
break;
}

free(trigger);
}
}

void psmx_cntr_add_trigger(struct psmx_fid_cntr *cntr, struct psmx_trigger *trigger)
{
struct psmx_trigger *p, *q;

/* TODO: protect the trigger list with mutex */

q = NULL;
p = cntr->trigger;
while (p && p->threshold <= trigger->threshold) {
q = p;
p = p->next;
}
if (q)
q->next = trigger;
else
cntr->trigger = trigger;
trigger->next = p;

psmx_cntr_check_trigger(cntr);
}

static uint64_t psmx_cntr_read(struct fid_cntr *cntr)
{
struct psmx_fid_cntr *fid_cntr;
Expand All @@ -48,6 +133,8 @@ static int psmx_cntr_add(struct fid_cntr *cntr, uint64_t value)
fid_cntr = container_of(cntr, struct psmx_fid_cntr, cntr);
fid_cntr->counter += value;

psmx_cntr_check_trigger(fid_cntr);

if (fid_cntr->wait_obj == FI_WAIT_MUT_COND)
pthread_cond_signal(&fid_cntr->cond);

Expand All @@ -61,6 +148,8 @@ static int psmx_cntr_set(struct fid_cntr *cntr, uint64_t value)
fid_cntr = container_of(cntr, struct psmx_fid_cntr, cntr);
fid_cntr->counter = value;

psmx_cntr_check_trigger(fid_cntr);

if (fid_cntr->wait_obj == FI_WAIT_MUT_COND)
pthread_cond_signal(&fid_cntr->cond);

Expand Down
26 changes: 22 additions & 4 deletions prov/psm/src/psmx_ep.c
Expand Up @@ -76,7 +76,7 @@ static int psmx_ep_getopt(fid_t fid, int level, int optname,
if (!fid_ep->domain)
return -EBADF;

*(size_t *)optval = 64;
*(size_t *)optval = PSMX_INJECT_SIZE;
*optlen = sizeof(size_t);
break;

Expand All @@ -90,8 +90,7 @@ static int psmx_ep_getopt(fid_t fid, int level, int optname,
if (!fid_ep->domain)
return -EBADF;

/* PSM message len is uint32_t. */
*(size_t *)optval = 0xFFFFFFFF;
*(size_t *)optval = PSMX_MAX_MSG_SIZE;
*optlen = sizeof(size_t);
break;

Expand All @@ -113,6 +112,9 @@ static int psmx_ep_getopt(fid_t fid, int level, int optname,
*optlen = sizeof(size_t);
break;

case FI_OPT_MIN_MULTI_RECV:
*(size_t *)optval = fid_ep->min_multi_recv;
*optlen = sizeof(size_t);
break;

default:
Expand All @@ -125,7 +127,23 @@ static int psmx_ep_getopt(fid_t fid, int level, int optname,
static int psmx_ep_setopt(fid_t fid, int level, int optname,
const void *optval, size_t optlen)
{
return -ENOPROTOOPT;
struct psmx_fid_ep *fid_ep;

fid_ep = container_of(fid, struct psmx_fid_ep, ep.fid);

if (level != FI_OPT_ENDPOINT)
return -ENOPROTOOPT;

switch (optname) {
case FI_OPT_MIN_MULTI_RECV:
fid_ep->min_multi_recv = *(size_t *)optval;
break;

default:
return -ENOPROTOOPT;
}

return 0;
}

static int psmx_ep_enable(struct fid_ep *ep)
Expand Down
7 changes: 2 additions & 5 deletions prov/psm/src/psmx_eq.c
Expand Up @@ -335,11 +335,8 @@ int psmx_eq_poll_mq(struct psmx_fid_eq *eq, struct psmx_fid_domain *domain_if_nu
psmx_eq_enqueue_event(tmp_eq, event);
}

if (tmp_cntr) {
tmp_cntr->counter++;
if (tmp_cntr->wait_obj == FI_WAIT_MUT_COND)
pthread_cond_signal(&tmp_cntr->cond);
}
if (tmp_cntr)
tmp_cntr->cntr.ops->add(&tmp_cntr->cntr, 1);

if (multi_recv) {
struct psmx_multi_recv *req;
Expand Down

0 comments on commit 101a4f0

Please sign in to comment.