In [15]:
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms

In [16]:
# MNIST 데이터셋 로드 함수
def load_mnist_data():
  # Define a transform to convert the images to tensors and normalize them
    transform = transforms.Compose([
        transforms.ToTensor()         # Convert images to tensor
    ])

    # Load the MNIST dataset (train set for example)
    mnist_data = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)

    # Create a DataLoader to fetch images
    dataloader = torch.utils.data.DataLoader(mnist_data, batch_size=2, shuffle=True)
    # Get a batch of images
    data_iter = iter(dataloader)
    x_train, labels = next(data_iter)  # Corrected this line
    
    # x_train을 (batch_size, 28*28) 형식으로 변환
    if not isinstance(x_train, torch.Tensor):
      raise TypeError("x_train은 Tensor 타입이어야 합니다.")
    
    #size? 텐서의 각 차원의 크기를 반환
    #unpacking? size객체에서 값을 각각의변수에 할당.
    batch_size, channels, height, width = x_train.size()

    print("x_tran.size() ", x_train.size())
    #dim? tensor의 차원수 반환 
    #flatten(펼침)? 다차원 데이터를 1차원 벡터로 변환. 
    #Fully Connected Layer, Dense Layer은 1차원 벡터 형태의 데이터를 요구.
    #view? 데이터 구성은 그대로, tensor의 size 차원를 변환 
    if x_train.dim() >= 2:
      x_train = x_train.view(batch_size, -1) #-1? 첫번째 차원은 유지, 나머지 차원은 하나의 차원으로 flatten.
    
    inputs = x_train.numpy()
    labels = labels.numpy()  # 레이블을 numpy 배열로 변환    

    return inputs, labels

In [17]:
x_train, labels = load_mnist_data()
x_train.shape

x_tran.size()  torch.Size([2, 1, 28, 28])


(2, 784)

In [18]:
labels.shape
labels

array([1, 1])

input shape(batch_size, pixcel_size of image) 일때, weight(pixcel_size)와의 dot 연산의 비교 예제

In [19]:
import torch

# batch_size가 없을 때, 픽셀 3개인 이미지 1개 (1D 벡터)
pixcel_1D = torch.tensor([1, 2, 3])  # 크기: (3,)

# batch_size = 2일 때, 픽셀 3개인 이미지 2개 (2D 행렬)
pixcel_2D = torch.tensor([[1, 2, 3],   # 크기: (2, 3)
                     [4, 5, 6]])

# weight 벡터 (3,) 3개의 가중치가 존재 
weights = torch.tensor([7, 8, 9])  # 크기: (3,)

# 1. A_1D와 B_2D의 dot 연산 (내적 연산)
result_1D_dot = torch.dot(pixcel_1D, weights)

# 2. A_2D와 B_2D의 mm() 연산 (행렬 곱셈)
# A_2D의 각 이미지에 대해 B_2D와 곱셈을 수행
result_2D_mm = torch.matmul(pixcel_2D, weights)

# 결과 출력
print("pixcel_1D dot weights 결과\n", result_1D_dot)  # 출력: 스칼라 값
print("pixcel_2D mm weights 결과\n", result_2D_mm)  # 출력: (2,) 크기의 벡터

pixcel_1D dot weights 결과
 tensor(50)
pixcel_2D mm weights 결과
 tensor([ 50, 122])


In [20]:
import torch

def v_dot(A, B):
    if isinstance(A, np.ndarray) and isinstance(B, np.ndarray):
        if A.ndim == 1 and B.ndim == 1:
            assert A.shape[0] == B.shape[0], "Size mismatch for 1D dot product"
            return np.dot(A, B)  # NumPy의 dot 연산
        else:
            print("A is 1st param (input)", A.shape)
            print("B is 2nd param (weight)", B.shape)
            return np.matmul(A, B)  # NumPy의 matmul 연산

    elif isinstance(A, torch.Tensor) and isinstance(B, torch.Tensor):
        if A.dim() == 1 and B.dim() == 1:
            assert A.size(0) == B.size(0), "Size mismatch for 1D dot product"
            return torch.dot(A, B)  # PyTorch의 dot 연산
        else:
            return torch.matmul(A, B)  # PyTorch의 matmul 연산
    else:
        raise ValueError("Both inputs must be either NumPy arrays or PyTorch tensors")


