Skip to content

Commit

Permalink
Fix checkpoint streamer handling of circular buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
dskyle-shieldai authored and dskyle committed Oct 2, 2018
1 parent 0d0d94e commit 8ac6f40
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 7 deletions.
8 changes: 7 additions & 1 deletion include/madara/knowledge/ThreadSafeContext.inl
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,13 @@ inline void ThreadSafeContext::mark_and_signal(

if (settings.stream_changes && streamer_ != nullptr)
{
streamer_->enqueue(ref.get_name(), *ref.get_record_unsafe());
auto rec_ptr = ref.get_record_unsafe();
if (rec_ptr->has_history())
{
rec_ptr = &rec_ptr->ref_newest();
}

streamer_->enqueue(ref.get_name(), *rec_ptr);
}

if (settings.signal_changes)
Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/Fragmentation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ void madara::transport::frag(
{
madara_logger_ptr_log(logger::global_logger.get(), logger::LOG_DETAILED,
"transport::frag:"
" regular message header detected\n");
" reduced message header detected\n");

ReducedMessageHeader contents_header;
int64_t buffer_remaining = contents_header.encoded_size();
Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/udp/UdpTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ long UdpTransport::send_message(const char* buf, size_t packet_size)
frag(buf, settings_.max_fragment_size, map);

int j(0);
for (FragmentMap::iterator i = map.begin(); i != map.end(); ++i)
for (FragmentMap::iterator i = map.begin(); i != map.end(); ++i, ++j)
{
madara_logger_log(context_.get_logger(), logger::LOG_MAJOR,
"%s:"
Expand Down
6 changes: 3 additions & 3 deletions port/java/jni/ai_madara_knowledge_KnowledgeBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1102,9 +1102,9 @@ void JNICALL Java_ai_madara_knowledge_KnowledgeBase_jni_1setImageSettings(
env->ReleaseByteArrayElements(value, source, JNI_ABORT);
env->ReleaseStringUTFChars(var, nativeVar);

madara::utility::java::throw_dead_obj_exception(
env, "KnowledgeBase::setImageSettings: KB or settings objects are "
"released already");
madara::utility::java::throw_dead_obj_exception(env,
"KnowledgeBase::setImageSettings: KB or settings objects are "
"released already");
}

delete[] dest;
Expand Down
88 changes: 87 additions & 1 deletion tests/test_checkpointing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,21 @@ void test_filter_header(void)
std::cerr << (char*)(buffer) << "\n";
}

template<typename I = int, typename Iter>
bool is_sequential(Iter first, Iter last, I counter = 0)
{
while (first != last)
{
if (*first != counter)
{
return false;
}
++counter;
++first;
}
return true;
}

void test_streaming()
{
std::cerr << "\n*********** TESTING STREAMING *************.\n";
Expand Down Expand Up @@ -1042,18 +1057,52 @@ void test_streaming()
builder.setZ(9);
kb.emplace_any(".any0", madara::type<CapnObject<geo_capn::Point>>{}, msg);

kb.set_history_capacity("hist", 10);
kb.set("hist", 1);
kb.set("hist", 2);
kb.set("hist", 3);

utility::sleep(0.5);

kb.set("glob1", 4);
kb.set("glob2", "bar");
kb.set("hist", 4);
kb.set("hist", 5);
kb.set("hist", 6);

utility::sleep(0.5);

kb.attach_streamer(nullptr);

auto prehist = kb.get_history("hist");
if (kb.get(".loc1") == 2 && kb.get(".loc2") == "foo" &&
kb.get("glob1") == 4 && kb.get("glob2") == "bar" && prehist.size() == 6 &&
is_sequential(prehist.begin(), prehist.end(), 1))
{
std::cerr << "SUCCESS\n";
}
else
{
std::string dump;
kb.to_string(dump);
std::cerr << "FAIL before load:\n" << dump << "\n";
std::cerr << " hist:";
for (const auto& cur : prehist)
{
std::cerr << " " << cur;
}
std::cerr << std::endl;
madara_fails++;
// return;
}

knowledge::KnowledgeBase kb2;

kb2.set_history_capacity("hist", 10);

kb2.load_context(settings);

auto hist = kb2.get_history("hist");
if (kb2.get(".loc1") == 2 && kb2.get(".loc2") == "foo" &&
kb2.get("glob1") == 4 && kb2.get("glob2") == "bar")
{
Expand All @@ -1065,13 +1114,28 @@ void test_streaming()
kb2.to_string(dump);
std::cerr << "FAIL :\n" << dump << "\n";
madara_fails++;
return;
}

knowledge::KnowledgeBase kb3;

kb3.set_history_capacity("hist", 10);

settings.playback_simtime = true;

{
knowledge::CheckpointReader reader(settings);
for (;;)
{
auto next = reader.next();
if (next.first == "")
{
break;
}

std::cerr << next.first << ": " << next.second << std::endl;
}
}

knowledge::CheckpointReader reader(settings);
reader.start(); // Load checkpoint metadata

Expand Down Expand Up @@ -1111,6 +1175,28 @@ void test_streaming()
std::cerr << utility::get_time() << ": " << dump << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}

auto posthist = kb3.get_history("hist");
if (kb3.get(".loc1") == 2 && kb3.get(".loc2") == "foo" &&
kb3.get("glob1") == 4 && kb3.get("glob2") == "bar" &&
posthist.size() == 6 &&
is_sequential(posthist.begin(), posthist.end(), 1))
{
std::cerr << "SUCCESS\n";
}
else
{
std::string dump;
kb3.to_string(dump);
std::cerr << "FAIL after playback:\n" << dump << "\n";
std::cerr << " hist:";
for (const auto& cur : posthist)
{
std::cerr << " " << cur;
}
std::cerr << std::endl;
madara_fails++;
}
}

int main(int argc, char* argv[])
Expand Down

0 comments on commit 8ac6f40

Please sign in to comment.