In [None]:
import math

import serial
import msgpack
import tqdm
import pandas as pd
import numpy as np
import seaborn as sns
import scipy.interpolate as interp
import scipy.signal as signal
import matplotlib.pyplot as plt

# Enable "widget" mode for matplotlib
%matplotlib widget

In [None]:
# Configure Serial port with 100 ms timeout
ser = serial.Serial('COM3', timeout=0.1)

# Configure msgpack unpacker for data stream decode
unpacker = msgpack.Unpacker(raw=False)
data_list = []

###########################################################################################################
# This loop will never end, interrupt the kernel in Jupyter lab when you have taken enough data to continue
###########################################################################################################
while True:
    # Read some bytes in
    buf = ser.read(256)

    if not buf:
        # Try again later
        continue

    # Feed data to deserialization
    unpacker.feed(buf)

    try:
        # Process new objects
        for obj in unpacker:
            if not isinstance(obj, dict):
                # We only want the dicts!
                print(f'rejecting: {obj}')
                break

            if 'debug' in obj:
                # Debug message, display it but don't save it
                print(f'DEBUG: {obj}')
            else:
                # Save data
                data_list.append(obj)
    except (msgpack.ExtraData, msgpack.OutOfData, msgpack.FormatError, msgpack.StackError, UnicodeDecodeError) as ex:
        # These should all be (maybe?) ok?
        print(f'Something went wrong, but it is probably fine: {ex}')
        continue

In [None]:
len(data_list)

In [None]:
# Make dataframe
df = pd.DataFrame(data_list)
df['t'] -= df.t.iloc[0]  # Remove bias, create elapsed time
fs = 1.0/(df["t"].diff().mean())  # Average ODR

# Make sure the timing looks reasonable
df['t'].diff().plot()

# Calculate resistance and power from voltage and current
df['r'] = df['v'] / df['i']
df['p'] = df['v'] * df['i']

In [None]:
# Some extra info
print(f'fs: {fs} hz\n')
display(df['t'].diff().describe())

In [None]:
# Shift t=0 to the start of the puff (first time when power > half the max power in the data)
df.t -= df[df.p.gt(df.p.max() / 2.0)].t.iloc[0]
df['t_quant'] = df.t.round(1)  # Quantized time to 10 Hz

In [None]:
# Materials reference tables
MATERIAL_REF = {
    'tcr': {
#         'ss316l': 879*10**-6  # delta_r/r per degree Celsius  # From Steam Engine calculator
        'ss316l': 920*10**-6,  # delta_r/r per degree Celsius
    },
    'tfr': {
        # list(tuple()) with form [(t1, rr1), (t2, rr2), ...] with:
        #      t = temperature in Celsius
        #     rr = resistance ratio of measured_resistance / reference_resistance
        'ss316l': [  # From Arctic Fox/Steam Engine
            (-40, 0.93474),
            ( 20, 1.00000),
            ( 50, 1.03000),
            (100, 1.08000),
            (150, 1.12600),
            (200, 1.16800),
            (250, 1.20700),
            (300, 1.24600),
            (425, 1.33700),
        ],
    },
}

ESCC_COLD_RES_TCR_REF = {
    'ecss_v1': [
        # list(tuple()) with form [(tcr1, r1), (tcr2, r2), ...] with:
        #     tcr = Temperature Coefficent of Resistance * 10^5 (unitless)
        #       r = Cold resistance in Ohms
        # Data from: https://www.reddit.com/r/AdvancedVapeSupply/comments/iszh2f/escc_tcr_chart_and_update_about_the_escc/
        (200, 0.40),
        (185, 0.45),
        (170, 0.50),
        (155, 0.55),
        (145, 0.60),
        (138, 0.65),
        (130, 0.70),
        (120, 0.75),
        (110, 0.80),
        (100, 0.85),
        ( 85, 0.90),
        ( 70, 0.95),
        ( 50, 1.00),
        ( 25, 1.05),
        (  5, 1.10),
    ],
}

