# Imports + utilities

In [1]:
import os
import math
import numpy as np
from dataclasses import dataclass

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import tensorflow as tf




2026-01-12 13:20:29.432013: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2026-01-12 13:20:29.443757: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1768242029.458414 1723703 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1768242029.462867 1723703 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1768242029.473361 1723703 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'

# Label spaces + mapping to union

In [2]:
# Model-specific label orders (MUST match how each model was trained)
BABY2020_LABELS = ["Sleepy", "Hungry", "Wakeup"]
CHINESE_LABELS  = ["Diaper", "Uncomfortable", "Sleepy"]

# Union label space (your algorithm)
UNION_LABELS    = ["Diaper", "Uncomfortable", "Sleepy", "Hungry", "Wakeup"]

# Index maps
baby2idx   = {c:i for i,c in enumerate(BABY2020_LABELS)}
china2idx  = {c:i for i,c in enumerate(CHINESE_LABELS)}
union2idx  = {c:i for i,c in enumerate(UNION_LABELS)}
idx2union  = {i:c for c,i in union2idx.items()}

BABY_SLEEPY_IDX  = baby2idx["Sleepy"]
CHINA_SLEEPY_IDX = china2idx["Sleepy"]
UNION_SLEEPY_IDX = union2idx["Sleepy"]


# Define model classes (placeholders)

## real CNN+LMU classes

In [3]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model

# ----------------------------
# 0) (Optional) Safety toggles
# ----------------------------
# If you already set these earlier, you can skip this block.
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
tf.config.optimizer.set_jit(False)

gpus = tf.config.list_physical_devices("GPU")
for gpu in gpus:
    try:
        tf.config.experimental.set_memory_growth(gpu, True)
    except Exception:
        pass

print("Devices:", tf.config.list_physical_devices())

# ----------------------------
# 1) Label spaces (keep consistent with training)
# ----------------------------
BABY2020_LABELS = ["Sleepy", "Hungry", "Wakeup"]
CHINESE_LABELS  = ["Diaper", "Uncomfortable", "Sleepy"]
UNION_LABELS    = ["Diaper", "Uncomfortable", "Sleepy", "Hungry", "Wakeup"]

# Classes_chinese: {0: 'Awake', 1: 'Diaper', 2: 'Hug', 3: 'Hungry', 4: 'Sleepy', 5: 'Uncomfortable'}
# CHINESE_LABELS = ["Wakeup", "Diaper", "Hug", "Hungry", "Sleepy", "Uncomfortable"]
# UNION_LABELS = ["Wakeup", "Diaper", "Hug", "Hungry", "Sleepy", "Uncomfortable"]
# ----------------------------
# 2) Your architecture builder (same as your training)
#    (Keep it here only if you want to re-train. For loading .h5 you don't need it.)
# ----------------------------
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, MaxPooling2D, Dropout, TimeDistributed, Flatten, LSTM, Dense
from tensorflow.keras import regularizers
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.optimizers import Adam

def build_model(input_shape, num_classes):
    l2_reg = regularizers.l2(0.001)

    inputs = Input(shape=input_shape)

    x = Conv2D(32, (3, 3), padding="same", activation="relu", kernel_regularizer=l2_reg)(inputs)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.3)(x)

    x = Conv2D(16, (3, 3), padding="same", activation="relu", kernel_regularizer=l2_reg)(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.3)(x)

    x = Conv2D(8, (3, 3), padding="same", activation="relu", kernel_regularizer=l2_reg)(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.4)(x)

    x = TimeDistributed(Flatten())(x)

    x = LSTM(32, return_sequences=False, kernel_regularizer=l2_reg)(x)
    x = Dropout(0.5)(x)

    x = Dense(64, activation="relu", kernel_regularizer=l2_reg)(x)
    x = Dropout(0.5)(x)

    outputs = Dense(num_classes, activation="softmax", kernel_regularizer=l2_reg)(x)

    model = Model(inputs, outputs)
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss=CategoricalCrossentropy(label_smoothing=0.1),
        metrics=["accuracy"],
        jit_compile=False
    )
    return model

# ----------------------------
# 3) Load your trained .h5 models
# ----------------------------
BABY2020_CKPT_PATH = r"saved_models/best_val_f1_score_epoch510_f10.6540_20251216_140443.h5"
CHINESE_CKPT_PATH  = r"saved_models/model_20251231_205029_epoch626_f10.7251.h5" # r"saved_models/Chinese babycry stft f0 mfcc model_20251204_201256_epoch303_f10.8516.h5"

baby_model  = tf.keras.models.load_model(BABY2020_CKPT_PATH, compile=False)
china_model = tf.keras.models.load_model(CHINESE_CKPT_PATH,  compile=False)

print("✅ Loaded Keras models")
print("Baby output shape:", baby_model.output_shape)
print("China output shape:", china_model.output_shape)

# ----------------------------
# 4) IMPORTANT: get logits (pre-softmax) for temperature scaling
#    Your saved model ends with softmax, so we build a logits-model.
# ----------------------------
def make_logits_model(softmax_model: tf.keras.Model) -> tf.keras.Model:
    """
    Returns a new model with SAME input, but output = pre-softmax logits.
    Works when final layer is Dense(., activation='softmax').
    """
    last = softmax_model.layers[-1]

    # Case A: last layer is Dense with softmax activation
    if isinstance(last, tf.keras.layers.Dense) and last.activation == tf.keras.activations.softmax:
        # Build a Dense layer with same weights but linear activation
        logits_layer = tf.keras.layers.Dense(
            units=last.units,
            activation=None,
            use_bias=last.use_bias,
            name=last.name + "_logits"
        )

        # Create model graph up to penultimate layer
        penultimate_out = softmax_model.layers[-2].output
        logits_out = logits_layer(penultimate_out)

        logits_model = tf.keras.Model(inputs=softmax_model.input, outputs=logits_out, name=softmax_model.name + "_logits")

        # Copy weights from softmax Dense -> linear Dense
        logits_layer.set_weights(last.get_weights())
        return logits_model

    # Case B: last layer isn't a softmax Dense (maybe you already output logits)
    # We'll try to detect if output already sums to 1; if not, treat as logits.
    return softmax_model

baby_logits_model  = make_logits_model(baby_model)
china_logits_model = make_logits_model(china_model)

print("✅ Built logits wrappers")
print("Baby logits output:", baby_logits_model.output_shape)
print("China logits output:", china_logits_model.output_shape)

# ----------------------------
# 5) Helper: run logits + probs with temperature (for fusion later)
# ----------------------------
def tf_logits_and_probs(logits_model, x_np, T=1.0):
    """
    x_np: numpy batch
    returns:
      z: [B,C] logits (numpy)
      p: [B,C] probs after temperature (numpy)
    """
    z = logits_model(x_np, training=False).numpy()
    zT = z / float(T)
    p = tf.nn.softmax(zT, axis=-1).numpy()
    return z, p


