In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
import cv2

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
class EarlyStopping:
    """조기 종료를 위한 클래스"""
    def __init__(self, patience=15, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_loss = float('inf')
        self.early_stop = False

    def __call__(self, val_loss):
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping Counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True

In [None]:
class CNN(nn.Module):
    def __init__(self, num_classes, input_shape):
        super(CNN, self).__init__()
        self.num_classes = num_classes
        self.input_shape = input_shape
        self.model = models.resnet50(pretrained=False)
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)

    def forward(self, x):
        return self.model(x)

    def train_model(self, X, Y, patience, device='cuda'):
        self.to(device)  # 모델을 지정된 디바이스로 이동
        dataset = TensorDataset(X.to(device), Y.to(device))
        train_loader = DataLoader(dataset, batch_size=16, shuffle=True)

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.SGD(self.parameters(), lr=0.01)

        early_stopping = EarlyStopping(patience=patience, verbose=True)

        self.train()
        for epoch in range(100000):
            total_loss = 0
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = self(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()

            avg_loss = total_loss / len(train_loader)
            print(f'Epoch {epoch+1}: Loss: {avg_loss:.4f}')

            early_stopping(avg_loss)
            if early_stopping.early_stop:
                print("Early stopping")
                break


    def remove_class(self, idx_to_remove):
        num_features = self.model.fc.in_features
        num_classes = self.model.fc.out_features - 1  # 한 클래스 제거

        # 새로운 가중치 텐서 생성
        new_weight = torch.empty((num_classes, num_features))
        new_bias = torch.empty(num_classes)

        # 기존 가중치에서 제거할 클래스를 제외하고 복사
        old_weight = self.model.fc.weight.data
        old_bias = self.model.fc.bias.data

        if idx_to_remove > 0:
            new_weight[:idx_to_remove] = old_weight[:idx_to_remove]
            new_bias[:idx_to_remove] = old_bias[:idx_to_remove]

        if idx_to_remove < num_classes:
            new_weight[idx_to_remove:] = old_weight[idx_to_remove + 1:]
            new_bias[idx_to_remove:] = old_bias[idx_to_remove + 1:]

        # 새로운 가중치로 레이어 업데이트
        self.model.fc = nn.Linear(num_features, num_classes)
        self.model.fc.weight.data = new_weight
        self.model.fc.bias.data = new_bias

    def add_class(self):
        num_features = self.model.fc.in_features
        num_classes = self.model.fc.out_features + 1  # 한 클래스 추가

        # 새로운 가중치 텐서 생성
        new_weight = torch.empty((num_classes, num_features))
        new_bias = torch.empty(num_classes)

        # 기존 가중치 복사
        old_weight = self.model.fc.weight.data
        old_bias = self.model.fc.bias.data

        new_weight[:-1] = old_weight
        new_bias[:-1] = old_bias

        # 새 클래스를 위한 가중치 초기화 (예: 무작위 초기화)
        new_weight[-1].normal_(0, 0.01)
        new_bias[-1].zero_()

        # 새로운 가중치로 레이어 업데이트
        self.model.fc = nn.Linear(num_features, num_classes)
        self.model.fc.weight.data = new_weight
        self.model.fc.bias.data = new_bias


In [None]:
class NodeList(object):
    def __init__(self, label, values, nodes, class_position):
        self.label = label
        self.values = values
        self.nodes = nodes
        self.class_position = class_position

    def __str__(self):
        return f"NodeList(label={self.label}, values={self.values}, nodes={self.nodes}, class_position={self.class_position})"

    def __repr__(self):
        return f"NodeList(label={self.label!r}, values={self.values!r}, nodes={self.nodes!r}, class_position={self.class_position!r})"

# 사용 예시
node_list = NodeList('Node1', [1, 2, 3], ['Node2', 'Node3'], 0)
print(node_list)


In [None]:
class CnnNode(object):
    def __init__(self, num_classes, labels=[], input_shape=(28, 28, 1), max_leafes=10):
        self.net = CNN(num_classes, input_shape)
        self.num_classes = num_classes
        self.childrens = [label for label in labels]
        self.childrens_leaf = [True for _ in range(num_classes)]
        self.labels = labels
        self.max_leafes = max_leafes
        self.labels_transform = {}
        for nc in range(num_classes):
            if labels:  # labels 리스트가 주어진 경우에만 실행
                self.labels_transform[labels[nc]] = []
                self.labels_transform[labels[nc]].append(labels[nc])
        
    def get_num_leafnodes(self):
        count = 0
        for is_leaf in self.childrens_leaf:
            if is_leaf:
                count += 1
        return count

    def remove_leaf(self, label):
        if label in self.labels_transform:
            del self.labels_transform[label]
            position_in_net = self.labels.index(label)
            self.labels.remove(label)
            self.childrens.remove(label)
            self.childrens_leaf.pop(position_in_net)
            self.net.remove_class(position_in_net)
            self.num_classes -= 1

    def add_leaf(self, label):
        self.childrens.append(label)
        self.childrens_leaf.append(True)
        self.labels.append(label)
        self.labels_transform[label] = [label]
        self.net.add_class()
        self.num_classes += 1
    
    def predict(self, imgs):
        if not isinstance(imgs, torch.Tensor):
            imgs = torch.tensor(imgs, dtype=torch.float32)  # 데이터 타입 명시
        imgs = imgs.to(next(self.net.parameters()).device)  # 모델과 같은 디바이스로 이동
        self.net.eval()
        with torch.no_grad():
            outputs = self.net(imgs)
        return outputs

    def inference(self, imgs):
        vector_output = self.predict(imgs)
        probs = F.softmax(vector_output, dim=1)
        out = torch.argmax(probs, dim=1)
        output = -1 * torch.ones(imgs.shape[0], dtype=torch.int32).to(imgs.device)  # 디바이스 일치
        for i, o in enumerate(out):
            if self.childrens_leaf[o]:
                output[i] = self.labels[o]
        return output.cpu().numpy()  # GPU 사용 시 CPU로 이동

    def train(self, X, Y):
        self.net.train()
        dataset = TensorDataset(X, Y)
        loader = DataLoader(dataset, batch_size=64, shuffle=True)
        optimizer = torch.optim.Adam(self.net.parameters(), lr=0.001)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(10):  # 예시로 10 에포크 설정
            for inputs, labels in loader:
                optimizer.zero_grad()
                outputs = self.net(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
            print(f'Epoch {epoch+1}, Loss: {loss.item()}')

In [None]:
class TreeCNN:
    def __init__(self, initial_labels, alpha=0.1, beta=0.1, input_shape=(28, 28, 1), max_leafnodes=1000):
        self.root = CnnNode(len(initial_labels), initial_labels, input_shape)
        self.alpha = alpha
        self.beta = beta
        self.max_leafnodes = max_leafnodes

    def train(self, X, Y):
        self.root.train(X, Y)

    def inference(self, X):
        return self.root.inference(X)

    def growTreeCNN(self, operation_node, imgs_of_classes=[], labels=[]):
        def get_Oavg_matrix(node, imgs_of_classes_, labels_):
            Oavg = torch.tensor([], dtype=torch.float32).to('cuda')  # 빈 텐서 초기화 및 GPU에 할당
            for imgs, label in zip(imgs_of_classes_, labels_):
                imgs_tensor = torch.tensor(imgs, dtype=torch.float32).to('cuda')  # 이미지를 텐서로 변환하고 GPU로 이동
                net_out = node.predict(imgs_tensor)  # 모델 예측
                Oavg_i = torch.mean(net_out, dim=0, keepdim=True)  # 평균 출력 계산
                Oavg = torch.cat((Oavg, Oavg_i), dim=1)  # 결과 텐서에 연결
            return Oavg

        def get_loglikelihood_matrix(Oavg):
            exp_Oavg = torch.exp(Oavg)  # Oavg의 지수 계산
            llh = exp_Oavg / torch.sum(exp_Oavg, dim=0, keepdim=True)  # 소프트맥스 계산과 유사한 방법으로 정규화
            return llh

    # Oavg와 llh 계산
    Oavg = get_Oavg_matrix(operation_node, imgs_of_classes, labels)
    llh = get_loglikelihood_matrix(Oavg)