In [1]:
# !python -m pip install pyserial-asyncio

In [82]:
# CR30 Colorimeter with True Async/Await Support
import asyncio
import serial_asyncio
import struct
from typing import List, Tuple, Dict, Optional, Callable
import time
import numpy as np
from enum import Enum
from collections.abc import Mapping
from abc import abstractmethod, ABC



In [105]:
class MeasurementMode(Enum):
    REFLECTANCE = "reflectance"
    TRANSMITTANCE = "transmittance"
    EMISSION = "emission"
    FLUORESCENCE = "fluorescence"

class staticproperty:
    def __init__(self, fget):
        self.fget = fget
    def __get__(self, instance, owner):
        return self.fget()

class WhitePoint(Mapping):
    """
    Provides CIE 1931 XYZ white points as a read-only mapping.
    
    Supports standard illuminants and common white LEDs.
    """
    
    def __init__(self):
        # Base white points dictionary
        self._data = {
            # Standard illuminants
            "D50/10": (96.72, 100.000, 81.43),
            "D55/10": (95.682, 100.000, 92.149),
            "D65/10": (94.81, 100.000, 107.32),
            "D75/10": (94.972, 100.000, 122.638),
            "D50/2": (96.422, 100.000, 82.521),
            "D65/2": (95.047, 100.000, 108.883),
            "D75/2": (94.972, 100.000, 122.638),
            "A": (109.850, 100.000, 35.585),
            "B": (99.092, 100.000, 85.313),
            "C": (98.074, 100.000, 118.232),
            "E": (100.000, 100.000, 100.000),
            "F1": (92.834, 100.000, 103.665),
            "F2": (99.187, 100.000, 67.395),
            "F3": (103.754, 100.000, 49.861),
            "F4": (109.147, 100.000, 38.813),
            "F5": (90.872, 100.000, 98.723),
            "F6": (97.309, 100.000, 60.188),
            "F7": (95.044, 100.000, 108.755),
            "F8": (96.413, 100.000, 82.333),
            "F9": (100.365, 100.000, 67.868),
            "F10": (96.174, 100.000, 108.882),
            "F11": (100.966, 100.000, 64.370),
            "F12": (108.046, 100.000, 39.228),
            
            # Common white LEDs
            "LED_CW_6500K":  (95.04, 100.0, 108.88),
            "LED_NW_4300K":  (97.0, 100.0, 92.0),
            "LED_WW_3000K":  (98.5, 100.0, 67.0),
            "LED_VWW_2200K": (103.0, 100.0, 50.0),
        }

    # ----------------------
    # Mapping interface
    # ----------------------
    
    def __getitem__(self, key):
        # Auto-append /10 for standard illuminants without observer
        key_upper = key.upper()
        if "/" not in key_upper and key_upper in ["D50","D55","D65","D75"]:
            key_upper += "/10"
        if key_upper not in self._data:
            raise KeyError(f"Unknown whitepoint '{key}'")
        return self._data[key_upper]
    
    def __iter__(self):
        return iter(self._data)
    
    def __len__(self):
        return len(self._data)
    
    @staticproperty
    def D50_10(): return WhitePoint()['D65/10']
    @staticproperty
    def D65_10(): return WhitePoint()['D65/10']
    @staticproperty
    def D75_10(): return WhitePoint()['D65/10']
    @staticproperty
    def D50_2(): return WhitePoint()['D65/2']
    @staticproperty
    def D65_2(): return WhitePoint()['D65/2']
    @staticproperty
    def D75_2(): return WhitePoint()['D65/2']

class ColorScience(ABC):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    @abstractmethod
    def spectrum_to_xyz(self, spd, wavelengths=None, illuminant="D65/10"):
        pass
    @abstractmethod
    def adapt_xyz(self, X, Y, Z, Ws, Wd, method="bradford"):
        pass
    @abstractmethod
    def xyz_to_lab(self, X, Y, Z, illuminant=WhitePoint.D65_10):
        pass
    @abstractmethod
    def xyz_to_rgb(self, X, Y, Z, out_255=True):
        pass
    @abstractmethod
    def lab_to_xyz(self, L, a, b, illuminant=WhitePoint.D65_10):
        pass
    @abstractmethod
    def rgb_to_xyz(self, r, g, b):
        pass

    def rgb_to_lab(self, r, g, b, illuminant=None):
        """
        Convert sRGB → CIE LAB by chaining rgb_to_xyz → xyz_to_lab.
        """
        X, Y, Z = rgb_to_xyz(r, g, b)
        L, a, b = xyz_to_lab(X, Y, Z, illuminant=illuminant)
        return L, a, b

    def lab_to_rgb(self, L, a, b, illuminant=None, out_255=True):
        """
        Convert CIE LAB → sRGB by chaining lab_to_xyz → xyz_to_rgb.
        """
        X, Y, Z = lab_to_xyz(L, a, b, illuminant=illuminant)
        r, g, b = xyz_to_rgb(X, Y, Z, out_255=out_255)
        return r, g, b

    def calculate_k_s(self, reflectance):
        """Calculate Kubelka-Munk function from reflectance"""
        R = np.array(reflectance) / 100.0  # Convert to 0-1 range
        # Avoid division by zero
        R = np.clip(R, 0.01, 0.99)
        k_s = (1 - R)**2 / (2 * R)
        return k_s