Devices: [PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'), PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


FileNotFoundError: [Errno 2] Unable to synchronously open file (unable to open file: name = 'saved_models/best_val_f1_score_epoch510_f10.6540_20251216_140443.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)

In [4]:
def inspect_model(model, name):
    print("\n" + "="*60)
    print(f"Model: {name}")
    print("Input shape :", model.input_shape)
    print("Output shape:", model.output_shape)
    print("Last layer  :", model.layers[-1].name, type(model.layers[-1]).__name__)
    
    # If last layer is Dense, show units + activation
    last = model.layers[-1]
    if hasattr(last, "units"):
        print("Dense units :", last.units)
    if hasattr(last, "activation"):
        print("Activation :", last.activation)

inspect_model(baby_model, "Baby2020")
inspect_model(china_model, "Chinese")



Model: Baby2020
Input shape : (None, 100, 280, 1)
Output shape: (None, 3)
Last layer  : dense_11 Dense
Dense units : 3
Activation : <function softmax at 0x14c12a332710>

Model: Chinese
Input shape : (None, 100, 280, 1)
Output shape: (None, 3)
Last layer  : dense_1 Dense
Dense units : 3
Activation : <function softmax at 0x14c12a332710>


# Load Test

### Feaure set for test for Chinese babycry and Baby2020

# Combined feature module for Feature extration modular

In [5]:
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
from pathlib import Path
from typing import Iterable, List, Tuple, Dict, Optional, Literal

import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
import librosa

# -----------------------------
# General parameters (can be overridden per call)
# -----------------------------
AUDIO_EXTS = (".wav", ".mp3", ".flac", ".ogg", ".m4a")

def _is_audio(p: Path) -> bool:
    return p.suffix.lower() in AUDIO_EXTS

def _interp_resize_2d(feat_2d: np.ndarray, target_len: int) -> np.ndarray:
    """Resize a (C, T) feature matrix along time axis to target_len using linear interpolation."""
    feat_2d = np.asarray(feat_2d)
    assert feat_2d.ndim == 2, f"Expected 2D (C,T), got {feat_2d.shape}"
    C, T = feat_2d.shape
    if T == target_len:
        return feat_2d.astype(np.float32, copy=False)

    orig_idx = np.linspace(0.0, 1.0, num=T, endpoint=True)
    tgt_idx  = np.linspace(0.0, 1.0, num=target_len, endpoint=True)
    out = np.empty((C, target_len), dtype=np.float32)
    for c in range(C):
        f = interp1d(orig_idx, feat_2d[c, :], kind="linear", assume_sorted=True)
        out[c, :] = f(tgt_idx)
    return out

def _choose_target_len(lengths: Iterable[int], policy: Literal["median","max"]="median") -> int:
    arr = np.array(list(lengths), dtype=int)
    if len(arr) == 0:
        raise ValueError("Cannot choose target length: empty lengths.")
    return int(np.median(arr)) if policy == "median" else int(arr.max())

def _stft(
    y: np.ndarray, sr: int,
    n_fft: int, win_length: int, hop_length: int,
    power: float = 1.0, to_db: bool = True
) -> np.ndarray:
    S_complex = librosa.stft(
        y, n_fft=n_fft, hop_length=hop_length, win_length=win_length,
        window='hann', center=True
    )
    S_mag = np.abs(S_complex) ** power
    if to_db:
        # librosa uses 10*log10 for power and 20*log10 for amplitude internally
        S_db = librosa.power_to_db(S_mag, ref=np.max) if power != 1.0 else librosa.amplitude_to_db(S_mag, ref=np.max)
        return S_db.astype(np.float32)
    return S_mag.astype(np.float32)

def _mfcc(
    y: np.ndarray, sr: int,
    n_mfcc: int, n_fft: int, win_length: int, hop_length: int
) -> np.ndarray:
    M = librosa.feature.mfcc(
        y=y, sr=sr, n_mfcc=n_mfcc,
        n_fft=n_fft, hop_length=hop_length, win_length=win_length, center=True
    )
    return M.astype(np.float32)

def _load_audio(path: Path, sr: int) -> np.ndarray:
    y, _ = librosa.load(str(path), sr=sr, mono=True)
    return y.astype(np.float32, copy=False)

def _scan_pairs(
    f0_dir: Path, audio_dir: Path
) -> List[Tuple[str, str, Path, Path]]:
    """
    Pair items by (class_folder, file stem). Returns list of tuples:
      (class_label, stem, f0_npy_path, audio_path)

    - f0_dir structure: f0_dir/<class>/*_f0_wave_conf.npy (or any .npy)
    - audio_dir structure: audio_dir/<class>/*.(wav|mp3|...)
    """
    pairs = []
    if not f0_dir.exists():
        raise FileNotFoundError(f"Missing F0 dir: {f0_dir}")
    if not audio_dir.exists():
        raise FileNotFoundError(f"Missing audio dir: {audio_dir}")

    # index audio by (class, stem) -> path
    audio_index: Dict[Tuple[str,str], Path] = {}
    for class_dir in sorted([d for d in audio_dir.iterdir() if d.is_dir()]):
        cls = class_dir.name
        for ap in class_dir.rglob("*"):
            if ap.is_file() and _is_audio(ap):
                audio_index[(cls, ap.stem)] = ap

    # walk f0 npy files and find matching audio
    for class_dir in sorted([d for d in f0_dir.iterdir() if d.is_dir()]):
        cls = class_dir.name
        for npy in class_dir.rglob("*.npy"):
            stem = npy.stem
            # allow suffixes like *_f0_wave_conf; use split at first suffix
            stem_clean = stem.replace("_f0_wave_conf", "")
            key = (cls, stem_clean)
            if key not in audio_index:
                # fallback: try exact stem
                if (cls, stem) in audio_index:
                    key = (cls, stem)
                else:
                    # couldn't match — skip silently but could warn
                    # print(f"[WARN] No matching audio for {npy}")
                    continue
            pairs.append((cls, key[1], npy, audio_index[key]))

    return pairs

def _load_f0_array(npy_path: Path) -> np.ndarray:
    """
    Expect shape (3, T): [wave_on_grid; f0_hz; confidence].
    """
    arr = np.load(str(npy_path))
    arr = np.asarray(arr)
    if arr.ndim != 2 or arr.shape[0] != 3:
        raise ValueError(f"Expected F0 array shape (3, T). Got {arr.shape} from {npy_path}")
    return arr.astype(np.float32)

def build_split(
    f0_dir: str,
    audio_dir: str,
    *,
    sr: int = 16000,
    frame_ms: float = 30.0,
    hop_ms: float = 15.0,
    n_mfcc: int = 20,
    stft_power: float = 1.0,
    stft_to_db: bool = True,
    fixed_target_len: int | None = None,   # <— NEW
    target_len_policy: Literal["median","max"] = "median",
    modalities: Iterable[Literal["stft","mfcc","f0"]] = ("stft","mfcc","f0"),
    strict_triplet: bool = True,
    min_frames: int = 2,                         # <— NEW
) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]:
    """
    Build a single split (train OR test).

    Returns:
      X  : (N, T, F) float32 — concatenated [modalities] along feature dim
      y  : (N,) int labels (encoded by alphabetical class order)
      df : manifest with columns [class, stem, f0_path, audio_path, T_stft, T_mfcc, T_f0, T_final]

    Notes:
      - Only items where **all requested modalities** exist are kept.
      - Time axis is resized to a **single target length** chosen from STFT (if present),
        else MFCC, else F0 — according to `target_len_policy`.
    """
    f0_dir = Path(f0_dir)
    audio_dir = Path(audio_dir)
    pairs = _scan_pairs(f0_dir, audio_dir)

    if len(pairs) == 0:
        raise ValueError(f"No (F0, audio) pairs found between {f0_dir} and {audio_dir}.")

    # window sizes
    win_length = int(round(sr * frame_ms / 1000.0))
    hop_length = int(round(sr * hop_ms   / 1000.0))
    # next power of two for n_fft
    n_fft = 1
    while n_fft < win_length:
        n_fft <<= 1

    rows = []
    features_list = []
    labels = []

    # first pass: compute raw modality features + their T lengths
    stft_list, mfcc_list, f0_list = [], [], []
    Ts_stft, Ts_mfcc, Ts_f0 = [], [], []

    kept = 0
    skipped_short = 0
    for cls, stem, f0_path, audio_path in pairs:
        try:
            f0_arr = _load_f0_array(f0_path) if "f0" in modalities else None
            y = _load_audio(audio_path, sr=sr)

            stft_feat = _stft(y, sr, n_fft, win_length, hop_length,
                              power=stft_power, to_db=stft_to_db) if "stft" in modalities else None
            mfcc_feat = _mfcc(y, sr, n_mfcc, n_fft, win_length, hop_length) if "mfcc" in modalities else None

            # record lengths
            Ts_stft.append(stft_feat.shape[1] if stft_feat is not None else -1)
            Ts_mfcc.append(mfcc_feat.shape[1] if mfcc_feat is not None else -1)
            Ts_f0.append(f0_arr.shape[1]   if f0_arr   is not None else -1)

            # skip if any requested modality has too few frames
            if strict_triplet:
                if ("stft" in modalities and (stft_feat is None or stft_feat.shape[1] < min_frames)) \
                or ("mfcc" in modalities and (mfcc_feat is None or mfcc_feat.shape[1] < min_frames)) \
                or ("f0"   in modalities and (f0_arr   is None or f0_arr.shape[1]   < min_frames)):
                    skipped_short += 1
                    continue

            stft_list.append(stft_feat)
            mfcc_list.append(mfcc_feat)
            f0_list.append(f0_arr)
            labels.append(cls)
            rows.append({
                "class": cls, "stem": stem,
                "f0_path": str(f0_path), "audio_path": str(audio_path)
            })
            kept += 1
        except Exception as e:
            # Skip problematic files but keep going
            # print(f"[WARN] Skipped ({cls}/{stem}): {e}")
            continue
            
    if kept == 0:
        raise ValueError(f"After loading, no usable items remained (skipped_short={skipped_short}).")

    if len(labels) == 0:
        raise ValueError("After loading, no usable items remained. Check data.")

    # choose target T from the first available modality in priority order
    def _valid_lengths(L): return [x for x in L if x > 0]
    if fixed_target_len is not None: 
        T_target = int(fixed_target_len)
    elif "stft" in modalities and len(_valid_lengths(Ts_stft)) > 0:
        T_target = _choose_target_len(_valid_lengths(Ts_stft), policy=target_len_policy)
    elif "mfcc" in modalities and len(_valid_lengths(Ts_mfcc)) > 0:
        T_target = _choose_target_len(_valid_lengths(Ts_mfcc), policy=target_len_policy)
    elif "f0" in modalities and len(_valid_lengths(Ts_f0)) > 0:
        T_target = _choose_target_len(_valid_lengths(Ts_f0), policy=target_len_policy)
    else:
        raise ValueError("Could not infer target time length from requested modalities.")

    # second pass: resize and concatenate
    X_list = []
    keep_mask = []
    for stft_feat, mfcc_feat, f0_feat in zip(stft_list, mfcc_list, f0_list):
        if strict_triplet:
            # ensure all requested modalities exist
            if ("stft" in modalities and stft_feat is None) or \
               ("mfcc" in modalities and mfcc_feat is None) or \
               ("f0"   in modalities and f0_feat   is None):
                keep_mask.append(False)
                X_list.append(None)
                continue

        channels = []
        if stft_feat is not None:
            channels.append(_interp_resize_2d(stft_feat, T_target))
        if mfcc_feat is not None:
            channels.append(_interp_resize_2d(mfcc_feat, T_target))
        if f0_feat is not None:
            channels.append(_interp_resize_2d(f0_feat, T_target))  # (3, T)

        if len(channels) == 0:
            keep_mask.append(False)
            X_list.append(None)
            continue

        CxT = np.concatenate(channels, axis=0)     # (C_total, T)
        X_list.append(CxT.T.astype(np.float32))    # (T, C_total)
        keep_mask.append(True)

    # filter by keep_mask
    keep_idx = [i for i, k in enumerate(keep_mask) if k]
    if len(keep_idx) == 0:
        raise ValueError("No samples had all requested modalities (strict_triplet=True).")

    X = np.stack([X_list[i] for i in keep_idx], axis=0)   # (N, T, F)
    y_labels = [labels[i] for i in keep_idx]
    df = pd.DataFrame([rows[i] for i in keep_idx])
    df["T_final"] = T_target
    if "stft" in modalities: df["T_stft"] = [Ts_stft[i] for i in keep_idx]
    if "mfcc" in modalities: df["T_mfcc"] = [Ts_mfcc[i] for i in keep_idx]
    if "f0"   in modalities: df["T_f0"]   = [Ts_f0[i]   for i in keep_idx]

    # encode labels alphabetically (stable & reproducible)
    classes = sorted(pd.unique(df["class"]))
    cls_to_id = {c:i for i,c in enumerate(classes)}
    y = np.array([cls_to_id[c] for c in y_labels], dtype=np.int64)

    return X, y, df.assign(label_id=[cls_to_id[c] for c in y_labels])

def prepare_train_test(
    f0_conf_wave_train_dir: str,
    f0_conf_wave_test_dir: str,
    audio_train_dir: str,
    audio_test_dir: str,
    *,
    sr: int = 16000,
    frame_ms: float = 30.0,
    hop_ms: float = 15.0,
    n_mfcc: int = 20,
    stft_power: float = 1.0,
    stft_to_db: bool = True,
    target_len_policy: Literal["median","max"] = "median",
    modalities: Iterable[Literal["stft","mfcc","f0"]] = ("stft","mfcc","f0"),
    strict_triplet: bool = True,
    fixed_target_len: int | None = None,   # <— NEW
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, Dict[int,str], pd.DataFrame, pd.DataFrame]:
    """
    Convenience wrapper to build train & test with identical settings.

    Returns:
      X_train, y_train, X_test, y_test,
      id_to_class (dict), train_manifest (df), test_manifest (df)
    """
    X_train, y_train, df_train = build_split(
        f0_conf_wave_train_dir, audio_train_dir,
        sr=sr, frame_ms=frame_ms, hop_ms=hop_ms, n_mfcc=n_mfcc,
        stft_power=stft_power, stft_to_db=stft_to_db,
        target_len_policy=target_len_policy, modalities=modalities,
        strict_triplet=strict_triplet,
        fixed_target_len=fixed_target_len,         # <— pass through
        min_frames=2,                    # <— NEW
    )
    X_test, y_test, df_test = build_split(
        f0_conf_wave_test_dir, audio_test_dir,
        sr=sr, frame_ms=frame_ms, hop_ms=hop_ms, n_mfcc=n_mfcc,
        stft_power=stft_power, stft_to_db=stft_to_db,
        target_len_policy=target_len_policy, modalities=modalities,
        strict_triplet=strict_triplet,
        fixed_target_len=fixed_target_len,         # <— pass through
        min_frames=2,                    # <— NEW
    )

    # harmonize label ids across splits (use train mapping)
    classes = sorted(pd.unique(df_train["class"]))
    id_to_class = {i:c for i,c in enumerate(classes)}
    cls_to_id = {c:i for i,c in id_to_class.items()}

    # remap test if classes overlap; unknown classes get new ids at the end
    test_classes = list(pd.unique(df_test["class"]))
    for c in test_classes:
        if c not in cls_to_id:
            cls_to_id[c] = len(cls_to_id)
            id_to_class[cls_to_id[c]] = c

    y_train = np.array([cls_to_id[c] for c in df_train["class"].tolist()], dtype=np.int64)
    y_test  = np.array([cls_to_id[c] for c in df_test["class"].tolist()], dtype=np.int64)

    df_train = df_train.assign(label_id=y_train)
    df_test  = df_test.assign(label_id=y_test)

    return X_train, y_train, X_test, y_test, id_to_class, df_train, df_test


# Also do the same  the same for chinese baby cry without reverbration 

In [6]:
# from feature_fusion import prepare_train_test

f0_conf_wave_train_chinese_dir = "Chinese Babycry/Chinese baby cry train_f0"
f0_conf_wave_test_chinese_dir  = "Chinese Babycry/Chinese baby cry test_f0"

audio_train_dir_chinese        = "Chinese Babycry/Train_Split_80"
audio_test_dir_chinese         = "Chinese Babycry/Test_Split_20"

X_train_chinese, y_train_chinese, X_test_chinese, y_test_chinese, id2cls_chinese, train_manifest_chinese, test_manifest_chinese = prepare_train_test(
    f0_conf_wave_train_dir=f0_conf_wave_train_chinese_dir,
    f0_conf_wave_test_dir=f0_conf_wave_test_chinese_dir,
    audio_train_dir=audio_train_dir_chinese,
    audio_test_dir=audio_test_dir_chinese,
    sr=16000,
    frame_ms=30.0,
    hop_ms=15.0,
    n_mfcc=20,
    modalities=("stft", "mfcc", "f0"),   # choose any subset e.g. ("mfcc","f0")
    fixed_target_len=100, 
    target_len_policy="median",           # or "max"
    strict_triplet=True,                  # require all requested modalities per sample
)

print("X_train_chinese:", X_train_chinese.shape, "X_test_cinese:", X_test_chinese.shape)
print("y_train_chinese:", y_train_chinese.shape, "y_test_chinese:", y_test_chinese.shape)
print("Classes_chinese:", id2cls_chinese)
display(train_manifest_chinese.head())
display(test_manifest_chinese.head())


X_train_chinese: (734, 100, 280) X_test_cinese: (184, 100, 280)
y_train_chinese: (734,) y_test_chinese: (184,)
Classes_chinese: {0: 'awake', 1: 'diaper', 2: 'hug', 3: 'hungry', 4: 'sleepy', 5: 'uncomfortable'}


Unnamed: 0,class,stem,f0_path,audio_path,T_final,T_stft,T_mfcc,T_f0,label_id
0,awake,awake_0,Chinese Babycry/Chinese baby cry train_f0/awak...,Chinese Babycry/Train_Split_80/awake/awake_0.wav,100,1051,1051,79,0
1,awake,awake_100,Chinese Babycry/Chinese baby cry train_f0/awak...,Chinese Babycry/Train_Split_80/awake/awake_100...,100,1039,1039,78,0
2,awake,awake_101,Chinese Babycry/Chinese baby cry train_f0/awak...,Chinese Babycry/Train_Split_80/awake/awake_101...,100,1713,1713,129,0
3,awake,awake_102,Chinese Babycry/Chinese baby cry train_f0/awak...,Chinese Babycry/Train_Split_80/awake/awake_102...,100,1148,1148,87,0
4,awake,awake_103,Chinese Babycry/Chinese baby cry train_f0/awak...,Chinese Babycry/Train_Split_80/awake/awake_103...,100,1048,1048,79,0


Unnamed: 0,class,stem,f0_path,audio_path,T_final,T_stft,T_mfcc,T_f0,label_id
0,awake,awake_110,Chinese Babycry/Chinese baby cry test_f0/awake...,Chinese Babycry/Test_Split_20/awake/awake_110.wav,100,1292,1292,97,0
1,awake,awake_113,Chinese Babycry/Chinese baby cry test_f0/awake...,Chinese Babycry/Test_Split_20/awake/awake_113.wav,100,1112,1112,84,0
2,awake,awake_116,Chinese Babycry/Chinese baby cry test_f0/awake...,Chinese Babycry/Test_Split_20/awake/awake_116.wav,100,1074,1074,81,0
3,awake,awake_117,Chinese Babycry/Chinese baby cry test_f0/awake...,Chinese Babycry/Test_Split_20/awake/awake_117.wav,100,1039,1039,78,0
4,awake,awake_131,Chinese Babycry/Chinese baby cry test_f0/awake...,Chinese Babycry/Test_Split_20/awake/awake_131.wav,100,1168,1168,88,0


# Filter 3 moods for Chinese baby cry

In [7]:
import numpy as np

# Classes of interest
keep_classes = [1, 4, 5]   # diaper=1, sleepy=4, uncomfortable=5
class_names = {1: "Diaper", 4: "Sleepy", 5: "Uncomfortable"}

# --- Train filtering ---
mask_train = np.isin(y_train_chinese, keep_classes)
X_train_split = X_train_chinese[mask_train]
y_train_split = y_train_chinese[mask_train]

# --- Test filtering ---
mask_test = np.isin(y_test_chinese, keep_classes)
X_val_split = X_test_chinese[mask_test]
y_val_split = y_test_chinese[mask_test]

# --- Re-encode labels to [0,1,2] ---
unique_classes = sorted(keep_classes)  # [1,4,5]
class2newid = {old: new for new, old in enumerate(unique_classes)}
id2cls_merge_3mood = {new: class_names[old] for old, new in class2newid.items()}

y_train_split = np.array([class2newid[y] for y in y_train_split])
y_val_split = np.array([class2newid[y] for y in y_val_split])

# --- Add channel dimension ---
X_train_split = np.expand_dims(X_train_split, axis=-1)
X_val_split = np.expand_dims(X_val_split, axis=-1)

print("Class mapping:", id2cls_merge_3mood)
print("Train shape:", X_train_split.shape, " Val shape:", X_val_split.shape)
print("Unique encoded y:", np.unique(y_train_split))


Class mapping: {0: 'Diaper', 1: 'Sleepy', 2: 'Uncomfortable'}
Train shape: (350, 100, 280, 1)  Val shape: (88, 100, 280, 1)
Unique encoded y: [0 1 2]


# Baby2020 Test

# Also do the same (Test & Train) for baby 2020 M0 to 3 or 9 ans merge months

In [8]:
from pathlib import Path
import numpy as np
import pandas as pd

def _remap_labels(y_old: np.ndarray, id2cls: dict, global_cls2id: dict) -> np.ndarray:
    return np.array([global_cls2id[id2cls[int(k)]] for k in y_old], dtype=np.int64)

def _append_month_feature(X, month_idx, n_months, mode="onehot"):
    """
    Append month info to X along feature dim.
    mode="onehot" -> +n_months features; mode="index" -> +1 feature in [0,1].
    """
    N, T, F = X.shape
    month_idx = np.asarray(month_idx, dtype=int)
    if mode == "onehot":
        X_out = np.empty((N, T, F + n_months), dtype=np.float32)
        X_out[..., :F] = X
        for i in range(N):
            one = np.zeros((T, n_months), dtype=np.float32)
            one[:, month_idx[i]] = 1.0
            X_out[i, :, F:] = one
        return X_out
    elif mode == "index":
        denom = max(1, n_months - 1)
        X_out = np.empty((N, T, F + 1), dtype=np.float32)
        X_out[..., :F] = X
        for i in range(N):
            X_out[i, :, F:] = float(month_idx[i]) / denom
        return X_out
    else:
        return X  # no change

def load_all_months_train(
    base_path: str,
    *,
    months=("0Month","1Month","2Month","3Month","4Month","5Month","6Month","7Month","8Month","9Month"),
    f0_root_subdir="Baby2020/Baby2020_f0_wave_conf_arrays",   # sits inside base_path
    audio_month_subdir="",                           # usually empty: classes live directly under each month
    sr=16000, frame_ms=30.0, hop_ms=15.0, n_mfcc=20,
    modalities=("stft","mfcc","f0"),
    fixed_target_len=100, target_len_policy="median", strict_triplet=True,
    label_from: Literal["class","month"] = "class",
    month_feature_mode: Literal["onehot","index",None] = None,
):
    """
    Build ONE big training set from month folders only (no test split).

    Returns:
      X_all (N, T, F[+month_feat]), y_all (N,),
      id2cls_all (dict), df_all (manifest with month_folder), months_used (list)
    """
    base = Path(base_path)

    # lists to collect month-wise outputs
    X_list, y_list = [], []
    df_list = []
    id2cls_per_month = []
    class_name_set = set()
    months_used = []
    month_idx_list = []

    # Load each month by calling your existing build_split()
    for mi, m in enumerate(months):
        audio_dir = base / m / audio_month_subdir
        f0_dir    = base / f0_root_subdir / m

        if not (audio_dir.exists() and f0_dir.exists()):
            print(f"[WARN] Skipping {m}: missing {audio_dir} or {f0_dir}")
            continue

        # build_split expects directories with CLASS subfolders inside
        X_m, y_m, df_m = build_split(
            f0_dir=str(f0_dir),
            audio_dir=str(audio_dir),
            sr=sr, frame_ms=frame_ms, hop_ms=hop_ms, n_mfcc=n_mfcc,
            stft_power=1.0, stft_to_db=True,
            fixed_target_len=fixed_target_len,
            target_len_policy=target_len_policy,
            modalities=modalities,
            strict_triplet=strict_triplet,
        )

        # Add month info to manifest
        df_m = df_m.copy()
        df_m["month_folder"] = m
        df_list.append(df_m)

        # Record id2cls for this month (class mapping from build_split)
        # and collect class names for global map
        # If label_from="month", we'll override later anyway.
        local_classes = sorted(pd.unique(df_m["class"]))
        id2cls_m = {i: c for i, c in enumerate(local_classes)}
        id2cls_per_month.append(id2cls_m)
        class_name_set.update(local_classes)

        X_list.append(X_m)
        y_list.append(y_m)
        months_used.append(m)
        month_idx_list.append(np.full(len(y_m), mi, dtype=int))

        print(f"[OK] {m}: {X_m.shape[0]} samples, X shape {X_m.shape}")
        

    if not X_list:
        raise RuntimeError("No months loaded. Check base_path and folder names.")

    # Merge across months
    X_all = np.concatenate(X_list, axis=0)
    df_all = pd.concat(df_list, ignore_index=True)
    month_idx = np.concatenate(month_idx_list, axis=0)

    if label_from == "class":
        # Build global class mapping and remap y from each month
        all_classes = sorted(class_name_set)
        cls2gid = {c:i for i,c in enumerate(all_classes)}
        id2cls_all = {i:c for c,i in cls2gid.items()}
        y_all = np.concatenate([
            _remap_labels(y_m, id2cls_m, cls2gid) for y_m, id2cls_m in zip(y_list, id2cls_per_month)
        ], axis=0)
        df_all["label_id"] = y_all
    else:
        # Label is the month itself (0..len(months_used)-1)
        month2id = {m:i for i,m in enumerate(months_used)}
        y_all = np.array([month2id[m] for m in df_all["month_folder"]], dtype=np.int64)
        id2cls_all = {i:m for m,i in month2id.items()}
        df_all["label_id"] = y_all

    # Optional: append month feature to X
    if month_feature_mode in ("onehot","index"):
        X_all = _append_month_feature(X_all, month_idx, n_months=len(months_used), mode=month_feature_mode)

    return X_all, y_all, id2cls_all, df_all, months_used


In [9]:
base = r"Baby2020/"  # <- use your real absolute path

X_all, y_all, id2cls_all, df_all, months_used = load_all_months_train(
    base_path=base,
    months=("0Month","1Month","2Month","3Month","4Month","5Month","6Month","7Month","8Month","9Month"),
    f0_root_subdir="Baby2020_f0_wave_conf_arrays",  # under base, contains 0Month..6Month
    audio_month_subdir="",                          # classes live directly under each month folder
    # features
    modalities=("stft","mfcc","f0"),
    fixed_target_len=100,
    # labels: choose "class" (default) or "month"
    label_from="class",
    # optionally append month to X as features:
    month_feature_mode=None,   # or "onehot" / "index"
)

print("Months loaded:", months_used)
print("X_all:", X_all.shape, "y_all:", y_all.shape)
print("Classes:", id2cls_all)
print(df_all[["class","month_folder","label_id"]].head())


[OK] 0Month: 500 samples, X shape (500, 100, 280)
[OK] 1Month: 550 samples, X shape (550, 100, 280)
[OK] 2Month: 1070 samples, X shape (1070, 100, 280)
[OK] 3Month: 1340 samples, X shape (1340, 100, 280)




[OK] 4Month: 1709 samples, X shape (1709, 100, 280)
[OK] 5Month: 1650 samples, X shape (1650, 100, 280)
[OK] 6Month: 1010 samples, X shape (1010, 100, 280)
[OK] 7Month: 320 samples, X shape (320, 100, 280)
[OK] 8Month: 399 samples, X shape (399, 100, 280)
[OK] 9Month: 850 samples, X shape (850, 100, 280)
Months loaded: ['0Month', '1Month', '2Month', '3Month', '4Month', '5Month', '6Month', '7Month', '8Month', '9Month']
X_all: (9398, 100, 280) y_all: (9398,)
Classes: {0: 'Hungry', 1: 'NeedHug', 2: 'Sleepy', 3: 'Temper', 4: 'UnComfy', 5: 'Uncomfy', 6: 'Wakeup'}
    class month_folder  label_id
0  Hungry       0Month         0
1  Hungry       0Month         0
2  Hungry       0Month         0
3  Hungry       0Month         0
4  Hungry       0Month         0


# Make a noneleakage selection randomly test train 20 80 for baby2020

In [10]:
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold


def make_group_stratified_folds(
    df_all: pd.DataFrame,
    X_all: np.ndarray,
    n_splits: int = 5,
    group_from: str = "audio_path",
    class_col: str = "class",
    month_col: str = "month_folder",
    selected_classes: list | None = None,
    selected_months: list | None = None,
    shuffle: bool = True,
    random_state: int | None = None,
    verbose: bool = True
):
    """
    Leakage-safe, group-aware, stratified K-fold splits
    with optional filtering and automatic label re-indexing.

    IMPORTANT:
    - Labels are re-indexed AFTER class/month selection
      so they are contiguous: {0, 1, ..., K-1}
    """

    assert len(df_all) == len(X_all), "df_all and X_all must be aligned"

    # --------------------------------------------------
    # 0. Optional filtering (NO leakage)
    # --------------------------------------------------
    mask = np.ones(len(df_all), dtype=bool)

    if selected_classes is not None:
        mask &= df_all[class_col].isin(selected_classes)

    if selected_months is not None:
        mask &= df_all[month_col].isin(selected_months)

    df = df_all.loc[mask].copy().reset_index(drop=True)
    X = X_all[mask]

    if verbose:
        print(f"\nAfter filtering: {len(df)} samples")
        print("Class counts:", df[class_col].value_counts().to_dict())
        print("Month counts:", df[month_col].value_counts().to_dict())

    # --------------------------------------------------
    # 1. Re-index labels AFTER selection (CRITICAL)
    # --------------------------------------------------
    unique_classes = sorted(df[class_col].unique())
    class_to_label = {cls: i for i, cls in enumerate(unique_classes)}
    label_to_class = {i: cls for cls, i in class_to_label.items()}

    df["label_id"] = df[class_col].map(class_to_label)

    num_classes = len(unique_classes)

    if verbose:
        print("\nLabel re-indexing (used for training):")
        for cls, idx in class_to_label.items():
            print(f"  {cls} → {idx}")

    # --------------------------------------------------
    # 2. Extract group_id (leakage-safe)
    # --------------------------------------------------
    df["group_id"] = (
        df[group_from]
        .str.split("/")
        .str[-1]
        .str.split("_")
        .str[0]
    )

    # --------------------------------------------------
    # 3. Group-level dataframe for stratification
    # --------------------------------------------------
    group_df = (
        df.groupby("group_id")
        .agg({
            class_col: "first",
            month_col: "first"
        })
        .reset_index()
    )

    group_df["strata"] = (
        group_df[class_col].astype(str) + "_" +
        group_df[month_col].astype(str)
    )

    # --------------------------------------------------
    # 4. Stratified K-fold on GROUPS
    # --------------------------------------------------
    skf = StratifiedKFold(
        n_splits=n_splits,
        shuffle=shuffle,
        random_state=random_state
    )

    folds = []

    # --------------------------------------------------
    # 5. Build folds
    # --------------------------------------------------
    for fold_idx, (train_g_idx, test_g_idx) in enumerate(
        skf.split(group_df["group_id"], group_df["strata"])
    ):
        train_group_ids = set(group_df.iloc[train_g_idx]["group_id"])
        test_group_ids  = set(group_df.iloc[test_g_idx]["group_id"])

        train_mask = df["group_id"].isin(train_group_ids)
        test_mask  = df["group_id"].isin(test_group_ids)

        X_train = X[train_mask.values]
        X_test  = X[test_mask.values]

        df_train = df.loc[train_mask].reset_index(drop=True)
        df_test  = df.loc[test_mask].reset_index(drop=True)

        # Sanity checks
        assert train_group_ids.isdisjoint(test_group_ids)
        assert len(X_train) + len(X_test) == len(X)

        if verbose:
            print(
                f"\nFold {fold_idx}: "
                f"train={len(X_train)} ({len(X_train)/len(X):.2%}), "
                f"test={len(X_test)} ({len(X_test)/len(X):.2%})"
            )

        folds.append({
            "fold": fold_idx,
            "X_train": X_train,
            "X_test": X_test,
            "df_train": df_train,
            "df_test": df_test,
            "num_classes": num_classes,
            "class_to_label": class_to_label,
            "label_to_class": label_to_class,
            "train_group_ids": train_group_ids,
            "test_group_ids": test_group_ids
        })

    return folds


In [11]:
folds = make_group_stratified_folds(
    df_all,
    X_all,
    n_splits=20,
    selected_months=["0Month", "1Month", "2Month", "3Month"],
    selected_classes=["Sleepy", "Hungry", "Wakeup"],
    random_state=42,
    verbose=True
)



After filtering: 2190 samples
Class counts: {'Hungry': 1000, 'Sleepy': 600, 'Wakeup': 590}
Month counts: {'3Month': 750, '2Month': 700, '0Month': 400, '1Month': 340}

Label re-indexing (used for training):
  Hungry → 0
  Sleepy → 1
  Wakeup → 2

Fold 0: train=2085 (95.21%), test=105 (4.79%)

Fold 1: train=2078 (94.89%), test=112 (5.11%)





Fold 2: train=2092 (95.53%), test=98 (4.47%)

Fold 3: train=2080 (94.98%), test=110 (5.02%)

Fold 4: train=2070 (94.52%), test=120 (5.48%)

Fold 5: train=2052 (93.70%), test=138 (6.30%)

Fold 6: train=2058 (93.97%), test=132 (6.03%)

Fold 7: train=2040 (93.15%), test=150 (6.85%)

Fold 8: train=2027 (92.56%), test=163 (7.44%)

Fold 9: train=2080 (94.98%), test=110 (5.02%)

Fold 10: train=2129 (97.21%), test=61 (2.79%)

Fold 11: train=2092 (95.53%), test=98 (4.47%)

Fold 12: train=2108 (96.26%), test=82 (3.74%)

Fold 13: train=2038 (93.06%), test=152 (6.94%)

Fold 14: train=2082 (95.07%), test=108 (4.93%)

Fold 15: train=2066 (94.34%), test=124 (5.66%)

Fold 16: train=2117 (96.67%), test=73 (3.33%)

Fold 17: train=2119 (96.76%), test=71 (3.24%)

Fold 18: train=2079 (94.93%), test=111 (5.07%)

Fold 19: train=2118 (96.71%), test=72 (3.29%)


In [12]:
def print_group_statistics(folds, top_k: int = 5):
    """
    Print statistics of number of samples per (group_id, class, month)
    for each fold, separately for train and test.
    """

    for f in folds:
        fold_id = f["fold"]

        print("\n" + "=" * 80)
        print(f"FOLD {fold_id}")
        print("=" * 80)

        for split_name, df_split in [
            ("TRAIN", f["df_train"]),
            ("TEST",  f["df_test"])
        ]:
            print(f"\n--- {split_name} SPLIT ---")

            # Count samples per (group_id, class, month)
            counts = (
                df_split
                .groupby(["group_id", "class", "month_folder"])
                .size()
                .reset_index(name="num_samples")
            )

            # Summary statistics
            print(f"Number of unique IDs: {counts['group_id'].nunique()}")
            print(
                "Samples per ID: "
                f"min={counts['num_samples'].min()}, "
                f"max={counts['num_samples'].max()}, "
                f"mean={counts['num_samples'].mean():.2f}"
            )

            # Show largest groups
            print(f"\nTop {top_k} IDs with most samples:")
            print(
                counts
                .sort_values("num_samples", ascending=False)
                .head(top_k)
                .to_string(index=False)
            )


print_group_statistics(folds, top_k=5)



FOLD 0

--- TRAIN SPLIT ---
Number of unique IDs: 250
Samples per ID: min=1, max=45, mean=8.34

Top 5 IDs with most samples:
       group_id  class month_folder  num_samples
Wakeup00MG00003 Wakeup       0Month           45
Wakeup00MG00005 Wakeup       0Month           40
Wakeup00MG00002 Wakeup       0Month           36
Sleepy03MB00011 Sleepy       3Month           34
Wakeup02MB00005 Wakeup       2Month           34

--- TEST SPLIT ---
Number of unique IDs: 14
Samples per ID: min=1, max=22, mean=7.50

Top 5 IDs with most samples:
       group_id  class month_folder  num_samples
Sleepy03MB00008 Sleepy       3Month           22
Hungry00MB00009 Hungry       0Month           10
Wakeup02MU00017 Wakeup       2Month           10
Hungry02MB00006 Hungry       2Month            9
Hungry01MB00004 Hungry       1Month            7

FOLD 1

--- TRAIN SPLIT ---
Number of unique IDs: 250
Samples per ID: min=1, max=45, mean=8.31

Top 5 IDs with most samples:
       group_id  class month_folder  num_sam

# Baby2020 Fold0 Test and Train and y Train and y Yest

In [13]:
# --- pick fold 0 ---
fold0 = folds[0]   # or folds[fold_index]

# --- features ---
X_train0 = fold0["X_train"]
X_test0  = fold0["X_test"]

# --- labels (integer ids made in make_group_stratified_folds) ---
y_train0 = fold0["df_train"]["label_id"].to_numpy()
y_test0  = fold0["df_test"]["label_id"].to_numpy()

print("Fold0 X_train:", X_train0.shape, "y_train:", y_train0.shape)
print("Fold0 X_test :", X_test0.shape,  "y_test :", y_test0.shape)

# --- optional: see class mapping for this fold setup ---
print("class_to_label:", fold0["class_to_label"])
print("label_to_class:", fold0["label_to_class"])


Fold0 X_train: (2085, 100, 280) y_train: (2085,)
Fold0 X_test : (105, 100, 280) y_test : (105,)
class_to_label: {'Hungry': 0, 'Sleepy': 1, 'Wakeup': 2}
label_to_class: {0: 'Hungry', 1: 'Sleepy', 2: 'Wakeup'}


# Chinese Babycry Test and Train and y Train and y Test

In [14]:
import numpy as np

# ======================================================
# Chinese BabyCry: select 3 moods
# ======================================================

# Original class IDs to keep
Chinese_babyCry_keep_classes = [1, 4, 5]   # diaper=1, sleepy=4, uncomfortable=5

Chinese_babyCry_class_names = {
    1: "Diaper",
    4: "Sleepy",
    5: "Uncomfortable"
}

# ======================================================
# Train split (Chinese BabyCry)
# ======================================================
Chinese_babyCry_mask_train = np.isin(y_train_chinese, Chinese_babyCry_keep_classes)

X_train_Chinese_babyCry = X_train_chinese[Chinese_babyCry_mask_train]
y_train_Chinese_babyCry = y_train_chinese[Chinese_babyCry_mask_train]

# ======================================================
# Test / Validation split (Chinese BabyCry)
# ======================================================
Chinese_babyCry_mask_test = np.isin(y_test_chinese, Chinese_babyCry_keep_classes)

X_val_Chinese_babyCry = X_test_chinese[Chinese_babyCry_mask_test]
y_val_Chinese_babyCry = y_test_chinese[Chinese_babyCry_mask_test]

# ======================================================
# Re-encode labels to {0,1,2}
# ======================================================
Chinese_babyCry_unique_classes = sorted(Chinese_babyCry_keep_classes)  # [1,4,5]
Chinese_babyCry_class2newid = {
    old: new for new, old in enumerate(Chinese_babyCry_unique_classes)
}

Chinese_babyCry_id2label = {
    new: Chinese_babyCry_class_names[old]
    for old, new in Chinese_babyCry_class2newid.items()
}

y_train_Chinese_babyCry = np.array(
    [Chinese_babyCry_class2newid[y] for y in y_train_Chinese_babyCry],
    dtype=np.int64
)

y_val_Chinese_babyCry = np.array(
    [Chinese_babyCry_class2newid[y] for y in y_val_Chinese_babyCry],
    dtype=np.int64
)

# ======================================================
# Add channel dimension (CNN expects [..., 1])
# ======================================================
X_train_Chinese_babyCry = np.expand_dims(X_train_Chinese_babyCry, axis=-1)
X_val_Chinese_babyCry   = np.expand_dims(X_val_Chinese_babyCry, axis=-1)

# ======================================================
# Sanity checks
# ======================================================
print("Chinese BabyCry 3-Mood Mapping:")
for k, v in Chinese_babyCry_id2label.items():
    print(f"  {k} → {v}")

print("\nShapes:")
print("  X_train_Chinese_babyCry:", X_train_Chinese_babyCry.shape)
print("  y_train_Chinese_babyCry:", y_train_Chinese_babyCry.shape)
print("  X_val_Chinese_babyCry  :", X_val_Chinese_babyCry.shape)
print("  y_val_Chinese_babyCry  :", y_val_Chinese_babyCry.shape)

print("\nUnique encoded train labels:", np.unique(y_train_Chinese_babyCry))
print("Unique encoded val labels  :", np.unique(y_val_Chinese_babyCry))


Chinese BabyCry 3-Mood Mapping:
  0 → Diaper
  1 → Sleepy
  2 → Uncomfortable

Shapes:
  X_train_Chinese_babyCry: (350, 100, 280, 1)
  y_train_Chinese_babyCry: (350,)
  X_val_Chinese_babyCry  : (88, 100, 280, 1)
  y_val_Chinese_babyCry  : (88,)

Unique encoded train labels: [0 1 2]
Unique encoded val labels  : [0 1 2]


# Re evaluate Loaded models  accuracy and F1 score on test baby2020 and Chinese baby cry 

In [15]:
import numpy as np
from sklearn.metrics import (
    accuracy_score, f1_score, matthews_corrcoef,
    classification_report, confusion_matrix
)

def evaluate_keras_model(
    model,
    X_test,
    y_test,
    label_names,
    name="Model",
    batch_size=32
):
    """
    model: tf.keras.Model (softmax output is fine)
    X_test: np array, shape [N, T, F, 1] (or whatever model expects)
    y_test: int labels [N] with values in {0..C-1}
    label_names: list of class names in EXACT output order of model
    """
    # ---------- sanity checks ----------
    C = model.output_shape[-1]
    assert C == len(label_names), f"{name}: output classes={C} but label_names={len(label_names)}"
    assert X_test.shape[0] == len(y_test), f"{name}: X_test and y_test length mismatch"
    assert np.min(y_test) >= 0 and np.max(y_test) < C, \
        f"{name}: y_test has labels outside [0,{C-1}]. min={np.min(y_test)}, max={np.max(y_test)}"

    # ---------- predict ----------
    probs = model.predict(X_test, batch_size=batch_size, verbose=0)
    y_pred = np.argmax(probs, axis=1)

    # ---------- metrics ----------
    acc = accuracy_score(y_test, y_pred)
    f1_macro = f1_score(y_test, y_pred, average="macro")
    f1_weighted = f1_score(y_test, y_pred, average="weighted")
    mcc = matthews_corrcoef(y_test, y_pred)

    print("\n" + "="*80)
    print(f"✅ {name} — Test Results")
    print("="*80)
    print(f"Accuracy     : {acc:.4f}")
    print(f"F1 (macro)   : {f1_macro:.4f}")
    print(f"F1 (weighted): {f1_weighted:.4f}")
    print(f"MCC          : {mcc:.4f}")

    print("\nClassification report:")
    print(classification_report(y_test, y_pred, target_names=label_names, digits=4))

    cm = confusion_matrix(y_test, y_pred)
    print("Confusion matrix (rows=true, cols=pred):")
    print(cm)

    return {
        "acc": acc,
        "f1_macro": f1_macro,
        "f1_weighted": f1_weighted,
        "mcc": mcc,
        "cm": cm,
        "y_pred": y_pred,
        "probs": probs
    }


In [16]:
# Example variable names — replace with yours:
# X_test_Baby2020 = X_val_Baby2020 or X_test_split_Baby2020
# y_test_Baby2020 = y_val_Baby2020 or y_test_split_Baby2020

baby_results = evaluate_keras_model(
    baby_model,
    X_train0,
    y_train0,
    label_names=BABY2020_LABELS,
    name="Baby2020"
)


I0000 00:00:1767555659.015342 1347469 cuda_dnn.cc:529] Loaded cuDNN version 90800



