In [1]:
# train_gillam_rnn.py
# C) SimpleRNN 기반 SLI/TD 분류기 (프레임워크 없이, 단일 파일)

import os
from pathlib import Path
from collections import Counter

import numpy as np
import pandas as pd

from utils import extract_utterances  # clean_text 포함된 Utterance 제공


# ============================================================
# 0. 공통 유틸 (softmax, cross-entropy 등)
# ============================================================

def softmax(x):
    if x.ndim == 2:
        x = x - x.max(axis=1, keepdims=True)
        y = np.exp(x)
        y /= y.sum(axis=1, keepdims=True)
    else:
        x = x - np.max(x)
        y = np.exp(x) / np.sum(np.exp(x))
    return y


def cross_entropy_error(y, t):
    """y: softmax 출력, t: one-hot 또는 정수 라벨"""
    if y.ndim == 1:
        y = y.reshape(1, -1)
        t = t.reshape(1, -1)

    # one-hot이면 정수 라벨로 변환
    if t.size == y.size:
        t = t.argmax(axis=1)

    batch_size = y.shape[0]
    return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size


# ============================================================
# 1. 레이어들 (Embedding, SimpleRNN, Affine, SoftmaxWithLoss)
# ============================================================

class Embedding:
    def __init__(self, W):
        # W: (V, D)
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.idx = None

    def forward(self, idx):
        # idx: (N, T) int
        W, = self.params
        self.idx = idx
        out = W[idx]  # (N, T, D)
        return out

    def backward(self, dout):
        # dout: (N, T, D)
        W, = self.params
        dW = self.grads[0]
        dW[...] = 0
        idx = self.idx
        N, T, D = dout.shape
        dout2 = dout.reshape(N * T, D)
        idx2 = idx.reshape(N * T)
        np.add.at(dW, idx2, dout2)
        return None


class SimpleRNNLayer:
    """
    단일 RNN layer (전체 시퀀스 unroll)
    입력: x (N, T, D)
    출력: h_last (N, H)
    """
    def __init__(self, Wx, Wh, b):
        # Wx: (D, H), Wh: (H, H), b: (H,)
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]

        self.xs = None      # 리스트 [x_t] (각각 (N, D))
        self.hs = None      # 리스트 [h_t] (각각 (N, H))
        self.h0 = None      # 초기 hidden (N, H)

    def forward(self, x):
        Wx, Wh, b = self.params
        N, T, D = x.shape
        H = Wh.shape[0]

        self.xs = []
        self.hs = []
        self.h0 = np.zeros((N, H), dtype=np.float32)

        h_prev = self.h0
        for t in range(T):
            xt = x[:, t, :]  # (N, D)
            a = xt.dot(Wx) + h_prev.dot(Wh) + b  # (N, H)
            h = np.tanh(a)
            self.xs.append(xt)
            self.hs.append(h)
            h_prev = h

        # 마지막 hidden state만 반환
        return self.hs[-1]

    def backward(self, dh_last):
        Wx, Wh, b = self.params
        N, D = self.xs[0].shape
        T = len(self.xs)
        H = Wh.shape[0]

        dWx, dWh, db = self.grads
        dWx[...] = 0
        dWh[...] = 0
        db[...] = 0

        dxs = [np.zeros((N, D), dtype=np.float32) for _ in range(T)]

        dh_next = dh_last  # (N, H)

        for t in reversed(range(T)):
            xt = self.xs[t]
            ht = self.hs[t]
            h_prev = self.h0 if t == 0 else self.hs[t - 1]

            # tanh backward
            da = dh_next * (1.0 - ht ** 2)  # (N, H)

            dWx += xt.T.dot(da)             # (D, H)
            dWh += h_prev.T.dot(da)         # (H, H)
            db += da.sum(axis=0)            # (H,)

            dx = da.dot(Wx.T)               # (N, D)
            dh_prev = da.dot(Wh.T)          # (N, H)

            dxs[t] = dx
            dh_next = dh_prev

        self.grads[0][...] = dWx
        self.grads[1][...] = dWh
        self.grads[2][...] = db

        dxs = np.stack(dxs, axis=1)  # (N, T, D)
        return dxs


