diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index 1b74d41ebf..944e9df558 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -47,6 +47,7 @@ #define DP_AV_DEF_SIZE 512 #define REQ_LIST_GRAN 8 #define DP_DATA_RECV_SIZE 64 +#define DP_PENDING_READ_LIMIT 1024 static pthread_mutex_t fabric_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t wsr_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -1059,6 +1060,8 @@ static void LogRequest(CP_Services Svcs, Rdma_RS_Stream RS_Stream, int Rank, LogEntry->Handle.Block = NULL; } +static int WaitForAnyPull(CP_Services Svcs, Rdma_RS_Stream Stream); + static ssize_t PostRead(CP_Services Svcs, Rdma_RS_Stream RS_Stream, int Rank, long Timestep, size_t Offset, size_t Length, void *Buffer, RdmaBufferHandle Info, @@ -1075,6 +1078,11 @@ static ssize_t PostRead(CP_Services Svcs, Rdma_RS_Stream RS_Stream, int Rank, ret = *ret_v; ret->Pending = 1; + while (RS_Stream->PendingReads > DP_PENDING_READ_LIMIT) + { + WaitForAnyPull(Svcs, RS_Stream); + } + if (Fabric->local_mr_req) { // register dest buffer @@ -1305,6 +1313,7 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream, { } pthread_mutex_unlock(&ts_mutex); + Stream->PendingReads--; } else { @@ -1320,6 +1329,7 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream, CQEntry.op_context); Handle_t = (RdmaCompletionHandle)CQEntry.op_context; Handle_t->Pending--; + Stream->PendingReads--; } } @@ -1331,40 +1341,47 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream, return (1); } -static int DoPullWait(CP_Services Svcs, Rdma_RS_Stream Stream, - RdmaCompletionHandle Handle) +static int WaitForAnyPull(CP_Services Svcs, Rdma_RS_Stream Stream) { FabricState Fabric = Stream->Fabric; RdmaCompletionHandle Handle_t; struct fi_cq_data_entry CQEntry = {0}; - while (Handle->Pending > 0) + ssize_t rc; + rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + if (rc < 1) { - ssize_t rc; - rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); - if (rc < 1) - { - Svcs->verbose(Stream->CP_Stream, DPCriticalVerbose, - "failure while waiting for completions (%d).\n", rc); - return 0; - } - else + Svcs->verbose(Stream->CP_Stream, DPCriticalVerbose, + "failure while waiting for completions (%d).\n", rc); + return 0; + } + else + { + Svcs->verbose( + Stream->CP_Stream, DPTraceVerbose, + "got completion for request with handle %p (flags %li).\n", + CQEntry.op_context, CQEntry.flags); + Handle_t = (RdmaCompletionHandle)CQEntry.op_context; + Handle_t->Pending--; + Stream->PendingReads--; + + // TODO: maybe reuse this memory registration + if (Fabric->local_mr_req) { - Svcs->verbose( - Stream->CP_Stream, DPTraceVerbose, - "got completion for request with handle %p (flags %li).\n", - CQEntry.op_context, CQEntry.flags); - Handle_t = (RdmaCompletionHandle)CQEntry.op_context; - Handle_t->Pending--; + fi_close((struct fid *)Handle_t->LocalMR); } } + return 1; +} - // TODO: maybe reuse this memory registration - if (Fabric->local_mr_req) +static int DoPullWait(CP_Services Svcs, Rdma_RS_Stream Stream, + RdmaCompletionHandle Handle) +{ + while (Handle->Pending > 0) { - fi_close((struct fid *)Handle->LocalMR); + if (WaitForAnyPull(Svcs, Stream) == 0) + return 0; } - return (1); }