✅ Baby2020 — Test Results
Accuracy     : 0.9669
F1 (macro)   : 0.9661
F1 (weighted): 0.9670
MCC          : 0.9491

Classification report:
              precision    recall  f1-score   support

      Sleepy     0.9756    0.9664    0.9710       952
      Hungry     0.9963    0.9450    0.9700       564
      Wakeup     0.9275    0.9895    0.9575       569

    accuracy                         0.9669      2085
   macro avg     0.9665    0.9670    0.9661      2085
weighted avg     0.9681    0.9669    0.9670      2085

Confusion matrix (rows=true, cols=pred):
[[920   1  31]
 [ 18 533  13]
 [  5   1 563]]


In [17]:
# Example variable names — replace with yours:
# X_test_Baby2020 = X_val_Baby2020 or X_test_split_Baby2020
# y_test_Baby2020 = y_val_Baby2020 or y_test_split_Baby2020

baby_results = evaluate_keras_model(
    baby_model,
    X_test0,
    y_test0,
    label_names=BABY2020_LABELS,
    name="Baby2020"
)


✅ Baby2020 — Test Results
Accuracy     : 0.6857
F1 (macro)   : 0.6540
F1 (weighted): 0.6767
MCC          : 0.5311

Classification report:
              precision    recall  f1-score   support

      Sleepy     0.7843    0.8333    0.8081        48
      Hungry     0.7500    0.4167    0.5357        36
      Wakeup     0.5000    0.8095    0.6182        21

    accuracy                         0.6857       105
   macro avg     0.6781    0.6865    0.6540       105