In [None]:
def resratio_to_temp(t_cold=None, material='ss316l', method='tfr'):
    """ Converts resistance ratio into temperature for a material with a known TCR or TFR.

    Args:
        t_cold (float): The cold coil reference temperature, in Celsius. Only used for TCR and is 20 C if None. Optional.
        material (float, list, or str): Material TCR float in delta_r/r per degree Celsius,
                                        material TFG list of tuples of temp in Celsius and resistance ratio,
                                        or str of material for TCR and TFR to look up.
        method (str): Method to use for conversion, must be one of ['tcr', 'tfr']. Optional.

    Returns:
        Function object:
            Args:
                rr (array-like): Resistance ratio(s) to convert to a temperature
            Returns:
                np.ndarray to temperatures in Celsius
    """
    if method not in ['tcr', 'tfr']:
        raise NotImplementedError(f'Unimplemented method: {method}')

    if (t_cold is not None) and (method != 'tcr'):
        raise ValueError(f't_cold is only useable for tcr method')

    if isinstance(material, str):
        # Look up material properties from string
        material = MATERIAL_REF[method][material]

    if method == 'tcr':
        # Temperature Coefficient of Resistance method
        t_cold = 20.0 if t_cold is None else t_cold  # Default t_cold is 20 C
        temp_func = lambda rr, t_cold=t_cold, material=material: np.asarray(t_cold + ((np.asarray(rr) - 1.0) / material))
    elif method == 'tfr':
        # Temperature Function of Resistance method, cubic spline interpolation
        temp, rr = tuple(zip(*material))
        temp_func = interp.CubicSpline(rr, temp, bc_type='natural')
    else:
        raise RuntimeError('The matrix is glitching again')

    return temp_func

def coldres_to_tcr(material='escc_v1'):
    """ Converts cold resistance into temperature for a material with a known TCR or TFR.

    Args:
        material (float, list, or str): Material TCR list of tuples of TCR (unitless) and cold resistance in Ohms,
                                        or str of material name to look up.

    Returns:
        Function object:
            Args:
                r_cold (array-like): Reference cold resistance to convert to a TCR
            Returns:
                np.ndarray to TCR (unitless)
    """
    if isinstance(material, str):
        # Look up material properties from string
        material = MATERIAL_REF[method][material]

    if method == 'tcr':
        # Temperature Coefficient of Resistance method
        t_cold = 20.0 if t_cold is None else t_cold  # Default t_cold is 20 C
        temp_func = lambda rr, t_cold=t_cold, material=material: np.asarray(t_cold + ((np.asarray(rr) - 1.0) / material))
    elif method == 'tfr':
        # Temperature Function of Resistance method, cubic spline interpolation
        temp, rr = tuple(zip(*material))
        temp_func = interp.CubicSpline(rr, temp, bc_type='natural')
    else:
        raise RuntimeError('The matrix is glitching again')

    return temp_func

In [None]:
r_cold = 0.390  # Cold coil resistance in Ohms, TODO: Automate this

# Calculate temp of heating element, clip outout values to "realistic" temps
t_func = resratio_to_temp(material='ss316l', method='tfr')
df['temp'] = t_func(df.r / r_cold)
df['temp'] = df.temp.fillna(0.0).clip(0.0, 300.0)

In [None]:
df.plot(y='r', x='t', xlim=(0.0, 10.0), ylim=(0.0,2.0))

In [None]:
df.plot(y=['v', 'i', 'p'], x='t', xlim=(0.0, 10.0))

In [None]:
# plt.figure(figsize=(32,18))
# pp = sns.lineplot(data=df, x='t', y='temp')
# pp.set(xlim=(0,10))
# # pp.set(ylim=(0.0, 50.0))
df.plot(y='temp', x='t', xlim=(0.0, 10.0))

In [None]:
pp = sns.relplot(data=df, x='t_quant', y='temp', kind='line', height=16)
pp.set(xlabel='Puff Time (s)', ylabel='Temperature (C)', title=f'Temperature mean and CI.')
plt.grid(b=True)
# pp.set(ylim=(0.4, 0.48))
pp.set(xlim=(-2,10))
plt.show()

In [None]:
desired_fs = 10.0
decimation_ratio = math.floor(fs / desired_fs)

actual_fs = fs / decimation_ratio

print(f'Decimation ratio: {decimation_ratio}')
print(f'Actual fs: {actual_fs} Hz')

In [None]:
decimated_df = df[df.t.between(-0.0, 9.9)]
# decimated_df = df
decimated_df = decimated_df.apply(lambda x: signal.decimate(x, decimation_ratio, ftype='fir'), axis='index')#, raw=True)
decimated_df = decimated_df.drop('t_quant', axis='columns')
decimated_df = decimated_df.assign(t=np.arange(len(decimated_df))/actual_fs)

decimated_df['r_post'] = decimated_df.v / decimated_df.i
decimated_df

