Skip to content

Commit

Permalink
* Changed CheckpointSettings to allow for a max_buffer_size (#133)
Browse files Browse the repository at this point in the history
* Changed CheckpointReader to print appropriate log messages (instead of ThreadSafeContext)
* Changed CheckpointReader to dynamically allocate buffer space up to a max or fail if max_buffer_size is too small
  • Loading branch information
jredmondson committed Oct 24, 2018
1 parent 148a096 commit 44fac88
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 34 deletions.
148 changes: 116 additions & 32 deletions include/madara/knowledge/CheckpointPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void CheckpointReader::start()
}

madara_logger_ptr_log(logger_, logger::LOG_MAJOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::start:"
" opening file %s\n",
checkpoint_settings.filename.c_str());

Expand All @@ -36,33 +36,76 @@ void CheckpointReader::start()
if (!file)
{
madara_logger_ptr_log(logger_, logger::LOG_ALWAYS,
"ThreadSafeContext::load_context:"
"CheckpointReader::start:"
" could not open file %s for reading. "
"Check that file exists and that permissions are appropriate.\n",
checkpoint_settings.filename.c_str());
stage = 9;
return;
}

max_buffer = checkpoint_settings.buffer_size;
buffer_remaining = max_buffer;

buffer = new char[max_buffer];
current = buffer.get_ptr();

file.seekg(0, file.end);
int length = file.tellg();
file.seekg(0, file.beg);

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::start:"
" file contains %d bytes.\n",
(int)length);

{
size_t bytes = FileHeader::encoded_size();

// if bytes is bigger than the buffer
if(bytes > checkpoint_settings.buffer_size)
{
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"CheckpointReader::start:"
" %d byte file header is greater than CheckpointSettings"
" buffer_size of %d.\n",
(int)length, (int)checkpoint_settings.buffer_size);

// if max buffer does not have enough room
if(bytes > checkpoint_settings.max_buffer_size)
{
std::stringstream buffer;
buffer << "CheckpointReader::start: ";
buffer << bytes << " is greater than CheckpointSettings ";
buffer << "max_buffer_size of ";
buffer << checkpoint_settings.max_buffer_size;
buffer << ". Increase buffer_size or max_buffer_size ";
buffer << " to stop this exception.";

throw exceptions::MemoryException(buffer.str());
}
else // if max_buffer_size is enough room
{
max_buffer = bytes;
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"CheckpointReader::start:"
" setting max_buffer to %d.\n",
(int)max_buffer);
}
}
else
{
max_buffer = checkpoint_settings.buffer_size;
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"CheckpointReader::start:"
" setting max_buffer to %d.\n",
(int)max_buffer);
}
} // end scope

buffer_remaining = max_buffer;

buffer = new char[max_buffer];
current = buffer.get_ptr();

if (!file.read(buffer.get(), FileHeader::encoded_size()))
{
std::stringstream message;
message << "ThreadSafeContext::load_context: ";
message << "CheckpointReader::start: ";
message << "file ";
message << checkpoint_settings.filename;
message << " does not have enough room for an appropriate header";
Expand All @@ -73,7 +116,7 @@ void CheckpointReader::start()
buffer_remaining = (int64_t)total_read;

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::start:"
" reading file: %d bytes read.\n",
(int)total_read);

Expand All @@ -83,7 +126,7 @@ void CheckpointReader::start()
!FileHeader::file_header_test(current))
{
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::start:"
" invalid file or wrong version. No contextual change.\n");
stage = 9;
return;
Expand All @@ -100,7 +143,7 @@ void CheckpointReader::start()
checkpoint_settings.version = utility::to_string_version(meta.karl_version);

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::start:"
" read File meta. Meta.size=%d. Meta.states=%d\n",
(int)meta.size, (int)meta.states);

Expand Down Expand Up @@ -140,15 +183,15 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
if (state >= meta.states || state > checkpoint_settings.last_state)
{
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" done at state=%d of meta.states=%d\n",
(int)state, (int)meta.states);
stage = 9;
return {};
}

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" reading 64bit unsigned size at %d byte file offset\n",
(int)checkpoint_start);

Expand All @@ -159,7 +202,7 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
if (!file.read((char*)&checkpoint_size, sizeof(checkpoint_size)))
{
std::stringstream message;
message << "ThreadSafeContext::load_context: ";
message << "CheckpointReader::next: ";
message << "file ";
message << checkpoint_settings.filename;
message << " does not have enough room for a checkpoint";
Expand All @@ -173,13 +216,54 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()

checkpoint_size = utility::endian_swap(checkpoint_size);

{
size_t bytes = checkpoint_size;

// if bytes is larger than the existing buffer size
if(bytes > max_buffer)
{
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"CheckpointReader::next:"
" %d bytes is greater than existing buffer size of %d\n",
(int)checkpoint_size, (int)max_buffer);

// if the buffer needed is bigger than the maximum size
if(bytes + filters::BufferFilterHeader::encoded_size() >
checkpoint_settings.max_buffer_size)
{
std::stringstream buffer;
buffer << "CheckpointReader::next: ";
buffer << bytes << " is greater than CheckpointSettings ";
buffer << "max_buffer_size of ";
buffer << checkpoint_settings.max_buffer_size;
buffer << ". Increase buffer_size or max_buffer_size ";
buffer << " to stop this exception.";

throw exceptions::MemoryException(buffer.str());
}
else // max buffer needs to be set to the max size
{
//max_buffer = checkpoint_settings.max_buffer_size;
max_buffer = bytes + filters::BufferFilterHeader::encoded_size();
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"CheckpointReader::start:"
" setting max_buffer to %d.\n",
(int)max_buffer);
}
}

