# diperoleh 160k siklus untuk satu kali inferensi dengan notebook sebelumnya

## SEL 0

In [22]:
# SEL 0: Download UCI-HAR dataset (jalankan sekali)
import os, urllib.request, zipfile

if not os.path.exists("UCI_HAR_Dataset"):
    print("Downloading UCI HAR dataset...")
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00240/UCI%20HAR%20Dataset.zip"
    urllib.request.urlretrieve(url, "UCI_HAR_Dataset.zip")
    with zipfile.ZipFile("UCI_HAR_Dataset.zip", "r") as z:
        z.extractall(".")
    os.rename("UCI HAR Dataset", "UCI_HAR_Dataset")
    print("Dataset ready.")
else:
    print("Dataset already present.")


Dataset already present.


## SEL A - Helper Functs

In [23]:
# === SEL A (final clean exporter) ===
import numpy as np, textwrap

def pack_bits_2d(arr2d, word_size=8, bitorder='little'):
    arr2d = np.asarray(arr2d, dtype=np.uint8)
    n, D = arr2d.shape
    if word_size == 8:
        pad = (-D) % 8
        if pad: arr2d = np.pad(arr2d, ((0,0),(0,pad)))
        packed = np.packbits(arr2d, axis=-1, bitorder=bitorder).astype(np.uint8)
        return packed
    elif word_size == 16:
        pad = (-D) % 16
        if pad: arr2d = np.pad(arr2d, ((0,0),(0,pad)))
        dim_word = arr2d.shape[1] // 16
        bytes_ = np.packbits(arr2d.reshape(n, dim_word, 16), axis=-1, bitorder=bitorder)
        out = (bytes_[...,0].astype(np.uint16) | (bytes_[...,1].astype(np.uint16) << 8))
        return out
    elif word_size == 32:
        pad = (-D) % 32
        if pad: arr2d = np.pad(arr2d, ((0,0),(0,pad)))
        dim_word = arr2d.shape[1] // 32
        bytes_ = np.packbits(arr2d.reshape(n, dim_word, 32), axis=-1, bitorder=bitorder)
        out = (bytes_[...,0].astype(np.uint32) |
               (bytes_[...,1].astype(np.uint32) << 8) |
               (bytes_[...,2].astype(np.uint32) << 16) |
               (bytes_[...,3].astype(np.uint32) << 24))
        return out
    else:
        raise ValueError("word_size must be 8/16/32")

def _flat_c_body(arr, per_line=16):
    flat = arr.flatten()
    rows = []
    for i in range(0, len(flat), per_line):
        rows.append(", ".join(str(int(x)) for x in flat[i:i+per_line]))
    return ",\n    ".join(rows)

def write_model_files(packed_F, packed_V, packed_C, word_size, hv_dim_bit, num_quant, prefix="model"):
    dim_word = packed_F.shape[1]
    n_features = packed_F.shape[0]
    n_classes = packed_C.shape[0]

    dtype = {8:"uint8_t",16:"uint16_t",32:"uint32_t"}[word_size]

    # === model.h ===
    model_h = f"""#ifndef MODEL_MODEL_H_
#define MODEL_MODEL_H_

#include <stdint.h>

#define MICROVSA_MODEL_FHV_DIMENSION_BIT {hv_dim_bit}
#define MICROVSA_MODEL_FHV_DIMENSION_WORD {dim_word}
#define MICROVSA_MODEL_NUM_CLASS {n_classes}
#define MICROVSA_MODEL_NUM_FEATURE {n_features}
#define MICROVSA_MODEL_NUM_QUANT {num_quant}
#define MICROVSA_MODEL_DTYPE {dtype}

#ifdef MODEL_C_IN_RAM
#define MODEL_C_QUALIFIER
#else
#define MODEL_C_QUALIFIER const
#endif
#ifdef MODEL_F_IN_RAM
#define MODEL_F_QUALIFIER
#else
#define MODEL_F_QUALIFIER const
#endif
#ifdef MODEL_V_IN_RAM
#define MODEL_V_QUALIFIER
#else
#define MODEL_V_QUALIFIER const
#endif

extern MODEL_C_QUALIFIER {dtype} MICROVSA_MODEL_C[];
extern MODEL_F_QUALIFIER {dtype} MICROVSA_MODEL_F[];
extern MODEL_V_QUALIFIER {dtype} MICROVSA_MODEL_V[];

#endif // MODEL_MODEL_H_
"""
    with open(f"{prefix}.h","w") as fh: fh.write(model_h)

    # === model.c ===
    flatC = _flat_c_body(packed_C)
    flatF = _flat_c_body(packed_F)
    flatV = _flat_c_body(packed_V)
    c_text = f'#include "{prefix}.h"\n\n'
    c_text += f"MODEL_C_QUALIFIER {dtype} MICROVSA_MODEL_C[] = {{\n    {flatC}\n}};\n\n"
    c_text += f"MODEL_F_QUALIFIER {dtype} MICROVSA_MODEL_F[] = {{\n    {flatF}\n}};\n\n"
    c_text += f"MODEL_V_QUALIFIER {dtype} MICROVSA_MODEL_V[] = {{\n    {flatV}\n}};\n\n"
    with open(f"{prefix}.c","w") as fc: fc.write(c_text)
    print("Wrote", f"{prefix}.h/.c")

