Skip to content

Commit

Permalink
Fix for the issue #3503
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-ganyushin authored and vicentebolea committed Mar 10, 2023
1 parent 267641a commit a0d4586
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 8 deletions.
24 changes: 16 additions & 8 deletions source/adios2/toolkit/format/dataman/DataManSerializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,17 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName,
{
input_data += j.position;
}

if (j.shape.size() > 0 and j.shape[0] > 1 and j.start.size() > 0 and
j.start.size() == j.count.size() and
j.start.size() == varStart.size() and
j.start.size() == varCount.size())
/* single values */
if (j.shape.empty() or
std::all_of(j.shape.begin(), j.shape.end(),
[&](size_t i) { return i == 1; }))
{
std::memcpy(reinterpret_cast<char *>(outputData), input_data,
sizeof(T));
}
else if (j.start.size() > 0 and j.start.size() == j.count.size() and
j.start.size() == varStart.size() and
j.start.size() == varCount.size())
{
if (m_ContiguousMajor)
{
Expand All @@ -279,10 +285,12 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName,
sizeof(T), j.start, j.count, varMemStart, varMemCount);
}
}
if (j.shape.empty() or (j.shape.size() == 1 and j.shape[0] == 1))
else
{
std::memcpy(reinterpret_cast<char *>(outputData), input_data,
sizeof(T));
throw std::runtime_error(
"DataManSerializer::GeData end with Step \" + "
"std::to_string(step) +\n"
" \" Var \" + varName failed");
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions testing/adios2/engine/dataman/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ foreach(tst IN ITEMS
)
endforeach()

if (ADIOS2_HAVE_Python)
python_add_test(NAME Test.Engine.DataMan1D.Serial SCRIPT TestDataMan1D.py)
python_add_test(NAME Test.Engine.DataMan1xN.Serial SCRIPT TestDataMan1xN.py)
python_add_test(NAME Test.Engine.DataManSingleValues SCRIPT TestDataManSingleValues.py)
endif()

if(ADIOS2_HAVE_ZFP)
gtest_add_tests_helper(2DZfp MPI_NONE DataMan Engine.DataMan. "")
set_tests_properties(${Test.Engine.DataMan.2DZfp-TESTS}
Expand Down
91 changes: 91 additions & 0 deletions testing/adios2/engine/dataman/TestDataMan1D.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python
#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#
# TestDataMan1D.py: test for 1D data transfer by reading in Python
# Created on: March 3, 2023
# Author: Dmitry Ganyushin ganyushindi@ornl.gov
from multiprocessing import Process
import unittest
import numpy as np
import adios2


class TestDataMan1D(unittest.TestCase):

def setUp(self):
self.conf = {
"IPAddress": "127.0.0.1",
"Port": "12306",
"Timeout": "5",
"TransportMode": "reliable",
"RendezvousReaderCount": "1",
}
self.Nx = 10
self.fill_value = 1.0
self.shape = [self.Nx]

def test_run(self):

s = Process(target=self.thread_send)
r = Process(target=self.thread_receive)

s.start()
r.start()

r.join()
s.join()

def thread_send(self):
data = np.full(shape=self.shape, fill_value=self.fill_value)
shape = data.shape
count = shape
start = (0,) * len(shape)

adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Server")
wan.SetEngine("Dataman")

wan.SetParameters(self.conf)
writer = wan.Open("testdata", adios2.Mode.Write)
sendbuffer = wan.DefineVariable("np_data", data, shape,
start, count, adios2.ConstantDims)
self.assertIsNotNone(sendbuffer)
if sendbuffer:
writer.BeginStep()
writer.Put(sendbuffer, data, adios2.Mode.Deferred)
writer.EndStep()
else:
raise ValueError("DefineVariable failed")

writer.Close()

def thread_receive(self):
data = np.zeros(shape=self.shape)
adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Client")
wan.SetEngine("Dataman")
wan.SetParameters(self.conf)
reader = wan.Open("testdata", adios2.Mode.Read)
while True:
stepStatus = reader.BeginStep()
if stepStatus == adios2.StepStatus.OK:
recvar = wan.InquireVariable("np_data")
self.assertIsNotNone(recvar)
bufshape = recvar.Shape()
self.assertTrue(bufshape[0] == self.Nx)
reader.Get(recvar, data, adios2.Mode.Sync)

elif stepStatus == adios2.StepStatus.EndOfStream:
break
else:
raise StopIteration()
reader.EndStep()
reader.Close()
self.assertTrue(all([data[i] == self.fill_value for i
in range(len(data))]))


if __name__ == '__main__':
unittest.main()
92 changes: 92 additions & 0 deletions testing/adios2/engine/dataman/TestDataMan1xN.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/env python
#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#
# TestDataMan1D.py: test for 1D data transfer by reading in Python
# Created on: March 3, 2023
# Author: Dmitry Ganyushin ganyushindi@ornl.gov
from multiprocessing import Process
import unittest
import numpy as np
import adios2


class TestDataMan1D(unittest.TestCase):

def setUp(self):
self.conf = {
"IPAddress": "127.0.0.1",
"Port": "12306",
"Timeout": "5",
"TransportMode": "reliable",
"RendezvousReaderCount": "1",
}
self.Nx = 10
self.fill_value = 1.0
self.shape = [1, self.Nx]

def test_run(self):

s = Process(target=self.thread_send)
r = Process(target=self.thread_receive)

s.start()
r.start()

r.join()
s.join()

def thread_send(self):
data = np.full(shape=self.shape, fill_value=self.fill_value)
shape = data.shape
count = shape
start = (0,) * len(shape)

adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Server")
wan.SetEngine("Dataman")

wan.SetParameters(self.conf)
writer = wan.Open("testdata", adios2.Mode.Write)
sendbuffer = wan.DefineVariable("np_data", data, shape,
start, count, adios2.ConstantDims)
self.assertIsNotNone(sendbuffer)
if sendbuffer:
writer.BeginStep()
writer.Put(sendbuffer, data, adios2.Mode.Deferred)
writer.EndStep()
else:
raise ValueError("DefineVariable failed")

writer.Close()

def thread_receive(self):
data = np.zeros(shape=self.shape)
adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Client")
wan.SetEngine("Dataman")
wan.SetParameters(self.conf)
reader = wan.Open("testdata", adios2.Mode.Read)
while True:
stepStatus = reader.BeginStep()
if stepStatus == adios2.StepStatus.OK:
recvar = wan.InquireVariable("np_data")
self.assertIsNotNone(recvar)
bufshape = recvar.Shape()
self.assertTrue(bufshape[0] == 1)
self.assertTrue(bufshape[1] == self.Nx)
reader.Get(recvar, data, adios2.Mode.Sync)

elif stepStatus == adios2.StepStatus.EndOfStream:
break
else:
raise StopIteration()
reader.EndStep()
reader.Close()
self.assertTrue(all([data[0][i] == self.fill_value for i
in range(len(data))]))


if __name__ == '__main__':
unittest.main()
89 changes: 89 additions & 0 deletions testing/adios2/engine/dataman/TestDataManSingleValues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python
#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#
# TestDataMan1D.py: test for 1D data transfer by reading in Python
# Created on: March 3, 2023
# Author: Dmitry Ganyushin ganyushindi@ornl.gov
from multiprocessing import Process
import unittest
import numpy as np
import adios2


class TestDataMan1D(unittest.TestCase):

def setUp(self):
self.conf = {
"IPAddress": "127.0.0.1",
"Port": "12306",
"Timeout": "5",
"TransportMode": "reliable",
"RendezvousReaderCount": "1",
}
self.Nx = 1
self.fill_value = 1.0
self.shape = [self.Nx]

def test_run(self):

s = Process(target=self.thread_send)
r = Process(target=self.thread_receive)

s.start()
r.start()

r.join()
s.join()

def thread_send(self):
data = np.full(shape=self.shape, fill_value=self.fill_value)
shape = data.shape
count = shape
start = (0,) * len(shape)

adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Server")
wan.SetEngine("Dataman")

wan.SetParameters(self.conf)
writer = wan.Open("testdata", adios2.Mode.Write)
sendbuffer = wan.DefineVariable("np_data", data, shape,
start, count, adios2.ConstantDims)
self.assertIsNotNone(sendbuffer)
if sendbuffer:
writer.BeginStep()
writer.Put(sendbuffer, data, adios2.Mode.Deferred)
writer.EndStep()
else:
raise ValueError("DefineVariable failed")

writer.Close()

def thread_receive(self):
data = np.zeros(shape=self.shape)
adios_io = adios2.ADIOS()
wan = adios_io.DeclareIO("Client")
wan.SetEngine("Dataman")
wan.SetParameters(self.conf)
reader = wan.Open("testdata", adios2.Mode.Read)
while True:
stepStatus = reader.BeginStep()
if stepStatus == adios2.StepStatus.OK:
recvar = wan.InquireVariable("np_data")
self.assertIsNotNone(recvar)
reader.Get(recvar, data, adios2.Mode.Sync)

elif stepStatus == adios2.StepStatus.EndOfStream:
break
else:
raise StopIteration()
reader.EndStep()
reader.Close()
self.assertTrue(all([data[i] == self.fill_value for i
in range(len(data))]))


if __name__ == '__main__':
unittest.main()

0 comments on commit a0d4586

Please sign in to comment.