class ColorScienceImpl(ColorScience):
    """
    Comprehensive spectral color calculator for different measurement modes.
    Handles different resolutions between device (10nm) and reference data (1nm).
    """
    # Chromatic adaptation matrices
    _CAT_MATRICES = {
        "bradford": np.array([
            [ 0.8951,  0.2664, -0.1614],
            [-0.7502,  1.7135,  0.0367],
            [ 0.0389, -0.0685,  1.0296]
        ]),
        "cat02": np.array([
            [ 0.7328,  0.4296, -0.1624],
            [-0.7036,  1.6975,  0.0061],
            [ 0.0030,  0.0136,  0.9834]
        ]),
        # Classic von Kries (Hunt–Pointer–Estevez) cone model
        "kries": np.array([
            [ 0.4002,  0.7075, -0.0807],
            [-0.2280,  1.1500,  0.0612],
            [ 0.0000,  0.0000,  0.9184]
        ])
    }
    
    def __init__(self, wavelengths=None, load=True):
        self.wavelengths = wavelengths
        self._observers = None
        self._illuminants = None
        if load:
            self.load_reference_data()
        
    def load_reference_data(self):
        """Load CMF and illuminant data from CSV files"""
        self._observers = self._load_cmf_data()
        self._illuminants = self._load_illuminants()
    
    def upsample_interpolate(self, X, Xp, Yp):
        """
        Interpolates low-resolution data (Xp, Yp) onto a new
        high-resolution X grid using linear interpolation.
        """
        X  = np.array(X, dtype=float)
        Xp = np.array(Xp, dtype=float)
        Yp = np.array(Yp, dtype=float)

        # Use np.interp for linear interpolation
        Y = np.interp(X, Xp, Yp)

        return Y    

    def restrict_to_X_range(self, X, Xp, Yp):
        """
        Restricts Xp and Yp to lie within the min/max of X.
        Removes all points outside [min(X), max(X)].
        """
        X  = np.array(X,  dtype=float)
        Xp = np.array(Xp, dtype=float)
        Yp = np.array(Yp, dtype=float)

        Xmin, Xmax = np.min(X), np.max(X)

        # Keep only points within the range
        mask = (Xp >= Xmin) & (Xp <= Xmax)

        return Xp[mask], Yp[mask]

    def downsample_nearest(self, X, Xp, Yp):
        """
        Downsamples Yp from Xp to X discarding non-existent X.
        Could be made with interpolation, however, here we usually
        want to downsample 1nm into 10nm resolution.
        """
        X  = np.array(X, dtype=float)
        Xp = np.array(Xp, dtype=float)
        Yp = np.array(Yp, dtype=float)

        Y = np.zeros_like(X)

        for i, Xi in enumerate(X):
            # Find index of closest X'
            idx = np.argmin(np.abs(Xp - Xi))
            Y[i] = Yp[idx]

        return Y

    def spectrum_to_xyz(self, spd, wavelengths=None, illuminant="D65/10"):
        """
        Convert a spectral power distribution to CIE XYZ tristimulus values.
        
        Parameters:
        -----------
        wavelengths : array-like
            Wavelength values in nm
        spd : array-like
            Spectral power distribution values (reflectance for reflective objects, 
            or emission spectrum for light sources)
        illuminant : string, optional
            Illuminant spectral power distribution (e.g., D65) at the same wavelengths.
            If None, assumes spd is an emissive spectrum (light source).
            If provided, assumes spd is a reflectance spectrum.
        
        Returns:
        --------
        tuple : (X, Y, Z)
            CIE XYZ tristimulus values, normalized so that Y=100 for a perfect 
            white reflector or reference white point.
        """
        
        if self.wavelengths or self.wavelengths == wavelengths:
            # wavelengths provided at the beginning and cmfs already downscaled
            # no need to upscale/downscale
            wavelengths = np.array(self.wavelengths)
            cmf = self.get_observer(observer=illuminant)
            illuminant = self.get_illuminant(illuminant=illuminant) if illuminant is not None else None
        elif wavelengths:
            # wavelengths differ in resolution from our cmf, so we need to first restrict wavelengths so they match
            # then do the upscaling of our spd
            cmf = self.get_observer(wavelengths, observer=illuminant)
            spd = self.upsample_interpolate(cmf["wavelengths"], wavelengths, spd)
            wavelengths = np.array(cmf["wavelengths"])
            illuminant = self.get_illuminant(wavelengths, illuminant=illuminant) if illuminant is not None else None
        else:
            raise Exception("wavelengths missing!")


        if len(wavelengths) != len(spd):
            raise Exception("wavelengths and spd lengths must match")

        spd = np.array(spd) / 100
        cmf_x = np.array(cmf["x_bar"])
        cmf_y = np.array(cmf["y_bar"])
        cmf_z = np.array(cmf["z_bar"])
        
        # Calculate wavelength interval (assuming uniform spacing)
        if len(wavelengths) > 1:
            delta_lambda = wavelengths[1] - wavelengths[0]
        else:
            delta_lambda = 1.0
        
        # Case 1: Reflective object (illuminant provided)
        if illuminant is not None:
            illuminant = np.array(illuminant["values"])
            
            # Calculate normalization constant k
            # k = 100 / Σ[S(λ) × ȳ(λ) × Δλ]
            k = 100.0 / np.sum(illuminant * cmf_y * delta_lambda)
            
            # Calculate XYZ values
            # X = k × Σ[S(λ) × R(λ) × x̄(λ) × Δλ]
            X = k * np.sum(illuminant * spd * cmf_x * delta_lambda)
            Y = k * np.sum(illuminant * spd * cmf_y * delta_lambda)
            Z = k * np.sum(illuminant * spd * cmf_z * delta_lambda)
        
        # Case 2: Emissive spectrum (no illuminant)
        else:
            # Calculate normalization constant k
            # k = 100 / Σ[E(λ) × ȳ(λ) × Δλ]
            k = 100.0 / np.sum(spd * cmf_y * delta_lambda)
            
            # Calculate XYZ values
            # X = k × Σ[E(λ) × x̄(λ) × Δλ]
            X = k * np.sum(spd * cmf_x * delta_lambda)
            Y = k * np.sum(spd * cmf_y * delta_lambda)
            Z = k * np.sum(spd * cmf_z * delta_lambda)
        
        # inferred corrections
        # X *= 0.997949
        # Y *= 0.997800
        # Z *= 0.997643

        return float(X), float(Y), float(Z)

    def adapt_xyz(self, X, Y, Z, Ws, Wd, method="bradford"):
        """
        Adapt XYZ from source white Ws to destination white Wd using
        Bradford, CAT02, or von Kries chromatic adaptation.
        
        Parameters:
        -----------
        X, Y, Z : float
            Input XYZ values
        Ws, Wd : 3-element arrays
            Source and destination white points (XYZ tristimulus values)
        method : str
            "bradford", "cat02", or "kries"
        """
        method = method.lower()
        if method not in self._CAT_MATRICES:
            raise ValueError(f"Unknown adaptation method '{method}'. "
                            "Use 'bradford', 'cat02', or 'kries'.")

        M = self._CAT_MATRICES[method]
        M_inv = np.linalg.inv(M)

        XYZ = np.array([X, Y, Z])

        # Convert to cone response domain
        src_cone = M @ Ws
        dst_cone = M @ Wd
        XYZ_cone = M @ XYZ

        # Von Kries scaling
        scale = dst_cone / src_cone
        adapted_cone = XYZ_cone * scale

        # Convert back to XYZ
        XYZ_adapted = M_inv @ adapted_cone
        return float(XYZ_adapted[0]), float(XYZ_adapted[1]), float(XYZ_adapted[2])

    def xyz_to_lab(self, X, Y, Z, illuminant=WhitePoint.D65_10):
        """
        Convert CIE XYZ to CIE LAB color space.
        
        Parameters:
        -----------
        X, Y, Z : float
            CIE XYZ tristimulus values
        illuminant : tuple, optional
            Reference white point. Default is 'D65' which uses D65/10° values.
            Can also pass a tuple of (Xn, Yn, Zn) values directly.
        
        Returns:
        --------
        tuple : (L, a, b)
            CIE LAB values
        """

        if illuminant is None:
            Xn, Yn, Zn = WhitePoint.D65_10
        elif isinstance(illuminant, (tuple, list)):
            Xn, Yn, Zn = illuminant
        else:
            raise ValueError("reference_white must be a tuple of (Xn, Yn, Zn)")
        
        # Normalize by reference white
        xr = X / Xn
        yr = Y / Yn
        zr = Z / Zn
        
        # Apply the f(t) function
        def f(t):
            delta = 6/29
            if t > delta**3:
                return np.cbrt(t)
            else:
                return t / (3 * delta**2) + 4/29
        
        # Vectorized version for arrays
        def f_vectorized(t):
            delta = 6/29
            return np.where(t > delta**3, 
                        np.cbrt(t), 
                        t / (3 * delta**2) + 4/29)
        
        # Use vectorized version if inputs are arrays, otherwise use scalar version
        if isinstance(xr, np.ndarray):
            fx = f_vectorized(xr)
            fy = f_vectorized(yr)
            fz = f_vectorized(zr)
        else:
            fx = f(xr)
            fy = f(yr)
            fz = f(zr)
        
        # Calculate LAB values
        L = 116 * fy - 16
        a = 500 * (fx - fy)
        b = 200 * (fy - fz)
        
        return float(L), float(a), float(b)

    def xyz_to_rgb(self, X, Y, Z, out_255=True):
        """
        Convert CIE XYZ (D65) to sRGB.
        
        Parameters
        ----------
        X, Y, Z : float or ndarray
            XYZ tristimulus values.
        out_255 : bool
            If True, output will be 0–255 ints. If False, output stays 0–1 floats.

        Returns
        -------
        (r, g, b) : float or ndarray
            RGB values, either 0–255 or 0–1.
        """

        xyz = np.array([X, Y, Z], dtype=float)

        # XYZ → sRGB matrix, D65
        M_inv = np.array([
            [ 3.2404542, -1.5371385, -0.4985314],
            [-0.9692660,  1.8760108,  0.0415560],
            [ 0.0556434, -0.2040259,  1.0572252]
        ])

        rgb_lin = M_inv @ xyz

        # Gamma correction
        def gamma(c):
            return np.where(c <= 0.0031308,
                            12.92 * c,
                            1.055 * (c ** (1 / 2.4)) - 0.055)

        rgb = gamma(rgb_lin)

        # Clip to valid range
        rgb = np.clip(rgb, 0, 1)

        # Optionally convert to 0–255
        if out_255:
            return (rgb * 255).round().astype(int)
        return tuple(float(i) for i in rgb)

    def lab_to_xyz(self, L, a, b, illuminant=WhitePoint.D65_10):
        """
        Convert CIE LAB to CIE XYZ color space (inverse of xyz_to_lab).
        
        Parameters:
        -----------
        L, a, b : float
            CIE LAB values
        illuminant : str or tuple, optional
            Reference white point. Same behavior as in xyz_to_lab().
            Defaults to D65/10°.
        
        Returns:
        --------
        tuple : (X, Y, Z)
            CIE XYZ tristimulus values
        """

        # Select reference white point (same as xyz_to_lab)
        if illuminant is None:
            Xn, Yn, Zn = WhitePoint.D65_10
        elif isinstance(illuminant, (tuple, list)):
            Xn, Yn, Zn = illuminant
        else:
            raise ValueError("illuminant must be (Xn,Yn,Zn) tuple")

        # Helper function: inverse of f(t)
        def f_inv(ft):
            delta = 6/29
            return np.where(
                ft > delta,
                ft**3,
                3 * delta**2 * (ft - 4/29)
            )

        # Compute intermediate values
        fy = (L + 16) / 116
        fx = fy + (a / 500)
        fz = fy - (b / 200)

        # Convert back to XYZ (normalized by white)
        xr = f_inv(fx)
        yr = f_inv(fy)
        zr = f_inv(fz)

        # Scale by reference white
        X = xr * Xn
        Y = yr * Yn
        Z = zr * Zn

        return float(X), float(Y), float(Z)

    def rgb_to_xyz(self, r, g, b):
        """
        Convert sRGB (0–255 or 0–1) to CIE XYZ (D65).
        
        Parameters
        ----------
        r, g, b : float or ndarray
            RGB components. Can be 0–255 or 0–1. If 0–255, will be normalized.

        Returns
        -------
        (X, Y, Z) : float or ndarray
            XYZ tristimulus values.
        """

        # Normalize if in 0–255 range
        rgb = np.array([r, g, b], dtype=float)
        if rgb.max() > 1.0:
            rgb /= 255.0

        # Inverse gamma correction
        def inv_gamma(c):
            return np.where(c <= 0.04045,
                            c / 12.92,
                            ((c + 0.055) / 1.055) ** 2.4)

        rgb_lin = inv_gamma(rgb)

        # sRGB → XYZ matrix, D65
        M = np.array([
            [0.4124564, 0.3575761, 0.1804375],
            [0.2126729, 0.7151522, 0.0721750],
            [0.0193339, 0.1191920, 0.9503041]
        ])

        X, Y, Z = M @ rgb_lin
        return float(X), float(Y), float(Z)

    # File loading methods
    def load_csv(self, path):
        with open(path) as f:
            return [
                [float(j) if j != "NaN" else 0.0 for j in i.strip().split(",")]
                for i in f.readlines()
                if len(i) > 2
            ]

    def load_illuminant(self, path):
        """Returns dict with wavelengths and values"""
        data = self.load_csv(path)
        if self.wavelengths:
            return {
                'wavelengths': np.array(self.wavelengths),
                'values': self.downsample_nearest(self.wavelengths, [i[0] for i in data], [i[1] for i in data]),
            }
            
        return {
            'wavelengths': np.array([i[0] for i in data]),
            'values': np.array([i[1] for i in data])
        }

    def load_observer(self, path):
        """Returns dict with wavelengths and CMF values"""
        data = self.load_csv(path)
        if self.wavelengths:
            return {
                'wavelengths': np.array(self.wavelengths),
                'x_bar': self.downsample_nearest(self.wavelengths, [i[0] for i in data], [i[1] for i in data]),
                'y_bar': self.downsample_nearest(self.wavelengths, [i[0] for i in data], [i[2] for i in data]),
                'z_bar': self.downsample_nearest(self.wavelengths, [i[0] for i in data], [i[3] for i in data]),
            }

        return {
            "wavelengths": np.array([i[0] for i in data]),
            "x_bar": np.array([i[1] for i in data]),
            "y_bar": np.array([i[2] for i in data]),
            "z_bar": np.array([i[3] for i in data]),
        }

    def _load_cmf_data(self):
        """Load CIE color matching functions"""
        return {
            '10': self.load_observer("CIE_xyz_1964_10deg.csv"),
            '2': self.load_observer("CIE_xyz_1931_2deg.csv"),
        }
    
    def _load_illuminants(self):
        """Load standard illuminant data"""
        return {
            'D65': self.load_illuminant("CIE_std_illum_D65.csv"),
            'D50': self.load_illuminant("CIE_std_illum_D50.csv"),
            'A': self.load_illuminant("CIE_std_illum_A_1nm.csv"),
        }

    def get_observer(self, wavelengths=None, observer="10"):
        if '/' in observer:
            observer = observer.split('/')[1]
        elif observer is None:
            observer = '10'
        cmf = self._observers[observer]
        if wavelengths is None:
            return cmf
        _cmf = {}
        _cmf["wavelengths"], _cmf["x_bar"] = self.restrict_to_X_range(wavelengths, cmf["wavelengths"], cmf["x_bar"])
        _, _cmf["y_bar"] = self.restrict_to_X_range(wavelengths, cmf["wavelengths"], cmf["y_bar"])
        _, _cmf["z_bar"] = self.restrict_to_X_range(wavelengths, cmf["wavelengths"], cmf["z_bar"])
        return _cmf

    def get_illuminant(self, wavelengths=None, illuminant="D65"):
        if '/' in illuminant:
            illuminant = illuminant.split('/')[0]
        elif illuminant is None:
            illuminant = 'D65'
        spd = self._illuminants[illuminant]
        if wavelengths is None:
            return spd
        _spd = {}
        _spd["wavelengths"], _spd["values"] = self.restrict_to_X_range(wavelengths, spd["wavelengths"], spd["values"])
        return _spd



