In [7]:
# import
import numpy as np
import cv2

from os import environ

# 멀티 스레딩 시도

N_THREADS = '8'
environ['OMP_NUM_THREADS'] = N_THREADS
environ['OPENBLAS_NUM_THREADS'] = N_THREADS
environ['MKL_NUM_THREADS'] = N_THREADS
environ['VECLIB_MAXIMUM_THREADS'] = N_THREADS
environ['NUMEXPR_NUM_THREADS'] = N_THREADS

In [8]:
import glob

path = './data_homework/G1020/Images/'

target = glob.glob(path + '*.jpg')
X = []
for idx in range(len(target)):
    print(f"{idx+1} / {len(target)}", end='\r')
    path = target[idx]
    img = cv2.imread(path)
    img = cv2.resize(img, dsize=(128, 128), interpolation=cv2.INTER_CUBIC) / 255.0
    img.resize(3, 128, 128)
    X.append(img)

print(X[0].shape)

(3, 128, 128)


In [9]:
import glob

path = './data_homework/G1020/Masks/'

target = glob.glob(path + '*.png')
Y = []
for idx in range(len(target)):
    print(f"{idx+1} / {len(target)}", end='\r')
    path = target[idx]
    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    img = cv2.resize(img,dsize=(128, 128), interpolation=cv2.INTER_CUBIC)
    img = img.reshape(1,128, 128)
    Y.append(img)

print(Y[0].shape, np.unique(Y[0]))

(1, 128, 128) [0 1 2]


In [10]:
from lib.conv2d import Conv2D
from lib.maxpool import MaxPool2D
from lib.bce import BCE
from lib.batch_norm import BatchNorm2d
from lib.relu import ReLU
from lib.sigmoid import Sigmoid
from lib.dropout import Dropout
from lib.flatten import Flatten
from lib.affine import Affine
from lib.adam import Adam
from lib.softmax import Softmax

class CNN:
    def __init__(self):
        self.layers = [
            Conv2D(3, 16, 3, stride=1, padding=1),
            BatchNorm2d(16),
            ReLU(),
            Conv2D(16, 32, 3, stride=1, padding=1),
            BatchNorm2d(32),
            ReLU(),
            Conv2D(32, 32, 3, stride=1, padding=1),
            BatchNorm2d(32),
            ReLU(),
            Conv2D(32, 16, 3, stride=1, padding=1),
            BatchNorm2d(16),
            ReLU(),
            Conv2D(16, 3, 3, stride=1, padding=1),
            Sigmoid()
        ]
        self.loss_layer = Softmax()
        self.optimizer = Adam(learning_rate=0.005)
        self.initial_lr = 0.005
        self.lr = self.initial_lr


    def forward(self, X, is_training=True):
        for layer in self.layers:
            if isinstance(layer, Dropout):
                X = layer.forward(X, is_training=is_training)
            else:
                X = layer.forward(X)
        return X
        
    def backward(self, d_out):
        for layer in reversed(self.layers):
            if isinstance(layer, BatchNorm2d):
                d_out, dgamma, dbeta = layer.backward(d_out)
                self.optimizer.update_batchnorm_params(layer, dgamma, dbeta)
            else:
                d_out = layer.backward(d_out)
        
        return d_out

    
    def train_step(self, X, y):
        # 순전파
        y_pred = self.forward(X, is_training=True)
        
        loss = self.loss_layer.forward(y_pred, y)
        
        # 역전파
        d_out = self.loss_layer.backward(y)
        self.backward(d_out)
        
        for layer in self.layers:
            if hasattr(layer, 'W'):
                self.optimizer.update(layer)
        
        return loss


    def predict(self, X):
        logits = self.forward(X)
        return np.argmax(logits, axis=1)

nn = CNN()

In [11]:
import time
import datetime
def train_and_test(nn, num_epochs, batch_size, decay_rate=0.95, verbose=False):
    for epoch in range(num_epochs):
        begin = time.time()
        
        total_loss = 0
        num_batches = len(X_train) // batch_size
        
        for i in range(0, len(X_train), batch_size):
            b_begin = time.time()
            X_batch = np.array(X_train[i:i+batch_size])
            y_batch = np.array(Y_train[i:i+batch_size])

            loss = nn.train_step(X_batch, y_batch)
            total_loss += loss

            if verbose:
                b_end = time.time()
                elapsed = b_end - b_begin
                print(f"\rBatch {i//batch_size+1}/{num_batches}, Current Loss: {loss:.4f}, Time: {elapsed}", end='')

        avg_loss = total_loss / num_batches
        
        end = time.time()
        elapsed = end - begin
        print(f"\nEpoch {epoch+1}/{num_epochs}, Avg Loss: {avg_loss:.4f}, Time: {elapsed:.2f}s")


In [12]:
pivot = int(len(X) * 0.7)
X_train, X_test = X[:pivot], X[pivot:]
Y_train, Y_test = Y[:pivot], Y[pivot:]

In [13]:
# 학습 설정
num_epochs = 5
batch_size = 64

# 신경망 학습 및 테스트
print("CNN training start")
train_and_test(nn, num_epochs, batch_size, verbose=True)

CNN training start


KeyboardInterrupt: 