class Affine:
    def __init__(self, W, b):
        self.params = [W, b]
        self.grads = [np.zeros_like(W), np.zeros_like(b)]
        self.x = None

    def forward(self, x):
        W, b = self.params
        self.x = x
        out = x.dot(W) + b
        return out

    def backward(self, dout):
        W, b = self.params
        dx = dout.dot(W.T)
        dW = self.x.T.dot(dout)
        db = np.sum(dout, axis=0)

        self.grads[0][...] = dW
        self.grads[1][...] = db
        return dx


class SoftmaxWithLoss:
    def __init__(self):
        self.params = []
        self.grads = []
        self.y = None
        self.t = None

    def forward(self, x, t):
        self.t = t
        self.y = softmax(x)
        loss = cross_entropy_error(self.y, self.t)
        return loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]

        if self.t.size == self.y.size:
            dx = (self.y - self.t) / batch_size
        else:
            dx = self.y.copy()
            dx[np.arange(batch_size), self.t] -= 1
            dx /= batch_size

        return dx * dout


# ============================================================
# 2. Optimizer (SGD)
# ============================================================

class SGD:
    def __init__(self, lr=0.01):
        self.lr = lr

    def update(self, params, grads):
        for p, g in zip(params, grads):
            p -= self.lr * g


# ============================================================
# 3. RNNClassifier
# ============================================================

class RNNClassifier:
    """
    Embedding → SimpleRNN(last hidden) → Affine → SoftmaxWithLoss
    """
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        V, D, H, O = vocab_size, embedding_dim, hidden_size, output_size

        embed_W = 0.01 * np.random.randn(V, D).astype(np.float32)
        Wx = 0.01 * np.random.randn(D, H).astype(np.float32)
        Wh = 0.01 * np.random.randn(H, H).astype(np.float32)
        b_rnn = np.zeros(H, dtype=np.float32)
        W_aff = 0.01 * np.random.randn(H, O).astype(np.float32)
        b_aff = np.zeros(O, dtype=np.float32)

        self.embed = Embedding(embed_W)
        self.rnn = SimpleRNNLayer(Wx, Wh, b_rnn)
        self.affine = Affine(W_aff, b_aff)
        self.loss_layer = SoftmaxWithLoss()

        self.params = []
        self.grads = []
        for layer in [self.embed, self.rnn, self.affine]:
            self.params += layer.params
            self.grads += layer.grads

    def predict(self, x_ids):
        x = self.embed.forward(x_ids)            # (N, T, D)
        h_last = self.rnn.forward(x)            # (N, H)
        score = self.affine.forward(h_last)     # (N, O)
        return score

    def forward(self, x_ids, t):
        score = self.predict(x_ids)
        loss = self.loss_layer.forward(score, t)
        return loss

    def backward(self, dout=1):
        ds = self.loss_layer.backward(dout)      # (N, O)
        dh_last = self.affine.backward(ds)       # (N, H)
        dx = self.rnn.backward(dh_last)          # (N, T, D)
        self.embed.backward(dx)                  # dW에 누적
        return None


# ============================================================
# 4. Gillam 데이터 로더 (CHA → 텍스트)
# ============================================================

def load_child_text(cha_path: Path, speakers=('CHI',)) -> str:
    utts = extract_utterances(str(cha_path), list(speakers))
    texts = [u.clean_text for u in utts if u.clean_text]
    return " ".join(texts)


def load_split_csv(csv_path: Path, base_dir: Path, speakers=('CHI',)):
    df = pd.read_csv(csv_path)

    texts = []
    labels = []

    label_to_idx = {"SLI": 0, "TD": 1}

    for _, row in df.iterrows():
        cha_rel = row["filename"]          # 예: 'gillam/SLI/5m/xxx.cha'
        cha_path = (base_dir / cha_rel).resolve()

        text = load_child_text(cha_path, speakers=speakers)

        if len(text.strip()) == 0:
            print(f"[WARN] Empty utterance: {cha_path}")
            continue

        texts.append(text)
        labels.append(label_to_idx[row["group"]])

    labels = np.array(labels, dtype=np.int64)
    return texts, labels


# ============================================================
# 5. Vocab + 시퀀스 변환
# ============================================================

