In [23]:
import numpy as np
import time
from collections import deque
from random import randint

import numpy as np
from bokeh.io import push_notebook, show, output_notebook
from bokeh.models import ColumnDataSource
from bokeh.models import HoverTool
from bokeh.models import Legend, LegendItem
from bokeh.models import Range1d
from bokeh.plotting import figure
output_notebook()

# Serial data readout

Stateful generator for parsing the serial feed from the sensor, rebooting it at start up.

In [2]:
import enum
from typing import Any, Iterator


def parse_measurement_line(line: str) -> list[float]:
    return list((float(e) for e in line.split(',')))


# TODO: replace Any with a proper type hint
def console_parser(serial_feed: Iterator[str]) -> Iterator[Any]:
    class State(enum.Enum):
        BOOTING = enum.auto()
        WAITING_FOR_FIRST = enum.auto()
        SAMPLING = enum.auto()

    state = State.BOOTING
    
    for msg in serial_feed:
        next_state = state
        
        if state == State.BOOTING:
            if 'WHO_AM_I: 0x' in msg:
                next_state = State.WAITING_FOR_FIRST
                
        elif state == State.WAITING_FOR_FIRST:
            try:
                yield parse_measurement_line(msg)
                next_state = State.SAMPLING
            except Exception:
                pass

        elif state == State.SAMPLING:
            try:
                yield parse_measurement_line(msg)
            except Exception:
                next_state = State.BOOTING

        if state != next_state:
            print(f">>>Change state to {next_state.name}")
        state = next_state

Generator that wraps the serial port as a stream of lines

In [3]:
import time
import serial


def lines_from_serial(port="/dev/ttyACM0",
                      baudrate=115200) -> Iterator[str]:
    with serial.Serial(port=port,
                       baudrate=baudrate,
                       dsrdtr=False) as sp:
        # Pulse DTR for resetting
        sp.setDTR(True)
        time.sleep(0.5)
        sp.setDTR(False)

        while True:
            read_in: bytes = sp.readline()
            line = read_in.decode().rstrip()
            yield line

Chain both generators:

In [4]:
lines = lines_from_serial()
for i, meas in enumerate(console_parser(lines)):
    print(time.time(), meas)
    if i == 10:
        break

>>>Change state to WAITING_FOR_FIRST
1719593083.8935378 [-0.018, -0.013, 1.142]
>>>Change state to SAMPLING
1719593083.9029582 [-0.019, -0.013, 1.144]
1719593083.912382 [-0.018, -0.014, 1.144]
1719593083.9230375 [-0.023, -0.012, 1.148]
1719593083.9324403 [-0.016, -0.011, 1.142]
1719593083.9424493 [-0.018, -0.015, 1.142]
1719593083.952379 [-0.019, -0.011, 1.152]
1719593083.962368 [-0.017, -0.01, 1.144]
1719593083.9729598 [-0.018, -0.012, 1.138]
1719593083.98247 [-0.018, -0.011, 1.141]
1719593083.9928465 [-0.017, -0.014, 1.15]


## Receive-side sample pre-processsing

Various pre-processing functions, including the trivial identity one

In [5]:
def identity(meas: list[float]) -> list[float]:
    return meas

In [42]:
import quaternion

def unrotated_accel(meas: list[float]) -> list[float]:
    """
    This was useful to diagnose that imu-fusion's earth_accel()
    method was not working as we expected. This rotation has been
    added to the processing loop within the MCU.
    """
    qw, qx, qy, qz, ax, ay, az = meas
    q = quaternion.quaternion(qw, qx, qy, qz)
    qn = q / q.norm()    
    rot = qn * quaternion.quaternion(0, ax, ay, az) * qn.inverse()
    return [rot.x, rot.y, rot.z]

In [20]:
# The offsets are calculated on the "Some tests" section below
def deoffset(meas: list[float]) -> list[float]:
    return [meas[0] - off_ax, meas[1] - off_ay, meas[2] - off_az]

In [122]:
import enum
from typing import Optional, Callable


class MotionState(enum.Enum):
    AT_REST = enum.auto()
    MOVING = enum.auto()


class MotionObserver:
    
    def __init__(self, mag_reset_thd: float, window: int, state_cb: Optional[Callable]=None):
        self.mag_reset_threshold = mag_reset_thd
        self.state_cb = state_cb
        
        self.buffer = deque(maxlen=window)
        self.state = MotionState.AT_REST

    def __call__(self, new: list[float]) -> float:
        vec = np.asarray(new)
        self.buffer.append(np.linalg.norm(vec))
        figure = np.percentile(np.asarray(self.buffer), 75)
        self.magnitude_reset_criterion(figure)
        return figure

    def magnitude_reset_criterion(self, magnitude: float) -> None:
        next_state = self.state
        if self.state == MotionState.AT_REST and magnitude > self.mag_reset_threshold:
            next_state = MotionState.MOVING
        elif self.state == MotionState.MOVING and magnitude < self.mag_reset_threshold:
            next_state = MotionState.AT_REST

        if next_state != self.state and self.state_cb:
            self.state_cb(next_state.name.replace("_", " "))
        self.state = next_state