wavelengths = [400+i*(700-400)/30.0 for i in range(31)]
c = ColorScienceImpl(wavelengths=wavelengths)

test = [
  {
    "LAB": [77.77,68.59,39.4],
    "XYZ_bar": (17.180952072143555, 21.955293655395508, 23.535581588745117),
    "LAB_bar": (53.979538268065966, -20.27722831914125, -10.994395896617014),
    "SPD": [23.898427963256836, 23.30441665649414, 23.15403175354004, 23.656957626342773, 24.614757537841797, 24.712610244750977, 22.19900131225586, 18.11726188659668, 14.16519546508789, 10.539053916931152, 6.648414134979248, 5.761203289031982, 12.976861000061035, 27.517276763916016, 50.46492004394531, 88.36058807373047, 135.19223022460938, 165.1352996826172, 165.7974853515625, 148.69451904296875, 130.1623992919922, 116.8099136352539, 107.09515380859375, 99.15723419189453, 95.44196319580078, 95.362548828125, 97.0391616821289, 100.17365264892578, 0.0, 0.0, 0.0]
  },
  {
    "LAB": [94.77,-21.06,75.27],
    "XYZ_bar": (12.567733764648438, 14.387170791625977, 14.949334144592285),
    "LAB_bar": (44.783138701407545, -8.485204922731082, -8.367437896777785),
    "SPD": [14.073362350463867, 11.897355079650879, 12.145124435424805, 16.8687801361084, 25.229473114013672, 37.675106048583984, 54.553279876708984, 74.28650665283203, 93.42936706542969, 105.80400085449219, 111.53778076171875, 109.79427337646484, 105.29646301269531, 97.67935943603516, 91.34756469726562, 87.81027221679688, 87.99266815185547, 88.89205169677734, 89.01248931884766, 89.27753448486328, 88.6020278930664, 86.75908660888672, 85.03485107421875, 83.5748519897461, 83.69290161132812, 84.84528350830078, 86.18856048583984, 87.56346130371094, 0.0, 0.0, 0.0]
  },
  {
    "LAB": [30.57,-6.09,-49.07],
    "XYZ_bar": (0.0, 1.6630793809890747, 6.584780216217041),
    "LAB_bar": (13.609357407490421, -58.66102330814837, -35.053172755721874),
    "SPD": [13.707263946533203, 24.877330780029297, 34.19121551513672, 37.345027923583984, 36.9126091003418, 33.529205322265625, 27.4881591796875, 20.151046752929688, 13.055499076843262, 7.297297477722168, 3.8680264949798584, 2.299462080001831, 1.1366851329803467, 0.30736032128334045, 0.29125112295150757, 1.0660878419876099, 1.718971848487854, 0.8910048007965088, 0.008038442581892014, 0.3875650465488434, 0.7629326581954956, 0.8622304201126099, 1.1714918613433838, 1.5497725009918213, 1.4481699466705322, 0.8264680504798889, 0.4876132905483246, 0.49265021085739136, 0.0, 0.0, 0.0]
  },
]
print(len(wavelengths), "wavelengths:", wavelengths)
print("D65/10", WhitePoint.D65_10)
for i in test:
  spd = list(i["XYZ_bar"]) + i["SPD"][:-3]
  print("Target LAB:", i["LAB"])
  print(" len(SPD)", len(spd))

  xyz = [float(i) for i in c.spectrum_to_xyz(spd, illuminant='D65/10')]
  print(" XYZ:", xyz)
  print(" LAB:", c.xyz_to_lab(*xyz))


