### [Fashion MNIST & IRIS ] 모델 구현

-  커스텀 데이터셋 생성

- 커스텀 모델 생성

- 학습/테스트 함수 생성

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optima
from torch.utils.data import DataLoader, Dataset


from torchinfo import summary
from torchmetrics.classification import F1Score
from sklearn.model_selection import train_test_split

In [2]:
TRAIN_FILE = '../data/fashion-mnist_train.csv'
TEST_FILE  = '../data/fashion-mnist_test.csv'

In [3]:
# random_state: 33
torch.manual_seed(33)

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

print("Notice")
print(f"DEVICE: {DEVICE}")
print(f"torch ver: {torch.__version__}")


Notice
DEVICE: cpu
torch ver: 2.4.1


- User Defined Datasets Class
---
- class function: base on file path, datasets
- parents class: torch.utils.data.Dataset
- class name: FileDataSet
- parameter: file_path
---

---
- function skill: data type transform DataFrame
- function name: cover_dataframe
- parameter: file_path, header=0
- function return: DataFrame
---

In [4]:
def cover_dataframe(file_path,header=0):
    ext = file_path.rsplit('.')[-1]
    
    if ext == 'csv':
        return pd.read_csv(file_path, header=header)
    elif ext == 'json':
        return pd.read_json(file_path, header=header)
    elif ext in ['xlsx', 'xls']:
        return pd.read_excel(file_path, header=header)
    else:
        return pd.read_table(file_path, header=header)

In [46]:
class TrainFileDataSet(Dataset):
    def __init__(self, file_path):
        super().__init__()
        data_df = cover_dataframe(file_path)

        self.feature_df = data_df[data_df.columns[1:]]
        self.label_df = data_df[[data_df.columns[0]]]
        # if self.label_df[self.label_df.columns[0]].dtype == 'object':
        #     label_df = pd.get_dummies(self.label_df)
        #     label_df = label_df.astype('int64')
        #     self.label_df = label_df
            
        # random_state: 33
        # stratify: self.label_df
        # train : val = 7 : 3
        self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(self.feature_df, self.label_df, random_state=33, stratify=self.label_df)
        # self.X_val = pd.get_dummies(self.X_val).astype('int64')
        # self.y_val = pd.get_dummies(self.y_val).astype('int64')
        
        self.n_row = self.feature_df.shape[0]
        self.features = self.feature_df.columns
        self.n_features = self.feature_df.shape[1]
        self.labels = data_df.iloc[:, 0].unique()
        self.n_labels = len(self.labels)
        
    def __len__(self):
        return self.n_row
    
    def __getitem__(self, idx):
        
        X_train = torch.FloatTensor(self.X_train.iloc[idx].values).to(DEVICE)
        X_test = torch.FloatTensor(self.X_val.iloc[idx].values).to(DEVICE)
        y_train = torch.FloatTensor(self.y_train.iloc[idx].values).to(DEVICE)
        y_test = torch.FloatTensor(self.y_val.iloc[idx].values).to(DEVICE)
        return X_train, X_test, y_train, y_test
        

In [47]:
class TestFileDataSet(Dataset):
    def __init__(self, file_path):
        super().__init__()
        data_df = cover_dataframe(file_path)

        self.feature_df = data_df[data_df.columns[1:]]
        self.label_df = data_df[[data_df.columns[0]]]
        # if self.label_df[self.label_df.columns[0]].dtype == 'object':
        #     label_df = pd.get_dummies(self.label_df)
        #     label_df = label_df.astype('int64')
        #     self.label_df = label_df
        
        self.n_row = self.feature_df.shape[0]
        self.features = self.feature_df.columns
        self.n_features = self.feature_df.shape[1]
        self.labels = data_df.iloc[:, 0].unique()
        self.n_labels = len(self.labels)
        
    def __len__(self):
        return self.n_row
    
    def __getitem__(self, idx):
       
            feature_ts = torch.FloatTensor(self.feature_df.iloc[idx].values).to(DEVICE)
            label_ts = torch.FloatTensor(self.label_df.iloc[idx].values).to(DEVICE)
            return feature_ts, label_ts

- Model Instance
---
- ANN classification model
    - input layer: input feature
    - output layer: predict label
    - hidden layer: dynamic? or fixed?
- parents class: nn.Module

- Model structure
---
- learning method: supervised learninf, multi-classification
- learning algorithm: 모델이 없나? linear

    - input layer: input 1784, output 1200, activation function: ReLU
    - hidden layer: input 1200, output 700, activation function: ReLU
    - hidden layer: input 700, output 200, activation function: ReLU
    - hidden layer: input 200, output 50, activation function: ReLU
    - output layer: input 50, output 10, activation function: None (지원안함. nn.CrossEntropyLoss 알아서 계산)