def write_params(min_val, inv_range, scale=4096, fname="model_params_scaled.h"):
    header = textwrap.dedent(f"""\
    #ifndef MODEL_PARAMS_SCALED_H_
    #define MODEL_PARAMS_SCALED_H_
    #include <stdint.h>
    #define FIXED_POINT_SCALE_FACTOR {scale}

    """)
    def arr_to_c(name, arr, dtype="int32_t"):
        rows=[]
        for i in range(0, len(arr), 16):
            rows.append(", ".join(str(int(x)) for x in arr[i:i+16]))
        body = ",\n    ".join(rows)
        return f"const {dtype} {name}[{len(arr)}] = {{\n    {body}\n}};\n\n"
    header += arr_to_c("min_val_scaled", min_val, "int32_t")
    header += arr_to_c("inv_range_val_scaled", inv_range, "int32_t")
    header += "#endif // MODEL_PARAMS_SCALED_H_\n"
    with open(fname,"w") as f: f.write(header)
    print("Wrote", fname)

def write_test_data(sample, label, prefix="test_data"):
    h_code = f"""#ifndef TEST_DATA_H_
#define TEST_DATA_H_
#include <stdint.h>
#define TEST_DATA_SAMPLE_LENGTH {len(sample)}
extern const float {prefix}_sample[];
extern const uint8_t {prefix}_actual_label;
#endif // TEST_DATA_H_
"""
    # fixed: no stray '\' anymore
    c_code = f'#include "{prefix}.h"\n\n'
    c_code += f"const uint8_t {prefix}_actual_label = {label};\n\n"
    rows=[]
    for i in range(0,len(sample),8):
        rows.append("    " + ", ".join(f"{x:.8f}f" for x in sample[i:i+8]))
    c_code += f"const float {prefix}_sample[TEST_DATA_SAMPLE_LENGTH] = {{\n" + ",\n".join(rows) + "\n};\n"
    with open(f"{prefix}.h","w") as fh: fh.write(h_code)
    with open(f"{prefix}.c","w") as fc: fc.write(c_code)
    print("Wrote", f"{prefix}.h/.c")

def write_microvsa_config(word_size, hv_dim_bit, n_classes, max_fhv_dim=None, fname="microvsa_config.h"):
    if max_fhv_dim is None:
        max_fhv_dim = hv_dim_bit // word_size
    text = f"""#ifndef MICROVSA_CONFIG_H_
#define MICROVSA_CONFIG_H_

#define MICROVSA_IMPL_WORDSIZE {word_size}
#define MICROVSA_TMP_DTYPE uint{word_size}_t
#define MICROVSA_ACC_DTYPE int32_t
#define MICROVSA_MAX_NUM_CLASS {n_classes}
#define MICROVSA_MAX_FHV_DIM {max_fhv_dim}
#define MICROVSA_MAX_FHV_DIM_BIT {hv_dim_bit}
#define MICROVSA_IMPL MICROVSA_IMPL_MCU_OPT_CC

// keep large arrays in flash
// #define MODEL_F_IN_RAM
// #define MODEL_V_IN_RAM
// #define MODEL_C_IN_RAM

#endif // MICROVSA_CONFIG_H_
"""
    with open(fname,"w") as f: f.write(text)
    print("Wrote", fname)