weighted avg     0.7157    0.6857    0.6767       105

Confusion matrix (rows=true, cols=pred):
[[40  3  5]
 [ 9 15 12]
 [ 2  2 17]]


In [18]:
china_results = evaluate_keras_model(
    china_model,
    X_train_Chinese_babyCry,
    y_train_Chinese_babyCry,
    label_names=CHINESE_LABELS,
    name="Chinese BabyCry"
)



✅ Chinese BabyCry — Test Results
Accuracy     : 0.9971
F1 (macro)   : 0.9972
F1 (weighted): 0.9971
MCC          : 0.9957

Classification report:
               precision    recall  f1-score   support

       Diaper     1.0000    1.0000    1.0000       107
Uncomfortable     0.9914    1.0000    0.9957       115
       Sleepy     1.0000    0.9922    0.9961       128

     accuracy                         0.9971       350
    macro avg     0.9971    0.9974    0.9972       350
 weighted avg     0.9972    0.9971    0.9971       350

Confusion matrix (rows=true, cols=pred):
[[107   0   0]
 [  0 115   0]
 [  0   1 127]]


In [19]:
china_results = evaluate_keras_model(
    china_model,
    X_val_Chinese_babyCry,
    y_val_Chinese_babyCry,
    label_names=CHINESE_LABELS,
    name="Chinese BabyCry"
)


✅ Chinese BabyCry — Test Results
Accuracy     : 0.7273
F1 (macro)   : 0.7251
F1 (weighted): 0.7256
MCC          : 0.5928