In [21]:
class MultiInputNeuron:
    def __init__(self, number_of_inputs: int):
        self.weights:np.ndarray = np.random.rand(number_of_inputs)
        self.bias:float = np.random.rand(1)

    def activate(self, inputs: np.ndarray, activation_fn=None) -> float:
        weighted_sum = v_dot(inputs, self.weights) + self.bias
        self.activation = activation_fn(weighted_sum) if activation_fn else weighted_sum
        
        print(f"[MultiInputNeuron.activate]inputs {inputs}, weighted_sum {weighted_sum}")
        print(f"[MultiInputNeuron.activate]activation {self.activation}")
        return self.activation
    
    def update(self, weights:np.ndarray, bias:float) : 
        self.weights = weights
        self.bias = bias
        
        print(f"[MultiInputNeuron.update]weights {weights}, bias {bias}")
    
    ''' softmax(self, z: np.ndarray) -> np.ndarray
        출력값이 범위 0 <= softmax(z) <= 1
        Softmax(z_i) = e^(z_i - z_max) / ∑(j=1 to n) e^(z_j - z_max)
    '''
    @staticmethod
    def softmax(z: np.ndarray) -> np.ndarray:
        print("activation function is softmax")
        if z is None or np.any(z == None):
            raise ValueError("Invalid input: None values detected in softmax input.")
        
        z_max = np.max(z, axis=-1, keepdims=True)
        exp_for_z = np.exp(z - z_max) 
        sum_exp_for_z = np.sum(exp_for_z, axis=-1, keepdims=True)  
        return exp_for_z / sum_exp_for_z

    @staticmethod
    def sigmoid(z: np.ndarray) -> np.ndarray:
        return 1 / (1 + np.exp(-z))
    
    @staticmethod
    def sigmoid_derivative(z: np.ndarray) -> np.ndarray:
        return z * (1 - z)

    @staticmethod
    def relu(z: np.ndarray) -> np.ndarray:
        print("activation function is relu")
        return np.maximum(0, z)
    
    @staticmethod
    def relu_partial_derivative(z: np.ndarray) -> np.ndarray :
        print("relu partial derivative")
        return 0

In [22]:
class Layer:
    def __init__(self, name: str, number_of_neurons: int, number_of_inputs: int):
        self.name = name  
        self.number_of_neurons = number_of_neurons
        self.number_of_inputs = number_of_inputs
        self.neurons = [MultiInputNeuron(number_of_inputs) for _ in range(number_of_neurons)]

        print(f"hiddenlayer name is", self.name)
        print(f"number_of_neurons {number_of_neurons}", "number_of_inputs {number_of_inputs}")
        
    def forward(self, inputs: np.ndarray) -> np.ndarray:
        raise NotImplementedError("forward 메서드는 하위 클래스에서 구현되어야 합니다.")
    
    def back(self, errors: np.ndarray) -> np.ndarray:
        raise NotImplementedError("back 메서드는 하위 클래스에서 구현되어야 합니다.")
    
    def get_name(self) :
        return self.name
    def get_number_of_neurons(self) :
        return self.number_of_neurons
    def get_neurons(self) :
        return self.neurons
    def get_number_of_inputs(self) :
        return self.number_of_inputs



