#### SAMD51 Serial

In [181]:
import serial, time, pandas as pd, numpy as np
from typing import Literal, Dict

class ArduinoReader:
    def __init__(self, port="COM11", baudrate: int = 115200, timeout: float = 1.0):
        """
        port: e.g. 'COM11' on Windows, '/dev/ttyACM0' on Linux
        baudrate: match Arduino Serial.begin()
        timeout: read timeout in seconds
        """
        self.ser = serial.Serial(port, baudrate=baudrate, timeout=timeout)
        # give Arduino time to reset when serial opens
        self.data = {'rpm': [], 'sound': [], 'lift': []}
        time.sleep(2)

    def read_line(self) -> str:
        """Read one line from Arduino, decode to string."""
        return list(map(float, self.ser.readline().decode(errors="ignore").strip().split()))

    def read_normal(self, num_blades=4) -> bool:
        """
        Takes two readings, and averages them.
        num_blades: default=4
        """
        appended = False
        line = self.read_line()
        line2 = self.read_line()
        if len(line) == 3 and len(line2) == 3:
            self.data['rpm'].append((line[0] + line2[0]) / 2 / num_blades)
            self.data['sound'].append((line[1] + line2[1]) / 2)
            self.data['lift'].append((line[2] + line2[2]) / 2)
            appended = True
        return appended
    
    @staticmethod
    def smooth_data(array: np.ndarray, window=30):
        
        df = pd.DataFrame(list(array))
        result = []
        n = df.shape[0]
        half = window // 2
        for i in range(n):
            # dynamically shrink window near edges
            start = max(0, i - half)
            end = min(n, i + half + 1)
            result.append(df.iloc[start:end, 0].mean())
        return np.array(result)
    
    def smooth_all_data(self, data: Dict[str, list], save_file=None, propeller_weight=1.0):
        df = pd.DataFrame(data)
        df = df[df["rpm"] != 0]
        df.rpm = self.smooth_data(df.rpm.to_numpy(), 5)
        df.sound = self.smooth_data(df.sound.to_numpy(), 30)
        df.lift = self.smooth_data(df.lift.to_numpy(), 10)
        df.power = self.smooth_data(df.power.to_numpy(), 3)
        df['lift_per_weight'] = df.lift / propeller_weight

        if save_file is not None:
            df.to_pickle(save_file)
        return df
    
    def request_raw_sound(self) -> list:
        """
        Send 's' to Arduino to trigger burst mode.
        Collects until no more burst data arrives.
        """
        self.ser.write(b's')
        self.ser.flush()

        burst_data = []
        last_time = time.time()

        while True:
            line = self.read_line()
            if line:
                burst_data.append(line)
                last_time = time.time()
            else:
                # stop when nothing new for 0.2s
                if time.time() - last_time > 0.2:
                    break
        return burst_data[-16000:]

    def tare_scale(self):
        """Send a space character to Arduino (changes behaviour)."""
        self.ser.write(b' ')
        self.ser.flush()

    def close(self):
        """Close serial connection."""
        self.ser.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()


#### Rigol Serial

In [182]:
from time import perf_counter_ns, sleep
import numpy as np, plotly.express as px, pandas as pd, pyvisa
from typing import List, Dict, TypedDict, Optional, Callable