def build_vocab(texts, min_freq=1, max_size=None):
    """
    vocab[0] = <pad>, vocab[1] = <unk>
    """
    counter = Counter()
    for t in texts:
        counter.update(t.lower().split())

    vocab = ["<pad>", "<unk>"]
    for w, c in counter.most_common():
        if c < min_freq:
            continue
        vocab.append(w)
        if max_size is not None and len(vocab) >= max_size:
            break

    word_to_id = {w: i for i, w in enumerate(vocab)}
    return vocab, word_to_id


def text_to_ids(text, word_to_id, max_len):
    pad_id = word_to_id["<pad>"]
    unk_id = word_to_id["<unk>"]

    tokens = text.lower().split()
    ids = [word_to_id.get(w, unk_id) for w in tokens]

    if len(ids) >= max_len:
        ids = ids[:max_len]
    else:
        ids += [pad_id] * (max_len - len(ids))

    return np.array(ids, dtype=np.int64)


def build_rnn_input(texts, labels, word_to_id, max_len):
    X_ids = np.stack([text_to_ids(t, word_to_id, max_len) for t in texts])  # (N, T)
    T = np.eye(2, dtype=np.float32)[labels]  # one-hot
    return X_ids, T


# ============================================================
# 6. 평가
# ============================================================

def evaluate_accuracy(model, X_ids, T):
    y_true = np.argmax(T, axis=1)
    scores = model.predict(X_ids)
    y_pred = np.argmax(scores, axis=1)
    acc = (y_true == y_pred).mean()
    return acc, y_true, y_pred