31 wavelengths: [400.0, 410.0, 420.0, 430.0, 440.0, 450.0, 460.0, 470.0, 480.0, 490.0, 500.0, 510.0, 520.0, 530.0, 540.0, 550.0, 560.0, 570.0, 580.0, 590.0, 600.0, 610.0, 620.0, 630.0, 640.0, 650.0, 660.0, 670.0, 680.0, 690.0, 700.0]
D65/10 (94.81, 100.0, 107.32)
Target LAB: [77.77, 68.59, 39.4]
 len(SPD) 31
 XYZ: [80.48530875833514, 53.16300160790341, 24.9502483208088]
 LAB: (77.97113817316729, 68.38386461855144, 39.04102271431245)
Target LAB: [94.77, -21.06, 75.27]
 len(SPD) 31
 XYZ: [73.74375494770615, 89.49312221036547, 22.735536209499553]
 LAB: (95.78611830872244, -22.010525540150983, 73.50854003827558)
Target LAB: [30.57, -6.09, -49.07]
 len(SPD) 31
 XYZ: [5.586958108818752, 6.438372027625087, 29.11869281507377]
 LAB: (30.492547685254088, -5.828368135075701, -49.317764296683684)


In [69]:
WhitePoint.D65_10

(94.81, 100.0, 107.32)