Classification report:
               precision    recall  f1-score   support

       Diaper     0.7000    0.7778    0.7368        27
Uncomfortable     0.7826    0.6207    0.6923        29
       Sleepy     0.7143    0.7812    0.7463        32

     accuracy                         0.7273        88
    macro avg     0.7323    0.7266    0.7251        88
 weighted avg     0.7324    0.7273    0.7256        88

Confusion matrix (rows=true, cols=pred):
[[21  2  4]
 [ 5 18  6]
 [ 4  3 25]]


In [20]:
y_train_Chinese_babyCry

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,

# Test Baby2020 model on Chinese dataset and vise versa before Ensemble mood

In [21]:
china_results = evaluate_keras_model(
    china_model,
    X_test0,
    y_test0,
    label_names=CHINESE_LABELS,
    name="Chinese BabyCry"
)


ValueError: as_list() is not defined on an unknown TensorShape.

In [None]:
baby_results = evaluate_keras_model(
    baby_model,
    X_train_Chinese_babyCry ,
    y_train_Chinese_babyCry ,
    label_names=BABY2020_LABELS,
    name="Baby2020"
)

# Temperature scaling (optimize scalar T)

### This learns one scalar T per model using its validation set (D_m^val).

In [None]:
# @torch.no_grad()
# def collect_logits_and_labels(model, loader, device):
#     model.eval()
#     all_logits, all_y = [], []
#     for xb, yb in loader:
#         xb = xb.to(device)
#         yb = yb.to(device)
#         logits = model(xb)
#         all_logits.append(logits.detach().cpu())
#         all_y.append(yb.detach().cpu())
#     return torch.cat(all_logits, dim=0), torch.cat(all_y, dim=0)

# class TemperatureScaler(nn.Module):
#     def __init__(self, init_T=1.0):
#         super().__init__()
#         # optimize logT for positivity: T = exp(logT)
#         self.logT = nn.Parameter(torch.tensor([math.log(init_T)], dtype=torch.float32))

#     def forward(self, logits):
#         T = torch.exp(self.logT)
#         return logits / T

# def fit_temperature(logits, labels, max_iter=2000, lr=0.05, verbose=True):
#     """
#     logits: [N, C] (CPU tensor)
#     labels: [N]    (CPU tensor) with class indices in that model's label space
#     """
#     scaler = TemperatureScaler(init_T=1.0)
#     optimizer = torch.optim.LBFGS(scaler.parameters(), lr=lr, max_iter=max_iter)
#     nll = nn.CrossEntropyLoss()

#     logits = logits.float()
#     labels = labels.long()

#     def closure():
#         optimizer.zero_grad()
#         loss = nll(scaler(logits), labels)
#         loss.backward()
#         return loss

#     loss = optimizer.step(closure)
#     T = torch.exp(scaler.logT).item()
#     if verbose:
#         print(f"✅ Learned temperature T = {T:.4f} | NLL = {loss.item():.4f}")
#     return T


# Fit T_B and T_C from validation loaders

### I must provide two loaders:

### baby_val_loader (Baby2020 val set)

### china_val_loader (Chinese val set)

## They should yield (xb, yb) where yb is in the model’s own label indices.

## Create a simple Dataset wrapper

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class NumpyDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


## Build calibration loaders (THIS is what you asked for)

In [None]:
# ---------------------------
# Chinese calibration loader
# ---------------------------
china_val_dataset = NumpyDataset(
    X_val_Chinese_babyCry,
    y_val_Chinese_babyCry
)

china_val_loader = DataLoader(
    china_val_dataset,
    batch_size=32,
    shuffle=False   # IMPORTANT: never shuffle calibration data
)

# ---------------------------
# Baby calibration loader
# ---------------------------
baby_val_dataset = NumpyDataset(
    X_test0,
    y_test0
)

baby_val_loader = DataLoader(
    baby_val_dataset,
    batch_size=32,
    shuffle=False
)


# Collect logits + labels from a Keras model

In [None]:
import numpy as np
import tensorflow as tf

def collect_logits_and_labels_keras(logits_model, X, y, batch_size=32):
    """
    logits_model: Keras model that outputs logits (pre-softmax), shape [N,C]
    X: numpy array
    y: numpy int labels, shape [N]
    """
    y = np.asarray(y).astype(np.int64)
    logits = logits_model.predict(X, batch_size=batch_size, verbose=0)
    return logits.astype(np.float32), y


# Fit temperature in TensorFlow (minimize NLL)

In [None]:
def fit_temperature_keras(logits, y, max_iter=200, lr=0.01, verbose=True):
    """
    logits: numpy [N,C] (pre-softmax)
    y: numpy [N] int labels in [0..C-1]
    returns: scalar temperature T > 0
    """
    logits_tf = tf.convert_to_tensor(logits, dtype=tf.float32)
    y_tf = tf.convert_to_tensor(y, dtype=tf.int32)

    # optimize log_T so T = exp(log_T) > 0 always
    log_T = tf.Variable(0.0, dtype=tf.float32)
    opt = tf.keras.optimizers.Adam(learning_rate=lr)

    best = np.inf
    for i in range(max_iter):
        with tf.GradientTape() as tape:
            T = tf.exp(log_T)
            loss = tf.reduce_mean(
                tf.nn.sparse_softmax_cross_entropy_with_logits(
                    labels=y_tf,
                    logits=logits_tf / T
                )
            )
        grads = tape.gradient(loss, [log_T])
        opt.apply_gradients(zip(grads, [log_T]))

        if float(loss.numpy()) < best:
            best = float(loss.numpy())

        if verbose and (i % 25 == 0 or i == max_iter - 1):
            print(f"iter {i:03d} | NLL={loss.numpy():.5f} | T={tf.exp(log_T).numpy():.5f}")

    return float(tf.exp(log_T).numpy())


# Use YOUR calibration

In [None]:
def ensure_4d_np(X):
    X = np.asarray(X)
    if X.ndim == 3:
        X = np.expand_dims(X, axis=-1)
    return X

# Ensure shapes
X_val_Chinese_babyCry = ensure_4d_np(X_val_Chinese_babyCry)
X_test0               = ensure_4d_np(X_test0)

# Collect logits (use logits models!)
baby_logits, baby_y = collect_logits_and_labels_keras(
    baby_logits_model,   # <-- IMPORTANT: logits wrapper
    X_test0,
    y_test0,
    batch_size=32
)

china_logits, china_y = collect_logits_and_labels_keras(
    china_logits_model,  # <-- IMPORTANT: logits wrapper
    X_val_Chinese_babyCry,
    y_val_Chinese_babyCry,
    batch_size=32
)

print("Baby logits:", baby_logits.shape, "labels:", baby_y.shape)
print("China logits:", china_logits.shape, "labels:", china_y.shape)

# Fit temperatures
T_B = fit_temperature_keras(baby_logits, baby_y, max_iter=200, lr=0.01, verbose=True)
T_C = fit_temperature_keras(china_logits, china_y, max_iter=200, lr=0.01, verbose=True)

print(f"\n✅ Temperature Baby2020: T_B = {T_B:.4f}")
print(f"✅ Temperature Chinese : T_C = {T_C:.4f}")


# Safety Checks

In [None]:
print("unique baby_y :", np.unique(baby_y), "C_baby:", baby_logits.shape[1])
print("unique china_y:", np.unique(china_y), "C_china:", china_logits.shape[1])


# Define label spaces + mappings (union logits)

In [None]:
import numpy as np
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score

# ----------------------------
# UNION LABEL SPACE (final output)
# ----------------------------
UNION_LABELS = ["Diaper", "Uncomfortable", "Sleepy", "Hungry", "Wakeup"]
U = len(UNION_LABELS)
union_idx = {name:i for i,name in enumerate(UNION_LABELS)}

# ----------------------------
# BABY model output mapping (3-class) -> union indices
# Update these if your Baby2020 labels are different
# Example: ["Sleepy", "Hungry", "Wakeup"]  (you had this earlier)
# ----------------------------
BABY_LABELS_3 = ["Sleepy", "Hungry", "Wakeup"]
baby_to_union = np.array([union_idx[c] for c in BABY_LABELS_3], dtype=int)  # length 3

# ----------------------------
# CHINESE model 6-class outputs (we keep 1,4,5) -> union indices
# 6-class index meaning (your assumption)
CHINESE_LABELS_6 = ["Hungry","Diaper","Pain","Awake","Sleepy","Uncomfortable"]
keep_idx_6 = [1,4,5]  # Diaper, Sleepy, Uncomfortable
CHINESE_KEEP_NAMES = ["Diaper","Sleepy","Uncomfortable"]
china_keep_to_union = np.array([union_idx[c] for c in CHINESE_KEEP_NAMES], dtype=int)  # length 3

print("baby_to_union:", baby_to_union, "=>", [UNION_LABELS[i] for i in baby_to_union])
print("china_keep_to_union:", china_keep_to_union, "=>", [UNION_LABELS[i] for i in china_keep_to_union])


# Helpers: logits → calibrated probs, entropy, weights

In [None]:
def softmax_temp(logits, T=1.0):
    z = logits / float(T)
    z = z - np.max(z, axis=1, keepdims=True)  # stability
    e = np.exp(z)
    return e / (np.sum(e, axis=1, keepdims=True) + 1e-12)

def entropy(p):
    # p: [N,C]
    return -np.sum(p * np.log(p + 1e-12), axis=1)  # [N]

def entropy_weights(p_baby, p_china, tau=1.0):
    Hb = entropy(p_baby)
    Hc = entropy(p_china)
    wb = np.exp(-tau * Hb)
    wc = np.exp(-tau * Hc)
    s = wb + wc + 1e-12
    return wb/s, wc/s  # each [N]


# Fusion function (exactly as algorithm)

In [None]:
def fuse_batch_union_logits(
    baby_logits_3,   # [N,3] logits (pre-softmax)
    china_logits_6,  # [N,6] logits (pre-softmax)
    T_B, T_C,
    baby_to_union,
    keep_idx_6,
    china_keep_to_union,
    tau=1.0,
):
    N = baby_logits_3.shape[0]

    # ---- calibrated posteriors for entropy weighting ----
    p_b = softmax_temp(baby_logits_3, T_B)           # [N,3]
    p_c6 = softmax_temp(china_logits_6, T_C)         # [N,6]
    p_c = p_c6[:, keep_idx_6]                        # [N,3] (Diaper/Sleepy/Uncomfortable)
    p_c = p_c / (np.sum(p_c, axis=1, keepdims=True) + 1e-12)

    # ---- entropy-gated weights per sample ----
    w_b, w_c = entropy_weights(p_b, p_c, tau=tau)    # each [N]

    # ---- initialize union logits ----
    z_union = np.full((N, U), -1e30, dtype=np.float32)  # -inf approx

    # ---- fill Baby logits into union ----
    for k in range(3):
        z_union[:, baby_to_union[k]] = baby_logits_3[:, k] / float(T_B)

    # ---- fill Chinese kept logits into union (but DON'T finalize sleepy yet) ----
    # Chinese kept order: [Diaper, Sleepy, Uncomfortable]
    # Map them to union indices [Diaper, Sleepy, Uncomfortable]
    # We'll temporarily write them; then overwrite sleepy with fused version.
    china_logits_kept = (china_logits_6[:, keep_idx_6] / float(T_C))  # [N,3]
    for k in range(3):
        z_union[:, china_keep_to_union[k]] = china_logits_kept[:, k]

    # ---- fuse shared label: Sleepy ----
    sleepy_u = union_idx["Sleepy"]

    # calibrated sleepy logits from each model
    # Baby sleepy is where in BABY_LABELS_3? (usually index 0 if ["Sleepy","Hungry","Wakeup"])
    baby_sleepy_k = BABY_LABELS_3.index("Sleepy")
    zB = baby_logits_3[:, baby_sleepy_k] / float(T_B)

    # Chinese sleepy is 6-class index 4
    zC = china_logits_6[:, 4] / float(T_C)

    # log( wC*exp(zC) + wB*exp(zB) )
    z_union[:, sleepy_u] = np.log(w_c * np.exp(zC) + w_b * np.exp(zB) + 1e-12)

    # ---- final posterior over UNION ----
    # softmax union logits
    z = z_union - np.max(z_union, axis=1, keepdims=True)
    p_union = np.exp(z) / (np.sum(np.exp(z), axis=1, keepdims=True) + 1e-12)  # [N,5]

    y_pred_union = np.argmax(p_union, axis=1)

    return p_union, y_pred_union, (w_b, w_c)


# Run ensemble on a dataset (Baby test OR Chinese test)

Baby test truth mapping

In [None]:
def map_baby_y_to_union(y_baby_012):
    # y_baby_012: {0,1,2} indexing BABY_LABELS_3
    return np.array([union_idx[BABY_LABELS_3[i]] for i in y_baby_012], dtype=int)


Chinese test truth mapping (3-of-6)

In [None]:
def map_china3_y_to_union(y_china_012):
    # 0->Diaper, 1->Sleepy, 2->Uncomfortable
    names = ["Diaper","Sleepy","Uncomfortable"]
    return np.array([union_idx[names[i]] for i in y_china_012], dtype=int)


# Collect logits for BOTH models on the SAME inputs

In [None]:
def get_logits_keras(logits_model, X, batch_size=32):
    return logits_model.predict(X, batch_size=batch_size, verbose=0).astype(np.float32)


# Evaluate on Baby test set

# Evaluate on Chinese test set

In [None]:
import numpy as np
import tensorflow as tf
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix

# ----------------------------
# Label spaces
# ----------------------------
UNION_LABELS  = ["Diaper", "Uncomfortable", "Sleepy", "Hungry", "Wakeup"]
BABY_LABELS_3 = ["Hungry", "Sleepy",  "Wakeup"]
CHINA_LABELS_3 = ["Diaper", "Sleepy", "Uncomfortable"]  # <-- your choice B ✅

