In [None]:
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from tqdm import tqdm
import pandas as pd
import gzip
import numpy as np
import tensorflow as tf
from keras.models import Model
from keras.layers import Conv2D, AveragePooling2D, Flatten, Dense, Dropout, ELU, BatchNormalization, PReLU, Add, Input, GlobalAveragePooling2D, ReLU
from keras.optimizers import Adam
from keras.callbacks import ReduceLROnPlateau
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "2"

train_file_path = '../../../../mnt/sda/suhohan/emnist/emnist-balanced-train.csv'
test_file_path = '../../../../mnt/sda/suhohan/emnist/emnist-balanced-test.csv'

chunk_size = 10000
train_data_iter = pd.read_csv(train_file_path, chunksize=chunk_size)
train_data = pd.concat([chunk for chunk in tqdm(train_data_iter, desc='Loading training data')])
test_data_iter = pd.read_csv(test_file_path, chunksize=chunk_size)
test_data = pd.concat([chunk for chunk in tqdm(test_data_iter, desc='Loading test data')])

print(train_data.shape, test_data.shape)

# 데이터의 차원과 크기를 정확히 파악
num_train_samples = train_data.shape[0]
num_test_samples = test_data.shape[0]

# 데이터 준비
x_train = train_data.iloc[:, 1:].to_numpy().reshape((num_train_samples, 28, 28, 1))
x_test = test_data.iloc[:, 1:].to_numpy().reshape((num_test_samples, 28, 28, 1))
y_train = train_data.iloc[:, 0].to_numpy()
y_test = test_data.iloc[:, 0].to_numpy()

x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.1, random_state=42)

print("x_train shape:", x_train.shape)
print("x_test shape:", x_test.shape)
print("x_valid shape:", x_valid.shape)
print("y_train shape:", y_train.shape)
print("y_test shape:", y_test.shape)
print("y_valid shape:", y_valid.shape)

In [None]:
def residual_block(x, filters, kernel_size=3, stride=1):
    y = Conv2D(filters, kernel_size, strides=stride, padding='same')(x)
    y = BatchNormalization()(y)
    y = PReLU()(y)
    y = Conv2D(filters, kernel_size, strides=1, padding='same')(y)
    y = BatchNormalization()(y)

    if stride != 1 or x.shape[-1] != filters:
        x = Conv2D(filters, 1, strides=stride, padding='same')(x)
        x = BatchNormalization()(x)

    out = Add()([x, y])
    out = PReLU()(out)
    return out


input = Input(shape=(28, 28, 1))
x = Conv2D(32, (3, 3), padding='same')(input)
x = BatchNormalization()(x)
x = PReLU()(x)
x = Dropout(0.33)(x)
x = AveragePooling2D(pool_size=2, strides=2, padding='valid')(x)

x = residual_block(x, 64)
x = Dropout(0.33)(x)
x = AveragePooling2D(pool_size=2, strides=2, padding='valid')(x)

x = residual_block(x, 128)
x = Dropout(0.33)(x)
x = AveragePooling2D(pool_size=2, strides=2, padding='valid')(x)

x = residual_block(x, 256)
x = Dropout(0.33)(x)
x = AveragePooling2D(pool_size=2, strides=2, padding='valid')(x)

x = Flatten()(x)
x = Dense(512)(x)
x = BatchNormalization()(x)
x = PReLU()(x)
x = Dropout(0.6)(x)

output = Dense(47, activation='softmax')(x)

model = Model(inputs=input, outputs=output)

In [None]:
model.summary()

In [None]:
optimizer = Adam(learning_rate=0.001)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=0.000001)

# 모델 컴파일
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# 모델 학습
history = model.fit(x_train, y_train, validation_data=(x_valid, y_valid),
                    epochs=50, callbacks=[reduce_lr], batch_size=500)

# 모델 평가
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f'Test accuracy: {test_acc}')

In [None]:
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim([0, 1])
plt.legend(loc='lower right')
plt.show()

test_loss, test_acc = model.evaluate(x_test, y_test, verbose=2)
print(f"Test accuracy: {test_acc}")