In [None]:
import pandas
import numpy

import os.path


In [None]:
dataset_path = './data/cnc-mill-toolwear'

In [None]:

def load_experiments(dataset_path):

    metadata_path = os.path.join(dataset_path, 'train.csv')
    df = pandas.read_csv(metadata_path)

    df = df.drop(columns='material') # only "wax", not so useful
    df = df.rename(columns={'No': 'experiment'})
    df = df.set_index('experiment')
    categorical_columns = ['tool_condition', 'machining_finalized', 'passed_visual_inspection']
    for c in categorical_columns:
        df[c] = df[c].astype('category')

    return df

                                 
experiments = load_experiments(dataset_path)
experiments

In [None]:

def remove_bad_data(data):
    """
    From the dataset README.txt -- 
    Note: Some variables will not accurately reflect the operation of the CNC machine.
    This can usually be detected by
    when M1_CURRENT_FEEDRATE reads 50,
    when X1 ActualPosition reads 198,
    or when M1_CURRENT_PROGRAM_NUMBER does not read 0.
    The source of these errors has not been identified.
    """
    pass

def load_timeseries(dataset_path):

    dfs = []
    for ex in range(1, 18+1):
        path = os.path.join(dataset_path, f'experiment_{ex:02d}.csv')
        df = pandas.read_csv(path)
        df['experiment'] = ex
        df['time'] = pandas.to_timedelta(0.1 * numpy.arange(len(df)), unit='s') # 100ms / 10hz samplerate
        dfs.append(df)

    out = pandas.concat(dfs)
    out = out.set_index(['experiment', 'time'])
    return out

data = load_timeseries(dataset_path)
data


In [None]:

# Add experiment info to sensor data, for ease of analysis
enrich = pandas.merge(data.reset_index(),  experiments, left_on='experiment', right_index=True).set_index(['experiment', 'time'])
enrich.head()



In [None]:
power_columns = list(enrich.columns[enrich.columns.str.contains('Power')])
def p99(s):
    return s.quantile(0.99)

power_stats = enrich[power_columns].agg(['min', 'max', 'median', p99])
print(power_stats)

for c in power_columns:
    s = numpy.maximum(enrich[c], 0.0)
    s = s / s.quantile(0.99)
    s = numpy.minimum(s, 1.0)
    enrich[c+'_Scaled'] = s

scaled_power_columns = [ c+'_Scaled' for c in power_columns  ]
power_stats = enrich[scaled_power_columns].agg(['min', 'max', 'median', p99])
power_stats

In [None]:

import math
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def create_square_subplot_grid(traces, x_range, y_range, cols=5, subplot_titles=None):
    """
    Create a grid of subplots with fixed number of columns (default=5),
    and each subplot has square X:Y axis (1:1 scale).
    
    Parameters:
    - traces: List of Plotly traces (one per subplot)
    - cols: Number of columns in the grid (default 5)
    - subplot_titles: Optional list of subplot titles
    
    Returns:
    - Plotly figure with square subplots
    """
    num_traces = len(traces)
    rows = math.ceil(num_traces / cols)

    fig = make_subplots(
        rows=rows,
        cols=cols,
        shared_xaxes=False,
        shared_yaxes=False,
        subplot_titles=subplot_titles if subplot_titles else ["" for _ in range(num_traces)],
        horizontal_spacing=0.05,  # More horizontal space between subplots
        vertical_spacing=0.05,     # Optional: space for subtitle/axis labels
    )

    for idx, trace in enumerate(traces):
        row = idx // cols + 1
        col = idx % cols + 1
        fig.add_trace(trace, row=row, col=col)
        axis_id = "" if idx == 0 else str(idx + 1)

        # Apply fixed range and square aspect
        fig.update_layout({
            f"xaxis{axis_id}": dict(range=x_range),
            f"yaxis{axis_id}": dict(range=y_range, scaleanchor=f"x{axis_id}", scaleratio=1),
        })

    fig.update_layout(
        height=350 * rows,
        width=350 * cols,
        showlegend=False,
        title_text="Grid of Square Subplots"
    )

    return fig
    

import plotly.express

# TODO: maybe support overlay a variable, like spindle power?
def plot_positions(data):

    x_column = 'X1_ActualPosition'
    y_column = 'Y1_ActualPosition'
    
    x_range = data[x_column].min(), data[x_column].max(), 
    y_range = data[y_column].min(), data[x_column].max(), 
    
    traces = []
    titles = []
    for experiment, d in data.groupby('experiment'):    
        trace = go.Scatter(x=d[x_column], y=d[y_column], mode='markers')
    
        ex = experiments.loc[experiment]
        #print(ex)    
        traces.append(trace)
        describe = f"Ex {experiment}: f={ex['feedrate']} c={ex['clamp_pressure']} t={ex['tool_condition']} f={ex['machining_finalized']} p={ex['passed_visual_inspection']}"
        titles.append(describe)

    fig = create_square_subplot_grid(traces, x_range, y_range, cols=6, subplot_titles=titles)
    return fig

fig = plot_positions(data)
fig.show()


## Spindle power vs different conditions

In [None]:
spindleactive = enrich[enrich['S1_OutputPower'] > 0.03]
import seaborn