U = len(UNION_LABELS)
union_idx = {n:i for i,n in enumerate(UNION_LABELS)}

def softmax_temp(logits, T=1.0):
    z = logits / float(T)
    z = z - np.max(z, axis=1, keepdims=True)  # stability
    e = np.exp(z)
    return e / (np.sum(e, axis=1, keepdims=True) + 1e-12)

def entropy(p):
    return -np.sum(p * np.log(p + 1e-12), axis=1)

def entropy_weights(p_b, p_c, tau=1.0):
    Hb = entropy(p_b)
    Hc = entropy(p_c)
    wb = np.exp(-tau * Hb)
    wc = np.exp(-tau * Hc)
    s = wb + wc + 1e-12
    return wb/s, wc/s

def get_logits_keras(logits_model, X, batch_size=32):
    return logits_model.predict(X, batch_size=batch_size, verbose=0).astype(np.float32)


def fuse_union_3v3(
    baby_logits_3,   # [N,3] (Sleepy, Hungry, Wakeup)
    china_logits_3,  # [N,3] (Diaper, Sleepy, Uncomfortable)
    T_B, T_C,
    tau=1.0
):
    N = baby_logits_3.shape[0]
    z_union = np.full((N, U), -1e30, dtype=np.float32)  # -inf approx

    # 1) Calibrated probs for entropy weights
    p_b = softmax_temp(baby_logits_3, T_B)   # [N,3]
    p_c = softmax_temp(china_logits_3, T_C)  # [N,3]

    w_b, w_c = entropy_weights(p_b, p_c, tau=tau)  # each [N]

    Hb = entropy(p_b)
    Hc = entropy(p_c)
    
    print("Mean entropy baby :", Hb.mean())
    print("Mean entropy china:", Hc.mean())
    print("Mean w_b:", w_b.mean(), "Mean w_c:", w_c.mean())
    print("Frac w_c > 0.5:", np.mean(w_c > 0.5))


    # 2) Calibrated logits
    zB = baby_logits_3 / float(T_B)   # [N,3]
    zC = china_logits_3 / float(T_C)  # [N,3]

    # 3) Fill disjoint labels directly into union logits
    # Baby: Sleepy, Hungry, Wakeup
    for k, name in enumerate(BABY_LABELS_3):
        z_union[:, union_idx[name]] = zB[:, k]

    # China: Diaper, Sleepy, Uncomfortable
    for k, name in enumerate(CHINA_LABELS_3):
        z_union[:, union_idx[name]] = zC[:, k]

    # 4) Fuse shared class Sleepy with weighted log-sum-exp
    sleepy_u = union_idx["Sleepy"]
    zB_sleepy = zB[:, BABY_LABELS_3.index("Sleepy")]      # baby sleepy logit
    zC_sleepy = zC[:, CHINA_LABELS_3.index("Sleepy")]     # china sleepy logit

    z_union[:, sleepy_u] = np.log(w_c * np.exp(zC_sleepy) + w_b * np.exp(zB_sleepy) + 1e-12)

    # 5) Final softmax over union
    z = z_union - np.max(z_union, axis=1, keepdims=True)
    p_union = np.exp(z) / (np.sum(np.exp(z), axis=1, keepdims=True) + 1e-12)
    y_pred_union = np.argmax(p_union, axis=1)

    return p_union, y_pred_union, (w_b, w_c)

def map_baby_y_to_union(y_baby_012):
    y_baby_012 = np.asarray(y_baby_012).astype(int)
    return np.array([union_idx[BABY_LABELS_3[i]] for i in y_baby_012], dtype=int)

def map_china_y_to_union(y_china_012):
    y_china_012 = np.asarray(y_china_012).astype(int)
    return np.array([union_idx[CHINA_LABELS_3[i]] for i in y_china_012], dtype=int)


def evaluate_union_subset(y_true_union, y_pred_union, subset_names, name=""):
    subset_ids = [union_idx[n] for n in subset_names]

    acc = accuracy_score(y_true_union, y_pred_union)
    f1m = f1_score(y_true_union, y_pred_union, average="macro", labels=subset_ids)
    f1w = f1_score(y_true_union, y_pred_union, average="weighted", labels=subset_ids)

    print("\n" + "="*80)
    print(f"✅ {name}")
    print("="*80)
    print("Subset:", subset_names)
    print(f"Accuracy: {acc:.4f} | F1-macro: {f1m:.4f} | F1-weighted: {f1w:.4f}")

    print("\nReport:")
    print(classification_report(
        y_true_union, y_pred_union,
        labels=subset_ids,
        target_names=subset_names,
        digits=4,
        zero_division=0
    ))

    print("Confusion matrix (rows=true, cols=pred):")
    print(confusion_matrix(y_true_union, y_pred_union, labels=subset_ids))

    return {"acc": acc, "f1_macro": f1m, "f1_weighted": f1w}


In [None]:
# ----------------------------
# BABY TEST
# ----------------------------
X_test_baby = X_test0
y_test_baby = y_test0   # {0,1,2} => BABY_LABELS_3

if X_test_baby.ndim == 3:
    X_test_baby = np.expand_dims(X_test_baby, axis=-1)

baby_logits = get_logits_keras(baby_logits_model, X_test_baby)
china_logits = get_logits_keras(china_logits_model, X_test_baby)

print("baby_logits:", baby_logits.shape, "china_logits:", china_logits.shape)

p_union_baby, y_pred_union_baby, (wb_baby, wc_baby) = fuse_union_3v3(
    baby_logits, china_logits,
    T_B=T_B, T_C=T_C,
    tau=1.0
)

y_true_union_baby = map_baby_y_to_union(y_test_baby)

baby_results = evaluate_union_subset(
    y_true_union_baby,
    y_pred_union_baby,
    subset_names=["Hungry","Sleepy","Wakeup"],
    name="Ensemble on BABY test"
)


In [121]:
print("X_test_baby:", X_test_baby.shape)
print("y_test_baby:", np.asarray(y_test_baby).shape)
print("baby_logits:", baby_logits.shape)
print("china_logits:", china_logits.shape)
print("y_pred_union_baby:", np.asarray(y_pred_union_baby).shape)
print("p_union_baby:", np.asarray(p_union_baby).shape)


X_test_baby: (105, 100, 280, 1)
y_test_baby: (105,)
baby_logits: (105, 3)
china_logits: (105, 3)
y_pred_union_baby: (105,)
p_union_baby: (105, 5)


# Energy gated Fusion

In [22]:
def energy_from_logits(z, T=1.0):
    """
    z: [N,C] logits
    returns energy: [N]
    """
    zT = z / float(T)
    m = np.max(zT, axis=1, keepdims=True)
    lse = m + np.log(np.sum(np.exp(zT - m), axis=1, keepdims=True) + 1e-12)
    return -lse.squeeze(1)   # [N]

def energy_weights(zB, zC, T_B, T_C, gamma=1.0):
    """
    zB: [N,3] baby logits (raw logits BEFORE dividing by T)|
    zC: [N,3] china logits (raw logits BEFORE dividing by T)
    returns wb,wc in [N]
    """
    Eb = energy_from_logits(zB, T_B)
    Ec = energy_from_logits(zC, T_C)
    wb = np.exp(-gamma * Eb)
    wc = np.exp(-gamma * Ec)
    s = wb + wc + 1e-12
    return wb/s, wc/s, Eb, Ec


In [23]:
def fuse_union_3v3_openworld(
    baby_logits_3,
    china_logits_3,
    T_B, T_C,
    gamma=1.0
):
    N = baby_logits_3.shape[0]
    z_union = np.full((N, U), -1e30, dtype=np.float32)

    # calibrated logits
    zB = baby_logits_3 / float(T_B)
    zC = china_logits_3 / float(T_C)

    # weights from ENERGY (not entropy)
    wb, wc, Eb, Ec = energy_weights(baby_logits_3, china_logits_3, T_B, T_C, gamma=gamma)

    print("Mean energy baby :", Eb.mean())
    print("Mean energy china:", Ec.mean())
    print("Mean w_b:", wb.mean(), "Mean w_c:", wc.mean())
    print("Frac w_c > 0.5:", np.mean(wc > 0.5))

    # fill disjoint
    for k, name in enumerate(BABY_LABELS_3):
        z_union[:, union_idx[name]] = zB[:, k]
    for k, name in enumerate(CHINA_LABELS_3):
        z_union[:, union_idx[name]] = zC[:, k]

    # fuse shared Sleepy
    sleepy_u = union_idx["Sleepy"]
    zB_sleepy = zB[:, BABY_LABELS_3.index("Sleepy")]
    zC_sleepy = zC[:, CHINA_LABELS_3.index("Sleepy")]
    z_union[:, sleepy_u] = np.log(wc*np.exp(zC_sleepy) + wb*np.exp(zB_sleepy) + 1e-12)

    # softmax over union
    z = z_union - np.max(z_union, axis=1, keepdims=True)
    p_union = np.exp(z) / (np.sum(np.exp(z), axis=1, keepdims=True) + 1e-12)
    y_pred_union = np.argmax(p_union, axis=1)

    return p_union, y_pred_union, (wb, wc)


In [24]:
p_union_baby, y_pred_union_baby, (wb_baby, wc_baby) = fuse_union_3v3_openworld(
    baby_logits, china_logits,
    T_B=T_B, T_C=T_C,
    gamma=2.0   # try 1.0, 2.0, 5.0
)


NameError: name 'baby_logits' is not defined

# Masked Fusion evaluation

In [25]:
from sklearn.metrics import (
    accuracy_score, f1_score, matthews_corrcoef,
    confusion_matrix, classification_report, roc_auc_score
)

def eval_ensemble_on_baby_fold0(
    y_test0,                 # shape [N], integers like 0/1/2 in fold mapping
    label_to_class,          # dict: {0:'Hungry',1:'Sleepy',2:'Wakeup'}
    p_union_baby,            # [N,5] union probs
    y_pred_union_baby,       # [N] union argmax
    UNION_LABELS=("Diaper","Uncomfortable","Sleepy","Hungry","Wakeup"),
):
    N = len(y_test0)
    y_test0 = np.asarray(y_test0).astype(int)
    y_pred_union_baby = np.asarray(y_pred_union_baby).astype(int)
    p_union_baby = np.asarray(p_union_baby)

    assert p_union_baby.shape[0] == N, f"p_union_baby N mismatch: {p_union_baby.shape[0]} vs {N}"
    assert y_pred_union_baby.shape[0] == N, f"y_pred_union_baby N mismatch: {y_pred_union_baby.shape[0]} vs {N}"

    union_idx = {n:i for i,n in enumerate(UNION_LABELS)}

    # ---- map y_true (fold ids) -> union ids using label_to_class ----
    y_true_names = np.array([label_to_class[int(i)] for i in y_test0], dtype=object)
    y_true_union = np.array([union_idx[name] for name in y_true_names], dtype=int)

    # ---- restrict predictions to baby classes only (safety) ----
    baby_names = ["Hungry", "Sleepy", "Wakeup"]
    baby_union_ids = np.array([union_idx[n] for n in baby_names], dtype=int)

    # Convert union prediction -> baby local index (0/1/2) by:
    # 1) take probs on baby union ids only
    p_baby = p_union_baby[:, baby_union_ids]                 # [N,3] in order [Hungry,Sleepy,Wakeup]
    y_pred_local = np.argmax(p_baby, axis=1)                 # 0..2 in that order
    # map local -> class name
    id2name_local = {0:"Hungry", 1:"Sleepy", 2:"Wakeup"}
    y_pred_names = np.array([id2name_local[int(i)] for i in y_pred_local], dtype=object)

    # Now map y_true_names to same local space for metrics
    name2id_local = {"Hungry":0,"Sleepy":1,"Wakeup":2}
    y_true_local = np.array([name2id_local[n] for n in y_true_names], dtype=int)

    # ---- metrics in Baby local space ----
    acc = accuracy_score(y_true_local, y_pred_local)
    f1_macro = f1_score(y_true_local, y_pred_local, average="macro")
    f1_weighted = f1_score(y_true_local, y_pred_local, average="weighted")
    mcc = matthews_corrcoef(y_true_local, y_pred_local)
    cm = confusion_matrix(y_true_local, y_pred_local, labels=[0,1,2])

    # ---- AUC (macro OVR) ----
    y_true_onehot = np.eye(3)[y_true_local]  # [N,3]
    auc_macro = roc_auc_score(y_true_onehot, p_baby, average="macro", multi_class="ovr")

    print("\n" + "="*80)
    print("✅ Ensemble on Baby2020 (Fold0) — evaluated correctly")
    print("="*80)
    print(f"N            : {N}")
    print(f"Accuracy     : {acc:.4f}")
    print(f"F1 (macro)   : {f1_macro:.4f}")
    print(f"F1 (weighted): {f1_weighted:.4f}")
    print(f"MCC          : {mcc:.4f}")
    print(f"AUC (macro)  : {auc_macro:.4f}")

    print("\nClassification report (Baby classes):")
    print(classification_report(
        y_true_local, y_pred_local,
        labels=[0,1,2],
        target_names=["Hungry","Sleepy","Wakeup"],
        digits=4,
        zero_division=0
    ))

    print("Confusion matrix (rows=true, cols=pred) order=[Hungry,Sleepy,Wakeup]:")
    print(cm)

    return {
        "acc": acc, "f1_macro": f1_macro, "f1_weighted": f1_weighted,
        "mcc": mcc, "auc_macro": auc_macro, "cm": cm,
        "y_true_local": y_true_local, "y_pred_local": y_pred_local,
        "p_baby": p_baby
    }


In [26]:

class_to_label= {'Hungry': 1, 'Sleepy': 0, 'Wakeup': 2}
label_to_class= {1: 'Hungry', 0: 'Sleepy', 2: 'Wakeup'}


