Skip to content

Commit

Permalink
Merge pull request #3928 from anagainaru/dataman-kokkos
Browse files Browse the repository at this point in the history
Dataman with GPU support
  • Loading branch information
eisenhauer committed Nov 18, 2023
2 parents ced424b + cba9c79 commit 05c3200
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 41 deletions.
3 changes: 2 additions & 1 deletion bindings/CXX11/adios2/cxx11/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,12 @@ class Engine
void Put(Variable<T> variable, U const &data, const Mode launch = Mode::Deferred)
{
auto bufferView = static_cast<AdiosView<U>>(data);
#ifdef ADIOS2_HAVE_GPU_SUPPORT
auto bufferMem = bufferView.memory_space();
#ifdef ADIOS2_HAVE_GPU_SUPPORT
auto variableMem = variable.GetMemorySpace();
CheckMemorySpace(variableMem, bufferMem);
#endif
variable.SetMemorySpace(bufferMem);
Put(variable, bufferView.data(), launch);
}

Expand Down
3 changes: 3 additions & 0 deletions examples/hello/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ if(ADIOS2_HAVE_Kokkos)
if(ADIOS2_HAVE_SST)
add_subdirectory(sstKokkos)
endif()
if(ADIOS2_HAVE_DataMan)
add_subdirectory(datamanKokkos)
endif()
endif()

add_subdirectory(bpThreadWrite)
Expand Down
33 changes: 33 additions & 0 deletions examples/hello/datamanKokkos/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#------------------------------------------------------------------------------#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#------------------------------------------------------------------------------#

cmake_minimum_required(VERSION 3.12)
project(ADIOS2HelloDataManKokkosExample)

if(NOT TARGET adios2_core)
set(_components CXX)

find_package(MPI COMPONENTS C)
if(MPI_FOUND)
# Workaround for various MPI implementations forcing the link of C++ bindings
add_definitions(-DOMPI_SKIP_MPICXX -DMPICH_SKIP_MPICXX)

list(APPEND _components MPI)
endif()

find_package(ZeroMQ 4.1 QUIET)

find_package(ADIOS2 REQUIRED COMPONENTS ${_components})
endif()

if(ADIOS2_HAVE_MPI AND ADIOS2_HAVE_DataMan)
add_executable(adios2_hello_datamanWriterKokkos dataManWriterKokkos.cpp)
target_link_libraries(adios2_hello_datamanWriterKokkos adios2::cxx11_mpi MPI::MPI_C Kokkos::kokkos)
install(TARGETS adios2_hello_datamanWriterKokkos RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})

add_executable(adios2_hello_datamanReaderKokkos dataManReaderKokkos.cpp)
target_link_libraries(adios2_hello_datamanReaderKokkos adios2::cxx11_mpi MPI::MPI_C Kokkos::kokkos)
install(TARGETS adios2_hello_datamanReaderKokkos RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
endif()
76 changes: 76 additions & 0 deletions examples/hello/datamanKokkos/dataManReaderKokkos.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <adios2.h>
#include <chrono>
#include <iostream>
#include <mpi.h>
#include <numeric>
#include <thread>
#include <vector>

#include <adios2/cxx11/KokkosView.h>

#include <Kokkos_Core.hpp>

int mpiRank, mpiSize;

template <class T, class MemSpace>
void PrintData(Kokkos::View<T *, MemSpace> &gpuData, const size_t step)
{
auto data = Kokkos::create_mirror_view_and_copy(Kokkos::HostSpace{}, gpuData);
std::cout << "Rank: " << mpiRank << " Step: " << step << " [";
for (int i = 0; i < data.extent_int(0); ++i)
{
std::cout << data(i) << " ";
}
std::cout << "]" << std::endl;
}

int main(int argc, char *argv[])
{
// initialize MPI
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);

// initialize adios2
adios2::ADIOS adios(MPI_COMM_WORLD);
adios2::IO dataManIO = adios.DeclareIO("whatever");
dataManIO.SetEngine("DataMan");
dataManIO.SetParameters({{"IPAddress", "127.0.0.1"}, {"Port", "12306"}, {"Timeout", "5"}});

// open stream
adios2::Engine dataManReader = dataManIO.Open("HelloDataMan", adios2::Mode::Read);

// define variable
adios2::Variable<float> floatArrayVar;

Kokkos::DefaultExecutionSpace exe_space;
std::cout << "Read on memory space: " << exe_space.name() << std::endl;
// read data
while (true)
{
auto status = dataManReader.BeginStep();
if (status == adios2::StepStatus::OK)
{
floatArrayVar = dataManIO.InquireVariable<float>("FloatArray");
auto shape = floatArrayVar.Shape();
size_t datasize =
std::accumulate(shape.begin(), shape.end(), 1, std::multiplies<size_t>());
Kokkos::View<float *, Kokkos::DefaultExecutionSpace::memory_space> floatVector(
"simBuffer", datasize);
dataManReader.Get<float>(floatArrayVar, floatVector, adios2::Mode::Sync);
dataManReader.EndStep();
PrintData(floatVector, dataManReader.CurrentStep());
}
else if (status == adios2::StepStatus::EndOfStream)
{
std::cout << "End of stream" << std::endl;
break;
}
}

// clean up
dataManReader.Close();
MPI_Finalize();

return 0;
}
97 changes: 97 additions & 0 deletions examples/hello/datamanKokkos/dataManWriterKokkos.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* datamanWriterKokkos.cpp Simple example of writing multiple steps of a 2D float Kokkos::View
* through ADIOS2 DataMan
*/
#include <adios2.h>
#include <adios2/cxx11/KokkosView.h>
#include <iostream>
#include <mpi.h>
#include <numeric>
#include <thread>
#include <vector>

