<a href="https://colab.research.google.com/github/sasa10th/research/blob/main/Hebbian_Neural_Network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hebbian_Neural_Network

In [None]:
# =============================================================================
# ⓒ 2025 김지환 (Kim Ji-Hwan), 문휘석 (Moon Hwi-Seok) 세종과학예술영재학교 — All rights reserved.
# 작성자   : 김지환 (Kim Ji-Hwan), 문휘석 (Moon Hwi-Seok)
# 소속     : 세종과학예술영재학교
# 작성일   : 2025-06-10
# 버전     : 1.0
# =============================================================================
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import r2_score
import pandas as pd
import numpy as np
import random
import os


def set_seed(seed=24):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


class HebbianLinear(nn.Module):
    active_layers = 0

    def __init__(self, in_features, out_features, threshold=1e-2, verbose=False):
        super().__init__()
        self.linear = nn.Linear(in_features, out_features)
        self.bn = nn.BatchNorm1d(out_features)
        self.threshold = threshold
        self.verbose = verbose
        self.active = True
        self.last_input = None
        self.last_output = None
        HebbianLinear.active_layers += 1

    def forward(self, x):
        if self.active:
            self.last_input = x.detach()
            out = self.bn(self.linear(x))
            self.last_output = out.detach()
            return out
        else:
            return x @ torch.zeros(x.size(1), self.linear.out_features, device=x.device)

    def evolve(self):
        if not self.active or self.last_input is None or self.last_output is None:
            return

        with torch.no_grad():
            corr = torch.bmm(
                self.last_output.unsqueeze(2), self.last_input.unsqueeze(1)
            )
            mean_corr = corr.mean(dim=0)
            strength = torch.norm(mean_corr, p="fro")

            if self.verbose:
                print(f"[DEBUG] correlation_strength = {strength:.6f}")

            if strength < self.threshold:
                if HebbianLinear.active_layers > 1:
                    self.active = False
                    HebbianLinear.active_layers -= 1
                    if self.verbose:
                        print(f"[DEBUG] 레이어 비활성화 (threshold={self.threshold})")
                elif self.verbose:
                    print("[DEBUG] 비활성화 보류 - 마지막 활성 레이어 보호됨")


class HebbianNet(nn.Module):
    def __init__(
        self,
        input_dim,
        hidden_dim=128,
        output_dim=1,
        dropout_prob=0.3,
        verbose=False,
        thresh_l1=0.10,
        thresh_l2=0.05,
        thresh_l3=0.02,
    ):
        super().__init__()
        # 레이어별 threshold 값 할당
        self.l1 = HebbianLinear(
            input_dim, hidden_dim, threshold=thresh_l1, verbose=verbose
        )
        self.l2 = HebbianLinear(
            hidden_dim, hidden_dim, threshold=thresh_l2, verbose=verbose
        )
        self.l3 = HebbianLinear(
            hidden_dim, output_dim, threshold=thresh_l3, verbose=verbose
        )
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        x = self.dropout(x)
        x = self.l3(x)
        return x

    def evolve_layers(self):
        self.l1.evolve()
        self.l2.evolve()
        self.l3.evolve()


def load_dataset_from_excel(xlsx_path):
    if not os.path.exists(xlsx_path):
        raise FileNotFoundError(f"파일을 찾을 수 없습니다: {xlsx_path}")

    df = pd.read_excel(xlsx_path)
    y = df.iloc[:, 0].to_numpy(dtype=np.float32)
    X = df.iloc[:, 1:].to_numpy(dtype=np.float32)

    x_scaler = MinMaxScaler()
    y_scaler = StandardScaler()

    X_scaled = x_scaler.fit_transform(X)
    y_scaled = y_scaler.fit_transform(y.reshape(-1, 1)).flatten()

    return train_test_split(X_scaled, y_scaled, test_size=0.8, random_state=42)


