In [None]:
import os, re, json, math, random
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
import tensorflow as tf

# Constants
CSV_PATH      = "classified_data_CNN4_Final.csv"
LABEL_COL     = "label"

WINDOW        = 128
HOP           = 1
NUM_SEGMENTS  = 8
STATS_LIST    = ("mean","std","p2p","energy")

PCA_VAR_KEEP  = 0.80

BATCH_SIZE    = 64
EPOCHS        = 40
DROPOUT_RATE  = 0.5
LR            = 2e-3

# Class-0 injection to enforce a strong "none" prior
SYNTH_ZERO_TARGET_FRAC = 0.7
SYNTH_ZERO_METHOD      = "shuffle"  # or "gaussian"
GAUSS_ATTENUATION      = 0.5
RNG_SEED               = 1337

ARTIFACT_DIR  = "artifacts_CNN4_Final"
Path(ARTIFACT_DIR).mkdir(exist_ok=True)

HLS_PARAMS_DIR  = ARTIFACT_DIR + "/HLS_PARAMS"
Path(HLS_PARAMS_DIR).mkdir(exist_ok=True)

np.random.seed(42)
tf.random.set_seed(42)

In [None]:
# Load CSV and create rolling windows
df = pd.read_csv(CSV_PATH)
feature_cols = [c for c in df.columns if c not in ("t", LABEL_COL)]
assert len(feature_cols) == 30, f"Expected 30 raw channels, got {len(feature_cols)}"
X_raw = df[feature_cols].values.astype(np.float32)
y_raw = df[LABEL_COL].astype(int).values

def make_windows(X, y, win=WINDOW, hop=HOP):
    Xw, yw = [], []
    for end in range(win, len(X)+1, hop):
        st = end - win
        Xw.append(X[st:end])
        yw.append(y[end-1])
    return np.array(Xw, np.float32), np.array(yw, int)

Xw, yw = make_windows(X_raw, y_raw)
print("Windows:", Xw.shape, "Labels:", yw.shape)

Windows: (47809, 128, 30) Labels: (47809,)


In [None]:
# Inject synthetic class-0 windows
np.random.seed(RNG_SEED)

def _count_frac_zero(y):
    n0 = int((y == 0).sum())
    return n0, n0/len(y)

def _need_zeros(n_total, n0_cur, target_frac):
    t = min(target_frac, 0.999)
    return max(0, int(math.floor((t*n_total - n0_cur)/max(1e-9, 1.0 - t))))

n0_before, frac0_before = _count_frac_zero(yw)
n_add = _need_zeros(len(yw), n0_before, SYNTH_ZERO_TARGET_FRAC)

def synth_gauss(n, W, C, atten=0.5):
    if n <= 0: return np.empty((0,W,C), np.float32)
    mu = X_raw.mean(axis=0).astype(np.float32)
    sd = (X_raw.std(axis=0)+1e-6).astype(np.float32)
    r = np.random.randn(n,W,C).astype(np.float32)*sd + mu
    return mu + atten*(r - mu)

def synth_shuffle(n, W, C):
    if n <= 0: return np.empty((0,W,C), np.float32)
    out = np.empty((n,W,C), np.float32)
    starts = np.random.randint(0, len(X_raw)-W, size=n)
    for i,s in enumerate(starts):
        w = X_raw[s:s+W].copy()
        perm = np.random.permutation(W)
        out[i] = w[perm]
    return out

Xw_s0 = synth_gauss(n_add, WINDOW, X_raw.shape[1], GAUSS_ATTENUATION) if SYNTH_ZERO_METHOD=="gaussian" else synth_shuffle(n_add, WINDOW, X_raw.shape[1])
yw_s0 = np.zeros((Xw_s0.shape[0],), int)

if Xw_s0.shape[0] > 0:
    Xw = np.concatenate([Xw, Xw_s0], axis=0).astype(np.float32)
    yw = np.concatenate([yw, yw_s0], axis=0).astype(int)
    perm = np.random.permutation(len(yw))
    Xw, yw = Xw[perm], yw[perm]

n0_after, frac0_after = _count_frac_zero(yw)
print(f"Synth class-0 added: {Xw_s0.shape[0]}")
print(f"class-0 before: {n0_before}/{len(yw)} ({frac0_before:.1%}) -> after: {n0_after}/{len(yw)} ({frac0_after:.1%})")


Synth class-0 added: 6647
class-0 before: 31472/54456 (65.8%) -> after: 38119/54456 (70.0%)