In [23]:
# HiddenLayer 클래스 정의 (Layer 상속)
class HiddenLayer(Layer):
    def __init__(self, name: str, number_of_neurons: int, number_of_inputs: int, activation_fn=None):
        super().__init__(name, number_of_neurons, number_of_inputs)
        self.activation_fn = activation_fn
        self.activations:np.ndarray = np.zeros(number_of_neurons)
        print(f"activation_fn {self.activation_fn}", "partial_derivative {self.partial_derivative}")

    '''
        if inputs(batch_size=2, pixcel=784), 
            1st hidden layer의 출력의 형태는 (batch_size, hidden layer의 neurons의 수)가 되어야함.
            첫 번째 차원은 배치 크기 2를 나타내며, 각각 2개의 입력 데이터를 처리한 결과를 나타냅니다.
            두 번째 차원은 각각의 입력 데이터에 대한 16개의 뉴런 활성화 값을 나타냅니다.
     '''
    def forward(self, inputs: np.ndarray) -> np.ndarray:
        print(f"hiddenlayer name is {self.name}", "forward")
        self.activations = np.stack(
                    np.array(
                        [neuron.activate(inputs, self.activation_fn) for neuron in self.neurons]
                    ), axis=1)
        return self.activations
    
    def back(self, weights_changes:np.ndarray, bias_changes:np.ndarray) -> np.ndarray:
        print(f"hiddenlayer name is {self.name}", "back")
        
        try :
            for neuron in enumerate(self.neurons):
                weights = neuron.weights - weights_changes
                bias = neuron.bias - bias_changes
                neuron.update(weights, bias)
            
        except Exception as e:
            print(e)
        finally:
            self.activations = None

    def get_activations(self) :
        return self.activations

In [24]:
# OutputLayer 클래스 정의 (Layer 상속)
class OutputLayer(Layer):
    def __init__(self, number_of_neurons: int, number_of_inputs: int, activation_fn=None):
        super().__init__(
            name="output_layer"
            , number_of_neurons=number_of_neurons
            , number_of_inputs=number_of_inputs)
        self.activation_fn = activation_fn
        self.activations:np.ndarray =  np.zeros(number_of_neurons)
        
    def forward(self, inputs: np.ndarray) -> np.ndarray:
        print(f"hiddenlayer name is {self.name}", "forward")

        #output layer에서는 각각의 neuron은 가중합(weighted sum)만을 계산한다.
        #output layer에서는 전체 weighted sum에 대해서 activation funcion을 적용한다.
        self.activations = self.activation_fn (
            np.stack(
                np.array(
                    [neuron.activate(inputs, None) for neuron in self.neurons]
                )
            , axis=1)
        )
        return self.activations
        
    def back(self, weights_changes:np.ndarray, bias_changes:np.ndarray) -> None : 
        print(f"hiddenlayer name is {self.name}", "back")
        
        try :
            for i, neuron in enumerate(self.neurons):
                weights = neuron.weights - weights_changes[i]
                bias = neuron.bias - bias_changes[i]
                neuron.update(weights, bias)
        except Exception as e:
            print(e)
            raise e
            
    def get_activations(self) -> np.ndarray:
        return self.activations

In [25]:
# FCNN 클래스 정의
class FCNN:
    def __init__(self, hidden_layers: list, output_layer: OutputLayer):
        self.hidden_layers = hidden_layers
        self.output_layer = output_layer

    def forward_propagation(self, inputs: np.ndarray) -> np.ndarray:
        for layer_index, hidden_layer in enumerate(self.hidden_layers):
            if layer_index == 0:
                hidden_layer_outputs = hidden_layer.forward(inputs)
            else :
                hidden_layer_outputs = hidden_layer.forward(hidden_layer_outputs)
        self.output_layer.forward(hidden_layer_outputs)

    def get_output_layer(self) :
        return self.output_layer
    def get_hidden_layers(self) :
        return self.hidden_layers

    @staticmethod
    def calculate_number_of_inputs(image_shape: tuple) -> int:
        return np.prod(image_shape)  # (28, 28) -> 784