#include <Kokkos_Core.hpp>

size_t Nx = 10;
size_t Ny = 10;
size_t steps = 2;
adios2::Dims shape;
adios2::Dims start;
adios2::Dims count;

int mpiRank, mpiSize;

template <class T, class MemSpace>
void PrintData(Kokkos::View<T **, MemSpace> &gpuData, const size_t step)
{
auto data = Kokkos::create_mirror_view_and_copy(Kokkos::HostSpace{}, gpuData);
std::cout << "Rank: " << mpiRank << " Step: " << step << " [";
for (int i = 0; i < data.extent_int(0); ++i)
for (int j = 0; j < data.extent_int(1); ++j)
std::cout << data(i, j) << " ";
std::cout << "]" << std::endl;
}

template <class T, class MemSpace, class ExecSpace>
Kokkos::View<T **, MemSpace> GenerateData(const size_t step, const size_t Ny, const size_t mpiRank)
{
Kokkos::View<T **, MemSpace> gpuSimData("simBuffer", Nx, Ny);
static_assert(Kokkos::SpaceAccessibility<ExecSpace, MemSpace>::accessible, "");
Kokkos::parallel_for(
"initBuffer", Kokkos::RangePolicy<ExecSpace>(0, Nx), KOKKOS_LAMBDA(int i) {
for (int j = 0; j < Ny; j++)
gpuSimData(i, j) = static_cast<float>(i * Ny + j) + mpiRank * 10000 + step;
});
Kokkos::fence();
ExecSpace exe_space;
std::cout << "Create data for step " << step << " on memory space: " << exe_space.name()
<< std::endl;
return gpuSimData;
}

int main(int argc, char *argv[])
{
// initialize MPI
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);

// initialize data dimensions
count = {Nx, Ny};
start = {mpiRank * Nx, 0};
shape = {mpiSize * Nx, Ny};

// initialize adios2
adios2::ADIOS adios(MPI_COMM_WORLD);
adios2::IO dataManIO = adios.DeclareIO("whatever");
dataManIO.SetEngine("DataMan");
dataManIO.SetParameters({{"IPAddress", "127.0.0.1"},
{"Port", "12306"},
{"Timeout", "5"},
{"RendezvousReaderCount", "1"}});

// open stream
adios2::Engine dataManWriter = dataManIO.Open("HelloDataMan", adios2::Mode::Write);

// define variable
auto floatArrayVar = dataManIO.DefineVariable<float>("FloatArray", shape, start, count);

// write data
for (size_t i = 0; i < steps; ++i)
{
auto floatVector = GenerateData<float, Kokkos::DefaultExecutionSpace::memory_space,
Kokkos::DefaultExecutionSpace>(i, Ny, mpiRank);
dataManWriter.BeginStep();
dataManWriter.Put(floatArrayVar, floatVector, adios2::Mode::Sync);
PrintData(floatVector, dataManWriter.CurrentStep());
dataManWriter.EndStep();
}

dataManWriter.Close();
MPI_Finalize();