In [None]:
# Temporally summarize (8 segments × stats)
def summarize(win, num_segments=NUM_SEGMENTS, stats=STATS_LIST):
    W, C = win.shape
    seg_len = W // num_segments
    feats = []
    for s in range(num_segments):
        a, b = s*seg_len, (s+1)*seg_len
        seg = win[a:b]
        parts = []
        if "mean"   in stats: parts.append(seg.mean(axis=0))
        if "std"    in stats: parts.append(seg.std(axis=0)+1e-8)
        if "p2p"    in stats: parts.append(seg.max(axis=0)-seg.min(axis=0))
        if "energy" in stats: parts.append((seg**2).sum(axis=0))
        feats.append(np.concatenate(parts, axis=0))
    return np.stack(feats, axis=0)

sum_feats = np.array([summarize(w) for w in Xw])
N, T, F = sum_feats.shape
flat = sum_feats.reshape(N, -1).astype(np.float32)
print("Summarized:", sum_feats.shape, "Flat:", flat.shape)

Summarized: (54456, 8, 120) Flat: (54456, 960)


In [None]:
# Scale -> PCA -> flatten
scaler = StandardScaler().fit(flat)
flat_scaled = scaler.transform(flat)

pca = PCA(n_components=PCA_VAR_KEEP, svd_solver="full")
flat_pca = pca.fit_transform(flat_scaled)   # (N, D_pca)
D_pca = flat_pca.shape[1]
X_cnn = flat_pca.reshape(N, D_pca, 1).astype(np.float32)
num_classes = int(yw.max()+1)


# First: split 70% train, 30% temp
X_train, X_temp, y_train, y_temp = train_test_split(
    X_cnn, yw,
    test_size=0.30,              # 30% goes to val+test
    stratify=yw,
    random_state=42
)

# Second: split temp into 15% val + 15% test
# Since temp = 30%, the val/test split is 0.5
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp,
    test_size=0.50,              # half of temp (15%)
    stratify=y_temp,
    random_state=42
)

print("Train:", X_train.shape, "Val:", X_val.shape, "Test:", X_test.shape)

Train: (38119, 34, 1) Val: (8168, 34, 1) Test: (8169, 34, 1)


In [None]:
# Define CNN model
from tensorflow.keras import layers, models


def make_fp32_cnn(input_shape, n_classes, dropout=0.5):
    x = layers.Input(shape=input_shape, name="input_fp32")
    x = layers.GaussianNoise(stddev=0.02, name="noise")(x)
    h = layers.Conv1D(64, 3, padding='same', activation='relu', name="conv1")(x)
    h = layers.Dropout(DROPOUT_RATE, name="drop2")(h)
    h = layers.Flatten(name="flat")(h)
    y = layers.Dense(n_classes, activation='softmax', name="softmax")(h)
    return models.Model(x, y, name="cnn1d_fp32")


fp32_model = make_fp32_cnn((D_pca,1), num_classes, dropout=DROPOUT_RATE)
fp32_model.compile(
    optimizer=tf.keras.optimizers.Adam(LR),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy'],
)
fp32_model.summary()


Model: "cnn1d_fp32"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_10 (InputLayer)       [(None, 34, 1)]           0         
                                                                 
 conv1 (Conv1D)              (None, 34, 64)            256       
                                                                 
 drop2 (Dropout)             (None, 34, 64)            0         
                                                                 
 flat (Flatten)              (None, 2176)              0         
                                                                 
 softmax (Dense)             (None, 7)                 15239     
                                                                 
Total params: 15495 (60.53 KB)
Trainable params: 15495 (60.53 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
# Train model
es = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=6, restore_best_weights=True)
hist = fp32_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=[es],
    verbose=1
)
val_loss, val_acc = fp32_model.evaluate(X_val, y_val, verbose=0)
print(f"✅ FP32 Validation accuracy: {val_acc:.4f}")

# (Optional) Save FP32 model
fp32_path = os.path.join(ARTIFACT_DIR, "cnn1d_fp32_keras.keras")
fp32_model.save(fp32_path)
print("✅ Saved FP32 model:", fp32_path)

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
✅ FP32 Validation accuracy: 0.9716
✅ Saved FP32 model: artifacts_CNN4_Final\cnn1d_fp32_keras.keras