def export_all_from_bits(F_bits, V_bits, C_bits, feat_min, feat_range, test_sample, test_label, word_size=32, scale=4096):
    hv_dim_bit = F_bits.shape[1]
    num_quant = V_bits.shape[0]
    packed_F = pack_bits_2d(F_bits, word_size=word_size)
    packed_V = pack_bits_2d(V_bits, word_size=word_size)
    packed_C = pack_bits_2d(C_bits, word_size=word_size)

    write_model_files(packed_F, packed_V, packed_C, word_size, hv_dim_bit, num_quant, prefix="model")

    min_val_scaled = np.floor(feat_min * scale).astype(np.int32)
    inv_range_val_scaled = np.round((1.0/np.maximum(feat_range,1e-8)) * scale).astype(np.int32)
    write_params(min_val_scaled, inv_range_val_scaled, scale=scale)

    write_test_data(test_sample, test_label, prefix="test_data")
    write_microvsa_config(word_size, hv_dim_bit, C_bits.shape[0])

    print("Export complete.")


## SEL 1

In [19]:
# SEL 1: Train DiffLDC (safe mem) and extract F_bits,V_bits,C_bits
import numpy as np, torch, torch.nn as nn, torch.optim as optim, os

# load
def load_uci_har(split='train'):
    path = os.path.join("UCI_HAR_Dataset", split, f"X_{split}.txt")
    ypath = os.path.join("UCI_HAR_Dataset", split, f"y_{split}.txt")
    X = np.loadtxt(path).astype(np.float32)
    y = (np.loadtxt(ypath).astype(int) - 1).astype(np.int64)
    return X, y

X_train, y_train = load_uci_har('train')
X_test, y_test = load_uci_har('test')
n_features = X_train.shape[1]
n_classes = int(max(y_train.max(), y_test.max()) + 1)
print("Shapes:", X_train.shape, X_test.shape, "n_classes", n_classes)

# hyperparams (tune these)
NUM_QUANT = 256      # reduce to 64/128 to shrink model V size
HV_DIM = 256         # try 128/256 per flash budget
WORD = 32            # target word-size for ESP32
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)

# quantization helpers
feat_min = np.vstack([X_train, X_test]).min(axis=0)
feat_max = np.vstack([X_train, X_test]).max(axis=0)
feat_range = np.maximum(feat_max - feat_min, 1e-8)
def quantize_array(X, num_quant=NUM_QUANT):
    norm = (X - feat_min) / feat_range
    return np.clip((norm * (num_quant - 1)).astype(int), 0, num_quant - 1)

Xq_train = quantize_array(X_train)
Xq_test  = quantize_array(X_test)

# DiffLDC model
class DiffLDC(nn.Module):
    def __init__(self, n_features, num_quant, hv_dim, n_classes):
        super().__init__()
        self.V_embed = nn.Parameter(torch.randn(num_quant, hv_dim) * 0.01)
        self.F = nn.Parameter(torch.randn(n_features, hv_dim) * 0.01)
        self.C = nn.Parameter(torch.randn(n_classes, hv_dim) * 0.01)
    def forward(self, quantized_samples):
        Vvals = self.V_embed[quantized_samples]   # (B, n_features, hv_dim)
        F = self.F.unsqueeze(0)
        bound = Vvals * F
        agg = bound.sum(dim=1)
        agg_t = torch.tanh(agg)
        Cnorm = torch.tanh(self.C)
        logits = torch.matmul(agg_t, Cnorm.t())
        return logits, agg_t

train_x = torch.from_numpy(Xq_train).long().to(DEVICE)
train_y = torch.from_numpy(y_train).long().to(DEVICE)
test_x  = torch.from_numpy(Xq_test).long().to(DEVICE)
test_y  = torch.from_numpy(y_test).long().to(DEVICE)

model = DiffLDC(n_features, NUM_QUANT, HV_DIM, n_classes).to(DEVICE)
opt = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-6)
criterion = nn.CrossEntropyLoss()

EPOCHS = 5
BATCH = 64
def eval_split(x_tensor, y_tensor, batch_size=256):
    model.eval()
    correct = 0; total = 0
    with torch.no_grad():
        for i in range(0, len(x_tensor), batch_size):
            xb = x_tensor[i:i+batch_size]
            yb = y_tensor[i:i+batch_size]
            logits, _ = model(xb)
            preds = torch.argmax(logits, dim=1)
            correct += (preds == yb).sum().item()
            total += len(yb)
    return correct/total

print("Training...")
for epoch in range(EPOCHS):
    model.train()
    perm = np.random.permutation(len(train_x))
    for i in range(0, len(train_x), BATCH):
        idx = perm[i:i+BATCH]
        xb = train_x[idx]; yb = train_y[idx]
        opt.zero_grad()
        logits, _ = model(xb)
        loss = criterion(logits, yb)
        loss.backward(); opt.step()
    print(f"Epoch {epoch+1}/{EPOCHS} loss={loss.item():.4f} tr_acc={eval_split(train_x,train_y):.4f} te_acc={eval_split(test_x,test_y):.4f}")