baby_metrics = eval_ensemble_on_baby_fold0(
    y_test0=y_test0,
    label_to_class=label_to_class,   # <- from your fold0 split output
    p_union_baby=p_union_baby,
    y_pred_union_baby=y_pred_union_baby
)


NameError: name 'p_union_baby' is not defined

# Print the weights to see if Chinese dominates on Baby data

## If wc_baby is often high on Baby test, that explains the degradation.

In [27]:
print("Mean w_b (baby weight):", np.mean(wb_baby))
print("Mean w_c (china weight):", np.mean(wc_baby))
print("Fraction wc>0.5:", np.mean(wc_baby > 0.5))


NameError: name 'wb_baby' is not defined

# Now test Ensemble posterior Fusion on Chinese baby

In [28]:
# ----------------------------
# CHINESE TEST
# ----------------------------
X_test_china = X_val_Chinese_babyCry
y_test_china = y_val_Chinese_babyCry  # {0,1,2} => CHINA_LABELS_3

if X_test_china.ndim == 3:
    X_test_china = np.expand_dims(X_test_china, axis=-1)

baby_logits = get_logits_keras(baby_logits_model, X_test_china)
china_logits = get_logits_keras(china_logits_model, X_test_china)

print("baby_logits:", baby_logits.shape, "china_logits:", china_logits.shape)

p_union_china, y_pred_union_china, (wb_china, wc_china) = fuse_union_3v3(
    baby_logits, china_logits,
    T_B=T_B, T_C=T_C,
    tau=1.0
)

y_true_union_china = map_china_y_to_union(y_test_china)

china_results = evaluate_union_subset(
    y_true_union_china,
    y_pred_union_china,
    subset_names=["Diaper","Sleepy","Uncomfortable"],
    name="Ensemble on CHINESE test"
)


NameError: name 'get_logits_keras' is not defined

# Domain-gate head that learns to predict “this input looks like Baby2020-domain vs Chinese-domain

In [29]:
import numpy as np
import tensorflow as tf

# ---------- helpers ----------
def softmax_np(z):
    z = z - np.max(z, axis=1, keepdims=True)
    e = np.exp(z)
    return e / (np.sum(e, axis=1, keepdims=True) + 1e-12)

def entropy_np(p):
    return -np.sum(p * np.log(p + 1e-12), axis=1)

def energy_np(logits, T=1.0):
    z = logits / float(T)
    m = np.max(z, axis=1, keepdims=True)
    lse = m + np.log(np.sum(np.exp(z - m), axis=1, keepdims=True) + 1e-12)
    return -lse.squeeze(1)  # [N]

def top2_margin_np(logits):
    # margin between top1 and top2 logits
    part = np.partition(logits, -2, axis=1)
    top2 = part[:, -2]
    top1 = part[:, -1]
    return top1 - top2

def get_logits_keras(logits_model, X, batch_size=64):
    return logits_model.predict(X, batch_size=batch_size, verbose=0).astype(np.float32)

def logits_stats_features(logits, T=1.0):
    """
    logits: [N,C]
    returns features: [N,F]
    """
    p = softmax_np(logits / float(T))
    H = entropy_np(p)                          # [N]
    mp = np.max(p, axis=1)                     # [N]
    margin = top2_margin_np(logits / float(T)) # [N]
    E = energy_np(logits, T=T)                 # [N]
    l2 = np.linalg.norm(logits, axis=1)        # [N]
    zmax = np.max(logits, axis=1)              # [N]
    zmean = np.mean(logits, axis=1)            # [N]
    zstd = np.std(logits, axis=1)              # [N]

    # stack
    return np.stack([H, mp, margin, E, l2, zmax, zmean, zstd], axis=1).astype(np.float32)


In [30]:
def build_gate_dataset(
    X_baby, X_china,
    baby_logits_model, china_logits_model,
    T_B=1.0, T_C=1.0,
    batch_size=64
):
    """
    Returns:
      X_gate: [N, F]  features for gate
      y_dom : [N]     domain labels (0=baby, 1=china)
    """
    # ensure 4D
    if X_baby.ndim == 3:  X_baby  = np.expand_dims(X_baby, axis=-1)
    if X_china.ndim == 3: X_china = np.expand_dims(X_china, axis=-1)

    # ---- logits on baby-domain data ----
    zB_on_baby = get_logits_keras(baby_logits_model,  X_baby,  batch_size=batch_size)
    zC_on_baby = get_logits_keras(china_logits_model, X_baby,  batch_size=batch_size)

    fB_on_baby = logits_stats_features(zB_on_baby, T=T_B)
    fC_on_baby = logits_stats_features(zC_on_baby, T=T_C)

    # include difference features (helps a lot)
    fDiff_baby = (fB_on_baby - fC_on_baby)

    X_gate_baby = np.concatenate([fB_on_baby, fC_on_baby, fDiff_baby], axis=1)
    y_dom_baby = np.zeros(len(X_gate_baby), dtype=np.int64)

    # ---- logits on china-domain data ----
    zB_on_china = get_logits_keras(baby_logits_model,  X_china, batch_size=batch_size)
    zC_on_china = get_logits_keras(china_logits_model, X_china, batch_size=batch_size)

    fB_on_china = logits_stats_features(zB_on_china, T=T_B)
    fC_on_china = logits_stats_features(zC_on_china, T=T_C)
    fDiff_china = (fB_on_china - fC_on_china)

    X_gate_china = np.concatenate([fB_on_china, fC_on_china, fDiff_china], axis=1)
    y_dom_china = np.ones(len(X_gate_china), dtype=np.int64)

    # ---- combine ----
    X_gate = np.concatenate([X_gate_baby, X_gate_china], axis=0)
    y_dom  = np.concatenate([y_dom_baby,  y_dom_china],  axis=0)

    # shuffle
    idx = np.random.permutation(len(X_gate))
    return X_gate[idx], y_dom[idx]


In [31]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, roc_auc_score, classification_report

def train_domain_gate(X_gate, y_dom):
    Xtr, Xva, ytr, yva = train_test_split(
        X_gate, y_dom, test_size=0.25, random_state=42, stratify=y_dom
    )

    gate = LogisticRegression(max_iter=2000, class_weight="balanced")
    gate.fit(Xtr, ytr)

    p_va = gate.predict_proba(Xva)[:, 1]  # P(domain=china)
    yhat = (p_va >= 0.5).astype(int)

    print("\n" + "="*80)
    print("✅ Domain gate validation")
    print("="*80)
    print("Acc:", accuracy_score(yva, yhat))
    print("AUC:", roc_auc_score(yva, p_va))
    print("\nReport:")
    print(classification_report(yva, yhat, digits=4))

    return gate


In [32]:
def gate_weights_from_proba(p_china, alpha=1.0):
    """
    p_china: [N] probability input is China-domain
    alpha: >1 makes gate more decisive, <1 makes softer
    """
    p = np.clip(p_china, 1e-6, 1-1e-6)
    # logit transform + scaling
    logit = np.log(p/(1-p))
    p2 = 1.0 / (1.0 + np.exp(-alpha * logit))
    w_c = p2
    w_b = 1.0 - p2
    return w_b.astype(np.float32), w_c.astype(np.float32)


In [33]:
UNION_LABELS  = ["Diaper", "Uncomfortable", "Sleepy", "Hungry", "Wakeup"]
BABY_LABELS_3 = ["Hungry", "Sleepy",  "Wakeup"]
CHINA_LABELS_3 = ["Diaper", "Sleepy", "Uncomfortable"]

U = len(UNION_LABELS)
union_idx = {n:i for i,n in enumerate(UNION_LABELS)}

def fuse_union_with_domain_gate(
    baby_logits_3, china_logits_3,
    T_B, T_C,
    wb, wc
):
    """
    wb,wc: [N] from domain gate
    returns p_union [N,5], y_pred_union [N]
    """
    N = baby_logits_3.shape[0]
    z_union = np.full((N, U), -1e30, dtype=np.float32)

    # calibrated logits
    zB = baby_logits_3 / float(T_B)
    zC = china_logits_3 / float(T_C)

    # fill disjoint
    for k, name in enumerate(BABY_LABELS_3):
        z_union[:, union_idx[name]] = zB[:, k]
    for k, name in enumerate(CHINA_LABELS_3):
        z_union[:, union_idx[name]] = zC[:, k]

    # fuse shared Sleepy only (log-sum-exp)
    sleepy_u = union_idx["Sleepy"]
    zB_sleepy = zB[:, BABY_LABELS_3.index("Sleepy")]
    zC_sleepy = zC[:, CHINA_LABELS_3.index("Sleepy")]
    z_union[:, sleepy_u] = np.log(wc*np.exp(zC_sleepy) + wb*np.exp(zB_sleepy) + 1e-12)

    # softmax over union
    z = z_union - np.max(z_union, axis=1, keepdims=True)
    p_union = np.exp(z) / (np.sum(np.exp(z), axis=1, keepdims=True) + 1e-12)
    y_pred_union = np.argmax(p_union, axis=1)

    return p_union, y_pred_union


In [34]:
# Build gate training features using unlabeled domain splits
X_gate, y_dom = build_gate_dataset(
    X_baby=X_test0,                 # or a baby val split if you have one
    X_china=X_val_Chinese_babyCry,   # china val split
    baby_logits_model=baby_logits_model,
    china_logits_model=china_logits_model,
    T_B=T_B, T_C=T_C
)

gate = train_domain_gate(X_gate, y_dom)


NameError: name 'T_B' is not defined

In [35]:
# logits on baby test
X_baby = X_test0
if X_baby.ndim == 3: X_baby = np.expand_dims(X_baby, axis=-1)

zB = get_logits_keras(baby_logits_model,  X_baby)
zC = get_logits_keras(china_logits_model, X_baby)

# gate features for this batch
fB = logits_stats_features(zB, T=T_B)
fC = logits_stats_features(zC, T=T_C)
fD = fB - fC
Xg = np.concatenate([fB, fC, fD], axis=1)

# gate proba + weights
p_china = gate.predict_proba(Xg)[:, 1]      # P(china-domain)
wb, wc = gate_weights_from_proba(p_china, alpha=2.0)

print("Mean wc (china weight) on BABY test:", wc.mean())
print("Frac wc>0.5:", np.mean(wc > 0.5))

# fuse open-world (5 classes)
p_union_baby, y_pred_union_baby = fuse_union_with_domain_gate(
    baby_logits_3=zB, china_logits_3=zC,
    T_B=T_B, T_C=T_C,
    wb=wb, wc=wc
)




NameError: name 'T_B' is not defined

# More powerful gate

In [36]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, f1_score, matthews_corrcoef, confusion_matrix,
    classification_report, roc_auc_score
)


UNION_LABELS  = ["Diaper", "Uncomfortable", "Sleepy", "Hungry", "Wakeup"]
BABY_LABELS_3 = ["Hungry", "Sleepy", "Wakeup"]
CHINA_LABELS_3 = ["Diaper", "Sleepy", "Uncomfortable"]  # choice B

U = len(UNION_LABELS)
union_idx = {n:i for i,n in enumerate(UNION_LABELS)}

BABY_UNION_IDS  = [union_idx[n] for n in BABY_LABELS_3]
CHINA_UNION_IDS = [union_idx[n] for n in CHINA_LABELS_3]

def map_baby_y_to_union(y012):
    y012 = np.asarray(y012).astype(int)
    return np.array([union_idx[BABY_LABELS_3[i]] for i in y012], dtype=int)

def map_china_y_to_union(y012):
    y012 = np.asarray(y012).astype(int)
    return np.array([union_idx[CHINA_LABELS_3[i]] for i in y012], dtype=int)


def softmax_np(z):
    z = z - np.max(z, axis=1, keepdims=True)
    e = np.exp(z)
    return e / (np.sum(e, axis=1, keepdims=True) + 1e-12)

def entropy_np(p):
    return -np.sum(p * np.log(p + 1e-12), axis=1)

def energy_np(logits, T=1.0):
    z = logits / float(T)
    m = np.max(z, axis=1, keepdims=True)
    lse = m + np.log(np.sum(np.exp(z - m), axis=1, keepdims=True) + 1e-12)
    return -lse.squeeze(1)

def top2_margin_np(logits):
    part = np.partition(logits, -2, axis=1)
    top2 = part[:, -2]
    top1 = part[:, -1]
    return top1 - top2

def get_logits_keras(logits_model, X, batch_size=64):
    return logits_model.predict(X, batch_size=batch_size, verbose=0).astype(np.float32)

def logits_stats_features(logits, T=1.0):
    """
    logits: [N,C]
    returns: [N,8]
    """
    z = logits / float(T)
    p = softmax_np(z)
    H = entropy_np(p)
    mp = np.max(p, axis=1)
    margin = top2_margin_np(z)
    E = energy_np(logits, T=T)
    l2 = np.linalg.norm(logits, axis=1)
    zmax = np.max(logits, axis=1)
    zmean = np.mean(logits, axis=1)
    zstd = np.std(logits, axis=1)
    return np.stack([H, mp, margin, E, l2, zmax, zmean, zstd], axis=1).astype(np.float32)

def build_gate_features_for_batch(zB, zC, T_B=1.0, T_C=1.0):
    fB = logits_stats_features(zB, T=T_B)
    fC = logits_stats_features(zC, T=T_C)
    fD = fB - fC
    return np.concatenate([fB, fC, fD], axis=1).astype(np.float32)  # [N, 24]