In [None]:
# Export PCA/scaler + meta
np.savez(os.path.join(ARTIFACT_DIR, "pca_params_summarizer.npz"),
         scaler_mean=np.asarray(scaler.mean_, dtype=np.float32),
         scaler_scale=np.asarray(scaler.scale_, dtype=np.float32),
         pca_components=np.asarray(pca.components_, dtype=np.float32),
         pca_mean=np.asarray(pca.mean_, dtype=np.float32))

meta = {
    "window": int(WINDOW),
    "hop": int(HOP),
    "orig_channels": 30,
    "num_segments": int(NUM_SEGMENTS),
    "stats_list": list(STATS_LIST),
    "stats_per_segment": int(len(STATS_LIST)),
    "F_flat": int(scaler.mean_.size),
    "D_pca": int(D_pca),
    "classes": int(num_classes),
    "class_names": ["none","come","go","turn","pet","feed","fetch"][:num_classes], #Mislabelled fetch. it's supposed to be throw ball
    "note": "FP32-trained model"
}
with open(os.path.join(ARTIFACT_DIR, "meta.json"), "w") as f:
    json.dump(meta, f, indent=2)
print("✅ Saved pca_params_summarizer.npz & meta.json")

✅ Saved pca_params_summarizer.npz & meta.json


In [None]:
# Reload model & compute F1 score
from sklearn.metrics import classification_report, f1_score

# Reload the saved FP32 model
reload_model = tf.keras.models.load_model(fp32_path)

# Predict on validation set
y_pred_probs = reload_model.predict(X_test, batch_size=BATCH_SIZE, verbose=0)
y_pred = np.argmax(y_pred_probs, axis=1)

# Compute F1 score (macro and weighted)
f1_macro = f1_score(y_test, y_pred, average="macro")
f1_weighted = f1_score(y_test, y_pred, average="weighted")

print(f"✅ F1 Macro:    {f1_macro:.4f}")
print(f"✅ F1 Weighted:{f1_weighted:.4f}")

# Full classification report
print("\nClassification report:")
print(classification_report(y_test, y_pred, target_names=meta["class_names"]))


✅ F1 Macro:    0.9523
✅ F1 Weighted:0.9689

Classification report:
              precision    recall  f1-score   support

        none       0.98      0.98      0.98      5718
        come       0.92      0.96      0.94       315
          go       0.97      0.96      0.96       444
        turn       0.95      0.93      0.94       406
         pet       0.97      0.94      0.95       313
        feed       0.94      0.95      0.94       532
       fetch       0.95      0.94      0.95       441

    accuracy                           0.97      8169
   macro avg       0.95      0.95      0.95      8169
weighted avg       0.97      0.97      0.97      8169



In [None]:
# Non-zero test vector export for HLS

# reload model
reload_model = tf.keras.models.load_model(fp32_path)


class_names = meta["class_names"]
CLASSES = meta["classes"]
D_IN = int(meta["D_pca"])

# pick a random NON-ZERO validation sample
nz_idx = np.where(y_test != 0)[0]
if nz_idx.size == 0:
    # if all are zero, fallback to any sample
    sel = np.random.randint(0, len(y_test))
else:
    sel = np.random.choice(nz_idx)

x_sample = X_test[sel]
y_true   = int(y_test[sel])

# run inference
probs = reload_model.predict(x_sample[None, ...], verbose=0)[0]
y_pred = int(np.argmax(probs))

print(f"Selected index: {sel}")
print(f"Ground truth : {y_true} ({class_names[y_true] if y_true < len(class_names) else y_true})")
print(f"Predicted     : {y_pred} ({class_names[y_pred] if y_pred < len(class_names) else y_pred})")
print("Top-3 probs   :", np.argsort(-probs)[:3], np.sort(probs)[-3:][::-1])

test_in = x_sample.reshape(-1).astype(np.float32)
assert test_in.shape[0] == D_IN

tb_path = HLS_PARAMS_DIR + "/tb_vector.h"

def _emit_1d_floats(name, arr):
    lines = []
    lines.append(f"static const float {name}[{arr.shape[0]}] = " + "{")
    chunk = 8
    for i in range(0, arr.shape[0], chunk):
        seg = ", ".join(f"{float(v):.8e}" for v in arr[i:i+chunk])
        lines.append("  " + seg + ("," if i+chunk < arr.shape[0] else ""))
    lines.append("};")
    return "\n".join(lines)