In [None]:
plt.figure(figsize=(32,18))
pp = sns.lineplot(data=decimated_df, x='t', y='temp')#, kind='line', height=16)
pp.set(xlabel='Time (s)', ylabel='Temperature (C)', title=f'Decimated Temperature.')
plt.grid(b=True)
# pp.set(ylim=(195, 205))
# pp.set(xlim=(0.0, 10.0))
plt.show()

In [None]:
# Find values where power < 1.0 Watts
valid_mask = df.p.gt(1.0)
for i in range(1, len(valid_mask) - 1):
    if not valid_mask[i]:
        # Also remove the last value in every series of values over 1.0 Watts since it is noisy
        # TODO: Can this be improved?
        valid_mask[i-1] = False

# Valid data based on mask
df_valid = df[valid_mask]

In [None]:
df_valid.plot(y=['i', 'v', 'r'], x='t', xlim=(0.0, 10.0), style='.-')

In [None]:
df_valid.plot(x='t', y='temp', style='.-')

In [None]:
pp = sns.relplot(data=df_valid, x='t_quant', y='temp', kind='line', height=16)
pp.set(xlabel='Puff Time (s)', ylabel='Temperature (C)', title=f'Temperature mean and CI.')
plt.grid(b=True)
# pp.set(ylim=(210, 225))
# pp.set(xlim=(0,10))
plt.show()

In [None]:
pp = sns.relplot(data=df_valid, x='t_quant', y='temp', kind='line', height=16)
pp.set(xlabel='Puff Time (s)', ylabel='Temperature (C)', title=f'Temperature mean and CI.')
plt.grid(b=True)
pp.set(ylim=(220, 240))
pp.set(xlim=(-2,15))
plt.show()

In [None]:
# save df to a csv file
df.to_csv('m22_octo_gg_p600i20.csv')

In [None]:
# read df from a csv file
df = pd.read_csv('m22_octo_gg_p300i20.csv')

In [None]:
# Comparing GG to AF data
df_af = pd.read_csv('m22_af.csv')
df_af.Time -= df_af[df_af.Power.gt(df_af.Power.max() / 2.0)].Time.iloc[0]
df_af

In [None]:
df_af['temp_c'] = 5.0 / 9.0 *(df_af.Temperature - 32.0)

plt.figure(figsize=(32,18))
pp = sns.lineplot(data=decimated_df, x='t', y='temp')#, kind='line', height=16)
sns.lineplot(data=df_af, x='Time', y='temp_c', ax=pp)#, kind='line', height=16)
pp.set(xlabel='Time (s)', ylabel='Temperature (C)', title=f'AF temp and decimated GG temp, M22 Octo-coil.')
plt.grid(b=True)
# pp.set(ylim=(0.46, 0.48))
pp.set(xlim=(0,10))
plt.show()

In [None]:
df_valid['settings'] = 'p600_i20'
df_valid4 = df_valid

In [None]:
df_valid['settings'] = 'no_pi'
df_valid2 = df_valid

In [None]:
df_valid['settings'] = 'p300_i20'
df_valid3 = df_valid

In [None]:
df_valid['settings'] = 'p300_i10'
df_valid1 = df_valid

In [None]:
df_valid_all = pd.concat([df_valid1, df_valid2, df_valid3, df_valid4])

In [None]:
pp = sns.relplot(data=df_valid_all, x='t_quant', y='temp', kind='line', hue='settings', height=12)
pp.set(xlabel='Puff Time (s)', ylabel='Temperature (C)', title=f'Temperature mean and CI.')
plt.grid(b=True)
pp.set(ylim=(200, 250))
pp.set(xlim=(0,10))
plt.show()

In [None]:
pp = sns.relplot(data=df_valid_all, x='t_quant', y='p', kind='line', hue='settings', height=12)
pp.set(xlabel='Puff Time (s)', ylabel='Temperature (C)', title=f'Temperature mean and CI.')
plt.grid(b=True)
# pp.set(ylim=(200, 250))
pp.set(xlim=(-2,15))
plt.show()

In [None]:
pp = sns.relplot(data=df_valid_all, x='t_quant', y='temp', kind='line', hue='settings', height=12)
pp.set(xlabel='Puff Time (s)', ylabel='Temperature (C)', title=f'Temperature mean and CI.')
plt.grid(b=True)
# pp.set(ylim=(200, 250))
pp.set(xlim=(-2,15))
plt.show()

In [None]:
t_func(1.17)

In [None]:
t_func(1.17+0.003)