In [78]:
class Device:
    """Handles raw byte-level communication with CR30 asynchronously using asyncio.Queue."""

    def __init__(self, port='COM3', baudrate=19200, verbose=True):
        self.port = port
        self.baudrate = baudrate
        self.reader = None
        self.writer = None

        self._raw_callbacks: List[Callable[[bytes], None]] = []

        # Async FIFO for frames
        self._frames: asyncio.Queue[bytes] = asyncio.Queue()
        self._receive_task: Optional[asyncio.Task] = None
        self._running = False
        self._device_name = None
        self._device_model = None
        self._device_serial = None
        self._device_firmware = None
        self._device_build = None
        self._verbose = verbose

    @property
    def name(self) -> str:
        return self._device_name
    @property
    def model(self) -> str:
        return self._device_model
    @property
    def serial_number(self) -> str:
        return self._device_serial
    @property
    def fw_version(self) -> str:
        return self._device_firmware
    @property
    def fw_build(self) -> str:
        return self._device_build

    @property
    def verbose(self) -> bool:
        return self._verbose
    @verbose.setter
    def verbose(self, v: bool):
        self._verbose = v

    async def connect(self):
        self.reader, self.writer = await serial_asyncio.open_serial_connection(
            url=self.port, baudrate=self.baudrate
        )
        print(f"Connected to {self.port} at {self.baudrate} baud")
        self._running = True
        self._receive_task = asyncio.create_task(self._receive_loop())
        await asyncio.sleep(1)
        await self.handshake()

    async def disconnect(self):
        self._running = False
        if self._receive_task:
            self._receive_task.cancel()
            try:
                await self._receive_task
            except asyncio.CancelledError:
                pass
        if self.writer:
            self.writer.close()
            await self.writer.wait_closed()
        print("Disconnected")

    async def handshake(self):
        """Perform full CR30 handshake and populate device info using send_recv."""
        # -------------------------
        # AA 0A xx commands (device info)
        # -------------------------
        def parse_name(data):
            # 00 56 00 19 03 53 44 36 38 37 30 42 36 36 37 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 43 52 33 30 00 00 00 00 00 00 00 00 00 00 00 00 00
            # Parse Name: SD6870B667    CR30
            self._device_name = data[5:30].decode('ascii', errors='ignore').strip('\x00')
            self._device_model = data[35:45].decode('ascii', errors='ignore').strip('\x00')
        def parse_serial_number(data):
            # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 43 4D 34 34 33 4C 30 37 38 37 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 56 31 31 2E 33 2E 00
            # Parse Serial Number: CM443L0787   V11.3.
            self._device_serial = data[16:30].decode('ascii', errors='ignore').strip('\x00')
            self._device_serial += " - " + data[-7:].decode('ascii', errors='ignore').strip('\x00')
        def parse_firmware(data):
            # 00 30 2E 30 2E 32 30 32 33 31 32 31 39 00 00 00 00 00 00 00 00 00 00 00 00 56 31 30 2E 30 2E 30 2E 30 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
            # Parse Firmware: 0.0.20231219    V10.0.0.0
            self._device_build = data[1:20].decode('ascii', errors='ignore').strip('\x00')
            self._device_firmware = data[20:30].decode('ascii', errors='ignore').strip('\x00')

        aa_commands = [
            (0x00, parse_name),
            (0x01, parse_serial_number),
            (0x02, parse_firmware),
            (0x03, None)  # status / build info, optional
        ]
        
        await self.flush_recv()
        for subcmd, parse in aa_commands:
            response = await self.send_recv(0xAA, 0x0A, subcmd, 0x00, timeout=1.0)
            if not response or len(response) < 60:
                print(f"AA 0A {subcmd:02X} failed or incomplete response")
                continue
            
            payload = self._extract_payload_bytes(response)
            
            if parse:
                parse(payload)
                info_str = payload.decode('ascii', errors='ignore').strip('\x00')
                if self._verbose: print(f"{parse.__name__.replace('_',' ').title()}: {info_str}")
        
        # -------------------------
        # BB 17 - initialize device
        # -------------------------
        response = await self.send_recv(0xBB, 0x17, 0x00, 0x00, timeout=1.0)
        
        # -------------------------
        # BB 13 - simple 'Check'
        # -------------------------
        response = await self.send_recv(0xBB, 0x13, 0x00, 0x00, data=b'Check' + b'\x00'*7, timeout=1.0)
        
        # -------------------------
        # BB 28 - query parameters (0x00..0xFF)
        # -------------------------
        for idx in [0x00, 0x01, 0x02, 0x03, 0xFF]:
            response = await self.send_recv(0xBB, 0x28, 0x00, idx, timeout=1.0)
            # optional: store or parse parameter info
        
        print("Handshake complete!")

    async def flush_recv(self):
        """Empty all items from the async queue."""
        try:
            while True:
                self._frames.get_nowait()
        except asyncio.QueueEmpty:
            pass

    async def _receive_loop(self):
        """Continuously read 60-byte chunks and put valid frames in the async queue."""
        try:
            while self._running:
                data = await self.reader.readexactly(60)
                if (data[0] == 0xAA and data[-2] == 0xFF) or (data[0] == 0xBB and data[-2] in (0x00, 0xFF)):
                    if self._verbose: print(f"+R: {data.hex(' ').upper()}...")
                    await self._frames.put(data)  # put frame in queue
                    for cb in self._raw_callbacks:
                        cb(data)
                else:
                    if self._verbose: print(f"-R: {data.hex(' ').upper()}...")
        except asyncio.CancelledError:
            pass
        except Exception as e:
            print(f"Receive loop error: {e}")

    # -----------------
    # Frame retrieval
    # -----------------
    async def recv(self, timeout: Optional[float] = None) -> Optional[bytes]:
        """Get a frame from the async FIFO queue. Waits up to timeout seconds."""
        try:
            return await asyncio.wait_for(self._frames.get(), timeout=timeout)
        except asyncio.TimeoutError:
            return None

    # -----------------
    # Packet Handling
    # -----------------
    def _extract_payload_bytes(self, packet: bytes) -> bytes:
        """Return the payload bytes area (bytes 4..55 inclusive) — 52 bytes"""
        if not packet or len(packet) < 60:
            return b""
        return packet[4:56]

    def _parse_header(self, packet: bytes) -> Dict:
        """
        Parse the header packet (subcmd 0x09).
        
        Args:
            packet: Full 60-byte packet
            
        Returns:
            Dict with header info
        """
        payload = self._extract_payload_bytes(packet)
        return {
            "cmd": packet[1],
            "subcmd": packet[2],
            "param": packet[3],
            "payload": payload.hex(),
        }

    def calculate_checksum(self, packet: bytes) -> int:
        checksum = sum(packet[:58]) % 256
        if packet[0] == 0xBB:
            checksum = (checksum - 1) % 256
        return checksum

    def build_packet(self, start: int, cmd: int, subcmd: int = 0,
                     param: int = 0, data: bytes = None) -> bytes:
        packet = bytearray(58)
        packet[0] = start
        packet[1] = cmd
        packet[2] = subcmd
        packet[3] = param
        if data:
            packet[4:4+len(data)] = data[:54]
        checksum = self.calculate_checksum(packet)
        return bytes(packet + bytes([0xFF, checksum]))

    async def send_raw(self, packet: bytes):
        if self._verbose: print(f"Send: {packet.hex(' ').upper()}...")
        self.writer.write(packet)
        await self.writer.drain()

    async def send(self, start: int, cmd: int, subcmd: int = 0,
                   param: int = 0, data: bytes = None):
        packet = self.build_packet(start, cmd, subcmd, param, data)
        await self.send_raw(packet)

    async def send_recv(self, start: int, cmd: int, subcmd: int = 0,
                        param: int = 0, data: bytes = None, timeout: float = 1.0) -> Optional[bytes]:
        await self.send(start, cmd, subcmd, param, data)
        return await self.recv(timeout=timeout)

    # -----------------
    # Callbacks
    # -----------------
    def register_raw_callback(self, callback: Callable[[bytes], None]):
        self._raw_callbacks.append(callback)


# Connect to the device (starts async receive loop)
dev = Device(port='COM3', baudrate=19200, verbose=False)

try:
    await dev.connect()
    print("Name: ", dev.name)
    print("Model: ", dev.model)
    print("Serial No: ", dev.serial_number)
    print("Version: ", dev.fw_version)
    print("Date: ", dev.fw_build)

    await asyncio.sleep(0.1)

    print("Waiting for button press")
    dev.verbose = True
    await asyncio.sleep(1)
    print("Waiting for long button press")
    await asyncio.sleep(5)

finally:
    await dev.disconnect()        

Connected to COM3 at 19200 baud
Handshake complete!
Name:  SD6870B667
Model:  CR30
Serial No:  M443L0787 - V11.3.
Version:  V10.0
Date:  0.0.20231219
Waiting for button press
Waiting for long button press
Disconnected


In [85]:
import struct
from typing import List, Dict, Callable, Optional
import asyncio

