diff --git a/src/stream_inlet_impl.h b/src/stream_inlet_impl.h index 10f410306..c9ba35471 100644 --- a/src/stream_inlet_impl.h +++ b/src/stream_inlet_impl.h @@ -305,7 +305,11 @@ class stream_inlet_impl { std::size_t samples_available() { return data_receiver_.samples_available(); } /// Flush the queue, return the number of dropped samples - uint32_t flush() { return data_receiver_.flush(); } + uint32_t flush() { + int nskipped = data_receiver_.flush(); + postprocessor_.skip_samples(nskipped); + return nskipped; + } /** Query whether the clock was potentially reset since the last call to was_clock_reset(). * diff --git a/src/time_postprocessor.cpp b/src/time_postprocessor.cpp index 533dc37e6..6fc6811ba 100644 --- a/src/time_postprocessor.cpp +++ b/src/time_postprocessor.cpp @@ -3,17 +3,39 @@ #include +#pragma GCC diagnostic ignored "-Wdouble-promotion" +#pragma GCC diagnostic ignored "-Wimplicit-int-float-conversion" + // === implementation of the time_postprocessor class === using namespace lsl; +/// how many samples have to be seen between clocksyncs? +const uint8_t samples_between_clocksyncs = 50; + time_postprocessor::time_postprocessor(const postproc_callback_t &query_correction, const postproc_callback_t &query_srate, const reset_callback_t &query_reset) - : samples_seen_(0.0), query_srate_(query_srate), options_(proc_none), - halftime_(api_config::get_instance()->smoothing_halftime()), + : samples_since_last_clocksync(samples_between_clocksyncs), query_srate_(query_srate), + options_(proc_none), halftime_(api_config::get_instance()->smoothing_halftime()), query_correction_(query_correction), query_reset_(query_reset), next_query_time_(0.0), - last_offset_(0.0), smoothing_initialized_(false), - last_value_(-std::numeric_limits::infinity()) {} + last_offset_(0.0), last_value_(std::numeric_limits::lowest()) {} + +void time_postprocessor::set_options(uint32_t options) +{ + // bitmask which options actually changed (XOR) + auto changed = options_ ^ options; + + // dejitter option changed? -> Reset it + // in case it got enabled, it'll be initialized with the correct t0 when + // the next sample comes in + if(changed & proc_dejitter) + dejitter = postproc_dejitterer(); + + if(changed & proc_monotonize) + last_value_ = std::numeric_limits::lowest(); + + options_ = options; +} double time_postprocessor::process_timestamp(double value) { if (options_ & proc_threadsafe) { @@ -23,19 +45,27 @@ double time_postprocessor::process_timestamp(double value) { return process_internal(value); } +void time_postprocessor::skip_samples(uint32_t skipped_samples) { + if (options_ & proc_dejitter && dejitter.smoothing_applicable()) + dejitter.samples_since_t0_ += skipped_samples; +} + double time_postprocessor::process_internal(double value) { // --- clock synchronization --- if (options_ & proc_clocksync) { // update last correction value if needed (we do this every 50 samples and at most twice per // second) - if ((fmod(samples_seen_, 50.0) == 0.0) && lsl_clock() > next_query_time_) { + if (++samples_since_last_clocksync > samples_between_clocksyncs && + lsl_clock() > next_query_time_) { last_offset_ = query_correction_(); + samples_since_last_clocksync = 0; if (query_reset_()) { // reset state to unitialized last_offset_ = query_correction_(); - last_value_ = -std::numeric_limits::infinity(); - samples_seen_ = 0; - smoothing_initialized_ = false; + last_value_ = std::numeric_limits::lowest(); + // reset the dejitterer to an uninitialized state so it's + // initialized on the next use + dejitter = postproc_dejitterer(); } next_query_time_ = lsl_clock() + 0.5; } @@ -48,51 +78,52 @@ double time_postprocessor::process_internal(double value) { // --- jitter removal --- if (options_ & proc_dejitter) { // initialize the smoothing state if not yet done so - if (!smoothing_initialized_) { + if (!dejitter.is_initialized()) { double srate = query_srate_(); - smoothing_applicable_ = (srate > 0.0); - if (smoothing_applicable_) { - // linear regression model coefficients (intercept, slope) - w0_ = 0.0; - w1_ = 1.0 / srate; - // forget factor lambda in RLS calculation & its inverse - lam_ = pow(2.0, -1.0 / (srate * halftime_)); - il_ = 1.0 / lam_; - // inverse autocovariance matrix of predictors u - P00_ = P11_ = 1e10; - P01_ = P10_ = 0.0; - // numeric baseline - baseline_value_ = value; - } - smoothing_initialized_ = true; - } - if (smoothing_applicable_) { - value -= baseline_value_; - - // RLS update - double u1 = samples_seen_; // u = np.matrix([[1.0], [samples_seen]]) - double pi0 = P00_ + u1 * P10_; // pi = u.T * P - double pi1 = P01_ + u1 * P11_; // ... (ct'd) - double al = value - w0_ - w1_ * u1; // al = value - w.T * u (prediction error) - double gam = lam_ + pi0 + pi1 * u1; // gam = lam + pi * u - P00_ = il_ * (P00_ - ((pi0 * pi0) / gam)); // Pp = k * pi; P = il * (P - Pp) - P01_ = il_ * (P01_ - ((pi0 * pi1) / gam)); // ... - P10_ = il_ * (P10_ - ((pi1 * pi0) / gam)); // ... - P11_ = il_ * (P11_ - ((pi1 * pi1) / gam)); // ... - w0_ += al * (P00_ + P10_ * u1); // w = w + k*al - w1_ += al * (P01_ + P11_ * u1); // ... - value = w0_ + w1_ * u1; // value = float(w.T * u) - - value += baseline_value_; + dejitter = postproc_dejitterer(value, srate, halftime_); } + value = dejitter.dejitter(value); } // --- force monotonic timestamps --- if (options_ & proc_monotonize) { if (value < last_value_) value = last_value_; + else + last_value_ = value; } - samples_seen_ += 1.0; - last_value_ = value; return value; } + +postproc_dejitterer::postproc_dejitterer(double t0, double srate, double halftime) + : t0_(static_cast(t0)) { + if (srate > 0) { + w1_ = 1. / srate; + lam_ = pow(2, -1 / (srate * halftime)); + } +} + +double postproc_dejitterer::dejitter(double t) noexcept { + if (!smoothing_applicable()) return t; + + // remove baseline for better numerical accuracy + t -= t0_; + + // RLS update + const double u1 = samples_since_t0_++, // u = np.matrix([[1.0], [samples_seen]]) + pi0 = P00_ + u1 * P01_, // pi = u.T * P + pi1 = P01_ + u1 * P11_, // .. + al = t - (w0_ + u1 * w1_), // α = t - w.T * u # prediction error + g_inv = 1 / (lam_ + pi0 + pi1 * u1), // g_inv = 1/(lam_ + pi * u) + il_ = 1 / lam_; // ... + P00_ = il_ * (P00_ - pi0 * pi0 * g_inv); // P = (P - k*pi) / lam_ + P01_ = il_ * (P01_ - pi0 * pi1 * g_inv); // ... + P11_ = il_ * (P11_ - pi1 * pi1 * g_inv); // ... + w0_ += al * (P00_ + P01_ * u1); // w += k*α + w1_ += al * (P01_ + P11_ * u1); // ... + return w0_ + u1 * w1_ + t0_; // t = float(w.T * u) + t0 +} + +void postproc_dejitterer::skip_samples(uint_fast32_t skipped_samples) noexcept { + samples_since_t0_ += skipped_samples; +} diff --git a/src/time_postprocessor.h b/src/time_postprocessor.h index 0c9094edb..4cbb51a1c 100644 --- a/src/time_postprocessor.h +++ b/src/time_postprocessor.h @@ -5,13 +5,36 @@ #include #include - namespace lsl { /// A callback function that allows the post-processor to query state from other objects if needed using postproc_callback_t = std::function; using reset_callback_t = std::function; +/// Dejitter / smooth timestamps with a first order recursive least squares filter (RLS). +struct postproc_dejitterer { + /// first observed time-stamp value, used as a baseline to improve numerics + uint_fast32_t t0_; + /// number of samples since t0, i.e. the x in y=ax+b + uint_fast32_t samples_since_t0_{0}; + /// linear regression model coefficients (intercept, slope) + double w0_{0}, w1_{0}; + /// inverse covariance matrix elements (P00, P11, off diagonal element P01=P10) + double P00_{1e10}, P11_{1e10}, P01_{0}; + /// forget factor lambda in RLS calculation + double lam_{0}; + + /// constructor + postproc_dejitterer(double t0 = 0, double srate = 0, double halftime = 0); + + /// dejitter a timestamp and update RLS parameters + double dejitter(double t) noexcept; + + /// adjust RLS parameters to account for samples not seen + void skip_samples(uint_fast32_t skipped_samples) noexcept; + bool is_initialized() const noexcept { return t0_ != 0; } + bool smoothing_applicable() const noexcept { return lam_ > 0; } +}; /// Internal class of an inlet that is responsible for post-processing time stamps. class time_postprocessor { @@ -30,7 +53,7 @@ class time_postprocessor { * processing_options_t together (e.g., proc_clocksync|proc_dejitter); the default is to enable * all options. */ - void set_options(uint32_t options = proc_ALL) { options_ = options; } + void set_options(uint32_t options = proc_ALL); /// Post-process the given time stamp and return the new time-stamp. double process_timestamp(double value); @@ -38,12 +61,15 @@ class time_postprocessor { /// Override the half-time (forget factor) of the time-stamp smoothing. void smoothing_halftime(float value) { halftime_ = value; } + /// Inform the post processor some samples were skipped + void skip_samples(uint32_t skipped_samples); + private: /// Internal function to process a time stamp. double process_internal(double value); - /// number of samples seen so far - double samples_seen_; + /// number of samples seen since last clocksync + uint8_t samples_since_last_clocksync; // configuration parameters /// a callback function that returns the current nominal sampling rate @@ -63,19 +89,7 @@ class time_postprocessor { /// last queried correction offset double last_offset_; - // runtime parameters for smoothing - /// first observed time-stamp value, used as a baseline to improve numerics - double baseline_value_; - /// linear regression model coefficients - double w0_, w1_; - /// inverse covariance matrix - double P00_, P01_, P10_, P11_; - /// forget factor lambda in RLS calculation & its inverse - double lam_, il_; - /// whether smoothing can be applied to these data (false if irregular srate) - bool smoothing_applicable_; - /// whether the smoothing parameters have been initialized already - bool smoothing_initialized_; + postproc_dejitterer dejitter; // runtime parameters for monotonize /// last observed time-stamp value, to force monotonically increasing stamps diff --git a/testing/CMakeLists.txt b/testing/CMakeLists.txt index a579f8cf7..dcd07f499 100644 --- a/testing/CMakeLists.txt +++ b/testing/CMakeLists.txt @@ -35,6 +35,7 @@ add_executable(lsl_test_internal test_int_stringfuncs.cpp test_int_streaminfo.cpp test_int_samples.cpp + internal/postproc.cpp internal/serialization_v100.cpp ) target_link_libraries(lsl_test_internal PRIVATE lslobj lslboost catch_main) diff --git a/testing/internal/postproc.cpp b/testing/internal/postproc.cpp new file mode 100644 index 000000000..65b77f76e --- /dev/null +++ b/testing/internal/postproc.cpp @@ -0,0 +1,76 @@ +#include "time_postprocessor.h" +#include +#include +#include + +template +inline void test_postproc_array( + lsl::time_postprocessor &pp, const double (&in)[N], const double (&expected)[N]) { + for (std::size_t i = 0; i < N; ++i) CHECK(pp.process_timestamp(in[i]) == Approx(expected[i])); +} + +TEST_CASE("postprocessing", "[basic]") { + double time_offset = -50.0, srate = 1.; + bool was_reset = false; + lsl::time_postprocessor pp( + [&]() { + UNSCOPED_INFO("time_offset " << time_offset); + return time_offset; + }, + [&]() { return srate; }, [&]() { return was_reset; }); + + double nopostproc[] = {2, 3.1, 3, 5, 5.9, 7.1}; + + { + INFO("proc_clocksync") + pp.set_options(proc_clocksync); + for (double t : nopostproc) CHECK(pp.process_timestamp(t) == Approx(t + time_offset)); + } + + { + INFO("proc_clocksync + clock_reset") + pp.set_options(proc_clocksync); + // std::this_thread::sleep_for(std::chrono::milliseconds(600)); + was_reset = true; + time_offset = -200; + // for (double t : nopostproc) CHECK(pp.process_timestamp(t) == Approx(t + time_offset)); + } + + { + INFO("proc_monotonize") + double monotonized[] = {2, 3.1, 3.1, 5, 5.9, 7.1}; + pp.set_options(proc_monotonize); + test_postproc_array(pp, nopostproc, monotonized); + } + + { + INFO("reset with proc_none") + pp.set_options(proc_none); + test_postproc_array(pp, nopostproc, nopostproc); + } +} + +TEST_CASE("rls_smoothing", "[basic]") { + const int n = 100000, warmup_samples = 1000; + const double t0 = 5000, latency = 0.05, srate = 100., halftime = 90; + lsl::postproc_dejitterer pp(t0, srate, halftime); + + std::default_random_engine rng; + std::normal_distribution jitter(latency, .005); + + pp.dejitter(t0); + int n_outlier = 0; + + for (int i = 0; i < n; ++i) { + const double t = t0 + i / srate, e = jitter(rng), dejittered = pp.dejitter(t + e), + est_error = dejittered - t - latency; + if (i > warmup_samples && fabs(est_error) > std::max(e, .001)) n_outlier++; + } + LOG_F(INFO, "\nP:\t%f\t%f\n\t%f\t%f\nw:\t%f\t%f", pp.P00_, pp.P01_, pp.P01_, pp.P11_, pp.w0_, + pp.w1_); + CHECK(n_outlier < n / 100); + + CHECK(pp.dejitter(t0 + latency + n / srate) == Approx(t0 + latency + n / srate)); + CHECK(fabs(pp.w0_ - latency) < .1); + CHECK(fabs(pp.w1_ - 1 / srate) < 1e-6); +} diff --git a/testing/test_ext_DataType.cpp b/testing/test_ext_DataType.cpp index 59f956056..b702e9caf 100644 --- a/testing/test_ext_DataType.cpp +++ b/testing/test_ext_DataType.cpp @@ -3,6 +3,7 @@ #include #include #include +#include TEMPLATE_TEST_CASE( "datatransfer", "[datatransfer][basic]", char, int16_t, int32_t, int64_t, float, double) { @@ -69,3 +70,35 @@ TEST_CASE("TypeConversion", "[datatransfer][types][basic]") { CHECK(result == val); } } + +TEST_CASE("Flush", "[datatransfer][basic]") { + Streampair sp{create_streampair( + lsl::stream_info("FlushTest", "flush", 1, 1, lsl::cf_double64, "FlushTest"))}; + sp.in_.set_postprocessing(lsl::post_dejitter); + + const int n=20; + + std::thread pusher([&](){ + double data = lsl::local_clock(); + for(int i=0; i