with open(tb_path, "w") as f:
    f.write("#pragma once\n")
    f.write("// Auto-generated test vector for HLS testbench\n")
    f.write(f"#define TB_D_IN {D_IN}\n")
    f.write(f"#define TB_CLASSES {CLASSES}\n")
    f.write(f"#define TB_EXP_CLASS {y_pred}\n")
    f.write("\n")
    f.write(_emit_1d_floats("TB_TEST_IN", test_in))
    f.write("\n\n")
    f.write(_emit_1d_floats("TB_EXP_PROBS", probs.astype(np.float32)))
    f.write("\n")

print("✅ Wrote test header: ", tb_path)


Selected index: 2508
Ground truth : 6 (fetch)
Predicted     : 6 (fetch)
Top-3 probs   : [6 0 3] [9.9974734e-01 2.5239616e-04 2.0592087e-07]
✅ Wrote test header:  artifacts_CNN4_Final/HLS_PARAMS/tb_vector.h


In [None]:
mdl = tf.keras.models.load_model(fp32_path)
mdl.summary()

layers = mdl.layers
assert "conv1" in [l.name for l in layers], "Expected a Conv1D layer named 'conv1'."
conv = mdl.get_layer("conv1")

Wc, bc = conv.get_weights()
KERNEL = Wc.shape[0]
CIN    = Wc.shape[1]
CONV1_OUT = Wc.shape[2]
assert KERNEL == 3, f"Expected kernel_size=3, got {KERNEL}"
assert CIN == 1,    f"Expected input shape (*, D_pca, 1), got {CIN}"

dense = mdl.get_layer("softmax")
Wd, bd = dense.get_weights()
FLAT = Wd.shape[0]
CLASSES = Wd.shape[1]
assert FLAT % CONV1_OUT == 0, "Flattened size mismatch; check model architecture."
D_IN = FLAT // CONV1_OUT

print(f"[Exporter] D_IN={D_IN}, KERNEL={KERNEL}, CONV1_OUT={CONV1_OUT}, CLASSES={CLASSES}")

CONV1_W = np.transpose(Wc[:, 0:1, :], (2, 0, 1)).reshape(CONV1_OUT, KERNEL).astype(np.float32)
CONV1_B = bc.astype(np.float32)

DENSE_W = Wd.astype(np.float32)
DENSE_B = bd.astype(np.float32)

def _emit_array_2d(f, name, arr):
    rows, cols = arr.shape
    f.write(f"static const float {name}[{rows}][{cols}] = {{\n")
    for r in range(rows):
        vals = ", ".join(f"{float(v):.8e}" for v in arr[r])
        f.write(f"  {{ {vals} }},\n")
    f.write("};\n")

def _emit_array_1d(f, name, arr):
    n = arr.shape[0]
    vals = ", ".join(f"{float(v):.8e}" for v in arr)
    f.write(f"static const float {name}[{n}] = {{ {vals} }};\n")

with open(HLS_PARAMS_DIR + "/params.h", "w") as f:
    f.write("#pragma once\n")
    f.write(f"#define D_IN {D_IN}\n")
    f.write(f"#define KERNEL {KERNEL}\n")
    f.write(f"#define CONV1_OUT {CONV1_OUT}\n")
    f.write(f"#define CLASSES {CLASSES}\n")

with open(HLS_PARAMS_DIR + "/conv1_weights.h", "w") as f:
    f.write("#pragma once\n")
    _emit_array_2d(f, "CONV1_W", CONV1_W)

with open(HLS_PARAMS_DIR + "/conv1_bias.h", "w") as f:
    f.write("#pragma once\n")
    _emit_array_1d(f, "CONV1_B", CONV1_B)

with open(HLS_PARAMS_DIR + "/dense_weights.h", "w") as f:
    f.write("#pragma once\n")
    _emit_array_2d(f, "DENSE_W", DENSE_W)

with open(HLS_PARAMS_DIR + "/dense_bias.h", "w") as f:
    f.write("#pragma once\n")
    _emit_array_1d(f, "DENSE_B", DENSE_B)

print("✅ Export complete. Headers written to: ", HLS_PARAMS_DIR)

Model: "cnn1d_fp32"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_10 (InputLayer)       [(None, 34, 1)]           0         
                                                                 
 conv1 (Conv1D)              (None, 34, 64)            256       
                                                                 
 drop2 (Dropout)             (None, 34, 64)            0         
                                                                 
 flat (Flatten)              (None, 2176)              0         
                                                                 
 softmax (Dense)             (None, 7)                 15239     
                                                                 
Total params: 15495 (60.53 KB)
Trainable params: 15495 (60.53 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
[Exporter] D_IN=34, 

: 