class Protocol(Device):
    """Handles CR30 commands, handshake, and measurement sequencing.

    Assumes Device implements:
      - async send(start, cmd, subcmd, param, data=None)
      - async send_recv(start, cmd, subcmd, param, data=None, timeout=None) -> Optional[bytes]
      - async recv(timeout=None) -> Optional[bytes]
      - async flush_recv()
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._response_callbacks: List[Callable[[Dict], None]] = []
        self._raw_measurement_callbacks: List[Callable[[Dict], None]] = []
        self.wavelengths = list(range(400, 701, 10))  # 31 bands

    # -----------------
    # Callbacks
    # -----------------
    def register_response_callback(self, callback: Callable[[Dict], None]):
        self._response_callbacks.append(callback)

    def register_raw_measurement_callback(self, callback: Callable[[Dict], None]):
        self._raw_measurement_callbacks.append(callback)
        
    # -----------------
    # Calibration
    # -----------------
    async def calibrate(self, white: bool = False, timeout: float = 5.0) -> Dict:
        """
        Perform black/white calibration.
        Place device on appropriate calibration tile and call this method.
        
        Returns:
            Dict with status information
        """
        print(f"Starting {"white" if white else "black"} calibration...")
        response = await self.send_recv(0xBB, 0x11 if white else 0x10, 0x00, 0x00, timeout=timeout)
        
        if not response:
            raise asyncio.TimeoutError("No response from calibration command")
        
        # Parse response - byte 5 should be 0x01 if successful
        payload = self._extract_payload_bytes(response)
        status = payload[1] if len(payload) > 1 else 0
        
        result = {
            "success": status == 0x01,
            "status_byte": status,
            "raw_response": response.hex()
        }
        
        if result["success"]:
            print(f"{"White" if white else "Black"} calibration successful!")
        else:
            print(f"{"White" if white else "Black"} calibration failed (status: 0x{status:02X})")
        
        return result

    # -----------------
    # Measurement
    # -----------------
    async def wait_measurement(self, timeout: float = 15, timeout_per_step: float = 1.5) -> Dict:
        """
        Waits for a button press and initiates the reading process.
        """
        response = await self.recv(timeout)
        if not response:
            raise asyncio.TimeoutError("No button press detected")
        return await self._read_measurement(response, timeout_per_step=timeout_per_step)

    async def measure(self, timeout_per_step: float = 1.5) -> Dict:
        """
        Triggers PC-initiated measurement and reads data
        """
        header_packet = await self.send_recv(0xBB, 0x01, 0x00, 0x00, timeout=timeout_per_step)
        if not header_packet:
            raise asyncio.TimeoutError("No header response after trigger (BB 01 00).")
        return await self._read_measurement(header_packet, timeout_per_step)

    # -----------------
    # Measurement - Main Entry Point
    # -----------------
    async def _read_measurement(self, initial_response: bytes, timeout_per_step: float = 1.5) -> Dict:
        """
        Main measurement reading orchestrator.
        Coordinates reading all chunks and assembling final result.
        
        Args:
            initial_response: The header packet (subcmd 0x09)
            timeout_per_step: Timeout for each chunk request
            
        Returns:
            Complete measurement dict with XYZ, LAB, SPD, and raw data
        """
        await self.flush_recv()

        result = {
            "header": None,
            "chunks": [],
            "xyz": None,
            "xyz_copy": None,
            "spd": [],
            "raw": {}
        }

        # 1) Parse header
        result["header"] = self._parse_header(initial_response)
        result["raw"]["header"] = initial_response.hex()

        # 2) Read and parse all data chunks
        spd_bytes = await self._read_all_chunks(result, timeout_per_step)

        result["spd_bytes"] = spd_bytes.hex()

        # 3) Convert accumulated SPD bytes to float values
        self._parse_spd_data(result, spd_bytes)

        # 5) Fire callbacks
        self._trigger_callbacks(result)

        return result


    # -----------------
    # Measurement - Chunk Reading
    # -----------------
    async def _read_all_chunks(self, result: Dict, timeout: float) -> bytearray:
        """
        Read all measurement data chunks (0x10, 0x11, 0x12, 0x13).
        Accumulates SPD bytes and populates result dict.
        
        Args:
            result: Result dict to populate
            timeout: Timeout per chunk
            
        Returns:
            bytearray containing all SPD bytes
        """
        spd_bytes = bytearray()
        chunk_subcmds = [0x10, 0x11, 0x12, 0x13]

        for subcmd in chunk_subcmds:
            packet = await self.send_recv(0xBB, 0x01, subcmd, 0x00, timeout=timeout)
            if not packet:
                print(f"Warning: No response for chunk 0x{subcmd:02X}")
                break

            payload = self._extract_payload_bytes(packet)
            result["raw"][f"0x{subcmd:02X}"] = payload.hex()
            
            # Parse based on chunk type
            if subcmd == 0x10:
                chunk_info = self._parse_chunk_0x10(payload, spd_bytes)
            elif subcmd in (0x11, 0x12):
                chunk_info = self._parse_chunk_spd(payload, subcmd, spd_bytes)
            elif subcmd == 0x13:
                chunk_info = self._parse_chunk_0x13(payload)
            else:
                chunk_info = {"subcmd": subcmd, "payload": payload.hex()}
            
            # Store parsed chunk info and update result
            result["chunks"].append(chunk_info)
            
            # Update main result fields
            if "xyz" in chunk_info:
                result["xyz"] = chunk_info["xyz"]
            if "xyz_copy" in chunk_info:
                result["xyz_copy"] = chunk_info["xyz_copy"]

        return spd_bytes

    # -----------------
    # Measurement - Individual Chunk Parsers
    # -----------------
    def _parse_chunk_0x10(self, payload: bytes, spd_bytes: bytearray) -> Dict:
        """
        Parse chunk 0x10: Contains XYZ and first SPD bytes.
        
        Structure:
            Bytes 0-1: Header/unknown
            Bytes 2-50: First SPD data 
        
        Args:
            payload: 52-byte payload
            spd_bytes: Accumulator for SPD data (modified in place)
            
        Returns:
            Dict with parsed chunk info
        """
        spd = payload[2:50]
        spd_bytes.extend(spd)
        floats = list(struct.unpack(f"<{len(spd)//4}f", spd))
        
        return {
            "subcmd": 0x10,
            "payload": payload.hex(),
            "spd_bytes_start": 2,
            "spd_bytes_count": len(spd),
            "spd_raw": spd.hex(),
            "spd": floats,
        }

    def _parse_chunk_spd(self, payload: bytes, subcmd: int, spd_bytes: bytearray) -> Dict:
        """
        Parse SPD-only chunks (0x11, 0x12): Pure spectral data.
        
        Args:
            payload: 52-byte payload
            subcmd: Chunk subcmd (0x11 or 0x12)
            spd_bytes: Accumulator for SPD data (modified in place)
            
        Returns:
            Dict with parsed chunk info
        """
        N = 50 if subcmd != 19 else 30
        spd = payload[2:N]
        spd_bytes.extend(spd)
        floats = list(struct.unpack(f"<{len(spd)//4}f", spd))
        
        return {
            "subcmd": subcmd,
            "payload": payload.hex(),
            "spd_bytes_count": len(spd),
            "spd_raw": spd.hex(),
            "spd": floats,
        }

    def _parse_chunk_0x13(self, payload: bytes) -> Dict:
        """
        Parse chunk 0x13: Contains XYZ copy near the end.
        
        Structure:
            Bytes 0-29: Zeros/padding
            Bytes 30-41: XYZ copy (3 × float32)
            Bytes 42-48: Padding

        Args:
            payload: 52-byte payload
            
        Returns:
            Dict with parsed chunk info
        """
        chunk_info = {
            "subcmd": 0x13,
            "payload": payload.hex(),
        }
        
        try:
            xyz_copy = struct.unpack_from("<fff", payload, 30)
            chunk_info["xyz"] = xyz_copy
            chunk_info["xyz_offset"] = 30
        except Exception as e:
            print(f"Warning: Could not parse XYZ copy: {e}")
            chunk_info["xyz"] = None
        
        return chunk_info

    # -----------------
    # Measurement - SPD Processing
    # -----------------
    def _parse_spd_data(self, result: Dict, spd_bytes: bytearray):
        """
        Convert accumulated SPD bytes to float values.
        
        Total bytes: 35 (chunk 0x10) + 52 (0x11) + 52 (0x12) = 139 bytes
        We need: 31 floats × 4 bytes = 124 bytes
        
        Args:
            result: Result dict to update (modified in place)
            spd_bytes: Raw SPD bytes
        """
        if len(spd_bytes) < 124:
            print(f"Warning: Insufficient SPD data ({len(spd_bytes)} bytes, need 124)")
            return
        
        try:
            spd = list(struct.unpack("<31f", bytes(spd_bytes[:124])))
            result["spd"] = spd
            
            # Create wavelength->reflectance mapping
            result["spd_data"] = dict(zip(self.wavelengths, spd))
            
        except Exception as e:
            print(f"Error parsing SPD data: {e}")

    # -----------------
    # Measurement - Callbacks
    # -----------------
    def _trigger_callbacks(self, result: Dict):
        """
        Fire all registered callbacks with measurement result.
        
        Args:
            result: Complete measurement result
        """
        for cb in self._raw_measurement_callbacks:
            try: 
                cb(result)
            except Exception as e:
                print(f"Raw callback error: {e}")
                
        for cb in self._response_callbacks:
            try: 
                cb(result)
            except Exception as e:
                print(f"Response callback error: {e}")

proto = Protocol(port="COM3", baudrate=115200, verbose=False)

try:
    print("Connecting...")
    await proto.connect()

    print("Triggering measurement...")
    proto.verbose=True
    result = await proto.measure()

    # Print a short summary
    # print("\n--- MEASUREMENT RESULT ---")
    # print("Header bytes:", result["header"]["payload"])
    # print(f"Received {len(result['chunks'])} chunks.")
    # for i, chunk in enumerate(result['chunks']):
    #     print(f"#{i} chunk {chunk}")

    print("SPD:", result["spd"])
finally:
    await proto.disconnect()

Connecting...
Connected to COM3 at 115200 baud
Handshake complete!
Triggering measurement...
Send: BB 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 FF BB...
+R: BB 01 09 00 28 1F 0A 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 FF 15...
Send: BB 01 10 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 FF CB...
+R: BB 01 10 00 00 00 3A 44 8A 42 0E 3C 93 42 7E 1A 99 42 79 56 9B 42 E5 DC 9A 42 68 2B 9A 42 25 C3 9A 42 D0 6B 9B 42 3A 81 9C 42 7D 58 9D 42 48 F9 9C 42 42 20 9C 42 00 00 00 00 FF E7...
Send: BB 01 11 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00

In [47]:
# 24 patches
"""
Name	Date	Test mode	Light source/angle	L*	a*	b*	dL*	da*	db*	dE*ab	P/F dE*ab
sample_1_2025-10-22	10/22/2025 00:21:00	M0	D65/10°	94.42	-2.09	0.25	22.80	34.22	-4.97	41.41	NG
sample_2_2025-10-22	10/22/2025 00:21:03	M0	D65/10°	81.90	-2.40	4.89	10.28	33.90	-0.32	35.43	NG
sample_3_2025-10-22	10/22/2025 00:21:05	M0	D65/10°	67.33	-1.97	2.98	-4.30	34.33	-2.23	34.67	NG
sample_4_2025-10-22	10/22/2025 00:21:07	M0	D65/10°	49.40	0.95	0.06	-22.22	37.25	-5.15	43.68	NG
sample_5_2025-10-22	10/22/2025 00:21:10	M0	D65/10°	31.59	-1.09	-0.33	-40.04	35.21	-5.54	53.60	NG
sample_6_2025-10-22	10/22/2025 00:21:13	M0	D65/10°	14.27	-6.37	-3.91	-57.35	29.93	-9.12	65.33	NG
sample_7_2025-10-22	10/22/2025 00:21:17	M0	D65/10°	53.42	-29.85	-22.41	-18.20	6.45	-27.62	33.71	NG
sample_8_2025-10-22	10/22/2025 00:21:19	M0	D65/10°	50.14	47.94	-10.16	-21.48	84.24	-15.37	88.28	NG
sample_9_2025-10-22	10/22/2025 00:21:21	M0	D65/10°	81.61	0.16	82.98	9.98	36.46	77.77	86.47	NG
sample_10_2025-10-22	10/22/2025 00:21:22	M0	D65/10°	35.96	55.93	43.84	-35.66	92.23	38.63	106.16	NG
sample_11_2025-10-22	10/22/2025 00:21:24	M0	D65/10°	55.39	-38.39	42.97	-16.23	-2.09	37.76	41.16	NG
sample_12_2025-10-22	10/22/2025 00:21:26	M0	D65/10°	19.83	22.19	-51.21	-51.79	58.49	-56.42	96.37	NG
sample_13_2025-10-22	10/22/2025 00:21:28	M0	D65/10°	57.68	33.39	60.38	-13.94	69.69	55.17	89.97	NG
sample_14_2025-10-22	10/22/2025 00:21:30	M0	D65/10°	39.24	1.98	-41.32	-32.39	38.28	-46.53	68.41	NG
sample_15_2025-10-22	10/22/2025 00:21:32	M0	D65/10°	46.06	47.84	14.77	-25.56	84.14	9.56	88.45	NG
sample_16_2025-10-22	10/22/2025 00:21:34	M0	D65/10°	23.34	29.00	-33.45	-48.28	65.31	-38.67	89.95	NG
sample_17_2025-10-22	10/22/2025 00:21:36	M0	D65/10°	69.09	-19.59	64.38	-2.53	16.71	59.17	61.53	NG
sample_18_2025-10-22	10/22/2025 00:21:38	M0	D65/10°	66.70	21.91	68.21	-4.93	58.21	63.00	85.92	NG
sample_19_2025-10-22	10/22/2025 00:21:41	M0	D65/10°	71.68	-36.32	5.18	0.06	-0.02	-0.03	0.07	Pass
sample_20_2025-10-22	10/22/2025 00:21:43	M0	D65/10°	54.97	5.20	-26.89	-16.65	41.50	-32.10	55.05	NG
sample_21_2025-10-22	10/22/2025 00:21:45	M0	D65/10°	42.30	-19.28	28.84	-29.32	17.02	23.63	41.33	NG
sample_22_2025-10-22	10/22/2025 00:21:47	M0	D65/10°	52.44	-4.19	-23.26	-19.19	32.11	-28.47	47.01	NG
sample_23_2025-10-22	10/22/2025 00:21:48	M0	D65/10°	64.64	11.58	18.60	-6.98	47.88	13.39	50.21	NG
sample_24_2025-10-22	10/22/2025 00:21:52	M0	D65/10°	30.13	15.92	20.53	-41.49	52.22	15.32	68.43	NG
"""
LAB_patches = [[float(j) for j in i.split(" ")] for i in """
94.42 -2.09 0.25
81.90 -2.40 4.89
67.33 -1.97 2.98
49.40 0.95 0.06
31.59 -1.09 -0.33
14.27 -6.37 -3.91
53.42 -29.85 -22.41
50.14 47.94 -10.16
81.61 0.16 82.98
35.96 55.93 43.84
55.39 -38.39 42.97
19.83 22.19 -51.21
57.68 33.39 60.38
39.24 1.98 -41.32
46.06 47.84 14.77
23.34 29.00 -33.45
69.09 -19.59 64.38
66.70 21.91 68.21
71.68 -36.32 5.18
54.97 5.20 -26.89
42.30 -19.28 28.84
52.44 -4.19 -23.26
64.64 11.58 18.60
30.13 15.92 20.53
""".splitlines() if i]
# extra
# fluorescent ink
# bright yellow
# blue
# red
# gray metalic
# some form of red
# very light fluorescent
# white 
"""
sample_25_2025-10-22	10/22/2025 00:22:05	M0	D65/10°	77.77	68.59	39.40	6.14	104.89	34.19	110.50	NG
sample_26_2025-10-22	10/22/2025 00:22:47	M0	D65/10°	94.77	-21.06	75.27	23.14	15.25	70.06	75.34	NG
sample_27_2025-10-22	10/22/2025 00:22:51	M0	D65/10°	30.57	-6.09	-49.07	-41.05	30.21	-54.28	74.46	NG
sample_28_2025-10-22	10/22/2025 00:22:56	M0	D65/10°	32.36	63.98	39.61	-39.26	100.28	34.40	113.06	NG
sample_29_2025-10-22	10/22/2025 00:23:04	M0	D65/10°	57.66	-0.14	4.46	-13.96	36.16	-0.75	38.77	NG
sample_30_2025-10-22	10/22/2025 00:23:08	M0	D65/10°	41.82	51.92	-13.39	-29.80	88.22	-18.60	94.96	NG
sample_31_2025-10-22	10/22/2025 00:23:14	M0	D65/10°	89.85	16.20	2.16	18.23	52.50	-3.05	55.66	NG
sample_32_2025-10-22	10/22/2025 00:23:21	M0	D65/10°	90.52	-0.97	0.06	18.90	35.33	-5.15	40.39	NG
"""
LAB_extra = [[float(j) for j in i.split(" ")] for i in """
77.77 68.59 39.40
94.77 -21.06 75.27
30.57 -6.09 -49.07
32.36 63.98 39.61
57.66 -0.14 4.46
41.82 51.92 -13.39
89.85 16.20 2.16
90.52 -0.97 0.06
""".splitlines() if i]

In [49]:
proto = Protocol(port="COM3", baudrate=115200, verbose=False)

try:
    print("Connecting...")
    await proto.connect()

    truth = LAB_patches
    got_patches = []

    for i, colour in enumerate(truth):
      print(f"{i}/{len(truth)} Triggering measurement. LAB: {",".join(str(i) for i in colour)}")
      result = await proto.wait_measurement()

      # Print a short summary
      print("LAB:", colour)
      print("SPD:", result["spd"])
      got_patches.append({
        "LAB": colour,
        "SPD": result["spd"],
      })
finally:
    await proto.disconnect()
    print(got_patches)

Connecting...
Connected to COM3 at 115200 baud
Handshake complete!
0/24 Triggering measurement. LAB: 94.42,-2.09,0.25
LAB: [94.42, -2.09, 0.25]
SPD: [55.28138732910156, 63.344505310058594, 69.52454376220703, 75.96088409423828, 84.39344787597656, 91.21277618408203, 94.03691101074219, 93.96580505371094, 92.2179183959961, 89.6288833618164, 87.10913848876953, 86.24519348144531, 86.83333587646484, 88.19606018066406, 88.75723266601562, 87.40884399414062, 85.43611145019531, 84.26840209960938, 84.56645202636719, 86.09268951416016, 86.34684753417969, 85.4700927734375, 85.07050323486328, 84.85047149658203, 84.81207275390625, 85.62571716308594, 86.11259460449219, 86.30834197998047, 85.8306655883789, 85.49370574951172, 85.34911346435547]
1/24 Triggering measurement. LAB: 81.9,-2.4,4.89
LAB: [81.9, -2.4, 4.89]
SPD: [33.698726654052734, 42.011390686035156, 46.31901931762695, 48.819271087646484, 51.30421447753906, 54.405887603759766, 58.18791198730469, 62.568641662597656, 67.04041290283203, 70.197242

In [None]:
# =====================
# 3. CR30Colorimetry Class
# =====================
class CR30Colorimetry(Protocol):
    """Handles spectrum -> XYZ/LAB conversion, adaptation, and color calculations."""
    
    def __init__(self, science : ColorScience = ColorScienceImpl(load=True), whitepoints=WhitePoint(), *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._cs : ColorScience = science
        self._wp : WhitePoint = whitepoints
        self._measurement_callbacks: List[Callable[[Dict], None]] = []
        self._wavelengths = []
    
    # -----------------
    # Callbacks
    # -----------------
    def register_measurement_callback(self, callback: Callable[[Dict], None]):
        self._measurement_callbacks.append(callback)
    
    @property
    def science(self) -> ColorScience: return self._cs
    @property
    def whitepoint(self) -> WhitePoint: return self._wp

    async def connect(self, upsample=True):
        await super().connect()
        # print("Name: ", dev.name)
        # print("Model: ", dev.model)
        # print("Serial No: ", dev.serial_number)
        # print("Version: ", dev.fw_version)
        # print("Date: ", dev.fw_build)
        #
        # Name:  SD6870B667
        # Model:  CR30
        # Serial No:  M443L0787 - V11.3.
        # Version:  V10.0
        # Date:  0.0.20231219

        if self.model == "CR30":
            self._wavelengths = [400+i*(700-400)/30.0 for i in range(31)]
        if not upsample:
            self._cs = self._cs.__class__(wavelengths=self._wavelengths)

    async def measure(self, space='XYZ', illuminant='D65/10', wait=0, timeout_per_step=0.5) -> Tuple:
        space = space.upper()
        illuminant = illuminant.upper()
        result = await super().measure(timeout_per_step=timeout_per_step) if wait == 0 else await self.wait_measurement(timeout=wait, timeout_per_step=timeout_per_step)
        xyz = [float(i) for i in self._cs.spectrum_to_xyz(result["spd"], wavelengths=self._wavelengths, illuminant=illuminant)]
        return self._decide(xyz, space, illuminant)

    async def measure_avg(self, space='XYZ', illuminant='D65/10', count=3, delay=.5, wait=0, timeout_per_step=0.5) -> Tuple:
        space = space.upper()
        illuminant = illuminant.upper()
        if count <1:
            raise Exception("count must be positive")
        results = []
        result = await super().measure(timeout_per_step=timeout_per_step) if wait == 0 else await self.wait_measurement(timeout=wait, timeout_per_step=timeout_per_step)
        results.append(result)
        for i in range(count-1):
            asyncio.sleep(delay)
            result.append(await super().measure(timeout_per_step=timeout_per_step))

        spd = [0] * len(results[0]["spd"])
        for i in results:
            spd = [a+b for a, b in zip(spd, i["spd"])]
        spd = [i / count for i in spd]

        xyz = [float(i) for i in self._cs.spectrum_to_xyz(spd, wavelengths=self._wavelengths, illuminant=illuminant)]
        return self._decide(xyz, space, illuminant)


    def _decide(self, xyz, space, illuminant) -> Tuple:
        if space == 'XYZ':
            return xyz

        whitepoint = self._wp[illuminant]

        if space == 'LAB':
            return self._cs.xyz_to_lab(*xyz, illuminant=whitepoint)

        if space == 'RGB':
            return self._cs.xyz_to_rgb(*xyz)

        raise Exception("unknown color space " + space)


c = CR30Colorimetry(port="COM3", baudrate=9600, verbose=False)
try:
    await c.connect(upsample=False)

    xyz = await c.measure(space="xyz", wait=10)
    print("XYZ", xyz)
    await asyncio.sleep(1)
    print("LAB", await c.measure(space="lab"), " w.r.t ", c.science.xyz_to_lab(*xyz))
    await asyncio.sleep(1)
    print("RGB", await c.measure(space="rgb"), " w.r.t ", c.science.xyz_to_rgb(*xyz))
    await asyncio.sleep(1)
finally:
    await c.disconnect()

Connected to COM3 at 9600 baud
Handshake complete!
XYZ [72.94174792994345, 77.42719160976208, 83.01150240001532]
LAB (90.51787445654864, -0.975618479705509, 0.061432779097714274)  w.r.t  (90.51787445654864, -0.975618479705509, 0.061432779097714274)
RGB [255 255 255]  w.r.t  [255 255 255]
Disconnected