class RigolPSU:
    def __init__(self, resource_index=0):
        rm = pyvisa.ResourceManager()
        resources = rm.list_resources()
        if not resources:
            raise RuntimeError("No VISA instruments found.")
        self.instrument = rm.open_resource(resources[resource_index])

        # Set sensible defaults
        self.instrument.timeout = 5000
        self.instrument.read_termination = '\n'
        self.instrument.write_termination = '\n'

        self.on = False
        self.channel = 1
        self.data = {'voltage': [], 'power': []}

        # print("Connected to:", self.instrument.query("*IDN?").strip())

    def set_voltage(self, voltage: float, current_limit: float):
        self.instrument.write(f"APPL CH{self.channel},{voltage},{current_limit}")
        if not self.on:
            self.instrument.write(f"OUTP CH{self.channel},ON")
            self.on = True

    def measure_power(self) -> float:
        voltage = float(self.instrument.query(f"MEAS:VOLT? CH{self.channel}"))
        current = float(self.instrument.query(f"MEAS:CURR? CH{self.channel}"))
        return voltage * current
    
    def measure_power_over_time(self, seconds: float, time_step=.05):
        start = perf_counter_ns()
        end = start + seconds * 1e9
        time = []
        power = []
        while (current_time := perf_counter_ns()) < end:
            time.append(current_time)
            power.append(self.measure_power())
            sleep(time_step)
        return dict(x=(np.array(time)-start)/1e9, y=np.array(power))
    
    def volt_sweep(self, total_time=10.0, min_voltage = 2, max_voltage=10.0, current_limit: float = 2.0, 
                   measure_time=1, sleep_call_func: Optional[Callable] = None, func_kwargs={}):

        power_measurements = []
        voltages = np.linspace(min_voltage, max_voltage, round(total_time/measure_time)).round(1)
        used_voltages = []
        for voltage in voltages:

            start = perf_counter_ns()
            append = True
            if sleep_call_func is not None:
                append = sleep_call_func(**func_kwargs)
            sleep_call_duration = (perf_counter_ns() - start) / 1e9

            self.set_voltage(voltage, current_limit)

            if sleep_call_duration < measure_time:
                sleep(measure_time - sleep_call_duration)
                
            if append:
                used_voltages.append(voltage)
                power_measurements.append(round(self.measure_power(), 2))

        self.data = dict(voltage=used_voltages, power=np.array(power_measurements))
    
    def output_on(self):
        self.instrument.write(f"OUTP CH{self.channel},ON")
        self.on = True

    def output_off(self):
        self.instrument.write(f"OUTP CH{self.channel},OFF")
        self.on = False

    def close(self):
        if self.on:
            self.output_off()
        self.instrument.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()




In [194]:
with RigolPSU() as psu, ArduinoReader() as ar:
    ar.tare_scale()
    psu.volt_sweep(total_time=60, max_voltage=10, sleep_call_func=ar.read_normal, func_kwargs={'num_blades': 4})
    df = ar.smooth_all_data({**ar.data, **psu.data}, save_file='data/no ring .5', propeller_weight=7.193) #6.038

In [186]:
df_other: pd.DataFrame = pd.read_pickle('data/no ring')
df: pd.DataFrame = pd.read_pickle('data/with ring')

In [187]:
import plotly.express as px
import pandas as pd
import plotly.io as pio

def compare_scatter(df1, df2, x, y, title, label1="df1", label2="df2"):
    d1 = df1.copy(); d1["source"] = label1
    d2 = df2.copy(); d2["source"] = label2
    df_all = pd.concat([d1, d2])
    fig = px.scatter(
        df_all,
        x=x,
        y=y,
        color="source",
        title=title,
        labels={x: x.capitalize(), y: y.capitalize(), "source": "Dataset"},
    )
    # remove whitespace around plot
    fig.update_layout(
        margin=dict(l=20, r=20, t=40, b=20)  # adjust if you want even tighter
    )
    return fig

fig1 = compare_scatter(df, df_other, "power", "lift_per_weight", "Power vs eff"  , "with ring", "no ring")
fig2 = compare_scatter(df, df_other, "power", "lift"           , "Power vs Lift" , "with ring", "no ring")
fig3 = compare_scatter(df, df_other, "rpm"  , "lift"           , "RPM vs Lift"   , "with ring", "no ring")
fig4 = compare_scatter(df, df_other, "power", "sound"          , "Power vs Sound", "with ring", "no ring")

# write all figures to one HTML
with open("scatter_dashboard.html", "w") as f:
    f.write(pio.to_html(fig1, include_plotlyjs='cdn', full_html=False))
    f.write(pio.to_html(fig2, include_plotlyjs=False, full_html=False))
    f.write(pio.to_html(fig3, include_plotlyjs=False, full_html=False))
    f.write(pio.to_html(fig4, include_plotlyjs=False, full_html=False))