In [None]:

seaborn.displot(kind='kde', data=spindleactive, x='S1_OutputPower', hue='machining_finalized', clip=(0.1, 0.250), aspect=2.0, height=4.0, common_norm=False)
#(spindleactive.reset_index().sort_values('Machining_Process'), x='S1_OutputPower', color='Machining_Process')


In [None]:

seaborn.displot(kind='kde', data=spindleactive, x='S1_OutputPower', hue='feedrate', clip=(0.1, 0.250), aspect=2.0, height=4.0, common_norm=False)
#(spindleactive.reset_index().sort_values('Machining_Process'), x='S1_OutputPower', color='Machining_Process')


In [None]:
seaborn.displot(kind='kde', data=spindleactive, x='S1_OutputPower', hue='Machining_Process', clip=(0.1, 0.250), aspect=2.0, height=4.0, common_norm=False)


In [None]:
seaborn.displot(kind='kde', data=spindleactive, x='S1_OutputPower', hue='clamp_pressure', clip=(0.1, 0.250), aspect=2.0, height=4.0, common_norm=False)
#(spindleactive.reset_index().sort_values('Machining_Process'), x='S1_OutputPower', color='Machining_Process')


In [None]:
seaborn.displot(kind='kde', data=spindleactive, x='S1_OutputPower', hue='Machining_Process', row='feedrate', clip=(0.1, 0.250), aspect=2.0, height=2.0, common_norm=False)
#(spindleactive.reset_index().sort_values('Machining_Process'), x='S1_OutputPower', color='Machining_Process')


## Time-series view

In [None]:
# TODO: show the different labeled sections in Machining_Process column
# TODO: normalize powers for all axes, and plot together
plot_timeseries(data, y_column='Y1_OutputPower')

In [None]:
for column in scaled_power_columns:
    
    seaborn.displot(data=enrich.reset_index(), kind='kde', x=column, hue='feedrate', height=2.0, aspect=2.0)

In [None]:
spindle_active = enrich[enrich['S1_OutputPower'] > 0.01]
seaborn.pairplot(data=spindle_active.reset_index(), vars=scaled_power_columns, hue='feedrate', height=3.6, aspect=1.5, diag_kws=dict(common_norm=False))

In [None]:
seaborn.pairplot(data=spindle_active.reset_index(), vars=scaled_power_columns, hue='machining_finalized', height=3.6, aspect=1.5, diag_kws=dict(common_norm=False))

In [None]:


def plot_timeseries(data, y, time_column = 'time', row_column='experiment', row_order=None):
    import plotly.graph_objects as go
    
    data = data.reset_index()
    # convert to seconds, Plotly default time markers are bad with Timedelta
    data[time_column] = data[time_column] / pandas.Timedelta('1sec')

    x_range = data[time_column].min(), data[time_column].max()

    if row_order is None:
        row_order = sorted(list(data[row_column].unique()))
    else:
        row_order = list(row_order)
    
    for experiment in row_order:
        df = data[data[row_column] == experiment]
        df = df.sort_values(time_column) # plotly lines connect badly without sorting by time
        
        ex = experiments.loc[experiment]
        describe = f"Ex {experiment}: f={ex['feedrate']} c={ex['clamp_pressure']} t={ex['tool_condition']} f={ex['machining_finalized']} p={ex['passed_visual_inspection']}"

        fig = go.Figure()
        fig.update_layout(title=describe, xaxis=dict(range=x_range))
        for column in y:
            fig.add_trace(go.Scatter(x=df[time_column], y=df[column], name=column))
        
        fig.show()

plot_timeseries(enrich.sort_values(['feedrate']), y=scaled_power_columns)

In [None]:
exx = experiments.sort_values(['feedrate', 'clamp_pressure', 'tool_condition'])
exx

In [None]:

plot_timeseries(data.reset_index(), y=['Y1_ActualPosition', 'X1_ActualPosition'], row_order=exx.index)


In [None]:
plot_timeseries(enrich.sort_values(['feedrate']), y=['M1_CURRENT_FEEDRATE', 'S1_CurrentFeedback'])

In [None]:
plot_timeseries(enrich.sort_values(['feedrate']), y=['M1_CURRENT_FEEDRATE', 'S1_OutputPower_Scaled'])


In [None]:
p = enrich.sort_values(['feedrate'])
p['S1_CommandVelocity_Scaled'] = p['S1_CommandVelocity'] / 50.0
p['S1_Power_Calc'] = p['S1_OutputCurrent'] * p['S1_OutputVoltage']
#plot_timeseries(p, y=['S1_CommandVelocity_Scaled', 'S1_OutputPower_Scaled'])
plot_timeseries(p, y=['S1_OutputCurrent', 'S1_OutputPower'])

In [None]:
scaled_power_columns

In [None]:
data['S1_CommandVelocity'].hist()

In [None]:
for c in sorted(data.columns):
    print(c)

In [None]:
data.Machining_Process.value_counts(dropna=False)

In [None]:
data.M1_CURRENT_FEEDRATE.value_counts(dropna=False)

In [None]:
data.M1_CURRENT_PROGRAM_NUMBER.value_counts(dropna=False)