Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill warnings in UCX dataplane #3435

Merged
merged 2 commits into from Jan 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 3 additions & 19 deletions source/adios2/toolkit/sst/dp/ucx_dp.c
Expand Up @@ -47,7 +47,6 @@

#include "dp_interface.h"

static pthread_mutex_t ucx_fabric_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ucx_wsr_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ucx_ts_mutex = PTHREAD_MUTEX_INITIALIZER;

Expand Down Expand Up @@ -104,9 +103,6 @@ static ucs_status_t init_fabric(struct fabric_state *fabric,
ucp_worker_params_t worker_params;
ucp_config_t *config;
ucs_status_t status = UCS_ERR_LAST;
/* UCP handler objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;

memset(&ucp_params, 0, sizeof(ucp_params));
memset(&worker_params, 0, sizeof(worker_params));
Expand Down Expand Up @@ -257,12 +253,10 @@ static DP_RS_Stream UcxInitReader(CP_Services Svcs, void *CP_Stream,
{
Ucx_RS_Stream Stream = malloc(sizeof(struct _Ucx_RS_Stream));
SMPI_Comm comm = Svcs->getMPIComm(CP_Stream);
FabricState Fabric;
ucs_status_t status;

memset(Stream, 0, sizeof(*Stream));
Stream->Fabric = calloc(1, sizeof(*Fabric));
Fabric = Stream->Fabric;
Stream->Fabric = calloc(1, sizeof(*Stream->Fabric));

/*
* save the CP_stream value of later use
Expand Down Expand Up @@ -300,7 +294,6 @@ static DP_WS_Stream UcxInitWriter(CP_Services Svcs, void *CP_Stream,
{
Ucx_WS_Stream Stream = malloc(sizeof(struct _Ucx_WS_Stream));
SMPI_Comm comm = Svcs->getMPIComm(CP_Stream);
int ret;
ucs_status_t status;

memset(Stream, 0, sizeof(struct _Ucx_WS_Stream));
Expand Down Expand Up @@ -331,8 +324,6 @@ UcxInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
FabricState Fabric = WS_Stream->Fabric;
UcxWriterContactInfo ContactInfo;

int i;

WSR_Stream->WS_Stream = WS_Stream; /* pointer to writer struct */
WSR_Stream->PeerCohort = PeerCohort;

Expand Down Expand Up @@ -446,7 +437,7 @@ static void *UcxReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
ret->Rank = Rank;
ret->Length = Length;
ret->Pending = 1;
Addr = Info->Block + Offset;
Addr = (uint8_t *)Info->Block + Offset;
Svcs->verbose(
RS_Stream->CP_Stream, DPTraceVerbose,
"Remote read target is Rank %d, EP = %p (Offset = %zi, Length = %zi)\n",
Expand Down Expand Up @@ -544,9 +535,6 @@ static void UcxProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v,
UcxBufferHandle Info = malloc(sizeof(struct _UcxBufferHandle));
FabricState Fabric = Stream->Fabric;
ucs_status_t status;
ucp_worker_h ucp_worker;
ucp_address_t *ucp_address;
size_t address_length = 0;

Entry->Data = malloc(sizeof(*Data));
memcpy(Entry->Data, Data, sizeof(*Data));
Expand Down Expand Up @@ -619,7 +607,6 @@ static void UcxReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v,
FabricState Fabric = Stream->Fabric;
TimestepList *List = &Stream->Timesteps;
TimestepList ReleaseTSL;
UcxBufferHandle Info;

Svcs->verbose(Stream->CP_Stream, DPTraceVerbose, "Releasing timestep %ld\n",
Timestep);
Expand Down Expand Up @@ -681,8 +668,6 @@ static void UcxDestroyWriterPerReader(CP_Services Svcs,
Ucx_WSR_Stream WSR_Stream = {0};
memcpy(&WSR_Stream, &WSR_Stream_v, sizeof(Ucx_WSR_Stream));
Ucx_WS_Stream WS_Stream = WSR_Stream->WS_Stream;
UcxWriterContactInfo WriterContactInfo = {0};
int j = 0;

pthread_mutex_lock(&ucx_wsr_mutex);
for (int i = 0; i < WS_Stream->ReaderCount; i++)
Expand All @@ -702,7 +687,6 @@ static void UcxDestroyWriterPerReader(CP_Services Svcs,

if (WSR_Stream->WriterContactInfo)
{
WriterContactInfo = WSR_Stream->WriterContactInfo;
free(WSR_Stream->WriterContactInfo);
}

Expand Down Expand Up @@ -838,4 +822,4 @@ extern NO_SANITIZE_THREAD CP_DP_Interface LoadUcxDP()
UcxDPInterface.getPriority = UcxGetPriority;
UcxDPInterface.unGetPriority = UcxUnGetPriority;
return &UcxDPInterface;
}
}