def print_classification_metrics(y_true, y_pred, label_names=("SLI", "TD")):
    """
    accuracy, confusion matrix, per-class precision/recall/F1, macro F1 출력
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    # 전체 accuracy
    acc = (y_true == y_pred).mean()

    # confusion matrix (2x2) 0=SLI, 1=TD
    tp_1 = np.sum((y_true == 1) & (y_pred == 1))
    tn_1 = np.sum((y_true == 0) & (y_pred == 0))
    fp_1 = np.sum((y_true == 0) & (y_pred == 1))
    fn_1 = np.sum((y_true == 1) & (y_pred == 0))

    print("\nConfusion Matrix (rows = true, cols = pred):")
    print("          pred_SLI   pred_TD")
    print(f"true_SLI   {tn_1:7d}   {fp_1:7d}")
    print(f"true_TD    {fn_1:7d}   {tp_1:7d}")

    # per-class metrics
    per_class = []
    for i, label in enumerate(label_names):
        tp = np.sum((y_true == i) & (y_pred == i))
        fp = np.sum((y_true != i) & (y_pred == i))
        fn = np.sum((y_true == i) & (y_pred != i))
        support = np.sum(y_true == i)

        precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        recall    = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1        = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0.0

        per_class.append((label, precision, recall, f1, support))

    macro_precision = np.mean([x[1] for x in per_class])
    macro_recall    = np.mean([x[2] for x in per_class])
    macro_f1        = np.mean([x[3] for x in per_class])

    print("\nPer-class metrics:")
    print("Class   Precision   Recall   F1-score   Support")
    for label, p, r, f1, sup in per_class:
        print(f"{label:5s}  {p:9.4f}  {r:7.4f}  {f1:9.4f}   {sup:7d}")

    print("\nMacro-averaged:")
    print(f"Precision: {macro_precision:.4f}, Recall: {macro_recall:.4f}, F1: {macro_f1:.4f}")
    print(f"\nOverall Accuracy: {acc:.4f}")


# ============================================================
# 7. 메인
# ============================================================

def main():
    base_dir = Path(".").resolve()

    train_csv = base_dir / "gillam_train.csv"
    dev_csv   = base_dir / "gillam_dev.csv"
    test_csv  = base_dir / "gillam_test.csv"

    # 1) train/dev/test 텍스트 & 라벨 로딩
    print("Loading train/dev/test splits ...")
    train_texts, train_labels = load_split_csv(train_csv, base_dir, speakers=("CHI",))
    dev_texts, dev_labels     = load_split_csv(dev_csv, base_dir, speakers=("CHI",))
    test_texts, test_labels   = load_split_csv(test_csv, base_dir, speakers=("CHI",))

    print(f"Train subjects: {len(train_texts)}, Dev: {len(dev_texts)}, Test: {len(test_texts)}")

    # 2) vocab 생성 (train 기준)
    vocab, word_to_id = build_vocab(train_texts, min_freq=1, max_size=None)
    print(f"Vocab size: {len(vocab)}")

    # 3) 시퀀스 변환
    max_len = 100  # 한 아이당 최대 토큰 수
    X_train_ids, T_train = build_rnn_input(train_texts, train_labels, word_to_id, max_len)
    X_dev_ids, T_dev     = build_rnn_input(dev_texts, dev_labels, word_to_id, max_len)
    X_test_ids, T_test   = build_rnn_input(test_texts, test_labels, word_to_id, max_len)

    # 4) 모델 설정
    vocab_size   = len(vocab)
    embedding_dim = 50
    hidden_size   = 100
    output_size   = 2

    print(f"RNN config: vocab={vocab_size}, emb={embedding_dim}, hidden={hidden_size}, max_len={max_len}")

    model = RNNClassifier(vocab_size, embedding_dim, hidden_size, output_size)
    optimizer = SGD(lr=0.01)

    max_epoch = 30
    batch_size = 8
    data_size = X_train_ids.shape[0]
    max_iters = max(1, data_size // batch_size)

    # Early stopping용
    best_dev = 0.0
    best_params = [p.copy() for p in model.params]

    # 5) 학습 루프
    for epoch in range(max_epoch):
        idx = np.random.permutation(data_size)
        X_train_ids = X_train_ids[idx]
        T_train = T_train[idx]

        total_loss, loss_cnt = 0.0, 0

        for it in range(max_iters):
            batch_x = X_train_ids[it * batch_size:(it + 1) * batch_size]
            batch_t = T_train[it * batch_size:(it + 1) * batch_size]

            loss = model.forward(batch_x, batch_t)
            model.backward()
            optimizer.update(model.params, model.grads)

            total_loss += loss
            loss_cnt += 1

        avg_loss = total_loss / max(1, loss_cnt)
        dev_acc, _, _ = evaluate_accuracy(model, X_dev_ids, T_dev)
        print(f"[RNN][Epoch {epoch+1:02d}] loss={avg_loss:.4f}, dev_acc={dev_acc:.4f}")

        if dev_acc > best_dev:
            best_dev = dev_acc
            best_params = [p.copy() for p in model.params]

    # best 모델로 롤백
    for p, bp in zip(model.params, best_params):
        p[...] = bp

    # 6) 최종 test 평가 + 상세 metric 출력
    test_acc, y_true, y_pred = evaluate_accuracy(model, X_test_ids, T_test)
    print(f"\n[TEST] accuracy = {test_acc:.4f}, best_dev = {best_dev:.4f}")
    print_classification_metrics(y_true, y_pred, label_names=("SLI", "TD"))


if __name__ == "__main__":
    main()


Loading train/dev/test splits ...
Train subjects: 540, Dev: 68, Test: 68
Vocab size: 3701
RNN config: vocab=3701, emb=50, hidden=100, max_len=100
[RNN][Epoch 01] loss=0.6628, dev_acc=0.7353
[RNN][Epoch 02] loss=0.6223, dev_acc=0.7353
[RNN][Epoch 03] loss=0.6027, dev_acc=0.7353
[RNN][Epoch 04] loss=0.5891, dev_acc=0.7353
[RNN][Epoch 05] loss=0.5826, dev_acc=0.7353
[RNN][Epoch 06] loss=0.5824, dev_acc=0.7353
[RNN][Epoch 07] loss=0.5790, dev_acc=0.7353
[RNN][Epoch 08] loss=0.5763, dev_acc=0.7353
[RNN][Epoch 09] loss=0.5773, dev_acc=0.7353
[RNN][Epoch 10] loss=0.5769, dev_acc=0.7353
[RNN][Epoch 11] loss=0.5785, dev_acc=0.7353
[RNN][Epoch 12] loss=0.5766, dev_acc=0.7353
[RNN][Epoch 13] loss=0.5766, dev_acc=0.7353
[RNN][Epoch 14] loss=0.5784, dev_acc=0.7353
[RNN][Epoch 15] loss=0.5746, dev_acc=0.7353
[RNN][Epoch 16] loss=0.5765, dev_acc=0.7353
[RNN][Epoch 17] loss=0.5765, dev_acc=0.7353
[RNN][Epoch 18] loss=0.5784, dev_acc=0.7353
[RNN][Epoch 19] loss=0.5783, dev_acc=0.7353
[RNN][Epoch 20] lo