# extract bipolar -> bits
with torch.no_grad():
    F_cont = torch.tanh(model.F).cpu().numpy()
    V_cont = torch.tanh(model.V_embed).cpu().numpy()
    C_cont = torch.tanh(model.C).cpu().numpy()

F_bits = (F_cont > 0).astype(np.uint8)
V_bits = (V_cont > 0).astype(np.uint8)
C_bits = (C_cont > 0).astype(np.uint8)

# save bits for reproducibility
np.savez("ldc_trained_model_bits.npz", F_bits=F_bits, V_bits=V_bits, C_bits=C_bits)
print("Saved ldc_trained_model_bits.npz", F_bits.shape, V_bits.shape, C_bits.shape)


Shapes: (7352, 561) (2947, 561) n_classes 6
Device: cpu
Training...
Epoch 1/5 loss=0.5974 tr_acc=0.8044 te_acc=0.7855
Epoch 2/5 loss=0.0922 tr_acc=0.9705 te_acc=0.9264
Epoch 3/5 loss=0.0575 tr_acc=0.9852 te_acc=0.9342
Epoch 4/5 loss=0.1000 tr_acc=0.9918 te_acc=0.9338
Epoch 5/5 loss=0.0238 tr_acc=0.9989 te_acc=0.9393
Saved ldc_trained_model_bits.npz (561, 256) (256, 256) (6, 256)


## SEL 2

In [20]:
# SEL 2: Export (use bits saved from SEL1)
import numpy as np, os
d = np.load("ldc_trained_model_bits.npz")
F_bits = d["F_bits"]; V_bits = d["V_bits"]; C_bits = d["C_bits"]
# feat_min & feat_range must match quantization step in SEL1
# If SEL1 variables still in memory, reuse feat_min, feat_range; otherwise recompute:
# Recompute from dataset:
from pathlib import Path
import numpy as np
X_train = np.loadtxt("UCI_HAR_Dataset/train/X_train.txt").astype(np.float32)
X_test  = np.loadtxt("UCI_HAR_Dataset/test/X_test.txt").astype(np.float32)
feat_min = np.vstack([X_train, X_test]).min(axis=0)
feat_max = np.vstack([X_train, X_test]).max(axis=0)
feat_range = np.maximum(feat_max - feat_min, 1e-8)

# pick test sample
y_test = np.loadtxt("UCI_HAR_Dataset/test/y_test.txt").astype(int) - 1
X_test_all = X_test
test_idx = 0
test_sample = X_test_all[test_idx]
test_label = int(y_test[test_idx])

# call exporter (Sel A function)
# choose word_size = 32 for ESP32
from __main__ import export_all_from_bits
export_all_from_bits(F_bits, V_bits, C_bits, feat_min, feat_range, test_sample, test_label, word_size=32, scale=4096)

# will write: model.h, model.c, model_params_scaled.h, test_data.h, test_data.c
print("Files written to notebook working directory.")


Wrote model.h/.c
Wrote model_params_scaled.h
Wrote test_data.h/.c
Wrote microvsa_config.h
Export complete.
Files written to notebook working directory.


## SEL 3 - Quick Validation

In [21]:
# SEL 3: quick Python check (simple binary LDC emulate)
import numpy as np
d = np.load("ldc_trained_model_bits.npz")
F_bits = d["F_bits"]; V_bits = d["V_bits"]; C_bits = d["C_bits"]
# quantize the test sample with feat_min/feat_range
test_sample_q = np.clip(((test_sample - feat_min)/feat_range * (V_bits.shape[0]-1)).astype(int),0,V_bits.shape[0]-1)
# build S: for each feature do XOR(V[q], F[i]) then majority over features
HV = F_bits.shape[1]
S = np.zeros(HV, dtype=int)
for i in range(F_bits.shape[0]):
    vf = np.bitwise_xor(V_bits[test_sample_q[i]], F_bits[i])
    S += (vf==1).astype(int)
S_bit = (S > (F_bits.shape[0]//2)).astype(np.uint8)
# compare to classes
best=None; bestscore=1e9
for k in range(C_bits.shape[0]):
    # Hamming distance
    h = np.count_nonzero(S_bit ^ C_bits[k])
    if h < bestscore:
        bestscore=h; best=k
print("Python emu predict", best, "label", test_label, "hamming", bestscore)


Python emu predict 0 label 4 hamming 48
