Skip to content

Commit

Permalink
ChunkV for BP5Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Jul 6, 2021
1 parent d9db7c7 commit 0a1bca4
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 7 deletions.
2 changes: 1 addition & 1 deletion source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ add_library(adios2_core
toolkit/format/buffer/Buffer.cpp
toolkit/format/buffer/BufferV.cpp
toolkit/format/buffer/malloc/MallocV.cpp
# toolkit/format/buffer/chunk/ChunkV.cpp
toolkit/format/buffer/chunk/ChunkV.cpp
toolkit/format/buffer/heap/BufferSTL.cpp

toolkit/format/bp/BPBase.cpp toolkit/format/bp/BPBase.tcc
Expand Down
8 changes: 6 additions & 2 deletions source/adios2/common/ADIOSTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,12 @@ constexpr uint64_t DefaultMaxBufferSize = MaxSizeT - 1;
constexpr float DefaultBufferGrowthFactor = 1.05f;

/** default Buffer Chunk Size
* 2Gb - 100Kb (tolerance)*/
constexpr uint64_t DefaultBufferChunkSize = 2147381248;
* 16Mb */
constexpr uint64_t DefaultBufferChunkSize = 16 * 1024 * 1024;

/** default minimum size not copying deferred writes
* 4Mb */
constexpr size_t DefaultMinDeferredSize = 4 * 1024 * 1024;

/** default size for writing/reading files using POSIX/fstream/stdio write
* 2Gb - 100Kb (tolerance)*/
Expand Down
26 changes: 26 additions & 0 deletions source/adios2/engine/bp5/BP5Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,32 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)
return false;
};

auto lf_SetBufferVTypeParameter = [&](const std::string key, int &parameter,
int def) {
auto itKey = io.m_Parameters.find(key);
parameter = def;
if (itKey != io.m_Parameters.end())
{
std::string value = itKey->second;
std::transform(value.begin(), value.end(), value.begin(),
::tolower);
if (value == "malloc")
{
parameter = (int)BufferVType::MallocVType;
}
else if (value == "chunk")
{
parameter = (int)BufferVType::ChunkVType;
}
else
{
throw std::invalid_argument(
"ERROR: Unknown BP5 BufferVType parameter \"" + value +
"\" (must be \"malloc\" or \"chunk\"");
}
}
};

#define get_params(Param, Type, Typedecl, Default) \
lf_Set##Type##Parameter(#Param, Params.Param, Default);
BP5_FOREACH_PARAMETER_TYPE_4ARGS(get_params);
Expand Down
11 changes: 11 additions & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ class BP5Engine

std::string GetBPVersionFileName(const std::string &name) const noexcept;

enum class BufferVType
{
MallocVType,
ChunkVType,
Auto
};

BufferVType UseBufferV = BufferVType::ChunkVType;

#define BP5_FOREACH_PARAMETER_TYPE_4ARGS(MACRO) \
MACRO(OpenTimeoutSecs, Int, int, 3600) \
MACRO(BeginStepPollingFrequencySecs, Int, int, 0) \
Expand All @@ -97,7 +106,9 @@ class BP5Engine
MACRO(AsyncTasks, Bool, bool, true) \
MACRO(GrowthFactor, Float, float, DefaultBufferGrowthFactor) \
MACRO(InitialBufferSize, SizeBytes, size_t, DefaultInitialBufferSize) \
MACRO(MinDeferredSize, SizeBytes, size_t, DefaultMinDeferredSize) \
MACRO(BufferChunkSize, SizeBytes, size_t, DefaultBufferChunkSize) \
MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \
MACRO(ReaderShortCircuitReads, Bool, bool, false)