def summarize_performance(model, X_test_tensor, y_test_tensor, device):
    model.eval()
    with torch.no_grad():
        preds = model(X_test_tensor.to(device)).cpu().numpy()
        true = y_test_tensor.cpu().numpy()
        r2 = r2_score(true, preds)
        print(f"\n[최종 테스트 R² 점수] {r2:.4f}")


def train_hebbian_model(
    xlsx_path,
    num_epochs=100,
    batch_size=32,
    lr=1e-3,
    verbose=False,
    thresh_l1=0.10,
    thresh_l2=0.05,
    thresh_l3=0.02,
):
    set_seed(24)
    X_train, X_test, y_train, y_test = load_dataset_from_excel(xlsx_path)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

    train_loader = DataLoader(
        TensorDataset(X_train_tensor, y_train_tensor),
        batch_size=batch_size,
        shuffle=True,
    )

    model = HebbianNet(
        input_dim=X_train.shape[1],
        hidden_dim=128,
        dropout_prob=0.3,
        verbose=verbose,
        thresh_l1=thresh_l1,
        thresh_l2=thresh_l2,
        thresh_l3=thresh_l3,
    ).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode="min", factor=0.5, patience=10, verbose=True
    )
    criterion = nn.MSELoss()

    for epoch in range(1, num_epochs + 1):
        model.train()
        total_loss = 0.0
        for x_batch, y_batch in train_loader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            pred = model(x_batch)
            loss = criterion(pred, y_batch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * x_batch.size(0)

        avg_loss = total_loss / len(train_loader.dataset)
        scheduler.step(avg_loss)

        if epoch % 10 == 0:
            model.evolve_layers()

        print(f"[Epoch {epoch}/{num_epochs} 완료] 평균 손실 | {avg_loss:.6f}")

    summarize_performance(model, X_test_tensor, y_test_tensor, device)


if __name__ == "__main__":
    train_hebbian_model("data.xlsx", num_epochs=100, verbose=True)


# vs. ANN

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import r2_score, mean_squared_error
from torch.utils.data import DataLoader, TensorDataset

# HebbianLinear 레이어
class HebbianLinear(nn.Module):
    active_layers = 0
    def __init__(self, in_features, out_features, threshold=1e-2, name=""):
        super().__init__()
        self.linear = nn.Linear(in_features, out_features)
        self.bn = nn.BatchNorm1d(out_features)
        self.threshold = threshold
        self.name = name
        self.active = True
        self.last_input = None
        self.last_output = None
        HebbianLinear.active_layers += 1

    def forward(self, x):
        if self.active:
            self.last_input = x.detach()
            out = self.linear(x)
            out = self.bn(out)
            self.last_output = out.detach()
            return out
        else:
            return x @ torch.zeros(x.size(1), self.linear.out_features, device=x.device)

    def evolve(self):
        if not self.active or self.last_input is None or self.last_output is None:
            return
        with torch.no_grad():
            corr = torch.bmm(self.last_output.unsqueeze(2), self.last_input.unsqueeze(1))
            mean_corr = corr.mean(dim=0)
            strength = torch.norm(mean_corr, p='fro')
            print(f"[{self.name}] Hebbian correlation = {strength:.4f}")
            if strength < self.threshold and HebbianLinear.active_layers > 1:
                self.active = False
                HebbianLinear.active_layers -= 1
                print(f"[{self.name}] 비활성화됨")
            elif strength < self.threshold:
                print(f"[{self.name}] 비활성화 보류 (마지막 활성 레이어 보호)")

# HebbianNet 모델
class HebbianNet(nn.Module):
    def __init__(self, input_dim, output_dim=1):
        super().__init__()
        self.l1 = HebbianLinear(input_dim, 128, threshold=0.1, name="L1")
        self.l2 = HebbianLinear(128, 128, threshold=0.05, name="L2")
        self.l3 = HebbianLinear(128, output_dim, threshold=0.02, name="L3")
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        x = self.dropout(x)
        x = self.l3(x)
        return x

    def evolve_layers(self):
        self.l1.evolve()
        self.l2.evolve()
        self.l3.evolve()

# Deep ANN 모델
class DeepANN(nn.Module):
    def __init__(self, input_dim, output_dim=1):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )
    def forward(self, x):
        return self.model(x)

