In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd

import os
import random
from tqdm import tqdm

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.keras.initializers import TruncatedNormal
from tensorflow.keras.callbacks import EarlyStopping

import time
import pickle


from PIL import Image
import random

from collections import Counter

import tensorflow_addons as tfa


def my_seed_everywhere(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    tf.random.set_seed(seed)
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)


my_seed_everywhere(42)

In [None]:
# Image Model

train = pd.read_csv('train.csv')
test = pd.read_csv('test.csv')

In [None]:
tf.config.run_functions_eagerly(False)

class SAMModel(tf.keras.Model):
    def __init__(self, my_model, rho=0.05):
        """
        p, q = 2 for optimal results as suggested in the paper
        (Section 2)
        """
        super(SAMModel, self).__init__()
        self.my_model = my_model
        self.rho = rho

    def train_step(self, data):
        (text, labels) = data
        e_ws = []
        with tf.GradientTape() as tape:
            predictions = self.my_model(text)
            loss = self.compiled_loss(labels, predictions)
        trainable_params = self.my_model.trainable_variables
        gradients = tape.gradient(loss, trainable_params)
        grad_norm = self._grad_norm(gradients)
        scale = self.rho / (grad_norm + 1e-10)

        for (grad, param) in zip(gradients, trainable_params):
            e_w = grad * scale
            param.assign_add(e_w)
            e_ws.append(e_w)

        with tf.GradientTape() as tape:
            predictions = self.my_model(text)
            loss = self.compiled_loss(labels, predictions)    
        
        sam_gradients = tape.gradient(loss, trainable_params)
        for (param, e_w) in zip(trainable_params, e_ws):
            param.assign_sub(e_w)
        
        self.optimizer.apply_gradients(
            zip(sam_gradients, trainable_params))
        
        self.compiled_metrics.update_state(labels, predictions)
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        (text, labels) = data
        predictions = self.my_model(text, training=False)
        loss = self.compiled_loss(labels, predictions)
        self.compiled_metrics.update_state(labels, predictions)
        return {m.name: m.result() for m in self.metrics}

    def _grad_norm(self, gradients):
        norm = tf.norm(
            tf.stack([
                tf.norm(grad) for grad in gradients if grad is not None
            ])
        )
        return norm  
    
    def call(self, inputs):
        """Forward pass of SAM.
        SAM delegates the forward pass call to the wrapped model.
        Args:
          inputs: Tensor. The model inputs.
        Returns:
          A Tensor, the outputs of the wrapped model for given `inputs`.
        """
        return self.my_model(inputs)

In [None]:
# resize, Padding

import cv2


def padding(img, set_size):

    try:
        h,w,c = img.shape
    except:
        print('파일을 확인후 다시 시작하세요.')
        raise

    if h < w:
        new_width = set_size
        new_height = int(new_width * (h/w))
    else:
        new_height = set_size
        new_width = int(new_height * (w/h))

    if max(h, w) < set_size:
        img = cv2.resize(img, (new_width, new_height), cv2.INTER_CUBIC)
    else:
        img = cv2.resize(img, (new_width, new_height), cv2.INTER_AREA)

 
    try:
        h,w,c = img.shape
    except:
        print('파일을 확인후 다시 시작하세요.')
        raise

    delta_w = set_size - w
    delta_h = set_size - h
    top, bottom = delta_h//2, delta_h-(delta_h//2)
    left, right = delta_w//2, delta_w-(delta_w//2)

    new_img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=[0, 0, 0])

    return new_img

In [None]:
#생성한 함수를 활용하여 학습데이터 생성

IMAGE_SIZE = [224, 224]

Train_img_ls = train['img_path'].to_list()
Test_Train_img_ls = test['img_path'].to_list()


x_train = []

for i in tqdm(range(len(Train_img_ls))) :
  img = cv2.imread(Train_img_ls[i], cv2.IMREAD_COLOR)
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  pad_img = padding(img,IMAGE_SIZE[0])
  x_train.append(pad_img)
 
x_train = np.array(x_train)




x_test = []

for i in tqdm(range(len(Test_Train_img_ls))) :
  img = cv2.imread(Test_Train_img_ls[i], cv2.IMREAD_COLOR)
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  pad_img = padding(img,IMAGE_SIZE[0])
  x_test.append(pad_img)

x_test = np.array(x_test)

In [None]:
encoder = LabelEncoder()
y_train = train['cat3']
y_train = encoder.fit_transform(y_train)

In [None]:
# 다른 사전학습 모델로 변경하여 학습 가능

pretrained_model = tf.keras.applications.RegNetY120(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = efn.EfficientNetB7(weights='noisy-student', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.DenseNet201(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.Xception(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.inception_v3.InceptionV3(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.VGG16(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
# pretrained_model = tf.keras.applications.mobilenet.MobileNet(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
pretrained_model.trainable = False # False = transfer learning, True = fine-tuning


len(pretrained_model.layers)

In [None]:
# 이미지 모델 학습 함수 생성

def train_inference(x_train, y_train, x_test, pretrained_model,open=-50,dim=64,dropout=0.2,optimizer='Adam',epocs=100,batch_size=32,sam=False):
    for layer in pretrained_model.layers[:open]:      #전이학습 개방정도
        layer.trainable = False	



    model = tf.keras.Sequential([
        pretrained_model,
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dense(dim,activation='relu'),          
        tf.keras.layers.Dropout(dropout),                   
        tf.keras.layers.Dense(128, activation='softmax')
    ])

    if sam == True :    
        model = SAMModel(model)    
    model.compile(
    optimizer=optimizer,
    loss = 'sparse_categorical_crossentropy',
    metrics=['accuracy']
    )
    early = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=3,restore_best_weights=True)
    model.fit(x_train, y_train, epochs=epocs, batch_size=batch_size, validation_split=0.1,callbacks=[early])

    pred = model.predict(x_test, batch_size=batch_size) 
    
    del model
    tf.keras.backend.clear_session()

    return pred   

In [None]:
optimizer = tf.keras.optimizers.Nadam(learning_rate=5e-6)

pretrained_model = tf.keras.applications.RegNetY120(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
pretrained_model.trainable = False
pred11 = train_inference(x_train, y_train, x_test, pretrained_model,open=-1,dim=64,dropout=0.2,optimizer=optimizer,epocs=500,batch_size=32,sam=True)


pretrained_model = tf.keras.applications.RegNetY120(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
pretrained_model.trainable = True
pred12 = train_inference(x_train, y_train, x_test, pretrained_model,open=-50,dim=128,dropout=0.3,optimizer=optimizer,epocs=500,batch_size=32,sam=False)


pretrained_model = tf.keras.applications.RegNetY120(weights='imagenet', include_top=False ,input_shape=[*IMAGE_SIZE, 3])
pretrained_model.trainable = True
pred13 = train_inference(x_train, y_train, x_test, pretrained_model,open=-100,dim=128,dropout=0.3,optimizer=optimizer,epocs=500,batch_size=32,sam=False)

In [None]:
def mode(list):
    count = 0
    mode = 0
    for x in list: 
        if list.count(x) > count:
            count = list.count(x)
            mode = x

    return mode



pred_ls = []
for i in range(len(pred1)):
  index = mode([
      pred11[i].argmax(),
      pred12[i].argmax(),
      pred13[i].argmax(),
      ])
  pred_ls.append(encoder.classes_[index])
y_pre2= encoder.transform(pred_ls)

In [None]:
test['cat3'] = y_pre2

In [None]:
submission = test[['id','cat3']]
submission.to_csv('submission_image.csv', index=False, encoding='utf-8')