Skip to content

Commit

Permalink
Add a node to handle drifting data
Browse files Browse the repository at this point in the history
  • Loading branch information
mesca committed Dec 6, 2022
1 parent 0a1012e commit 141a1e0
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
22 changes: 22 additions & 0 deletions examples/dejitter_space.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
graphs:

- id: Spacer
nodes:
- id: random
module: timeflux.nodes.random
class: Random
params:
rows_min: 4
rows_max: 5
- id: space
module: timeflux.nodes.dejitter
class: Space
- id: display
module: timeflux.nodes.debug
class: Display
edges:
- source: random
target: space
- source: space
target: display
rate: 1
35 changes: 32 additions & 3 deletions timeflux/nodes/dejitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
import pandas as pd
from time import time
from timeflux.core.node import Node
from timeflux.core.exceptions import WorkerInterrupt

Expand All @@ -14,7 +15,7 @@ class Reindex(Node):
Attributes:
i (Port): Default input, expects DataFrame and meta.
o (Port): Default output, provides DataArray and meta.
o (Port): Default output, provides DataFrame and meta.
Args:
rate (float|None): Nominal sampling rate. If `None`, the value will be read from the meta data.
Expand Down Expand Up @@ -63,7 +64,7 @@ class Snap(Node):
Attributes:
i (Port): Default input, expects DataFrame and meta.
o (Port): Default output, provides DataArray and meta.
o (Port): Default output, provides DataFrame and meta.
Args:
rate (float|None): (optional) nominal sampling frequency of the data, to round
Expand Down Expand Up @@ -102,7 +103,7 @@ class Interpolate(Node):
Attributes:
i (Port): Default input, expects DataFrame and meta.
o (Port): Default output, provides DataArray and meta.
o (Port): Default output, provides DataFrame and meta.
Args:
rate (float|None): (optional) nominal sampling frequency of the data. If None, the rate will be obtained from the meta of the input port.
Expand Down Expand Up @@ -203,3 +204,31 @@ def _interpolate(self):
if not self.o.data.empty:
self._last_datetime = self.o.data.index[-1]
self._buffer = self._buffer.tail(self._n_max)


class Space(Node):
"""Evenly space timestamps.
This is useful to correct drifting data streams.
Attributes:
i (Port): Default input, expects DataFrame and meta.
o (Port): Default output, provides DataFrame and meta.
Example:
.. literalinclude:: /../examples/dejitter_space.yaml
:language: yaml
"""

def __init__(self):
self._stop = int(time() * 1e6)

def update(self):
if self.i.ready():
self.o = self.i
start = self._stop
self._stop = int(time() * 1e6)
indices = np.linspace(
start, self._stop, len(self.o.data), False, dtype="datetime64[us]"
)
self.o.data.index = indices

0 comments on commit 141a1e0

Please sign in to comment.