Skip to content

Commit

Permalink
* Updating KnowledgeBaseImpl to include a mutex specifically for chan… (
Browse files Browse the repository at this point in the history
#115)

* * Updating KnowledgeBaseImpl to include a mutex specifically for changing, using or retrieving transports
  * Stops us from having to lock KB during entire send_modifieds
  * Should open up massive multi-threading gains
* Added get_modifieds_current to ThreadSafeContext
  * Using this instead of get_modifieds to provide KnowledgeMap in send_modifieds
  * Allows all transports to have a consistent KnowledgeMap, regardless of order, and without needing a context mutex protection

* * Fixing clang compile errors

* * Moving toward a better implementation
* [skip ci]

* * Fixed issues with send_modifeds and get_modifieds_current

* * Updated stik_inspect to handle config files
* Updated karl and stk_inspect to have common default config locations
* Updated stk_inspect to have debug mode
* Updated karl and stk_inspect to have more standard help printouts

* * Modified get_modifieds_current default arguments to not include the send_list

* * Modified the stk_inspect config file to work correctly and point to the right location
  • Loading branch information
jredmondson committed Oct 8, 2018
1 parent 61266db commit 414de1e
Show file tree
Hide file tree
Showing 22 changed files with 477 additions and 190 deletions.
70 changes: 21 additions & 49 deletions include/madara/knowledge/KnowledgeBaseImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,24 @@ size_t KnowledgeBaseImpl::attach_transport(
" no transport was specified. Setting transport to null.\n");
}

MADARA_GUARD_TYPE guard(map_.mutex_);

// if we have a valid transport, add it to the transports vector
if (transport != 0)
{
transports_.emplace_back(transport);
}
MADARA_GUARD_TYPE guard(transport_mutex_);

// if we have a valid transport, add it to the transports vector
if (transport != 0)
{
transports_.emplace_back(transport);
}

return transports_.size();
return transports_.size();
}
}