struct BP5Params
Expand Down
15 changes: 12 additions & 3 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "adios2/common/ADIOSMacros.h"
#include "adios2/core/IO.h"
#include "adios2/helper/adiosFunctions.h" //CheckIndexRange
#include "adios2/toolkit/format/buffer/chunk/ChunkV.h"
#include "adios2/toolkit/format/buffer/malloc/MallocV.h"
#include "adios2/toolkit/transport/file/FileFStream.h"
#include <adios2-perfstubs-interface.h>
Expand Down Expand Up @@ -44,9 +45,17 @@ BP5Writer::BP5Writer(IO &io, const std::string &name, const Mode mode,
StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds)
{
m_WriterStep++;
m_BP5Serializer.InitStep(new MallocV("BP5Writer", false,
m_Parameters.InitialBufferSize,
m_Parameters.GrowthFactor));
if (m_Parameters.BufferVType == (int)BufferVType::MallocVType)
{
m_BP5Serializer.InitStep(new MallocV("BP5Writer", false,
m_Parameters.InitialBufferSize,
m_Parameters.GrowthFactor));
}
else
{
m_BP5Serializer.InitStep(new ChunkV("BP5Writer", true /* always copy */,
m_Parameters.BufferChunkSize));
}
return StepStatus::OK;
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/bp5/BP5Writer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void BP5Writer::PutCommon(Variable<T> &variable, const T *values, bool sync)
/* If arrays is small, force copying to internal buffer to aggregate
* small writes */
size_t n = helper::GetTotalSize(variable.m_Count) * sizeof(T);
if (n < 4194304 /* 4MB */)
if (n < m_Parameters.MinDeferredSize)
{
sync = true;
}
Expand Down
121 changes: 121 additions & 0 deletions source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* BufferV.cpp
*
*/

#include "ChunkV.h"
#include "adios2/toolkit/format/buffer/BufferV.h"
#include <iostream>
#include <string.h>

namespace adios2
{
namespace format
{

ChunkV::ChunkV(const std::string type, const bool AlwaysCopy,
const size_t ChunkSize)
: BufferV(type, AlwaysCopy), m_ChunkSize(ChunkSize)
{
}

ChunkV::~ChunkV()
{
for (const auto &Chunk : m_Chunks)
{
std::cout << "freeing chunk " << (void *)Chunk << std::endl;
free((void *)Chunk);
}
}

size_t ChunkV::AddToVec(const size_t size, const void *buf, int align,
bool CopyReqd)
{
std::cout << "In add to vec. size " << size << " buf " << (void *)buf
<< " align " << align << " Copy " << CopyReqd << std::endl;
int badAlign = CurOffset % align;
if (badAlign)
{
int addAlign = align - badAlign;
char zero[16] = {0};
AddToVec(addAlign, zero, 1, true);
}
size_t retOffset = CurOffset;

if (size == 0)
return CurOffset;

std::cout << "In add to vec" << std::endl;
if (!CopyReqd && !m_AlwaysCopy)
{
// just add buf to internal version of output vector
VecEntry entry = {true, buf, 0, size};
DataV.push_back(entry);
std::cout << "In add to vec2" << std::endl;
}
else
{
// we can possibly append this entry to the last if the last was
// internal
std::cout << "In add to vec3" << std::endl;
bool AppendPossible = DataV.size() && !DataV.back().External;

if (AppendPossible && (m_TailChunkPos + size > m_ChunkSize))
{
// No room in current chunk, close it out
// realloc down to used size (helpful?) and set size in array
std::cout << "In add to vec5" << std::endl;
m_Chunks.back() = (char *)realloc(m_Chunks.back(), m_TailChunkPos);

m_TailChunkPos = 0;
m_TailChunk = NULL;
AppendPossible = false;
}
if (AppendPossible)
{
std::cout << "In add to vec4" << std::endl;
// We can use current chunk, just append the data;
memcpy(m_TailChunk + m_TailChunkPos, buf, size);
DataV.back().Size += size;
m_TailChunkPos += size;
}
else
{
// We need a new chunk, get the larger of size or m_ChunkSize
std::cout << "In add to vec6 m_ChunkSize" << m_ChunkSize << std::endl;
size_t NewSize = m_ChunkSize;
std::cout << "In add to vec7" << std::endl;
if (size > m_ChunkSize)
NewSize = size;
std::cout << "In add to vec8 new size" << NewSize << std::endl;
m_TailChunk = (char *)malloc(NewSize);
std::cout << "In add to vec9 size" << size << std::endl;
memcpy(m_TailChunk, buf, size);
m_TailChunkPos = size;
VecEntry entry = {false, m_TailChunk, 0, size};
std::cout << "In add to vec10 size" << m_TailChunkPos << std::endl;
DataV.push_back(entry);
}
}
CurOffset = retOffset + size;
return retOffset;
}

ChunkV::BufferV_iovec ChunkV::DataVec() noexcept
{
BufferV_iovec ret = new iovec[DataV.size() + 1];
for (std::size_t i = 0; i < DataV.size(); ++i)
{
// For ChunkV, all entries in DataV are actual iov entries.
ret[i].iov_base = DataV[i].Base;
ret[i].iov_len = DataV[i].Size;
}
ret[DataV.size()] = {NULL, 0};
return ret;
}

} // end namespace format
} // end namespace adios2
45 changes: 45 additions & 0 deletions source/adios2/toolkit/format/buffer/chunk/ChunkV.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
*/

#ifndef ADIOS2_TOOLKIT_FORMAT_BUFFER_MALLOC_CHUNKV_H_
#define ADIOS2_TOOLKIT_FORMAT_BUFFER_MALLOC_CHUNKV_H_

#include "adios2/common/ADIOSConfig.h"
#include "adios2/common/ADIOSTypes.h"

#include "adios2/toolkit/format/buffer/BufferV.h"

namespace adios2
{
namespace format
{

class ChunkV : public BufferV
{
public:
uint64_t Size() noexcept;

const size_t m_ChunkSize;

ChunkV(const std::string type, const bool AlwaysCopy = false,
const size_t ChunkSize = DefaultBufferChunkSize);
virtual ~ChunkV();

virtual BufferV_iovec DataVec() noexcept;

virtual size_t AddToVec(const size_t size, const void *buf, int align,
bool CopyReqd);

private:
std::vector<char *> m_Chunks;
size_t m_TailChunkPos = 0;
char *m_TailChunk = NULL;
};

} // end namespace format
} // end namespace adios2

#endif /* ADIOS2_TOOLKIT_FORMAT_BUFFER_MALLOC_MALLOCV_H_ */

0 comments on commit 0a1bca4

Please sign in to comment.