# 데이터 로딩
def load_dataset_from_excel(xlsx_path):
    df = pd.read_excel(xlsx_path)
    y = df.iloc[:, 0].to_numpy(dtype=np.float32)
    X = df.iloc[:, 1:].to_numpy(dtype=np.float32)
    X_scaled = MinMaxScaler().fit_transform(X)
    y_scaled = StandardScaler().fit_transform(y.reshape(-1, 1)).flatten()
    return train_test_split(X_scaled, y_scaled, test_size=0.8, random_state=42)

# 학습 함수
def train_model(model, X_train, y_train, X_test, y_test, device, is_hebbian=False, num_epochs=100):
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=10)
    train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
    train_losses, test_r2s, test_rmses = [], [], []
    start = time.time()

    for epoch in range(1, num_epochs + 1):
        model.train()
        total_loss = 0.0
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            pred = model(xb)
            loss = criterion(pred, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * xb.size(0)
        avg_loss = total_loss / len(train_loader.dataset)
        scheduler.step(avg_loss)
        if is_hebbian and epoch % 10 == 0:
            model.evolve_layers()
        model.eval()
        with torch.no_grad():
            y_pred = model(X_test.to(device)).cpu().numpy()
            y_true = y_test.cpu().numpy()
            r2 = r2_score(y_true, y_pred)
            rmse = np.sqrt(mean_squared_error(y_true, y_pred))
        train_losses.append(avg_loss)
        test_r2s.append(r2)
        test_rmses.append(rmse)
        print(f"Epoch {epoch:03d} | Loss: {avg_loss:.4f} | R²: {r2:.4f}")
    end = time.time()
    mem = torch.cuda.memory_allocated(device) / (1024 ** 2) if torch.cuda.is_available() else 0
    return train_losses, test_r2s, test_rmses, end - start, mem

# 학습 곡선 그리기
def plot_curves(hebb_loss, deep_loss, hebb_r2, deep_r2, hebb_rmse, deep_rmse):
    epochs = range(1, len(hebb_loss) + 1)

    plt.figure()
    plt.plot(epochs, hebb_loss, label="Hebbian Loss")
    plt.plot(epochs, deep_loss, label="DeepANN Loss")
    plt.title("Loss")
    plt.legend()
    plt.grid()
    plt.show()

    plt.figure()
    plt.plot(epochs, hebb_r2, label="Hebbian R²")
    plt.plot(epochs, deep_r2, label="DeepANN R²")
    plt.title("R² Score")
    plt.legend()
    plt.grid()
    plt.show()

    plt.figure()
    plt.plot(epochs, hebb_rmse, label="Hebbian RMSE")
    plt.plot(epochs, deep_rmse, label="DeepANN RMSE")
    plt.title("RMSE")
    plt.legend()
    plt.grid()
    plt.show()


# 실행 메인
def compare_hebbian_vs_deepann(xlsx_path, num_epochs=100):
    X_train, X_test, y_train, y_test = load_dataset_from_excel(xlsx_path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    X_train = torch.tensor(X_train, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_test = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

    hebbian = HebbianNet(X_train.shape[1]).to(device)
    print("\n▶ HebbianNet")
    hloss, hr2, hrmse, htime, hmem = train_model(hebbian, X_train, y_train, X_test, y_test, device, is_hebbian=True)

    deep = DeepANN(X_train.shape[1]).to(device)
    print("\n▶ DeepANN")
    dloss, dr2, drmse, dtime, dmem = train_model(deep, X_train, y_train, X_test, y_test, device)

    plot_curves(hloss, dloss, hr2, dr2, hrmse, drmse)
    print("\n⏱ 성능 비교")
    print(f"Hebbian: {htime:.2f}s | {hmem:.2f}MB")
    print(f"DeepANN: {dtime:.2f}s | {dmem:.2f}MB")

# 실행 예시
compare_hebbian_vs_deepann("data(1).xlsx", num_epochs=100)