def build_gate_dataset(
    X_baby, X_china,
    baby_logits_model, china_logits_model,
    T_B=1.0, T_C=1.0,
    batch_size=64
):
    if X_baby.ndim == 3:  X_baby  = np.expand_dims(X_baby, axis=-1)
    if X_china.ndim == 3: X_china = np.expand_dims(X_china, axis=-1)

    zB_b = get_logits_keras(baby_logits_model,  X_baby,  batch_size=batch_size)
    zC_b = get_logits_keras(china_logits_model, X_baby,  batch_size=batch_size)
    Xg_b = build_gate_features_for_batch(zB_b, zC_b, T_B=T_B, T_C=T_C)
    yg_b = np.zeros(len(Xg_b), dtype=np.int64)

    zB_c = get_logits_keras(baby_logits_model,  X_china, batch_size=batch_size)
    zC_c = get_logits_keras(china_logits_model, X_china, batch_size=batch_size)
    Xg_c = build_gate_features_for_batch(zB_c, zC_c, T_B=T_B, T_C=T_C)
    yg_c = np.ones(len(Xg_c), dtype=np.int64)

    Xg = np.concatenate([Xg_b, Xg_c], axis=0)
    yg = np.concatenate([yg_b, yg_c], axis=0)

    idx = np.random.permutation(len(Xg))
    return Xg[idx], yg[idx]


def make_mlp_gate(input_dim, hidden=(64, 32), dropout=0.2, lr=1e-3):
    inp = tf.keras.Input(shape=(input_dim,))
    x = tf.keras.layers.LayerNormalization()(inp)

    for h in hidden:
        x = tf.keras.layers.Dense(h, activation="relu")(x)
        x = tf.keras.layers.Dropout(dropout)(x)

    # output: P(domain=china)
    out = tf.keras.layers.Dense(1, activation="sigmoid")(x)

    model = tf.keras.Model(inp, out, name="domain_gate_mlp")
    model.compile(
        optimizer=tf.keras.optimizers.Adam(lr),
        loss="binary_crossentropy",
        metrics=[
            tf.keras.metrics.BinaryAccuracy(name="acc"),
            tf.keras.metrics.AUC(name="auc")
        ]
    )
    return model


def train_domain_gate_mlp(Xg, yg, epochs=50, batch_size=64):
    Xtr, Xva, ytr, yva = train_test_split(
        Xg, yg, test_size=0.25, random_state=42, stratify=yg
    )

    gate = make_mlp_gate(input_dim=Xg.shape[1], hidden=(64, 32), dropout=0.2, lr=1e-3)

    es = tf.keras.callbacks.EarlyStopping(
        monitor="val_auc", mode="max", patience=8, restore_best_weights=True, verbose=1
    )

    gate.fit(
        Xtr, ytr,
        validation_data=(Xva, yva),
        epochs=epochs,
        batch_size=batch_size,
        verbose=2,
        callbacks=[es]
    )

    pva = gate.predict(Xva, batch_size=batch_size, verbose=0).squeeze()
    yhat = (pva >= 0.5).astype(int)

    print("\n" + "="*80)
    print("✅ Domain gate (MLP) validation")
    print("="*80)
    print("Acc:", accuracy_score(yva, yhat))
    print("AUC:", roc_auc_score(yva, pva))
    print(classification_report(yva, yhat, digits=4))
    return gate


def gate_weights_from_pchina(p_china, alpha=2.0):
    """
    alpha>1 => more decisive gate, alpha<1 => softer
    """
    p = np.clip(p_china, 1e-6, 1 - 1e-6)
    logit = np.log(p/(1-p))
    p2 = 1.0 / (1.0 + np.exp(-alpha * logit))
    wc = p2.astype(np.float32)          # china weight
    wb = (1.0 - p2).astype(np.float32)  # baby weight
    return wb, wc

def fuse_union_openworld(
    baby_logits_3, china_logits_3,
    T_B, T_C,
    wb, wc
):
    N = baby_logits_3.shape[0]
    z_union = np.full((N, U), -1e30, dtype=np.float32)

    zB = baby_logits_3 / float(T_B)
    zC = china_logits_3 / float(T_C)

    # fill disjoint
    for k, name in enumerate(BABY_LABELS_3):
        z_union[:, union_idx[name]] = zB[:, k]
    for k, name in enumerate(CHINA_LABELS_3):
        z_union[:, union_idx[name]] = zC[:, k]

    # fuse Sleepy only
    sleepy_u = union_idx["Sleepy"]
    zB_sleepy = zB[:, BABY_LABELS_3.index("Sleepy")]
    zC_sleepy = zC[:, CHINA_LABELS_3.index("Sleepy")]
    z_union[:, sleepy_u] = np.log(wc*np.exp(zC_sleepy) + wb*np.exp(zB_sleepy) + 1e-12)

    # final softmax over 5 classes
    z = z_union - np.max(z_union, axis=1, keepdims=True)
    p_union = np.exp(z) / (np.sum(np.exp(z), axis=1, keepdims=True) + 1e-12)
    y_pred = np.argmax(p_union, axis=1)
    return p_union, y_pred


def eval_gate_on_dataset(
    X, true_domain,   # true_domain: 0 for baby, 1 for china
    baby_logits_model, china_logits_model,
    gate_model,
    T_B, T_C,
    alpha=2.0,
    name=""
):
    if X.ndim == 3: X = np.expand_dims(X, axis=-1)

    zB = get_logits_keras(baby_logits_model, X)
    zC = get_logits_keras(china_logits_model, X)
    Xg = build_gate_features_for_batch(zB, zC, T_B=T_B, T_C=T_C)

    p_china = gate_model.predict(Xg, batch_size=64, verbose=0).squeeze()
    yhat_dom = (p_china >= 0.5).astype(int)

    wb, wc = gate_weights_from_pchina(p_china, alpha=alpha)

    print("\n" + "="*80)
    print(f"✅ Gate check: {name}")
    print("="*80)
    print(f"True domain={true_domain}  |  Pred-dom acc={accuracy_score(np.full_like(yhat_dom,true_domain), yhat_dom):.4f}")
    print(f"Mean wb={wb.mean():.4f}  Mean wc={wc.mean():.4f}  Frac wc>0.5={np.mean(wc>0.5):.4f}")

    return zB, zC, wb, wc, p_china, yhat_dom

def eval_ensemble_subset(y_true_union, y_pred_union, subset_union_ids, subset_names, name=""):
    acc = accuracy_score(y_true_union, y_pred_union)
    f1m = f1_score(y_true_union, y_pred_union, labels=subset_union_ids, average="macro")
    f1w = f1_score(y_true_union, y_pred_union, labels=subset_union_ids, average="weighted")
    mcc = matthews_corrcoef(y_true_union, y_pred_union)

    print("\n" + "="*80)
    print(f"✅ Ensemble metrics: {name}")
    print("="*80)
    print(f"Accuracy(all union preds vs union true): {acc:.4f}")
    print(f"F1-macro(subset): {f1m:.4f}")
    print(f"F1-weighted(subset): {f1w:.4f}")
    print(f"MCC: {mcc:.4f}")

    print("\nReport (subset labels only):")
    print(classification_report(
        y_true_union, y_pred_union,
        labels=subset_union_ids,
        target_names=subset_names,
        digits=4,
        zero_division=0
    ))

    print("Confusion matrix (subset):")
    print(confusion_matrix(y_true_union, y_pred_union, labels=subset_union_ids))

    return {"acc": acc, "f1_macro": f1m, "f1_weighted": f1w, "mcc": mcc}



In [37]:
Xg, yg = build_gate_dataset(
    X_baby=X_test0,
    X_china=X_val_Chinese_babyCry,
    baby_logits_model=baby_logits_model,
    china_logits_model=china_logits_model,
    T_B=T_B, T_C=T_C
)

gate_mlp = train_domain_gate_mlp(Xg, yg, epochs=200, batch_size=64)


NameError: name 'T_B' is not defined

In [38]:
# --- gate behavior on baby test ---
zB_b, zC_b, wb_b, wc_b, pch_b, ydom_b = eval_gate_on_dataset(
    X=X_test0,
    true_domain=0,
    baby_logits_model=baby_logits_model,
    china_logits_model=china_logits_model,
    gate_model=gate_mlp,
    T_B=T_B, T_C=T_C,
    alpha=2.0,
    name="BABY test"
)

# --- fuse open-world ---
p_union_b, y_pred_union_b = fuse_union_openworld(
    baby_logits_3=zB_b, china_logits_3=zC_b,
    T_B=T_B, T_C=T_C,
    wb=wb_b, wc=wc_b
)

# --- map baby true labels into union ---
y_true_union_b = map_baby_y_to_union(y_test0)

# evaluate only on baby subset labels
baby_metrics = eval_ensemble_subset(
    y_true_union_b, y_pred_union_b,
    subset_union_ids=BABY_UNION_IDS,
    subset_names=BABY_LABELS_3,
    name="Ensemble on BABY test (subset=Baby classes)"
)


NameError: name 'gate_mlp' is not defined

In [39]:
# --- gate behavior on china test ---
zB_c, zC_c, wb_c, wc_c, pch_c, ydom_c = eval_gate_on_dataset(
    X=X_val_Chinese_babyCry,
    true_domain=1,
    baby_logits_model=baby_logits_model,
    china_logits_model=china_logits_model,
    gate_model=gate_mlp,
    T_B=T_B, T_C=T_C,
    alpha=2.0,
    name="CHINA test"
)

# --- fuse open-world ---
p_union_c, y_pred_union_c = fuse_union_openworld(
    baby_logits_3=zB_c, china_logits_3=zC_c,
    T_B=T_B, T_C=T_C,
    wb=wb_c, wc=wc_c
)

# --- map china true labels into union ---
y_true_union_c = map_china_y_to_union(y_val_Chinese_babyCry)

# evaluate only on china subset labels
china_metrics = eval_ensemble_subset(
    y_true_union_c, y_pred_union_c,
    subset_union_ids=CHINA_UNION_IDS,
    subset_names=CHINA_LABELS_3,
    name="Ensemble on CHINA test (subset=China classes)"
)


NameError: name 'gate_mlp' is not defined

# The End

# Entropy-gated fusion on the union label space

### Implements exactly:

### calibrated posteriors p_m(y|x,T_m)

### union logits initialized to -inf

### fill disjoint classes directly

### fuse shared sleepy with log-sum-exp with weights

### weights from entropy: 
𝑤
𝑚
∝
exp
⁡
(
−
𝜏
𝐻
(
𝑝
𝑚
)
)
w
m​
∝exp(−τH(p
m
))

In [63]:
def entropy(p, eps=1e-12):
    # p: [C]
    p = torch.clamp(p, eps, 1.0)
    return -(p * torch.log(p)).sum()

@torch.no_grad()
def calibrated_probs(model, x, T, device):
    model.eval()
    x = x.to(device)
    logits = model(x)  # [B, C]
    probs = F.softmax(logits / T, dim=-1)
    return logits, probs

@torch.no_grad()
def fuse_two_models_union(
    x, baby_model, china_model, T_B, T_C,
    tau=1.0, device="cpu"
):
    """
    x: [B, ...] batch
    Returns:
      p_star: [B, |UNION|]
      z_union: [B, |UNION|] union logits pre-softmax
      weights: dict with wB, wC per sample
    """
    # Get calibrated logits & probs
    zB, pB = calibrated_probs(baby_model, x, T_B, device)    # [B,3]
    zC, pC = calibrated_probs(china_model, x, T_C, device)   # [B,3]

    B = x.shape[0]
    z_union = torch.full((B, len(UNION_LABELS)), -1e9, device=device)  # approx -inf

    # Entropy gated weights per sample
    wB_list, wC_list = [], []
    for i in range(B):
        HB = entropy(pB[i])
        HC = entropy(pC[i])
        aB = torch.exp(-tau * HB)
        aC = torch.exp(-tau * HC)
        s = aB + aC
        wB = aB / s
        wC = aC / s
        wB_list.append(wB)
        wC_list.append(wC)

    wB = torch.stack(wB_list).to(device)  # [B]
    wC = torch.stack(wC_list).to(device)  # [B]

    # Fill disjoint labels into union logits:
    # Baby2020 contributes: hug, uncomfortable, sleepy
    z_union[:, union2idx["hug"]] = zB[:, baby2idx["hug"]]
    z_union[:, union2idx["uncomfortable"]] = zB[:, baby2idx["uncomfortable"]]

    # Chinese contributes: hungry, awake, sleepy
    z_union[:, union2idx["hungry"]] = zC[:, china2idx["hungry"]]
    z_union[:, union2idx["awake"]]  = zC[:, china2idx["awake"]]

    # Fuse shared class "sleepy" using: log( wC * exp(zC_sleepy) + wB * exp(zB_sleepy) )
    z_sleepy = torch.log(
        wC * torch.exp(zC[:, CHINA_SLEEPY_IDX]) + wB * torch.exp(zB[:, BABY_SLEEPY_IDX])
    )
    z_union[:, UNION_SLEEPY_IDX] = z_sleepy

    # Final posterior over union
    p_star = F.softmax(z_union, dim=-1)

    return p_star, z_union, {"wB": wB, "wC": wC}


# Run fusion on a test loader (example)

### Your test loader should yield (xb, y_union) where y_union is indexed in UNION_LABELS.
### If you don’t have union-ground-truth for both datasets, evaluate per test set accordingly.

In [64]:
@torch.no_grad()
def evaluate_union_fusion(test_loader, tau=1.0, device="cpu"):
    correct = 0
    total = 0
    for xb, y_union in test_loader:
        xb = xb.to(device)
        y_union = y_union.to(device)

        p_star, z_union, w = fuse_two_models_union(
            xb, baby_model, china_model, T_B, T_C, tau=tau, device=device
        )
        pred = torch.argmax(p_star, dim=-1)
        correct += (pred == y_union).sum().item()
        total += y_union.numel()

    return correct / max(total, 1)

# acc = evaluate_union_fusion(union_test_loader, tau=1.0, device=device)
# print("Fusion accuracy:", acc)
