From c88ce59e569ccc946dd22878f8c1c5264eb8d942 Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Mon, 22 Mar 2021 15:56:45 +0100 Subject: [PATCH 1/2] Allocate sentinel alongside regular sample storage, remove `factory::new_sample_unmanaged` --- src/sample.cpp | 25 +++++++------------------ src/sample.h | 4 ---- 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/src/sample.cpp b/src/sample.cpp index 1fe1298e1..e668627a6 100644 --- a/src/sample.cpp +++ b/src/sample.cpp @@ -15,7 +15,6 @@ void sample::operator delete(void *x) noexcept { if(x == nullptr) return; lsl::factory *factory = reinterpret_cast(x)->factory_; - if (factory == nullptr) return; // delete the underlying memory only if it wasn't allocated in the factory's storage area if (x < factory->storage_ || x >= factory->storage_ + factory->storage_size_) @@ -358,14 +357,16 @@ factory::factory(lsl_channel_format_t fmt, uint32_t num_chans, uint32_t num_rese : fmt_(fmt), num_chans_(num_chans), sample_size_( ensure_multiple(sizeof(sample) - sizeof(char) + format_sizes[fmt] * num_chans, 16)), - storage_size_(sample_size_ * std::max(1u, num_reserve)), storage_(new char[storage_size_]), - sentinel_(new_sample_unmanaged(fmt, num_chans, 0.0, false)), head_(sentinel_), - tail_(sentinel_) { + storage_size_(sample_size_ * std::max(1u, num_reserve)), + storage_(new char[storage_size_ + sample_size_]), // +1 sample for the sentinel + sentinel_(new(reinterpret_cast(storage_ + storage_size_)) sample(fmt, num_chans, this)), + head_(sentinel_), tail_(sentinel_) + { // pre-construct an array of samples in the storage area and chain into a freelist sample *s = nullptr; - for (char const *p = storage_, *e = p + storage_size_; p < e;) { + for (char *p = storage_, *e = p + storage_size_; p < e;) { #pragma warning(suppress : 4291) - s = new ((sample *)p) sample(fmt, num_chans, this); + s = new (reinterpret_cast(p)) sample(fmt, num_chans, this); s->next_ = (sample *)(p += sample_size_); } s->next_ = nullptr; @@ -383,17 +384,6 @@ sample_p factory::new_sample(double timestamp, bool pushthrough) { return sample_p(result); } -sample *factory::new_sample_unmanaged( - lsl_channel_format_t fmt, uint32_t num_chans, double timestamp, bool pushthrough) { -#pragma warning(suppress : 4291) - sample *result = new (new char[ensure_multiple( - sizeof(sample) - sizeof(char) + format_sizes[fmt] * num_chans, 16)]) - sample(fmt, num_chans, nullptr); - result->timestamp = timestamp; - result->pushthrough = pushthrough; - return result; -} - sample *factory::pop_freelist() { sample *tail = tail_, *next = tail->next_; if (tail == sentinel_) { @@ -420,7 +410,6 @@ sample *factory::pop_freelist() { factory::~factory() { if (sample *cur = head_) for (sample *next = cur->next_; next; cur = next, next = next->next_) delete cur; - delete sentinel_; delete[] storage_; } diff --git a/src/sample.h b/src/sample.h index 268c95cb5..4732ef8e6 100644 --- a/src/sample.h +++ b/src/sample.h @@ -71,10 +71,6 @@ class factory { /// Reclaim a sample that's no longer used. void reclaim_sample(sample *s); - /// Create a new sample whose memory is not managed by the factory. - static sample *new_sample_unmanaged( - lsl_channel_format_t fmt, uint32_t num_chans, double timestamp, bool pushthrough); - private: /// ensure that a given value is a multiple of some base, round up if necessary static uint32_t ensure_multiple(uint32_t v, unsigned base) { From 481f6b1552d3ee95eed664230b7749ef015a8982 Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Tue, 23 Mar 2021 20:22:08 +0100 Subject: [PATCH 2/2] Adjust seen_samples for dejittering when flushing samples from an inlet, i.e. not pulling them (https://github.com/sccn/liblsl/issues/117) --- src/stream_inlet_impl.h | 6 +++++- src/time_postprocessor.cpp | 7 +++++++ src/time_postprocessor.h | 3 +++ testing/test_ext_DataType.cpp | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/stream_inlet_impl.h b/src/stream_inlet_impl.h index 7a85aab89..73fab9ce4 100644 --- a/src/stream_inlet_impl.h +++ b/src/stream_inlet_impl.h @@ -306,7 +306,11 @@ class stream_inlet_impl { std::size_t samples_available() { return data_receiver_.samples_available(); } /// Flush the queue, return the number of dropped samples - uint32_t flush() { return data_receiver_.flush(); } + uint32_t flush() { + int nskipped = data_receiver_.flush(); + postprocessor_.skip_samples(nskipped); + return nskipped; + } /** Query whether the clock was potentially reset since the last call to was_clock_reset(). * diff --git a/src/time_postprocessor.cpp b/src/time_postprocessor.cpp index 533dc37e6..ffde558b1 100644 --- a/src/time_postprocessor.cpp +++ b/src/time_postprocessor.cpp @@ -23,6 +23,13 @@ double time_postprocessor::process_timestamp(double value) { return process_internal(value); } +void time_postprocessor::skip_samples(uint32_t skipped_samples) +{ + if(options_ & proc_dejitter && smoothing_initialized_ && smoothing_applicable_) { + samples_seen_ += skipped_samples; + } +} + double time_postprocessor::process_internal(double value) { // --- clock synchronization --- if (options_ & proc_clocksync) { diff --git a/src/time_postprocessor.h b/src/time_postprocessor.h index 0c9094edb..0113caab1 100644 --- a/src/time_postprocessor.h +++ b/src/time_postprocessor.h @@ -38,6 +38,9 @@ class time_postprocessor { /// Override the half-time (forget factor) of the time-stamp smoothing. void smoothing_halftime(float value) { halftime_ = value; } + /// Inform the post processor some samples were skipped + void skip_samples(uint32_t skipped_samples); + private: /// Internal function to process a time stamp. double process_internal(double value); diff --git a/testing/test_ext_DataType.cpp b/testing/test_ext_DataType.cpp index 59f956056..b702e9caf 100644 --- a/testing/test_ext_DataType.cpp +++ b/testing/test_ext_DataType.cpp @@ -3,6 +3,7 @@ #include #include #include +#include TEMPLATE_TEST_CASE( "datatransfer", "[datatransfer][basic]", char, int16_t, int32_t, int64_t, float, double) { @@ -69,3 +70,35 @@ TEST_CASE("TypeConversion", "[datatransfer][types][basic]") { CHECK(result == val); } } + +TEST_CASE("Flush", "[datatransfer][basic]") { + Streampair sp{create_streampair( + lsl::stream_info("FlushTest", "flush", 1, 1, lsl::cf_double64, "FlushTest"))}; + sp.in_.set_postprocessing(lsl::post_dejitter); + + const int n=20; + + std::thread pusher([&](){ + double data = lsl::local_clock(); + for(int i=0; i