In [26]:
class CostFunction() :
    def __init__(self, labels:np.ndarray, number_of_inputs:int):
        self.labels:np.ndarray = labels
        self.accuracy:float = 0.0
        self.errors:np.ndarray = np.zeros(number_of_inputs)
        self.loss:float = 0.0
        
    def calculate(self, output_layer:OutputLayer) -> np.ndarray:
        self.erros = self._calculate_errors(output_layer)
        self.accuracy = self._calculate_accuracy(output_layer.get_activations())
        self.loss = self._calculate_cross_entropy_loss(output_layer.get_activations())
            
    def get_errors(self) -> np.ndarray:
        return self.erros 
    
    def get_accuracy(self) -> float:
        return self.accuracy
    
    def get_loss(self) -> float:
        return self.loss
    
    def _calculate_errors(self, output_layer:OutputLayer) ->np.ndarray:
        self.errors = output_layer.get_activations() - self._get_one_hot_coding(output_layer.get_number_of_neurons())

    def _get_one_hot_coding(self, number_of_neurons:int) -> np.ndarray:
        return np.array([np.eye(number_of_neurons)[label] 
                    for label in self.labels])
    
    ''' def _calculate_cross_entropy_loss(self) 
        L = - (1/N) * Σ (from i=1 to N) [log(p_true^i + ε)]
            N은 배치 크기,
            Σ는 합(sum)을 의미하며, i=1부터 N까지의 합을 구합니다,
            p_true^i는 i번째 샘플의 정답 클래스에 대한 예측 확률,
            ε는 작은 수로, 로그에서 0으로 나누는 것을 방지하기 위한 값입니다.
    '''
    def _calculate_cross_entropy_loss(self, probability_values: np.ndarray) -> float :
        p = self._get_probility_values_for_labels(probability_values)
        return self._average_after_sigma(self._loss(p))
    
    def _get_probility_values_for_labels(self, probability_values: np.ndarray) -> np.ndarray:
        batch_size = probability_values.shape[0]
        dimension = probability_values.ndim
        if (dimension == 1) :
            probability_for_labels = probability_values[self.labels]
        else :
            probability_for_labels = probability_values[np.arange(batch_size), self.labels]
            return probability_for_labels
        
    def _loss(self, p:np.ndarray) -> float:
        # 0 < x < 1 => 음수
        # 로그를 사용하여 확률 값을 손실 값으로 변환
        epsilon = 1e-15 # log(0)의 오류 방지를 위해서
        minusone = -1   # 음수에서 양수로 전환하여 부호에 상관없이 값의 크기를 증폭
        return np.log(p + epsilon) * minusone
    
    def _average_after_sigma(self, operand) -> float:
        return np.mean(operand) # == 1/n * sigma(operand)
    
    def _calculate_accuracy(self, probability_values:np.ndarray) -> np.ndarray:
        if probability_values is None :
            raise ValueError("Cannot calculate accuracy: probability_values is None.")
        
        max_values_indexs = np.argmax(probability_values, axis=-1) 
        self.accuracy = np.sum(max_values_indexs == self.labels) / self.labels.size


In [27]:
from abc import ABC, abstractmethod

class Optimizer(ABC) :      
    @abstractmethod
    def step(self, model:FCNN, cost:CostFunction) -> float:
        pass
    
class GradientDescent(Optimizer) :
    def __init__(self):
        self.learning_rate:float = 0.01
        self.step_count:int = 0
        
    def step(self, model:FCNN, cost:CostFunction) -> float:
        try :
            weights_changes = self._get_weights_changes(model, cost)
            bias_changes = self._get_bias_changes(model, cost)
            
            model.get_output_layer().back(weights_changes, bias_changes)
            
           # for hidden_layer in enumerate(model.get_hidden_layers()):
           #     hidden_layer.back()
            self.step_count += 1
        except Exception as e:
            print(e)
            
    '''
    # weight의 변화량
        ΔW_output = learning_rate * output_error * hidden_layer_output.T
            learning_rate는 학습률 
            output_error는 출력층에서 계산된 오차 
            hidden_layer_output.T는 은닉층의 출력값의 전치행렬 
    '''
    def _get_weights_changes(self, model:FCNN, cost:CostFunction) -> np.ndarray:
        return self.learning_rate * np.dot(model.get_hidden_layers()[-1].get_activations().T, cost.get_errors())
    
    def _get_bias_changes(self, model:FCNN, cost:CostFunction) -> np.ndarray :
        return self.learning_rate * np.sum(cost.get_errors(), axis=0)
    
    def get_step_count(self) :
        return self.step_count

