In [2]:
import os, json, numpy as np, torch, torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import precision_recall_fscore_support

In [5]:
# -----------------------
# Paths & hyperparams
# -----------------------

def find_project_root() -> str:
    """
    Try current dir and up to 4 parents to locate a folder that contains 'backend/ml/dataset/raw'.
    If not found, return current working directory.
    """
    cwd = os.getcwd()
    candidates = [cwd]
    # try parents
    cur = cwd
    for _ in range(4):
        cur = os.path.dirname(cur)
        if cur and cur not in candidates:
            candidates.append(cur)
    for base in candidates:
        raw_dir = os.path.join(base, "backend", "ml", "dataset", "raw")
        if os.path.isdir(raw_dir):
            return base
    return cwd

PROJECT_ROOT = find_project_root()
DATA_DIR = os.path.join(PROJECT_ROOT, "backend", "ml", "dataset", "processed")
MODEL_DIR = os.path.join(PROJECT_ROOT, "backend", "ml", "models")
EVAL_DIR  = os.path.join(PROJECT_ROOT, "backend", "ml", "evaluation")
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(EVAL_DIR, exist_ok=True)

DEVICE      = torch.device("cpu")
BATCH_SIZE  = 64
EPOCHS      = 50
PATIENCE    = 5
LR          = 1e-3
HIDDEN_SIZE = 128

In [6]:
# -----------------------
# Load processed arrays
# -----------------------
def must(path):
    if not os.path.exists(path):
        raise FileNotFoundError(
            f"Missing: {path}\n"
            f"Run your preprocessing first to generate X_*.npy / y_*.npy in {DATA_DIR}"
        )
    return path

X_train = np.load(must(os.path.join(DATA_DIR, "X_train.npy"))).astype(np.float32, copy=False)
y_train = np.load(must(os.path.join(DATA_DIR, "y_train.npy"))).astype(np.int64,   copy=False)
X_val   = np.load(must(os.path.join(DATA_DIR, "X_val.npy"  ))).astype(np.float32, copy=False)
y_val   = np.load(must(os.path.join(DATA_DIR, "y_val.npy"  ))).astype(np.int64,   copy=False)
X_test  = np.load(must(os.path.join(DATA_DIR, "X_test.npy" ))).astype(np.float32, copy=False)
y_test  = np.load(must(os.path.join(DATA_DIR, "y_test.npy" ))).astype(np.int64,   copy=False)

cfg_path = os.path.join(DATA_DIR, "feature_config.json")
if os.path.exists(cfg_path):
    with open(cfg_path, "r") as f:
        feat_cfg = json.load(f)
else:
    feat_cfg = {
        "seq_len": int(X_train.shape[1]),
        "features": [f"f{i}" for i in range(X_train.shape[2])]
    }

SEQ_LEN     = int(X_train.shape[1])
INPUT_SIZE  = int(X_train.shape[2])
NUM_CLASSES = int(max(y_train.max(), y_val.max(), y_test.max()) + 1)  # usually 2

print(f"Shapes  X:{X_train.shape}  y:{y_train.shape}  | SEQ_LEN={SEQ_LEN}, FEATS={INPUT_SIZE}, CLASSES={NUM_CLASSES}")

Shapes  X:(6241, 32, 3)  y:(6241, 32)  | SEQ_LEN=32, FEATS=3, CLASSES=2


In [7]:
# -----------------------
# Datasets / Loaders
# -----------------------
Xtr_t, ytr_t = torch.tensor(X_train), torch.tensor(y_train)
Xva_t, yva_t = torch.tensor(X_val),   torch.tensor(y_val)
Xte_t, yte_t = torch.tensor(X_test),  torch.tensor(y_test)