buffer_remaining = max_buffer;
buffer = new char[max_buffer];
} // end scope


if (checkpoint_settings.buffer_filters.size() > 0)
{
checkpoint_size += filters::BufferFilterHeader::encoded_size();
}

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" %d state checkpoint size is %d\n",
(int)state, (int)checkpoint_size);

Expand All @@ -189,14 +273,14 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
checkpoint_start += checkpoint_size;

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" reading %d bytes for full checkpoint\n",
(int)checkpoint_size);

if (!file.read(buffer.get(), checkpoint_size))
{
std::stringstream message;
message << "ThreadSafeContext::load_context: ";
message << "CheckpointReader::next: ";
message << "file ";
message << checkpoint_settings.filename;
message << " does not have enough room for ";
Expand All @@ -209,14 +293,14 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
current = buffer.get_ptr();

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" read %d bytes\n",
(int)total_read);

buffer_remaining = (int64_t)total_read;

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" decoding with %d buffer filters with initial size of "
"%d bytes and total buffer of %d bytes\n",
(int)checkpoint_settings.buffer_filters.size(), (int)total_read,
Expand All @@ -230,7 +314,7 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
{
stage = 9;
throw exceptions::FilterException(
"ThreadSafeContext::load_context: "
"CheckpointReader::next: "
"decode () returned a negative encoding size. Bad filter/encode.");
}

Expand All @@ -239,12 +323,12 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
{
stage = 9;
throw exceptions::MemoryException(
"ThreadSafeContext::load_context: "
"CheckpointReader::next: "
"Not enough room in buffer for message header");
}

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" Reading a checkpoint header with %d byte buffer remaining\n",
(int)buffer_remaining);

Expand All @@ -264,7 +348,7 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
checkpoint_header.size - checkpoint_header.encoded_size();

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" read Checkpoint header. header.size=%d, updates.size=%d\n",
(int)checkpoint_header.size, (int)updates_size);

Expand All @@ -276,12 +360,12 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
if (updates_size > (uint64_t)buffer_remaining)
{
throw exceptions::MemoryException(
"ThreadSafeContext::load_context: "
"CheckpointReader::next: "
"Not enough room in buffer for checkpoint");
} // end if allocation is needed

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" state=%d, initial_state=%d, last_state=%d\n",
(int)state, (int)checkpoint_settings.initial_state,
(int)checkpoint_settings.last_state);
Expand All @@ -295,7 +379,7 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
else
{
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" not a valid state, incrementing by %d bytes.\n",
(int)updates_size);

Expand All @@ -320,7 +404,7 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
current = (char*)record.read(current, key, buffer_remaining);

madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" read record (%d of %d): %s\n",
(int)update, (int)checkpoint_header.updates, key.c_str());

Expand All @@ -332,15 +416,15 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
j < checkpoint_settings.prefixes.size() && !prefix_found; ++j)
{
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" checking record %s against prefix %s\n",
key.c_str(), checkpoint_settings.prefixes[j].c_str());

if (madara::utility::begins_with(
key, checkpoint_settings.prefixes[j]))
{
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" record has the correct prefix.\n");

prefix_found = true;
Expand All @@ -350,7 +434,7 @@ std::pair<std::string, KnowledgeRecord> CheckpointReader::next()
if (!prefix_found)
{
madara_logger_ptr_log(logger_, logger::LOG_MINOR,
"ThreadSafeContext::load_context:"
"CheckpointReader::next:"
" record does not have the correct prefix. Rejected.\n");

++update;
Expand Down
12 changes: 10 additions & 2 deletions include/madara/knowledge/CheckpointSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class CheckpointSettings
* @param t_last_state the last state to query/save
* @param t_reset_checkpoint reset the checkpoint modifieds
* @param t_ignore_header_check if true, ignore header checks
* @param t_max_buffer_size the max size in bytes for buffer growth
**/
CheckpointSettings(size_t t_buffer_size, bool t_clear_knowledge,
std::string t_filename = "", uint64_t t_initial_timestamp = 0,
Expand All @@ -93,7 +94,8 @@ class CheckpointSettings
bool t_override_lamport = false, bool t_keep_open = false,
uint64_t t_initial_state = 0, uint64_t t_last_state = (uint64_t)-1,
bool t_reset_checkpoint = true, bool t_ignore_header_check = false,
VariablesLister* t_variables_lister = nullptr)
VariablesLister* t_variables_lister = nullptr,
size_t t_max_buffer_size = 2000000000)
: buffer_size(t_buffer_size),
clear_knowledge(t_clear_knowledge),
filename(t_filename),
Expand All @@ -112,7 +114,8 @@ class CheckpointSettings
last_state(t_last_state),
reset_checkpoint(t_reset_checkpoint),
ignore_header_check(t_ignore_header_check),
variables_lister(t_variables_lister)
variables_lister(t_variables_lister),
max_buffer_size(t_max_buffer_size)
{
}

Expand Down Expand Up @@ -463,6 +466,11 @@ class CheckpointSettings
**/
VariablesLister* variables_lister = nullptr;

/**
* the max size the buffer can grow to
**/
size_t max_buffer_size = 2000000000;

private:
/**
* a thread-safe ref-counted file handle for quick access to an open
Expand Down

0 comments on commit 44fac88

Please sign in to comment.