void KnowledgeBaseImpl::close_transport(void)
{
decltype(transports_) old_transports;
{
MADARA_GUARD_TYPE guard(map_.mutex_);
MADARA_GUARD_TYPE guard(transport_mutex_);
using std::swap;
swap(old_transports, transports_);
}
Expand Down Expand Up @@ -392,54 +394,24 @@ int KnowledgeBaseImpl::send_modifieds(
{
int result = 0;

MADARA_GUARD_TYPE guard(map_.mutex_);
MADARA_GUARD_TYPE guard(transport_mutex_);

if (transports_.size() > 0 && !settings.delay_sending_modifieds)
{
const VariableReferenceMap& modified = map_.get_modifieds();
KnowledgeMap modified;

// get the modifieds and reset those that will be sent, atomically
{
MADARA_GUARD_TYPE guard(map_.mutex_);
modified = map_.get_modifieds_current(settings.send_list, true);
}

if (modified.size() > 0)
{
// if there is not an allowed send_list list
if (settings.send_list.size() == 0)
{
// send across each transport
for (auto& transport : transports_)
{
transport->send_data(modified);
}

// reset the modified map
map_.reset_modified();
}
// if there is a send_list
else
// send across each transport
for (auto& transport : transports_)
{
VariableReferenceMap allowed_modifieds;
// otherwise, we are only allowed to send a subset of modifieds
for (const auto& entry : modified)
{
if (settings.send_list.find(entry.first) != settings.send_list.end())
{
allowed_modifieds.emplace_hint(allowed_modifieds.end(), entry);
}
}

// if the subset was greater than zero, we send the subset
if (allowed_modifieds.size() > 0)
{
// send across each transport
for (auto& transport : transports_)
{
transport->send_data(allowed_modifieds);
}

// reset modified list for the allowed modifications
for (const auto& entry : allowed_modifieds)
{
map_.reset_modified(entry.first);
}
}
transport->send_data(modified);
}

map_.inc_clock(settings);
Expand Down
2 changes: 2 additions & 0 deletions include/madara/knowledge/KnowledgeBaseImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,8 @@ class KnowledgeBaseImpl
ThreadSafeContext map_;
std::string id_;
transport::QoSTransportSettings settings_;

mutable MADARA_LOCK_TYPE transport_mutex_;

std::vector<std::unique_ptr<transport::Base>> transports_;
};
Expand Down
9 changes: 5 additions & 4 deletions include/madara/knowledge/KnowledgeBaseImpl.inl
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ inline int KnowledgeBaseImpl::read_file(const VariableReference& variable,

inline void KnowledgeBaseImpl::activate_transport(void)
{
MADARA_GUARD_TYPE guard(map_.mutex_);
MADARA_GUARD_TYPE guard(transport_mutex_);

if (transports_.size() == 0)
{
Expand Down Expand Up @@ -392,15 +392,15 @@ inline KnowledgeRecord KnowledgeBaseImpl::evaluate(

inline size_t KnowledgeBaseImpl::attach_transport(transport::Base* transport)
{
MADARA_GUARD_TYPE guard(map_.mutex_);
MADARA_GUARD_TYPE guard(transport_mutex_);

transports_.emplace_back(transport);
return transports_.size();
}

inline size_t KnowledgeBaseImpl::get_num_transports(void)
{
MADARA_GUARD_TYPE guard(map_.mutex_);
MADARA_GUARD_TYPE guard(transport_mutex_);

return transports_.size();
}
Expand All @@ -410,7 +410,8 @@ inline size_t KnowledgeBaseImpl::remove_transport(size_t index)
std::unique_ptr<transport::Base> transport;
size_t size = 0;
{
MADARA_GUARD_TYPE guard(map_.mutex_);
MADARA_GUARD_TYPE guard(transport_mutex_);

size = transports_.size();
if (index < size)
{
Expand Down
9 changes: 9 additions & 0 deletions include/madara/knowledge/ThreadSafeContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,15 @@ class MADARA_EXPORT ThreadSafeContext
**/
const VariableReferenceMap& get_modifieds(void) const;

/**
* Retrieves the current modifieds map
* @param send_list map of variables that limit what will be sent
* @param reset reset modifieds atomically
* @return the modified knowledge records
**/
KnowledgeMap get_modifieds_current(
const std::map<std::string, bool> & send_list, bool reset = true);

/**
* Adds a list of VariableReferences to the current modified list.
* @param modifieds a list of variables to add to modified list
Expand Down
62 changes: 62 additions & 0 deletions include/madara/knowledge/ThreadSafeContext.inl
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,68 @@ inline const VariableReferenceMap& ThreadSafeContext::get_modifieds(void) const
return changed_map_;
}

inline KnowledgeMap ThreadSafeContext::get_modifieds_current(
const std::map<std::string, bool> & send_list, bool reset)
{
MADARA_GUARD_TYPE guard(mutex_);

KnowledgeMap map;

// if there are no limiting prefixes, iterate through and reset
if (send_list.size() == 0)
{
for (auto i = changed_map_.begin();
i != changed_map_.end(); )
{
map.emplace_hint (map.end(),
i->first, *i->second.get_record_unsafe());

if (reset)
{
i = changed_map_.erase(i);
}
}
}
// if there are limiting prefixes, only copy over the prefixes
else
{
// if the prefixes list is smaller than the changed_map_
if (send_list.size() < changed_map_.size())
{
for (auto var : send_list)
{
auto found = changed_map_.find(var.first.c_str());

if (found != changed_map_.end())
{
map.emplace_hint(
map.end(), found->first, *found->second.get_record_unsafe());
changed_map_.erase(found);
}
}
}
// else if the changed_map_ is smaller than the prefixes list
else
{
for (auto i = changed_map_.begin(); i != changed_map_.end();)
{
if (send_list.find (i->first) != send_list.end())
{
map.emplace_hint (map.end(),
i->first, *i->second.get_record_unsafe());

if (reset)
{
i = changed_map_.erase (i);
}
}
}
}
}

return map;
}

inline VariableReferences ThreadSafeContext::save_modifieds(void) const
{
MADARA_GUARD_TYPE guard(mutex_);
Expand Down
24 changes: 12 additions & 12 deletions include/madara/transport/Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ int prep_rebroadcast(knowledge::ThreadSafeContext& context, char* buffer,
return result;
}

long Base::prep_send(const knowledge::VariableReferenceMap& orig_updates,
long Base::prep_send(const knowledge::KnowledgeMap& orig_updates,
const char* print_prefix)
{
// check to see if we are shutting down
Expand Down Expand Up @@ -881,23 +881,23 @@ long Base::prep_send(const knowledge::VariableReferenceMap& orig_updates,
madara_logger_log(context_.get_logger(), logger::LOG_MAJOR,
"%s:"
" Calling filter chain of %s.\n",
print_prefix, e.first);
print_prefix, e.first.c_str());

const auto* record = e.second.get_record_unsafe();
const auto record = e.second;

if (record->toi() > latest_toi)
if (record.toi() > latest_toi)
{
latest_toi = record->toi();
latest_toi = record.toi();
}

// filter the record according to the send filter chain
knowledge::KnowledgeRecord result =
settings_.filter_send(*record, e.first, transport_context);
settings_.filter_send(record, e.first, transport_context);

madara_logger_log(context_.get_logger(), logger::LOG_MAJOR,
"%s:"
" Filter returned for %s.\n",
print_prefix, e.first);
print_prefix, e.first.c_str());

if (result.exists())
{
Expand Down Expand Up @@ -940,21 +940,21 @@ long Base::prep_send(const knowledge::VariableReferenceMap& orig_updates,
{
for (auto e : orig_updates)
{
const auto* record = e.second.get_record_unsafe();
const auto record = e.second;

if (record->toi() > latest_toi)
if (record.toi() > latest_toi)
{
latest_toi = record->toi();
latest_toi = record.toi();
}

if (record)
{
madara_logger_log(context_.get_logger(), logger::LOG_MINOR,
"%s:"
" Adding record %s to update list.\n",
print_prefix, e.first);
print_prefix, e.first.c_str());

filtered_updates.emplace(std::make_pair(e.first, *record));
filtered_updates.emplace(std::make_pair(e.first, record));
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions include/madara/transport/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class MADARA_EXPORT Base
* 0 No message to send
* > 0 size of buffered message
**/
long prep_send(const knowledge::VariableReferenceMap& orig_updates,
long prep_send(const knowledge::KnowledgeMap& orig_updates,
const char* print_prefix);

/**
Expand All @@ -109,7 +109,7 @@ class MADARA_EXPORT Base
*
* @return result of operation or -1 if we are shutting down
**/
virtual long send_data(const knowledge::VariableReferenceMap&) = 0;
virtual long send_data(const knowledge::KnowledgeMap&) = 0;

/**
* Invalidates a transport to indicate it is shutting down
Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/ndds/NddsTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ int madara::transport::NddsTransport::setup(void)
}

long madara::transport::NddsTransport::send_data(
const knowledge::VariableReferenceMap& updates)
const knowledge::KnowledgeMap& updates)
{
long result = prep_send(updates, "NddsTransport::send_data:");

Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/ndds/NddsTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class NddsTransport : public Base
* @param updates listing of all updates that must be sent
* @return result of write operation or -1 if we are shutting down
**/
long send_data(const knowledge::VariableReferenceMap& updates) override;
long send_data(const knowledge::KnowledgeMap& updates) override;

/**
* Accesses reliability setting
Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/splice/SpliceDDSTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ int madara::transport::SpliceDDSTransport::setup(void)
}

long madara::transport::SpliceDDSTransport::send_data(
const knowledge::VariableReferenceMap& updates)
const knowledge::KnowledgeMap& updates)
{
long result = 0;

Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/splice/SpliceDDSTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class SpliceDDSTransport : public Base
* @param updates listing of all updates that must be sent
* @return result of write operation or -1 if we are shutting down
**/
long send_data(const knowledge::VariableReferenceMap& updates) override;
long send_data(const knowledge::KnowledgeMap& updates) override;

/**
* Accesses reliability setting
Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/udp/UdpRegistryClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void UdpRegistryClient::send_register(void)
}

long UdpRegistryClient::send_data(
const knowledge::VariableReferenceMap& orig_updates)
const knowledge::KnowledgeMap& orig_updates)
{
if (!settings_.no_sending)
{
Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/udp/UdpRegistryClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class MADARA_EXPORT UdpRegistryClient : public UdpTransport
* @return result of write operation or -1 if we are shutting down
**/
long send_data(
const madara::knowledge::VariableReferenceMap& updates) override;
const madara::knowledge::KnowledgeMap& updates) override;

int setup(void) override;

Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/udp/UdpRegistryServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ int madara::transport::UdpRegistryServer::setup(void)
}

long madara::transport::UdpRegistryServer::send_data(
const madara::knowledge::VariableReferenceMap& orig_updates)
const madara::knowledge::KnowledgeMap& orig_updates)
{
if (!settings_.no_sending)
{
Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/udp/UdpRegistryServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class MADARA_EXPORT UdpRegistryServer : public UdpTransport
* @return result of write operation or -1 if we are shutting down
**/
long send_data(
const madara::knowledge::VariableReferenceMap& updates) override;
const madara::knowledge::KnowledgeMap& updates) override;

int setup(void) override;

Expand Down
2 changes: 1 addition & 1 deletion include/madara/transport/udp/UdpTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ long UdpTransport::send_message(const char* buf, size_t packet_size)
}

long UdpTransport::send_data(
const knowledge::VariableReferenceMap& orig_updates)
const knowledge::KnowledgeMap& orig_updates)
{
long result(0);
const char* print_prefix = "UdpTransport::send_data";
Expand Down

0 comments on commit 414de1e

Please sign in to comment.