<a href="https://colab.research.google.com/github/kjinb1212/Falldown-detection-KD/blob/main/teacher_train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import numpy as np
from torch.utils.data.sampler import SubsetRandomSampler
from sklearn.model_selection import train_test_split

from glob import glob
import os
import sys
from PIL import Image

from efficientnet_pytorch import EfficientNet
import pandas as pd
from torchsummary import summary


In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, input_size, train = True, padding = True, normalize = False,
                 bright_ness = 0.2, hue = 0.15, contrast = 0.15, random_Hflip = 0.3, rotate_deg = 20):
        orig_normal_path = glob(os.path.join(root_dir, 'normal') + '/*.jpg')
        orig_fall_path = glob(os.path.join(root_dir, 'falldown') + '/*.jpg')
        orig_back_path = glob(os.path.join(root_dir, 'background') + '/*.jpg')
        
        normal_paths = []
        fall_paths = []
        back_paths = []
        
        for path in orig_normal_path:
            img = Image.open(path)
            if min(img.size[0], img.size[1]) < 32:
                pass
            else:
                normal_paths.append(path)
                
        for path in orig_fall_path:
            img = Image.open(path)
            if min(img.size[0], img.size[1]) < 32:
                pass
            else:
                fall_paths.append(path)
        
        for path in orig_back_path:
            img = Image.open(path)
            if min(img.size[0], img.size[1]) < 32:
                pass
            else:
                back_paths.append(path)
                        
        self.total_paths = normal_paths + fall_paths + back_paths
        self.labels = [0] * len(normal_paths) + [1] * len(fall_paths) + [2] * len(back_paths)
        
        transform = []
        if train:
            #transform.append(torchvision.transforms.ColorJitter(brightness=bright_ness, hue=hue, contrast=contrast))
            transform.append(torchvision.transforms.RandomHorizontalFlip(p=random_Hflip))
            #transform.append(torchvision.transforms.RandomCrop(224))
            transform.append(torchvision.transforms.RandomRotation(degrees=rotate_deg))
        
        transform.append(torchvision.transforms.Resize((input_size, input_size)))
        transform.append(torchvision.transforms.ToTensor())
        self.transform = torchvision.transforms.Compose(transform)
        
        
    def __len__(self):
        return len(self.total_paths)

    def __getitem__(self, index):
        img = Image.open(self.total_paths[index])
        img = self.transform(img)
        return img, self.labels[index]

In [None]:
def train(model, train_loader, test_loader, criterion, optimizer, epochs, save_name):
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, patience=4, verbose=True)
    best_loss = None
    best_acc = None
    patience = 0 
    
    history = {'loss': [], 'acc': []}
    
    for epoch in range(epochs):
        
        model.train()
        train_losses = []
        for data, label in train_loader:
            data = data.to(device)
            label = label.to(device)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, label)
            train_losses.append(loss.item())
            loss.backward()
            optimizer.step()
            torch.cuda.empty_cache()
        train_loss = np.average(train_losses)
        
        model.eval()
        test_losses = []
        correct = 0
        total = 0
        with torch.no_grad():
            for data, label in test_loader:
                data = data.to(device)
                label = label.to(device)
                
                output = model(data)
                loss = criterion(output, label)
                test_losses.append(loss.item())
                _, predict = torch.max(output.data, 1)
                correct += (predict == label).sum().item()
                total += label.size(0)
            test_loss = np.average(test_losses)
            test_acc = 100 * correct / total
            if (epoch + 1) % 5 ==0:
                print("--------- epoch : {} ------------".format(epoch+1))
                print("train loss: {}, \ttest loss: {}, \ttest acc: {}%".format(train_loss, test_loss, test_acc))
            
            history['loss'].append(test_loss)
            history['acc'].append(test_acc)
            
            if (best_loss is None) or (best_loss > test_loss):
                best_loss = test_loss
                best_acc = test_acc
                torch.save(model.state_dict(), '3_model_weights/' + save_name + '.pth')
                print('epoch: {}\t Best loss: {}'.format(epoch + 1, best_loss))
                patience = 0
            else:
                patience += 1

            if patience > 7:
                print("early stop at {} epoch".format(epoch + 1))
                break

            scheduler.step(metrics=test_loss)
    
    print("best loss: {},\t best acc: {}%\n\n".format(best_loss, best_acc))
    return best_loss, best_acc, history 

In [None]:
INPUT_SIZE = 128
PADDING = False
NORMALIZE = False
BATCHSIZE = 128
NUMEPOCH = 100

train_data = CustomDataset(
    root_dir='train',
    input_size=INPUT_SIZE, train=True, padding=PADDING, normalize=NORMALIZE,
    bright_ness=0, hue=01.5, contrast=0.15, random_Hflip=0, rotate_deg=0)

test_data = CustomDataset(
    root_dir='validation',
    input_size=INPUT_SIZE, train=False, padding=PADDING, normalize=NORMALIZE,
    bright_ness=0, hue=01.5, contrast=0.15, random_Hflip=0, rotate_deg=0)

In [None]:
train_loader = torch.utils.data.DataLoader(train_data, batch_size=BATCHSIZE, shuffle=True, num_workers=70, drop_last=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=BATCHSIZE, shuffle=False, num_workers=70, drop_last=True)

# train every model together

In [None]:
models = ['efficientnet-b0']
logs = []
device = torch.device("cuda:2" if torch.cuda.is_available() else "cpu")
print(device)
for model_name in models:
    torch.cuda.empty_cache()
    teacher_model = EfficientNet.from_pretrained(model_name, num_classes=3).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(teacher_model.parameters(), weight_decay=1e-4, lr=0.0001)
    
    loss, acc, history = train(model=teacher_model, train_loader=train_loader, test_loader=test_loader,
          criterion = criterion, optimizer=optimizer, epochs=NUMEPOCH, save_name=model_name) 
    
    s = 'model: ' + model_name + '\t loss: {}\t acc: {}'.format(loss, acc)
    logs.append(s)
    
    df = pd.DataFrame(history)
    df.to_csv("3_history/"+ model_name+ "_history.csv", mode='w')
    

In [None]:
for log in logs:
    print(log)