# Serial Connection Development

In [258]:
import serial
import time
from datetime import datetime
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.widgets import Button
from matplotlib.animation import FuncAnimation
from pynput import keyboard
from scipy.optimize import curve_fit
from typing import Callable
import multiprocessing
from ctypes import c_char

from src.pipeline.measurement_processes import start_measurement_processes

In [257]:
start_measurement_processes()

Figure(1000x1000)




Terminating processes...
Cleanup completed


Process SamplingProcess:


In [250]:
matplotlib.use('TkAgg')

## Animation

In [251]:
### READOUT:
def read_sensor(baud_rate: int = 115200,
                serial_port: str = '/dev/tty.usbmodem143309601',
                record_bool: bool = True,
                command: str | None = None,
                allowed_input_range: tuple[float] = (.0, 3.3),
                processing_func: Callable[[float], float] | None = None,
                smoothing_ema_alpha: float = 0.4,  # 1 = no smoothing, -> 0 more smoothing
                ) -> float | None:
    """ To be commented. """
    global _last_valid_reading, measurements, timestamps
    try:
        with serial.Serial(serial_port, baud_rate, timeout=1) as ser:
            # check for output command:
            if command in ("A", "B"):
                ser.write(command.encode("ascii"))
                ser.flush()  # waits for all outgoing data to be transmitted

            # read new line and convert to float:
            line = ser.readline().decode('ascii', errors="ignore").strip()
            if not line.startswith("VAL:"):  # check whether line contains measurement result
                return _last_valid_reading
            raw_str = line.replace("VAL:", "")  # formatting
            value = float(raw_str)

            # check whether input remains in feasible range:
            if not allowed_input_range[0] < value < allowed_input_range[1]: return _last_valid_reading

            if processing_func is not None: value = processing_func(value)

            # Apply EMA smoothing:
            value = smoothing_ema_alpha * value + (1 - smoothing_ema_alpha) * _last_valid_reading
            _last_valid_reading = value

            # save (if record_bool) and return:
            if record_bool:
                timestamps.append(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
                measurements.append(value)
            return value

    except (ValueError, serial.SerialException) as e:
        print(f"Serial error: {e}")
        return _last_valid_reading


def force_estimator(voltage: float,
                    fsr_a: float = 5.0869,
                    fsr_b: float = 1.8544) -> float:
    """ Converts the voltage input to estimated dynanometer force. """
    force_estimation = fsr_a * voltage ** fsr_b
    return force_estimation

In [252]:
def sampling_process(shared_memory_float,  # shared memory for display process
                     sampling_rate_hz: int = 1000,
                     **sensor_kwargs,
                     ):
    # values for read_sensor function:
    global measurements, timestamps, _last_valid_reading
    measurements = []
    timestamps = []
    _last_valid_reading = 0.0

    while True:
        # method retrieves and saves sample:
        sample = read_sensor(**sensor_kwargs)
        # store in shared memory:
        shared_memory_float.value = sample
        # simulate sampling frequency:
        time.sleep(1/sampling_rate_hz)

In [253]:
### PLOTTING:
def plot_live_view(shared_memory_float,  # shared memory from sampling process
                   display_window_len_s: int = 3,
                   display_refresh_rate_hz: int = 15,
                   y_limits: tuple[float] = (0, 4),
                   dynamically_update_y_limits: bool = True,
                   plot_size: tuple[int] = (10, 10),
                   y_label: str = 'Input [V]',
                   x_label: str = 'Time [s]',
                   title: str = 'Live Input View'):
    global dynamic_y_limit  # variables that are dynamically adjusted during update() need to be defined globally
    dynamic_y_limit = y_limits

    # define display refreshment counter and sanity check:
    global update_counter; update_counter = 0
    if display_refresh_rate_hz > 20: print(f"Fps are {display_refresh_rate_hz}, which is > 20 and potentially leads to rendering issues.")

    # Initial data
    x = np.linspace(-display_window_len_s, 0, display_window_len_s*display_refresh_rate_hz)
    global y; y = np.zeros_like(x)

    # initialise plot:
    fig, ax = plt.subplots(figsize=plot_size)
    line, = ax.plot([], [], lw=2)

    # variable, function and button for pausing:
    global is_running
    is_running = True

    def pause_button_click(event):
        global is_running
        is_running = not is_running
        if is_running: button.label.set_text("Pause")
        else: button.label.set_text("Continue")

    ax_button = plt.axes([0.8, .9, 0.1, 0.075])
    button = Button(ax_button, 'Pause')
    button.on_clicked(pause_button_click)

    # format plot:
    ax.set_xlim(x.min(), x.max())
    ax.set_ylim(*y_limits)
    ax.set_xlabel(x_label)
    ax.set_ylabel(y_label)
    ax.set_title(title)

    def init():
        line.set_data(x, y)
        return line,

    global _last_end  # todo: remove (timing)
    _last_end = datetime.now();  # todo: remove (timing)
    def update(frame):
        """ update view and fetch new observation. (frame is required although unused) """
        global _last_end; fetching_time_ms = (datetime.now() - _last_end).microseconds / 1000; print(f'Took {fetching_time_ms}ms in between. Max Hz: {1000/fetching_time_ms}')  # todo: remove (timing)

        # always retrieve new observation (stores measurement):
        start = datetime.now()  # todo: remove (timing)
        new_obs = shared_memory_float.value
        fetching_time_ms = (datetime.now() - start).microseconds / 1000; print(f'Took {fetching_time_ms}ms to parse. Max Hz: {1000/fetching_time_ms}')  # todo: remove (timing)

        start = datetime.now()  # todo: remove (timing)
        # update only if is_running:
        if is_running:
            print('update')
            if dynamically_update_y_limits:  # update y limit if it doesn't fit
                global dynamic_y_limit
                if new_obs > dynamic_y_limit[1]:
                    dynamic_y_limit = (dynamic_y_limit[0], new_obs)
                    ax.set_ylim(*dynamic_y_limit)
                    fig.canvas.draw_idle()
                elif new_obs < dynamic_y_limit[0]:
                    dynamic_y_limit = (new_obs, dynamic_y_limit[1])
                    ax.set_ylim(*dynamic_y_limit)
                    fig.canvas.draw_idle()

            # Shift data and append
            global y
            y = np.roll(y, -1)
            y[-1] = new_obs

            # update plot:
            line.set_ydata(y)

        fetching_time_ms = (datetime.now() - start).microseconds / 1000; print(f'Took {fetching_time_ms}ms to update the plot. Max Hz: {1000/fetching_time_ms}')  # todo: remove (timing)

        _last_end = datetime.now()  # todo: remove (timing)
        return line,

    # run and show animation:
    ani = FuncAnimation(fig, update, frames=len(x)+1,
                        init_func=init, blit=False,
                        interval=int(1000/display_refresh_rate_hz), repeat=True)
    plt.show()

In [255]:
### MULTIPROCESSING IMPLEMENTATION:
if __name__ == "__main__":
    #can the initialise shared memory for inter-process communication:
    shared_memory_float = multiprocessing.Value('d', 0.0)  # double precision float

    # define processes:
    p1 = multiprocessing.Process(
        target=sampling_process,
        args=(
            shared_memory_float,
        ), name="SamplingProcess")
    p2 = multiprocessing.Process(
        target=plot_live_view,
        args=(
            shared_memory_float,
        ), name="ChatbotProcess")

    # start processes:
    try:
        p1.start()
        p2.start()

        # Wait for processes with timeout
        p1.join()  #timeout=300  # 5 minute timeout (unused currently, main script ends anyway)
        p2.join()  #timeout=300

    except KeyboardInterrupt:
        print("Terminating processes...")
        p1.terminate()
        p2.terminate()
        p1.join()
        p2.join()

    finally:
        # Cleanup if using shared_memory
        print("Cleanup completed")

Cleanup completed


Traceback (most recent call last):
Traceback (most recent call last):
  File [35m"<string>"[0m, line [35m1[0m, in [35m<module>[0m
    from multiprocessing.spawn import spawn_main; [31mspawn_main[0m[1;31m(tracker_fd=99, pipe_handle=104)[0m
                                                  [31m~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"<string>"[0m, line [35m1[0m, in [35m<module>[0m
    from multiprocessing.spawn import spawn_main; [31mspawn_main[0m[1;31m(tracker_fd=99, pipe_handle=102)[0m
                                                  [31m~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"/Users/paulrusing/miniconda3/envs/biosig_env/lib/python3.13/multiprocessing/spawn.py"[0m, line [35m122[0m, in [35mspawn_main[0m
    exitcode = _main(fd, parent_sentinel)
  File [35m"/Users/paulrusing/miniconda3/envs/biosig_env/lib/python3.13/multiprocessing/spawn.py"[0m, line [35m122[0m, in [35mspawn_main[0m
    exitcode

In [245]:
plot_live_view(parser_function=read_sensor, title='Live FSR Input View',
               display_refresh_rate_hz=100,
               refresh_display_every_nth_sample=1,)

Took 139.154ms in between. Max Hz: 7.1862828233467955
Took 6.861ms to parse. Max Hz: 145.75134819997086
update
Took 0.41ms to update the plot. Max Hz: 2439.0243902439024
Took 69.653ms in between. Max Hz: 14.356883407749844
Took 3.209ms to parse. Max Hz: 311.6235587410408
update
Took 0.092ms to update the plot. Max Hz: 10869.565217391304
Took 16.868ms in between. Max Hz: 59.283851078966094
Took 2.875ms to parse. Max Hz: 347.82608695652175
update
Took 0.062ms to update the plot. Max Hz: 16129.032258064517
Took 55.946ms in between. Max Hz: 17.87437886533443
Took 2.393ms to parse. Max Hz: 417.8854993731718
update
Took 0.059ms to update the plot. Max Hz: 16949.15254237288
Took 14.756ms in between. Max Hz: 67.76904310111141
Took 2.552ms to parse. Max Hz: 391.84952978056424
update
Took 0.058ms to update the plot. Max Hz: 17241.379310344826
Took 59.039ms in between. Max Hz: 16.93795626619692
Took 2.492ms to parse. Max Hz: 401.2841091492777
update
Took 0.077ms to update the plot. Max Hz: 12987.

KeyboardInterrupt: 

In [None]:
parser_function = new_obs_retrieval
display_window_len_s: int = 3
sampling_rate_hz: int = 10
y_limits: tuple[float] = (0, 5)
dynamically_update_y_limits: bool = True
plot_size: tuple[int] = (10, 10)
y_label: str = 'Input [V]'
x_label: str = 'Time [s]'
title: str = 'Live Input View'

# Initial data
x = np.linspace(-display_window_len_s, 0, display_window_len_s*sampling_rate_hz)
y = np.zeros_like(x)

# initialise plot:
fig, ax = plt.subplots(figsize=plot_size)
line, = ax.plot([], [], lw=2)

# format plot:
ax.set_xlim(x.min(), x.max())
ax.set_ylim(*y_limits)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
ax.set_title(title)

def init():
    line.set_data(x, y)
    return line,

def update(frame):
    """ update view and fetch new observation. (frame is required although unused) """
    global y, input_range

    # Retrieve new observation
    new_obs = parser_function()

    if dynamically_update_y_limits:  # update y limit if it doesn't fit
        if new_obs > input_range[1]:
            input_range = (input_range[0], new_obs)
            ax.set_ylim(*input_range)
            fig.canvas.draw_idle()
        elif new_obs < input_range[0]:
            input_range = (new_obs, input_range[1])
            ax.set_ylim(*input_range)
            fig.canvas.draw_idle()


    # Shift data and append
    y = np.roll(y, -1)
    y[-1] = new_obs

    # update plot:
    line.set_ydata(y)
    return line,

# run and show animation:
ani = FuncAnimation(fig, update, frames=len(x)+1, init_func=init, blit=True, interval=int(1000/sampling_rate_hz), repeat=True)
plt.show()