Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 47 additions & 25 deletions src/stream_info_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,34 +188,56 @@ void stream_info_impl::from_fullinfo_message(const std::string &m) {
read_xml(doc_);
}

/**
* Test whether this stream info matches the given query string.
*/
bool stream_info_impl::matches_query(const string &query) {
/// Test whether this stream info matches the given query string.
bool stream_info_impl::matches_query(const string &query, bool nocache) {
return cached_.matches_query(doc_, query, nocache);
}

bool query_cache::matches_query(const xml_document &doc, const std::string query, bool nocache) {
lslboost::lock_guard<lslboost::mutex> lock(cache_mut_);
query_cache::left_iterator it = cached_.left.find(query);
if (it != cached_.left.end()) {
// found in cache
bool is_match = it->second.second;

decltype(cache)::iterator it;
if (!nocache && (it = cache.find(query)) != cache.end()) {
// the sign bit encodes if the query matches or not
bool matches = it->second > 0;
// update the last-use time stamp
cached_.left.replace_data(it,std::make_pair(lsl_clock(),is_match));
return is_match;
} else {
// not found in cache
try {
// compute whether it matches
string fullquery = (string("/info[") += query) += "]";
bool result = !doc_.select_nodes(fullquery.c_str()).empty();
// insert result into cache
cached_.left.insert(std::make_pair(query,std::make_pair(lsl_clock(),result)));
// remove oldest results until we're within capacity
while ((int)cached_.size() > api_config::get_instance()->max_cached_queries())
cached_.right.erase(cached_.right.begin());
// return result
return result;
} catch(...) {
return false; // error: unsupported query
it->second = ++query_cache_age * (matches ? 1 : -1);
// return cached match
return matches;
}

// not found in cache
try {
// compute whether it matches
bool matched = pugi::xpath_query(query.c_str()).evaluate_boolean(doc.first_child());

auto max_cached = (std::size_t)api_config::get_instance()->max_cached_queries();
if(nocache || max_cached == 0)
return matched;

cache.insert(std::make_pair(query, ++query_cache_age * (matched ? 1 : -1)));

// remove n/2 oldest results to make room for new entries
if (cache.size() > max_cached) {
// Find out the median cache entry age
std::vector<int> agevec;
agevec.reserve(cache.size());
for (auto &val : cache) agevec.push_back(std::abs(val.second));
auto middle = agevec.begin() + max_cached / 2;
std::nth_element(agevec.begin(), middle, agevec.end());
auto oldest_to_keep = *middle;

// Remove all elements older than the median age
for (auto it = cache.begin(); it != cache.end();)
if (abs(it->second) <= oldest_to_keep)
it = cache.erase(it);
else
++it;
}
return matched;
} catch (std::exception &e) {
LOG_F(WARNING, "Query error: %s", e.what());
return false;
}
}

Expand Down
18 changes: 12 additions & 6 deletions src/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,26 @@
#define STREAM_INFO_IMPL_H

#include "common.h"
#include <boost/bimap.hpp>
#include <boost/thread/mutex.hpp>
#include "pugixml/pugixml.hpp"
#include <boost/thread/mutex.hpp>
#include <unordered_map>

namespace lsl {

/// LRU cache for queries
class query_cache {
std::unordered_map<std::string, int> cache;
int query_cache_age{0};
lslboost::mutex cache_mut_;
public:
bool matches_query(const pugi::xml_document& doc, const std::string query, bool nocache);
};

/**
* Actual implementation of the stream_info class.
* The stream_info class forwards all operations to an instance of this class.
*/
class stream_info_impl {
/// The query cache is a (bidirectional) mapping between query-strings and pairs of (last-use-timestamp, matching-true/false)
typedef lslboost::bimap<std::string,std::pair<double,bool> > query_cache;
public:

/**
Expand Down Expand Up @@ -80,7 +87,7 @@ namespace lsl {
* The info "matches" if the given XPath 1.0 query string returns a non-empty node set.
* @return Whether stream info is matched by the query string.
*/
bool matches_query(const std::string &query);
bool matches_query(const std::string &query, bool nocache = false);


//
Expand Down Expand Up @@ -255,7 +262,6 @@ namespace lsl {
pugi::xml_document doc_;
// cached query results
query_cache cached_;
lslboost::mutex cache_mut_;
};


Expand Down
8 changes: 8 additions & 0 deletions testing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@ project(lsltests
)
cmake_minimum_required (VERSION 3.12)
enable_testing()

option(LSL_BENCHMARKS "Enable benchmarks in unit tests" OFF)

add_library(catch_main OBJECT catch_main.cpp)
target_compile_features(catch_main PUBLIC cxx_std_11)

target_compile_definitions(catch_main PRIVATE LSL_VERSION_INFO=${LSL_VERSION_INFO})
if(LSL_BENCHMARKS)
target_compile_definitions(catch_main PUBLIC CATCH_CONFIG_ENABLE_BENCHMARKING)
endif()

add_executable(lsl_test_exported
DataType.cpp
discovery.cpp
Expand All @@ -21,6 +28,7 @@ add_executable(lsl_test_internal
asiocancel.cpp
inireader.cpp
stringfuncs.cpp
streaminfo.cpp
)
target_link_libraries(lsl_test_internal PRIVATE lslobj lslboost catch_main)

Expand Down
52 changes: 52 additions & 0 deletions testing/streaminfo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include "../src/stream_info_impl.h"
#include "../src/api_config.h"
#include "catch.hpp"

TEST_CASE("streaminfo matching via XPath", "[basic][streaminfo][xml]") {
lsl::stream_info_impl info(
"streamname", "streamtype", 8, 500, lsl_channel_format_t::cft_string, "sourceid");
auto channels = info.desc().append_child("channels");
for(int i=0; i< 4;++i)
channels.append_child("channel").append_child("type").append_child(pugi::node_pcdata).set_value("EEG");
for(int i=0; i< 4;++i)
channels.append_child("channel").append_child("type").append_child(pugi::node_pcdata).set_value("EOG");

#ifdef CATCH_CONFIG_ENABLE_BENCHMARKING
// Append lots of dummy channels for performance tests
for(int i=0; i<50000; ++i)
channels.append_child("chn").append_child("type").append_child(pugi::node_pcdata).set_value("foobar");
for(int i=0; i<2000; ++i) {
channels = channels.append_child("chn");
channels.append_child(pugi::node_pcdata).set_value("1");
}

BENCHMARK("trivial query") {
return info.matches_query("name='streamname' and type='streamtype'", true);
};
BENCHMARK("complicated query") {
return info.matches_query("count(desc/channels/channel[type='EEG'])>3", true);
};
BENCHMARK("Cached query") {
return info.matches_query("count(desc/channels/channel[type='EEG'])>3", false);
};

// test how well the cache copes with lots of different queries
BENCHMARK("partially cached queries (x1000)") {
int matches = 0;
for (int j = 0; j < 1000; ++j)
matches += info.matches_query(("0<=" + std::to_string(j)).c_str());
return matches;
};

#endif

INFO(info.to_fullinfo_message());
REQUIRE(info.matches_query("name='streamname'"));
REQUIRE(info.matches_query("name='streamname' and type='streamtype'"));
REQUIRE(info.matches_query("channel_count > 5"));
REQUIRE(info.matches_query("nominal_srate >= 499"));
REQUIRE(info.matches_query("count(desc/channels/channel[type='EEG'])>3"));

REQUIRE(!info.matches_query("in'va'lid"));
REQUIRE(!info.matches_query("name='othername'"));
}