In [120]:
class OffsetState(enum.Enum):
    AT_REST_ACCUMULATING = enum.auto()
    AT_REST_IDLE = enum.auto()
    MOVING = enum.auto()


class DynamicOffsetCompensator:
    
    def __init__(self, mag_reset_thd: float, window: int, state_cb: Optional[Callable]):
        self.mag_reset_threshold = mag_reset_thd
        self.state_cb = state_cb

        self.offset = np.r_[0, 0, 0]
        self.buffer = deque(maxlen=window)
        self.state = OffsetState.AT_REST_ACCUMULATING

    def __call__(self, new: list[float]) -> list[float]:
        self.buffer.append(new)
        vecs = np.asarray(self.buffer)
        magnitudes = np.linalg.norm(vecs, axis=1)
        figure = np.percentile(magnitudes, 75)
        self.magnitude_reset_criterion(figure, vecs)
        
        return list(new[i] - self.offset[i] for i in range(len(new)))

    def magnitude_reset_criterion(self, magnitude: float, history: np.array) -> None:
        next_state = self.state
        std = history.std(axis=0).max()
        
        if self.state == OffsetState.AT_REST_ACCUMULATING:
            if magnitude > self.mag_reset_threshold:
                next_state = OffsetState.MOVING
            elif std < 0.2*self.mag_reset_threshold:
                next_state = OffsetState.AT_REST_IDLE
                self.offset = history.mean(axis=0)
                self.state_cb(next_state.name.replace("_", " ") + "!")
                
        elif self.state == OffsetState.AT_REST_IDLE:
            if magnitude > self.mag_reset_threshold:
                next_state = OffsetState.MOVING
                
        elif self.state == OffsetState.MOVING:
            if magnitude < self.mag_reset_threshold:
                next_state = OffsetState.AT_REST_ACCUMULATING

        if next_state != self.state and self.state_cb:
            self.state_cb(next_state.name.replace("_", " "))

        self.state = next_state

# Live plotting from serial

In [117]:
# Data sampling period [seconds]
sample_period = 10e-3

# Data capture buffer length [amount of samples]
depth = 250

# Live display rate [frames per second]
plot_upd_fps = 30

# data columns constructor
def new_data():
    return dict(
        t=[],
        x=[],
        y=[],
        z=[],
        mag=[],
        dt=[],
    )

In [118]:
source = ColumnDataSource(new_data())
p = figure()
p.line(source=source, x='t', y='x', legend_label="x", line_color="green")
p.line(source=source, x='t', y='y', legend_label="y", line_color="blue")
p.line(source=source, x='t', y='z', legend_label="z", line_color="red")
p.line(source=source, x='t', y='mag', legend_label="mag", line_color="black")
p.title.text = "Accelerations"
p.y_range = Range1d(-6, 6)

# get and explicit handle to update the next show cell with
target = show(p, notebook_handle=True)

In [123]:
new_samples = new_data()
all_samples = []
plot_upd_thd = int(1/(sample_period*plot_upd_fps))
last = None

def set_title(state: str):
    p.title.text = state

#preprocessor = unrotated_accel
preprocessor = identity
#preprocessor = deoffset

#postprocessor = identity
postprocessor = DynamicOffsetCompensator(1.8, 30, set_title)

indicator = MotionObserver(1.8, 30)

# Flush source's data
source.stream(new_samples, 1)
try:
    for i, meas in enumerate(console_parser(lines_from_serial())):
        # Receive loop timing consistency observability
        now = time.time()
        dt = now - last if last is not None else sample_period
        last = now

        meas = preprocessor(meas)
        meas = postprocessor(meas)
        #mag = np.linalg.norm(meas)
        #meas.append(mag)
        meas.append(indicator(meas))
        meas.append(dt/sample_period)

        # Store data
        new_samples['t'].append(i*sample_period)
        new_samples['x'].append(meas[0])
        new_samples['y'].append(meas[1])
        new_samples['z'].append(meas[2])
        new_samples['mag'].append(meas[3])
        new_samples['dt'].append(meas[4])
        all_samples.append(meas)

        if i % plot_upd_thd == 0:
            source.stream(new_samples, depth)
            # push updates to the plot continuously using the handle
            # (interrupt the notebook kernel to stop)
            push_notebook(handle=target)

            # Refresh for next accumulation phase
            new_samples = new_data()
        
except KeyboardInterrupt:
    print("Stopped.")
    
finally:
    print(f"All samples are {len(all_samples)}")

>>>Change state to WAITING_FOR_FIRST
>>>Change state to SAMPLING
Stopped.
All samples are 8576


# Offline data analysis

In [90]:
all_array = np.array(all_samples)

t = np.r_[:all_array.shape[0]] * sample_period
t

p = figure()
p.line(t, all_array[:, 0], legend_label="x", line_color="green")
p.line(t, all_array[:, 1], legend_label="y", line_color="blue")
p.line(t, all_array[:, 2], legend_label="z", line_color="red")
p.line(t, all_array[:, 3], legend_label="mag", line_color="black")

show(p)

### Some tests

In [19]:
off_ax, off_ay, off_az = all_array.mean(axis=0)[:3]
(off_ax, off_ay, off_az)

(-0.19041689827682082, -0.1705236242356863, 1.410013340744847)