train_loader = DataLoader(TensorDataset(Xtr_t, ytr_t), batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(TensorDataset(Xva_t, yva_t), batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(TensorDataset(Xte_t, yte_t), batch_size=BATCH_SIZE, shuffle=False)

In [8]:
# -----------------------
# Model: BiLSTM + Linear (emissions) + CRF
# -----------------------
try:
    from torchcrf import CRF
except Exception as e:
    raise RuntimeError("Missing dependency 'pytorch-crf'. Install it: pip install pytorch-crf") from e

class BiLSTMEmissions(nn.Module):
    """BiLSTM + Linear → emissions [B,T,C]. Exportable to ONNX."""
    def __init__(self, in_dim, hidden=128, classes=2):
        super().__init__()
        self.lstm = nn.LSTM(in_dim, hidden, batch_first=True, bidirectional=True)
        self.emission = nn.Linear(2*hidden, classes)
    def forward(self, x):              # x: [B,T,F]
        h, _ = self.lstm(x)            # [B,T,2H]
        logits = self.emission(h)      # [B,T,C]
        return logits

class BiLSTM_CRF(nn.Module):
    """Wrap emissions + CRF (CRF used during train/eval; not exported)."""
    def __init__(self, in_dim, hidden=128, classes=2):
        super().__init__()
        self.emitter = BiLSTMEmissions(in_dim, hidden, classes)
        self.crf = CRF(classes, batch_first=True)
    def forward(self, x, tags=None, mask=None):
        emissions = self.emitter(x)        # [B,T,C]
        if tags is None:
            paths = self.crf.decode(emissions, mask=mask)   # List[List[int]]
            return emissions, paths
        nll = -self.crf(emissions, tags, mask=mask, reduction='mean')
        return emissions, nll

model = BiLSTM_CRF(INPUT_SIZE, hidden=HIDDEN_SIZE, classes=NUM_CLASSES).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

def full_mask(bsz, T, device):
    return torch.ones(bsz, T, dtype=torch.bool, device=device)

In [9]:
# -----------------------
# Train (early stopping on val NLL)
# -----------------------
best_val = float("inf"); patience = 0
best_path = os.path.join(MODEL_DIR, "bilstm_crf_model.pt")

for epoch in range(EPOCHS):
    # Train
    model.train()
    tr_loss = 0.0
    for xb, yb in train_loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)  # [B,T,F], [B,T]
        optimizer.zero_grad()
        mask = full_mask(xb.size(0), xb.size(1), DEVICE)
        _, nll = model(xb, tags=yb, mask=mask)
        nll.backward()
        optimizer.step()
        tr_loss += nll.item() * xb.size(0)
    tr_loss /= len(train_loader.dataset)

    # Validate
    model.eval()
    va_loss = 0.0
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            mask = full_mask(xb.size(0), xb.size(1), DEVICE)
            _, nll = model(xb, tags=yb, mask=mask)
            va_loss += nll.item() * xb.size(0)
    va_loss /= len(val_loader.dataset)

    print(f"Epoch {epoch+1:02d} | train NLL {tr_loss:.4f} | val NLL {va_loss:.4f}")

    if va_loss < best_val:
        best_val = va_loss; patience = 0
        torch.save(model.state_dict(), best_path)
    else:
        patience += 1
        if patience >= PATIENCE:
            print("Early stopping."); break

Epoch 01 | train NLL 21.6631 | val NLL 21.1997
Epoch 02 | train NLL 20.8764 | val NLL 20.6090
Epoch 03 | train NLL 20.3463 | val NLL 19.9642
Epoch 04 | train NLL 19.6006 | val NLL 19.4312
Epoch 05 | train NLL 18.7173 | val NLL 18.7507
Epoch 06 | train NLL 17.8868 | val NLL 17.5069
Epoch 07 | train NLL 17.3391 | val NLL 17.5092
Epoch 08 | train NLL 17.1598 | val NLL 17.9793
Epoch 09 | train NLL 17.7456 | val NLL 17.2300
Epoch 10 | train NLL 16.9622 | val NLL 16.6144
Epoch 11 | train NLL 16.3615 | val NLL 16.0724
Epoch 12 | train NLL 16.0763 | val NLL 17.1467
Epoch 13 | train NLL 16.0771 | val NLL 15.5945
Epoch 14 | train NLL 15.1321 | val NLL 14.8966
Epoch 15 | train NLL 14.9838 | val NLL 14.6148
Epoch 16 | train NLL 14.3394 | val NLL 13.9647
Epoch 17 | train NLL 13.8617 | val NLL 13.4894
Epoch 18 | train NLL 13.2215 | val NLL 13.2063
Epoch 19 | train NLL 12.8714 | val NLL 13.2211
Epoch 20 | train NLL 12.4343 | val NLL 12.2358
Epoch 21 | train NLL 12.1123 | val NLL 11.9164
Epoch 22 | tr

In [10]:
# -----------------------
# Evaluate (CRF decode)
# -----------------------
model.load_state_dict(torch.load(best_path, map_location=DEVICE))
model.eval()

all_preds, all_tgts = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        mask = full_mask(xb.size(0), xb.size(1), DEVICE)
        emissions, paths = model(xb, tags=None, mask=mask)  # paths: list of length B, each T ints
        # stack predictions to [B,T]
        maxT = xb.size(1)
        pred_np = np.array([p[:maxT] for p in paths], dtype=np.int64)
        all_preds.append(pred_np)
        all_tgts.append(yb.numpy())

all_preds = np.concatenate(all_preds, axis=0).reshape(-1)
all_tgts  = np.concatenate(all_tgts,  axis=0).reshape(-1)

precision, recall, f1, _ = precision_recall_fscore_support(all_tgts, all_preds, average='binary')
metrics = {"precision": float(precision), "recall": float(recall), "f1_score": float(f1)}
with open(os.path.join(EVAL_DIR, "bilstm_metrics.json"), "w") as f:
    json.dump(metrics, f, indent=2)
print("Test metrics:", metrics)

Test metrics: {'precision': 0.9118659217877095, 'recall': 0.9503027480204937, 'f1_score': 0.9306876496749914}


In [11]:
# -----------------------
# Export ONNX (emissions only; CRF not exported)
# -----------------------
# For deployment, we export BiLSTMEmissions to ONNX → get per-timestep logits in Node.js
emitter = model.emitter.to(DEVICE)
emitter.eval()

dummy = torch.randn(1, SEQ_LEN, INPUT_SIZE, device=DEVICE)
onnx_path = os.path.join(MODEL_DIR, "bilstm_emitter.onnx")

try:
    import onnx  # ensure installed
except Exception as e:
    raise RuntimeError("Missing 'onnx'. Install it: pip install onnx") from e

torch.onnx.export(
    emitter, dummy, onnx_path,
    input_names=["input"], output_names=["logits"],
    dynamic_axes={"input": {0:"batch", 1:"seq_len"},
                  "logits": {0:"batch", 1:"seq_len"}},
    opset_version=14
)
print("Exported ONNX emissions to:", onnx_path)

# Save feature config (for inference parity)
with open(os.path.join(EVAL_DIR, "bilstm_feature_config.json"), "w") as f:
    json.dump({
        "seq_len": int(SEQ_LEN),
        "features": feat_cfg.get("features", [f"f{i}" for i in range(INPUT_SIZE)])
    }, f, indent=2)
print("Saved feature config to:", os.path.join(EVAL_DIR, "bilstm_feature_config.json"))

Exported ONNX emissions to: c:\Users\geloq\OneDrive\Desktop\pd-keyboard-app\backend\ml\models\bilstm_emitter.onnx
Saved feature config to: c:\Users\geloq\OneDrive\Desktop\pd-keyboard-app\backend\ml\evaluation\bilstm_feature_config.json


  torch.onnx.export(
