Skip to content

Commit

Permalink
Limit pending reads requests with RDMA DP
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Feb 25, 2022
1 parent 7cebd20 commit 80c5691
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions source/adios2/toolkit/sst/dp/rdma_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#define DP_AV_DEF_SIZE 512
#define REQ_LIST_GRAN 8
#define DP_DATA_RECV_SIZE 64
#define DP_PENDING_READ_LIMIT 1024

static pthread_mutex_t fabric_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t wsr_mutex = PTHREAD_MUTEX_INITIALIZER;
Expand Down Expand Up @@ -1059,6 +1060,8 @@ static void LogRequest(CP_Services Svcs, Rdma_RS_Stream RS_Stream, int Rank,
LogEntry->Handle.Block = NULL;
}

static int WaitForAnyPull(CP_Services Svcs, Rdma_RS_Stream Stream);

static ssize_t PostRead(CP_Services Svcs, Rdma_RS_Stream RS_Stream, int Rank,
long Timestep, size_t Offset, size_t Length,
void *Buffer, RdmaBufferHandle Info,
Expand All @@ -1075,6 +1078,11 @@ static ssize_t PostRead(CP_Services Svcs, Rdma_RS_Stream RS_Stream, int Rank,
ret = *ret_v;
ret->Pending = 1;

while (RS_Stream->PendingReads > DP_PENDING_READ_LIMIT)
{
WaitForAnyPull(Svcs, RS_Stream);
}

if (Fabric->local_mr_req)
{
// register dest buffer
Expand Down Expand Up @@ -1305,6 +1313,7 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream,
{
}
pthread_mutex_unlock(&ts_mutex);
Stream->PendingReads--;
}
else
{
Expand All @@ -1320,6 +1329,7 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream,
CQEntry.op_context);
Handle_t = (RdmaCompletionHandle)CQEntry.op_context;
Handle_t->Pending--;
Stream->PendingReads--;
}
}

Expand All @@ -1331,40 +1341,47 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream,
return (1);
}

static int DoPullWait(CP_Services Svcs, Rdma_RS_Stream Stream,
RdmaCompletionHandle Handle)
static int WaitForAnyPull(CP_Services Svcs, Rdma_RS_Stream Stream)
{
FabricState Fabric = Stream->Fabric;
RdmaCompletionHandle Handle_t;
struct fi_cq_data_entry CQEntry = {0};

while (Handle->Pending > 0)
ssize_t rc;
rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1);
if (rc < 1)
{
ssize_t rc;
rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1);
if (rc < 1)
{
Svcs->verbose(Stream->CP_Stream, DPCriticalVerbose,
"failure while waiting for completions (%d).\n", rc);
return 0;
}
else
Svcs->verbose(Stream->CP_Stream, DPCriticalVerbose,
"failure while waiting for completions (%d).\n", rc);
return 0;
}
else
{
Svcs->verbose(
Stream->CP_Stream, DPTraceVerbose,
"got completion for request with handle %p (flags %li).\n",
CQEntry.op_context, CQEntry.flags);
Handle_t = (RdmaCompletionHandle)CQEntry.op_context;
Handle_t->Pending--;
Stream->PendingReads--;

// TODO: maybe reuse this memory registration
if (Fabric->local_mr_req)
{
Svcs->verbose(
Stream->CP_Stream, DPTraceVerbose,
"got completion for request with handle %p (flags %li).\n",
CQEntry.op_context, CQEntry.flags);
Handle_t = (RdmaCompletionHandle)CQEntry.op_context;
Handle_t->Pending--;
fi_close((struct fid *)Handle_t->LocalMR);
}
}
return 1;
}

// TODO: maybe reuse this memory registration
if (Fabric->local_mr_req)
static int DoPullWait(CP_Services Svcs, Rdma_RS_Stream Stream,
RdmaCompletionHandle Handle)
{
while (Handle->Pending > 0)
{
fi_close((struct fid *)Handle->LocalMR);
if (WaitForAnyPull(Svcs, Stream) == 0)
return 0;
}

return (1);
}

Expand Down

0 comments on commit 80c5691

Please sign in to comment.