-
Notifications
You must be signed in to change notification settings - Fork 7
/
StreamSnooper.cpp
154 lines (133 loc) · 4.65 KB
/
StreamSnooper.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright (c) 2014-2014 Josh Blum
// SPDX-License-Identifier: BSL-1.0
#include <Pothos/Framework.hpp>
#include <chrono>
#include <complex>
#include <iostream>
/***********************************************************************
* |PothosDoc Stream Snooper
*
* The stream snooper accepts streaming input and forwards
* chunks of the stream buffer under configurable conditions.
* Conditions include periodic updates and trigger events.
*
* |category /Utility
*
* |param numPorts[Num Ports] The number of IO ports.
* |default 1
* |widget SpinBox(minimum=1)
* |preview disable
*
* |param chunkSize How many elements to yield when triggered?
* |default 1024
*
* |param triggerRate The rate of the time-based trigger.
* |units events/sec
* |default 1.0
*
* |factory /blocks/stream_snooper()
* |initializer setNumPorts(numPorts)
* |setter setChunkSize(chunkSize)
* |setter setTriggerRate(triggerRate)
**********************************************************************/
class StreamSnooper : public Pothos::Block
{
public:
static Block *make(void)
{
return new StreamSnooper();
}
StreamSnooper(void):
_chunkSize(0),
_triggerRate(1.0)
{
this->setupInput(0);
this->setupOutput(0);
this->registerCall(this, POTHOS_FCN_TUPLE(StreamSnooper, setNumPorts));
this->registerCall(this, POTHOS_FCN_TUPLE(StreamSnooper, setChunkSize));
this->registerCall(this, POTHOS_FCN_TUPLE(StreamSnooper, getChunkSize));
this->registerCall(this, POTHOS_FCN_TUPLE(StreamSnooper, setTriggerRate));
this->registerCall(this, POTHOS_FCN_TUPLE(StreamSnooper, getTriggerRate));
}
void setNumPorts(const size_t numPorts)
{
for (size_t i = this->inputs().size(); i < numPorts; i++) this->setupInput(i);
for (size_t i = this->outputs().size(); i < numPorts; i++) this->setupOutput(i);
}
void setChunkSize(const size_t numElems)
{
_chunkSize = numElems;
}
size_t getChunkSize(void) const
{
return _chunkSize;
}
void setTriggerRate(const double rate)
{
_triggerRate = rate;
}
double getTriggerRate(void) const
{
return _triggerRate;
}
void activate(void)
{
_lastTriggerTimes.resize(this->inputs().size());
_accumulationBuffs.resize(this->inputs().size());
}
void work(void)
{
for (auto inPort : this->inputs())
{
//always consume all available input
if (inPort->elements() == 0) continue;
inPort->consume(inPort->elements());
//forward all labels in case they have meaning
for (const auto &label : inPort->labels())
{
this->output(inPort->index())->postMessage(label);
}
//are we triggered by the periodic event?
auto &lastTriggerTime = _lastTriggerTimes[inPort->index()];
const auto timeBetweenUpdates = std::chrono::nanoseconds((long long)(1e9/_triggerRate));
bool doUpdate = (std::chrono::high_resolution_clock::now() - lastTriggerTime) > timeBetweenUpdates;
//perform the accumulation buffer update
if (doUpdate and this->handleTrigger(inPort))
{
lastTriggerTime = std::chrono::high_resolution_clock::now();
}
}
}
bool handleTrigger(Pothos::InputPort *inPort)
{
auto &packet = _accumulationBuffs[inPort->index()];
const auto initialOffset = packet.payload.elements();
//append the new buffer
auto inBuff = inPort->buffer();
const auto bytesPerEvent = _chunkSize*inBuff.dtype.size();
inBuff.length = std::min(inBuff.length, bytesPerEvent - packet.payload.length);
packet.payload.append(inBuff);
//append new labels
for (auto label : inPort->labels())
{
label.index /= packet.payload.dtype.size(); //bytes to elements
label.index += initialOffset;
if (label.index >= packet.payload.elements()) break;
packet.labels.push_back(label);
}
//enough in the buffer?
if (packet.payload.elements() < _chunkSize) return false;
//send the message
this->output(inPort->index())->postMessage(packet);
//clear old packet buffer
packet = Pothos::Packet();
return true;
}
private:
size_t _chunkSize;
double _triggerRate;
std::vector<std::chrono::high_resolution_clock::time_point> _lastTriggerTimes;
std::vector<Pothos::Packet> _accumulationBuffs;
};
static Pothos::BlockRegistry registerStreamSnooper(
"/blocks/stream_snooper", &StreamSnooper::make);