Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions src/sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ void sample::operator delete(void *x) noexcept {
if(x == nullptr) return;

lsl::factory *factory = reinterpret_cast<sample *>(x)->factory_;
if (factory == nullptr) return;

// delete the underlying memory only if it wasn't allocated in the factory's storage area
if (x < factory->storage_ || x >= factory->storage_ + factory->storage_size_)
Expand Down Expand Up @@ -358,14 +357,16 @@ factory::factory(lsl_channel_format_t fmt, uint32_t num_chans, uint32_t num_rese
: fmt_(fmt), num_chans_(num_chans),
sample_size_(
ensure_multiple(sizeof(sample) - sizeof(char) + format_sizes[fmt] * num_chans, 16)),
storage_size_(sample_size_ * std::max(1u, num_reserve)), storage_(new char[storage_size_]),
sentinel_(new_sample_unmanaged(fmt, num_chans, 0.0, false)), head_(sentinel_),
tail_(sentinel_) {
storage_size_(sample_size_ * std::max(1u, num_reserve)),
storage_(new char[storage_size_ + sample_size_]), // +1 sample for the sentinel
sentinel_(new(reinterpret_cast<sample*>(storage_ + storage_size_)) sample(fmt, num_chans, this)),
head_(sentinel_), tail_(sentinel_)
{
// pre-construct an array of samples in the storage area and chain into a freelist
sample *s = nullptr;
for (char const *p = storage_, *e = p + storage_size_; p < e;) {
for (char *p = storage_, *e = p + storage_size_; p < e;) {
#pragma warning(suppress : 4291)
s = new ((sample *)p) sample(fmt, num_chans, this);
s = new (reinterpret_cast<sample *>(p)) sample(fmt, num_chans, this);
s->next_ = (sample *)(p += sample_size_);
}
s->next_ = nullptr;
Expand All @@ -383,17 +384,6 @@ sample_p factory::new_sample(double timestamp, bool pushthrough) {
return sample_p(result);
}

sample *factory::new_sample_unmanaged(
lsl_channel_format_t fmt, uint32_t num_chans, double timestamp, bool pushthrough) {
#pragma warning(suppress : 4291)
sample *result = new (new char[ensure_multiple(
sizeof(sample) - sizeof(char) + format_sizes[fmt] * num_chans, 16)])
sample(fmt, num_chans, nullptr);
result->timestamp = timestamp;
result->pushthrough = pushthrough;
return result;
}

sample *factory::pop_freelist() {
sample *tail = tail_, *next = tail->next_;
if (tail == sentinel_) {
Expand All @@ -420,7 +410,6 @@ sample *factory::pop_freelist() {
factory::~factory() {
if (sample *cur = head_)
for (sample *next = cur->next_; next; cur = next, next = next->next_) delete cur;
delete sentinel_;
delete[] storage_;
}

Expand Down
4 changes: 0 additions & 4 deletions src/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ class factory {
/// Reclaim a sample that's no longer used.
void reclaim_sample(sample *s);

/// Create a new sample whose memory is not managed by the factory.
static sample *new_sample_unmanaged(
lsl_channel_format_t fmt, uint32_t num_chans, double timestamp, bool pushthrough);

private:
/// ensure that a given value is a multiple of some base, round up if necessary
static uint32_t ensure_multiple(uint32_t v, unsigned base) {
Expand Down
6 changes: 5 additions & 1 deletion src/stream_inlet_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,11 @@ class stream_inlet_impl {
std::size_t samples_available() { return data_receiver_.samples_available(); }

/// Flush the queue, return the number of dropped samples
uint32_t flush() { return data_receiver_.flush(); }
uint32_t flush() {
int nskipped = data_receiver_.flush();
postprocessor_.skip_samples(nskipped);
return nskipped;
}

/** Query whether the clock was potentially reset since the last call to was_clock_reset().
*
Expand Down
7 changes: 7 additions & 0 deletions src/time_postprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ double time_postprocessor::process_timestamp(double value) {
return process_internal(value);
}

void time_postprocessor::skip_samples(uint32_t skipped_samples)
{
if(options_ & proc_dejitter && smoothing_initialized_ && smoothing_applicable_) {
samples_seen_ += skipped_samples;
}
}

double time_postprocessor::process_internal(double value) {
// --- clock synchronization ---
if (options_ & proc_clocksync) {
Expand Down
3 changes: 3 additions & 0 deletions src/time_postprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class time_postprocessor {
/// Override the half-time (forget factor) of the time-stamp smoothing.
void smoothing_halftime(float value) { halftime_ = value; }

/// Inform the post processor some samples were skipped
void skip_samples(uint32_t skipped_samples);

private:
/// Internal function to process a time stamp.
double process_internal(double value);
Expand Down
33 changes: 33 additions & 0 deletions testing/test_ext_DataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <catch2/catch.hpp>
#include <cstdint>
#include <lsl_cpp.h>
#include <thread>

TEMPLATE_TEST_CASE(
"datatransfer", "[datatransfer][basic]", char, int16_t, int32_t, int64_t, float, double) {
Expand Down Expand Up @@ -69,3 +70,35 @@ TEST_CASE("TypeConversion", "[datatransfer][types][basic]") {
CHECK(result == val);
}
}

TEST_CASE("Flush", "[datatransfer][basic]") {
Streampair sp{create_streampair(
lsl::stream_info("FlushTest", "flush", 1, 1, lsl::cf_double64, "FlushTest"))};
sp.in_.set_postprocessing(lsl::post_dejitter);

const int n=20;

std::thread pusher([&](){
double data = lsl::local_clock();
for(int i=0; i<n; ++i) {
sp.out_.push_sample(&data, data, true);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
data+=.1;
}
});

double data_in, ts_in;
ts_in = sp.in_.pull_sample(&data_in, 1.);
REQUIRE(ts_in == Approx(data_in));
std::this_thread::sleep_for(std::chrono::milliseconds(700));
int pulled = sp.in_.flush() + 1;

for(; pulled < n; ++pulled) {
INFO(pulled);
ts_in = sp.in_.pull_sample(&data_in, 1.);
REQUIRE(ts_in == Approx(data_in));
}

pusher.join();
//sp.in_.set_postprocessing(lsl::post_none);
}