return 0;
}
8 changes: 4 additions & 4 deletions source/adios2/engine/dataman/DataManReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ void DataManReader::GetDeferredCommon(Variable<T> &variable, T *data)
{
while (true)
{
int ret =
m_Serializer.GetData(data, variable.m_Name, variable.m_Start, variable.m_Count,
m_CurrentStep, variable.m_MemoryStart, variable.m_MemoryCount);
int ret = m_Serializer.GetData(data, variable.m_Name, variable.m_Start,
variable.m_Count, m_CurrentStep, variable.m_MemSpace,
variable.m_MemoryStart, variable.m_MemoryCount);
if (ret == 0)
{
break;
Expand All @@ -57,7 +57,7 @@ void DataManReader::GetDeferredCommon(Variable<T> &variable, T *data)
while (true)
{
int ret = m_Serializer.GetData(data, variable.m_Name, start, count, m_CurrentStep,
memstart, memcount);
variable.m_MemSpace, memstart, memcount);
if (ret == 0)
{
break;
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/dataman/DataManWriter.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ void DataManWriter::PutDeferredCommon(Variable<T> &variable, const T *values)
std::reverse(memstart.begin(), memstart.end());
std::reverse(memcount.begin(), memcount.end());
m_Serializer.PutData(variable.m_Data, variable.m_Name, shape, start, count, memstart,
memcount, m_Name, CurrentStep(), m_MpiRank, "", variable.m_Operations);
memcount, variable.m_MemSpace, m_Name, CurrentStep(), m_MpiRank, "",
variable.m_Operations);
}

if (m_MonitorActive)
Expand Down
17 changes: 12 additions & 5 deletions source/adios2/toolkit/format/dataman/DataManSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,8 @@ void DataManSerializer::Log(const int level, const std::string &message, const b
void DataManSerializer::PutData(const std::string *inputData, const std::string &varName,
const Dims &varShape, const Dims &varStart, const Dims &varCount,
const Dims &varMemStart, const Dims &varMemCount,
const std::string &doid, const size_t step, const int rank,
const std::string &address,
const MemorySpace varMemSpace, const std::string &doid,
const size_t step, const int rank, const std::string &address,
const std::vector<std::shared_ptr<core::Operator>> &ops,
VecPtr localBuffer, JsonPtr metadataJson)
{
Expand Down Expand Up @@ -646,8 +646,14 @@ void DataManSerializer::PutData(const std::string *inputData, const std::string

localBuffer->resize(localBuffer->size() + inputData->size());

std::memcpy(localBuffer->data() + localBuffer->size() - inputData->size(), inputData->data(),
inputData->size());
#ifdef ADIOS2_HAVE_GPU_SUPPORT
if (varMemSpace == MemorySpace::GPU)
helper::CopyFromGPUToBuffer(localBuffer->data(), localBuffer->size() - inputData->size(),
inputData->data(), varMemSpace, inputData->size());
#endif
if (varMemSpace == MemorySpace::Host)
std::memcpy(localBuffer->data() + localBuffer->size() - inputData->size(),
inputData->data(), inputData->size());

if (metadataJson == nullptr)
{
Expand All @@ -665,7 +671,8 @@ void DataManSerializer::PutData(const std::string *inputData, const std::string
template <>
int DataManSerializer::GetData(std::string *outputData, const std::string &varName,
const Dims &varStart, const Dims &varCount, const size_t step,
const Dims &varMemStart, const Dims &varMemCount)
const MemorySpace varMemSpace, const Dims &varMemStart,
const Dims &varMemCount)
{
PERFSTUBS_SCOPED_TIMER_FUNC();

Expand Down
15 changes: 8 additions & 7 deletions source/adios2/toolkit/format/dataman/DataManSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ class DataManSerializer
// put a variable for writer
void PutData(const std::string *inputData, const std::string &varName, const Dims &varShape,
const Dims &varStart, const Dims &varCount, const Dims &varMemStart,
const Dims &varMemCount, const std::string &doid, const size_t step,
const int rank, const std::string &address,
const Dims &varMemCount, const MemorySpace varMemSpace, const std::string &doid,
const size_t step, const int rank, const std::string &address,
const std::vector<std::shared_ptr<core::Operator>> &ops,
VecPtr localBuffer = nullptr, JsonPtr metadataJson = nullptr);

template <class T>
void PutData(const T *inputData, const std::string &varName, const Dims &varShape,
const Dims &varStart, const Dims &varCount, const Dims &varMemStart,
const Dims &varMemCount, const std::string &doid, const size_t step,
const int rank, const std::string &address,
const Dims &varMemCount, const MemorySpace varMemSpace, const std::string &doid,
const size_t step, const int rank, const std::string &address,
const std::vector<std::shared_ptr<core::Operator>> &ops,
VecPtr localBuffer = nullptr, JsonPtr metadataJson = nullptr);

Expand Down Expand Up @@ -134,8 +134,8 @@ class DataManSerializer

template <class T>
int GetData(T *output_data, const std::string &varName, const Dims &varStart,
const Dims &varCount, const size_t step, const Dims &varMemStart = Dims(),
const Dims &varMemCount = Dims());
const Dims &varCount, const size_t step, const MemorySpace varMemSpace,
const Dims &varMemStart = Dims(), const Dims &varMemCount = Dims());

void Erase(const size_t step, const bool allPreviousSteps = false);

Expand Down Expand Up @@ -166,7 +166,8 @@ class DataManSerializer
nlohmann::json DeserializeJson(const char *start, size_t size);

template <typename T>
void CalculateMinMax(const T *data, const Dims &count, nlohmann::json &metaj);
void CalculateMinMax(const T *data, const Dims &count, const MemorySpace varMemSpace,
nlohmann::json &metaj);

bool StepHasMinimumBlocks(const size_t step, const int requireMinimumBlocks);

Expand Down

0 comments on commit 05c3200

Please sign in to comment.