Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/stream_inlet_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,11 @@ class stream_inlet_impl {
std::size_t samples_available() { return data_receiver_.samples_available(); }

/// Flush the queue, return the number of dropped samples
uint32_t flush() { return data_receiver_.flush(); }
uint32_t flush() {
int nskipped = data_receiver_.flush();
postprocessor_.skip_samples(nskipped);
return nskipped;
}

/** Query whether the clock was potentially reset since the last call to was_clock_reset().
*
Expand Down
121 changes: 76 additions & 45 deletions src/time_postprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,39 @@

#include <cmath>

#pragma GCC diagnostic ignored "-Wdouble-promotion"
#pragma GCC diagnostic ignored "-Wimplicit-int-float-conversion"

// === implementation of the time_postprocessor class ===

using namespace lsl;

/// how many samples have to be seen between clocksyncs?
const uint8_t samples_between_clocksyncs = 50;

time_postprocessor::time_postprocessor(const postproc_callback_t &query_correction,
const postproc_callback_t &query_srate, const reset_callback_t &query_reset)
: samples_seen_(0.0), query_srate_(query_srate), options_(proc_none),
halftime_(api_config::get_instance()->smoothing_halftime()),
: samples_since_last_clocksync(samples_between_clocksyncs), query_srate_(query_srate),
options_(proc_none), halftime_(api_config::get_instance()->smoothing_halftime()),
query_correction_(query_correction), query_reset_(query_reset), next_query_time_(0.0),
last_offset_(0.0), smoothing_initialized_(false),
last_value_(-std::numeric_limits<double>::infinity()) {}
last_offset_(0.0), last_value_(std::numeric_limits<double>::lowest()) {}

void time_postprocessor::set_options(uint32_t options)
{
// bitmask which options actually changed (XOR)
auto changed = options_ ^ options;

// dejitter option changed? -> Reset it
// in case it got enabled, it'll be initialized with the correct t0 when
// the next sample comes in
if(changed & proc_dejitter)
dejitter = postproc_dejitterer();

if(changed & proc_monotonize)
last_value_ = std::numeric_limits<double>::lowest();

options_ = options;
}

double time_postprocessor::process_timestamp(double value) {
if (options_ & proc_threadsafe) {
Expand All @@ -23,19 +45,27 @@ double time_postprocessor::process_timestamp(double value) {
return process_internal(value);
}

void time_postprocessor::skip_samples(uint32_t skipped_samples) {
if (options_ & proc_dejitter && dejitter.smoothing_applicable())
dejitter.samples_since_t0_ += skipped_samples;
}

double time_postprocessor::process_internal(double value) {
// --- clock synchronization ---
if (options_ & proc_clocksync) {
// update last correction value if needed (we do this every 50 samples and at most twice per
// second)
if ((fmod(samples_seen_, 50.0) == 0.0) && lsl_clock() > next_query_time_) {
if (++samples_since_last_clocksync > samples_between_clocksyncs &&
lsl_clock() > next_query_time_) {
last_offset_ = query_correction_();
samples_since_last_clocksync = 0;
if (query_reset_()) {
// reset state to unitialized
last_offset_ = query_correction_();
last_value_ = -std::numeric_limits<double>::infinity();
samples_seen_ = 0;
smoothing_initialized_ = false;
last_value_ = std::numeric_limits<double>::lowest();
// reset the dejitterer to an uninitialized state so it's
// initialized on the next use
dejitter = postproc_dejitterer();
}
next_query_time_ = lsl_clock() + 0.5;
}
Expand All @@ -48,51 +78,52 @@ double time_postprocessor::process_internal(double value) {
// --- jitter removal ---
if (options_ & proc_dejitter) {
// initialize the smoothing state if not yet done so
if (!smoothing_initialized_) {
if (!dejitter.is_initialized()) {
double srate = query_srate_();
smoothing_applicable_ = (srate > 0.0);
if (smoothing_applicable_) {
// linear regression model coefficients (intercept, slope)
w0_ = 0.0;
w1_ = 1.0 / srate;
// forget factor lambda in RLS calculation & its inverse
lam_ = pow(2.0, -1.0 / (srate * halftime_));
il_ = 1.0 / lam_;
// inverse autocovariance matrix of predictors u
P00_ = P11_ = 1e10;
P01_ = P10_ = 0.0;
// numeric baseline
baseline_value_ = value;
}
smoothing_initialized_ = true;
}
if (smoothing_applicable_) {
value -= baseline_value_;

// RLS update
double u1 = samples_seen_; // u = np.matrix([[1.0], [samples_seen]])
double pi0 = P00_ + u1 * P10_; // pi = u.T * P
double pi1 = P01_ + u1 * P11_; // ... (ct'd)
double al = value - w0_ - w1_ * u1; // al = value - w.T * u (prediction error)
double gam = lam_ + pi0 + pi1 * u1; // gam = lam + pi * u
P00_ = il_ * (P00_ - ((pi0 * pi0) / gam)); // Pp = k * pi; P = il * (P - Pp)
P01_ = il_ * (P01_ - ((pi0 * pi1) / gam)); // ...
P10_ = il_ * (P10_ - ((pi1 * pi0) / gam)); // ...
P11_ = il_ * (P11_ - ((pi1 * pi1) / gam)); // ...
w0_ += al * (P00_ + P10_ * u1); // w = w + k*al
w1_ += al * (P01_ + P11_ * u1); // ...
value = w0_ + w1_ * u1; // value = float(w.T * u)

value += baseline_value_;
dejitter = postproc_dejitterer(value, srate, halftime_);
}
value = dejitter.dejitter(value);
}

// --- force monotonic timestamps ---
if (options_ & proc_monotonize) {
if (value < last_value_) value = last_value_;
else
last_value_ = value;
}

samples_seen_ += 1.0;
last_value_ = value;
return value;
}

postproc_dejitterer::postproc_dejitterer(double t0, double srate, double halftime)
: t0_(static_cast<uint_fast32_t>(t0)) {
if (srate > 0) {
w1_ = 1. / srate;
lam_ = pow(2, -1 / (srate * halftime));
}
}

double postproc_dejitterer::dejitter(double t) noexcept {
if (!smoothing_applicable()) return t;

// remove baseline for better numerical accuracy
t -= t0_;

// RLS update
const double u1 = samples_since_t0_++, // u = np.matrix([[1.0], [samples_seen]])
pi0 = P00_ + u1 * P01_, // pi = u.T * P
pi1 = P01_ + u1 * P11_, // ..
al = t - (w0_ + u1 * w1_), // α = t - w.T * u # prediction error
g_inv = 1 / (lam_ + pi0 + pi1 * u1), // g_inv = 1/(lam_ + pi * u)
il_ = 1 / lam_; // ...
P00_ = il_ * (P00_ - pi0 * pi0 * g_inv); // P = (P - k*pi) / lam_
P01_ = il_ * (P01_ - pi0 * pi1 * g_inv); // ...
P11_ = il_ * (P11_ - pi1 * pi1 * g_inv); // ...
w0_ += al * (P00_ + P01_ * u1); // w += k*α
w1_ += al * (P01_ + P11_ * u1); // ...
return w0_ + u1 * w1_ + t0_; // t = float(w.T * u) + t0
}

void postproc_dejitterer::skip_samples(uint_fast32_t skipped_samples) noexcept {
samples_since_t0_ += skipped_samples;
}
48 changes: 31 additions & 17 deletions src/time_postprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,36 @@
#include <functional>
#include <mutex>


namespace lsl {

/// A callback function that allows the post-processor to query state from other objects if needed
using postproc_callback_t = std::function<double()>;
using reset_callback_t = std::function<bool()>;

/// Dejitter / smooth timestamps with a first order recursive least squares filter (RLS).
struct postproc_dejitterer {
/// first observed time-stamp value, used as a baseline to improve numerics
uint_fast32_t t0_;
/// number of samples since t0, i.e. the x in y=ax+b
uint_fast32_t samples_since_t0_{0};
/// linear regression model coefficients (intercept, slope)
double w0_{0}, w1_{0};
/// inverse covariance matrix elements (P00, P11, off diagonal element P01=P10)
double P00_{1e10}, P11_{1e10}, P01_{0};
/// forget factor lambda in RLS calculation
double lam_{0};

/// constructor
postproc_dejitterer(double t0 = 0, double srate = 0, double halftime = 0);

/// dejitter a timestamp and update RLS parameters
double dejitter(double t) noexcept;

/// adjust RLS parameters to account for samples not seen
void skip_samples(uint_fast32_t skipped_samples) noexcept;
bool is_initialized() const noexcept { return t0_ != 0; }
bool smoothing_applicable() const noexcept { return lam_ > 0; }
};

/// Internal class of an inlet that is responsible for post-processing time stamps.
class time_postprocessor {
Expand All @@ -30,20 +53,23 @@ class time_postprocessor {
* processing_options_t together (e.g., proc_clocksync|proc_dejitter); the default is to enable
* all options.
*/
void set_options(uint32_t options = proc_ALL) { options_ = options; }
void set_options(uint32_t options = proc_ALL);

/// Post-process the given time stamp and return the new time-stamp.
double process_timestamp(double value);

/// Override the half-time (forget factor) of the time-stamp smoothing.
void smoothing_halftime(float value) { halftime_ = value; }

/// Inform the post processor some samples were skipped
void skip_samples(uint32_t skipped_samples);

private:
/// Internal function to process a time stamp.
double process_internal(double value);

/// number of samples seen so far
double samples_seen_;
/// number of samples seen since last clocksync
uint8_t samples_since_last_clocksync;

// configuration parameters
/// a callback function that returns the current nominal sampling rate
Expand All @@ -63,19 +89,7 @@ class time_postprocessor {
/// last queried correction offset
double last_offset_;

// runtime parameters for smoothing
/// first observed time-stamp value, used as a baseline to improve numerics
double baseline_value_;
/// linear regression model coefficients
double w0_, w1_;
/// inverse covariance matrix
double P00_, P01_, P10_, P11_;
/// forget factor lambda in RLS calculation & its inverse
double lam_, il_;
/// whether smoothing can be applied to these data (false if irregular srate)
bool smoothing_applicable_;
/// whether the smoothing parameters have been initialized already
bool smoothing_initialized_;
postproc_dejitterer dejitter;

// runtime parameters for monotonize
/// last observed time-stamp value, to force monotonically increasing stamps
Expand Down
1 change: 1 addition & 0 deletions testing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ add_executable(lsl_test_internal
test_int_stringfuncs.cpp
test_int_streaminfo.cpp
test_int_samples.cpp
internal/postproc.cpp
internal/serialization_v100.cpp
)
target_link_libraries(lsl_test_internal PRIVATE lslobj lslboost catch_main)
Expand Down
76 changes: 76 additions & 0 deletions testing/internal/postproc.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "time_postprocessor.h"
#include <catch2/catch.hpp>
#include <random>
#include <thread>

template <std::size_t N>
inline void test_postproc_array(
lsl::time_postprocessor &pp, const double (&in)[N], const double (&expected)[N]) {
for (std::size_t i = 0; i < N; ++i) CHECK(pp.process_timestamp(in[i]) == Approx(expected[i]));
}

TEST_CASE("postprocessing", "[basic]") {
double time_offset = -50.0, srate = 1.;
bool was_reset = false;
lsl::time_postprocessor pp(
[&]() {
UNSCOPED_INFO("time_offset " << time_offset);
return time_offset;
},
[&]() { return srate; }, [&]() { return was_reset; });

double nopostproc[] = {2, 3.1, 3, 5, 5.9, 7.1};

{
INFO("proc_clocksync")
pp.set_options(proc_clocksync);
for (double t : nopostproc) CHECK(pp.process_timestamp(t) == Approx(t + time_offset));
}

{
INFO("proc_clocksync + clock_reset")
pp.set_options(proc_clocksync);
// std::this_thread::sleep_for(std::chrono::milliseconds(600));
was_reset = true;
time_offset = -200;
// for (double t : nopostproc) CHECK(pp.process_timestamp(t) == Approx(t + time_offset));
}

{
INFO("proc_monotonize")
double monotonized[] = {2, 3.1, 3.1, 5, 5.9, 7.1};
pp.set_options(proc_monotonize);
test_postproc_array(pp, nopostproc, monotonized);
}

{
INFO("reset with proc_none")
pp.set_options(proc_none);
test_postproc_array(pp, nopostproc, nopostproc);
}
}

TEST_CASE("rls_smoothing", "[basic]") {
const int n = 100000, warmup_samples = 1000;
const double t0 = 5000, latency = 0.05, srate = 100., halftime = 90;
lsl::postproc_dejitterer pp(t0, srate, halftime);

std::default_random_engine rng;
std::normal_distribution<double> jitter(latency, .005);

pp.dejitter(t0);
int n_outlier = 0;

for (int i = 0; i < n; ++i) {
const double t = t0 + i / srate, e = jitter(rng), dejittered = pp.dejitter(t + e),
est_error = dejittered - t - latency;
if (i > warmup_samples && fabs(est_error) > std::max(e, .001)) n_outlier++;
}
LOG_F(INFO, "\nP:\t%f\t%f\n\t%f\t%f\nw:\t%f\t%f", pp.P00_, pp.P01_, pp.P01_, pp.P11_, pp.w0_,
pp.w1_);
CHECK(n_outlier < n / 100);

CHECK(pp.dejitter(t0 + latency + n / srate) == Approx(t0 + latency + n / srate));
CHECK(fabs(pp.w0_ - latency) < .1);
CHECK(fabs(pp.w1_ - 1 / srate) < 1e-6);
}
33 changes: 33 additions & 0 deletions testing/test_ext_DataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <catch2/catch.hpp>
#include <cstdint>
#include <lsl_cpp.h>
#include <thread>

TEMPLATE_TEST_CASE(
"datatransfer", "[datatransfer][basic]", char, int16_t, int32_t, int64_t, float, double) {
Expand Down Expand Up @@ -69,3 +70,35 @@ TEST_CASE("TypeConversion", "[datatransfer][types][basic]") {
CHECK(result == val);
}
}

TEST_CASE("Flush", "[datatransfer][basic]") {
Streampair sp{create_streampair(
lsl::stream_info("FlushTest", "flush", 1, 1, lsl::cf_double64, "FlushTest"))};
sp.in_.set_postprocessing(lsl::post_dejitter);

const int n=20;

std::thread pusher([&](){
double data = lsl::local_clock();
for(int i=0; i<n; ++i) {
sp.out_.push_sample(&data, data, true);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
data+=.1;
}
});

double data_in, ts_in;
ts_in = sp.in_.pull_sample(&data_in, 1.);
REQUIRE(ts_in == Approx(data_in));
std::this_thread::sleep_for(std::chrono::milliseconds(700));
int pulled = sp.in_.flush() + 1;

for(; pulled < n; ++pulled) {
INFO(pulled);
ts_in = sp.in_.pull_sample(&data_in, 1.);
REQUIRE(ts_in == Approx(data_in));
}

pusher.join();
//sp.in_.set_postprocessing(lsl::post_none);
}