In [28]:
class FCNNMultiClassClassifier:
    def __init__(self):
        self.x_train, self.labels = load_mnist_data()
        self.fcnn = self.createFCNN()

    def createFCNN(self):
        # x_train으로부터 이미지 크기 추출 및 입력 뉴런 수 계산
        image_shape = self.x_train.shape[1:]  # (784) 차원 추출
        number_of_inputs = FCNN.calculate_number_of_inputs(image_shape) 

        hidden_layer1 = HiddenLayer(
            name="hidden_layer_1"
            , number_of_neurons=16
            , number_of_inputs=number_of_inputs
            , activation_fn=lambda x: MultiInputNeuron.relu(x))
        
        hidden_layer2 = HiddenLayer(
            name="hidden_layer_2"
            , number_of_neurons=16
            , number_of_inputs=16
            , activation_fn=lambda x: MultiInputNeuron.relu(x))
        
        hidden_layers = [hidden_layer1, hidden_layer2]

        output_layer = OutputLayer(
            number_of_neurons=10
            , number_of_inputs=16
            , activation_fn=lambda x: MultiInputNeuron.softmax(x))

        return FCNN(hidden_layers=hidden_layers, output_layer=output_layer)

    def train(self, inputs: np.ndarray, labels: np.ndarray, epochs: int):

        for epoch in range(epochs):
            self.fcnn.forward_propagation(inputs)
            
            cost = CostFunction(labels, self.fcnn.get_output_layer().get_number_of_neurons())
            cost.calculate(self.fcnn.get_output_layer())
            
            gradientDescent = GradientDescent()
            gradientDescent.step(self.fcnn, cost)
            
            if epoch % 1000 == 0:
                print(f'Epoch {epoch + 1}, Loss: {cost.get_loss()}, accuracy: {cost.get_accuracy()}')
                
    def train_fcnn(self):
        # FCNN 학습
        self.train(self.x_train, self.labels, epochs=10)

    def predict(self, inputs):
        # 예측 함수 호출
        return self.fcnn.forward_propagation(inputs)

    @staticmethod
    def main():

        # FCNNMultiClassClassifier 객체 생성
        multiClassClassifier = FCNNMultiClassClassifier()
        # 모델 학습
        multiClassClassifier.train_fcnn()

        # 테스트 입력 데이터 불러오기 (MNIST 데이터)
        #test_input, _ = load_mnist_data()

        # 예측 수행
        #prediction = fcnn_predicter.predict(test_input)

        # 예측 결과 출력
        #print(f"Prediction: {prediction}")

# FCNNPredicter main 함수 호출
if __name__ == "__main__":
    FCNNMultiClassClassifier.main()


x_tran.size()  torch.Size([2, 1, 28, 28])
hiddenlayer name is hidden_layer_1
number_of_neurons 16 number_of_inputs {number_of_inputs}
activation_fn <function FCNNMultiClassClassifier.createFCNN.<locals>.<lambda> at 0x0000021B340C6C00> partial_derivative {self.partial_derivative}
hiddenlayer name is hidden_layer_2
number_of_neurons 16 number_of_inputs {number_of_inputs}
activation_fn <function FCNNMultiClassClassifier.createFCNN.<locals>.<lambda> at 0x0000021B34274900> partial_derivative {self.partial_derivative}
hiddenlayer name is output_layer
number_of_neurons 10 number_of_inputs {number_of_inputs}
hiddenlayer name is hidden_layer_1 forward
A is 1st param (input) (2, 784)
B is 2nd param (weight) (784,)
activation function is relu
[MultiInputNeuron.activate]inputs [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]], weighted_sum [61.49349891 57.73635543]
[MultiInputNeuron.activate]activation [61.49349891 57.73635543]
A is 1st param (input) (2, 784)
B is 2nd param (weight) (784,)
activa