In [48]:
class ClassificationModel(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.input_layer = nn.Linear(1784, 1200)
        self.hidden_layer_1 = nn.Linear(1200, 700)
        self.hidden_layer_2 = nn.Linear(700, 200)
        self.hidden_layer_3 = nn.Linear(200, 50)
        self.output_layer = nn.Linear(50, 10)
        
    def forward(self, x):
        y = F.relu(self.input_layer(x))
        y = F.relu(self.hidden_layer_1(y))
        y = F.relu(self.hidden_layer_2(y))
        y = F.relu(self.hidden_layer_3(y))
        y = self.output_layer(y)
        return y

- validation & test function
---
- function name: testing
- parameter: model, x, y
- function return: loss, score
- `must not update weight & bais`

In [49]:
def testing(model, X_ts, y_ts):
    with torch.no_grad():
        y_ts = y_ts.reshape(-1).long()
        pred = model(X_ts)
        
        loss = nn.CrossEntropyLoss()(pred, y_ts)
        
        score = F1Score(task='multiclass', num_classes=10)(pred, y_ts)
    
    return loss, score

- model learning
---
- function name: training
- parameter: model, epochs, lr, batch_size, dataset
- function return: loss, score

In [50]:
def tarining(dataset, model, epochs, lr, batch):
    train_val_loss = {'train':[], 'val':[]}
    train_val_score = {'train':[], 'val':[]}
    optmizer = optima.Adam(model.parameters(), lr=lr)
    data_dl = DataLoader(dataset, batch_size=batch)
    
    for epoch in range(1, epochs+1):
        total_train_loss, total_train_score = 0, 0
        total_val_loss, total_val_score = 0, 0
        for X_train, X_val, y_train, y_val in data_dl:
            batch_cnt = 60000 / batch
            y_train = y_train.reshape(-1).long()
            
            pred = model(X_train)
            
            loss = nn.CrossEntropyLoss()(pred, y_train)
            total_train_loss += loss
            
            score = F1Score(task='multiclass', num_classes=10)(pred, y_train)
            total_train_score += score
            
            optmizer.zero_grad()
            loss.backward()
            optmizer.step()
            
            val_loss, val_score = testing(model, X_val, y_val)
            total_val_loss += val_loss
            total_val_score += val_score
        
        loss_train = (total_train_loss/batch_cnt).item()
        score_train = (total_train_score/batch_cnt).item()
        loss_val = (total_val_loss/batch_cnt).item()
        score_val = (total_val_score/batch_cnt).itrm()
        
        train_val_loss['train'].append(loss_train)
        train_val_score['train'].append(score_train)
        train_val_loss['val'].append(loss_val)
        train_val_score['val'].append(score_val)
        
        print(f"[{epoch:5}/{epochs:5}]  [Train]      loss: {loss_train:.6f}, score: {score_train:.6f}")
        print(f"[{epoch:5}/{epochs:5}]  [Validation] loss: {loss_val:.6f}, score: {score_val:.6f}")
    
    return train_val_loss, train_val_score

In [51]:
train_ds = TrainFileDataSet(TRAIN_FILE)
test_ds = TestFileDataSet(TEST_FILE)

In [52]:
model = ClassificationModel()

In [53]:
print(model, end='\n\n')
summary(model)

ClassificationModel(
  (input_layer): Linear(in_features=1784, out_features=1200, bias=True)
  (hidden_layer_1): Linear(in_features=1200, out_features=700, bias=True)
  (hidden_layer_2): Linear(in_features=700, out_features=200, bias=True)
  (hidden_layer_3): Linear(in_features=200, out_features=50, bias=True)
  (output_layer): Linear(in_features=50, out_features=10, bias=True)
)



Layer (type:depth-idx)                   Param #
ClassificationModel                      --
├─Linear: 1-1                            2,142,000
├─Linear: 1-2                            840,700
├─Linear: 1-3                            140,200
├─Linear: 1-4                            10,050
├─Linear: 1-5                            510
Total params: 3,133,460
Trainable params: 3,133,460
Non-trainable params: 0

In [55]:

# train_dl = DataLoader(train_ds, batch_size=32)
loss_dict, score_dict = tarining(train_ds, model, 1000, 0.001, 64)

RuntimeError: mat1 and mat2 shapes cannot be multiplied (64x784 and 1784x1200)