In [None]:
!pip install tensorflow[cuda-and]==2.15.1
!pip install keras-cv-attention-models
!pip install pandas
!pip install keras-cv
!pip install scikit-learn
!pip install tqdm glob
!pip install silence_tensorflow

In [1]:
import os
import warnings
warnings.filterwarnings("ignore")

from silence_tensorflow import silence_tensorflow
silence_tensorflow()

import numpy as np
import pandas as pd
import keras
import keras_cv
import tensorflow as tf
import keras.backend as K

from tqdm import tqdm
from keras_cv_attention_models import swin_transformer_v2
from glob import glob
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import OneHotEncoder
from IPython.display import clear_output


Using TensorFlow backend


In [2]:
SEED = 42
IMG_SIZE = 256
BATCH_SIZE = 32
EPOCHS = 100
NUM_CLASSES = 25
VERSION = "v4"
AUTOTUNE = tf.data.AUTOTUNE

keras.utils.set_random_seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)
os.environ['TF_CUDNN_DETERMINISTIC'] = "1"
os.environ['TF_USE_CUDNN'] = "true"

In [4]:
df_train = pd.read_csv("./open/train.csv")
df_train["img_path"] = df_train["img_path"].apply(lambda x: x.replace("./train", "./open/train"))

In [5]:
X = df_train["img_path"].to_numpy()
y = df_train["label"].to_numpy()

In [6]:
def read_image(image_path, label=None):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)    
    image = tf.cast(image, tf.float32)    
    
    if label is None:
        return image
    
    label = tf.cast(label, tf.float32)
    return (image, label)

def resize(image, label=None, size=IMG_SIZE):
    image = tf.image.resize(image, (size, size), "bicubic")
    
    if label is None:
        return image    
    return (image, label)

zoom_out = keras_cv.layers.RandomZoom((0.1, 0.4))
zoom_in = keras_cv.layers.RandomZoom((-0.4, -0.1))

aug_layers = [
    keras_cv.layers.RandomApply(keras_cv.layers.RandomChoice([zoom_out, zoom_in])),
    keras_cv.layers.RandomApply(layer=keras_cv.layers.RandomRotation(factor=(-0.2, 0.2))),
    keras_cv.layers.RandomApply(layer=keras_cv.layers.RandomBrightness(factor=0.2)),
    keras_cv.layers.RandomApply(layer=keras_cv.layers.RandomContrast(value_range=(0, 255), factor=0.2)),
    keras_cv.layers.RandomApply(layer=keras_cv.layers.RandomShear(0.2, 0.2))
]

def apply_augment(image, label=None):    
    for layer in aug_layers:        
        image = layer(image)
        
    if label is None:
        return image
    return (image, label)

In [7]:
ohe = OneHotEncoder(sparse_output=False)
ohe.fit(y.reshape(-1, 1))

In [8]:
def build_model():
    keras.mixed_precision.set_global_policy("mixed_float16")
    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))

    backbone = swin_transformer_v2.SwinTransformerV2Base_window16(
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
        num_classes=0,    
        pretrained="imagenet22k"
    )
    x = keras.layers.Rescaling(scale=1./127.5, offset=-1.)(inputs)
    x = backbone(x)
    x = keras.layers.GlobalAveragePooling2D()(x)

    outputs = keras.layers.Dense(NUM_CLASSES, activation="softmax", dtype="float32")(x)

    model = keras.Model(inputs, outputs)
    return model

class DisplayCallback(keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        if epoch % 5 == 0:
            clear_output(wait=True)

In [None]:
class TrainingProgressCallback(keras.callbacks.Callback):
    def __init__(self, steps_per_epoch):
        super(TrainingProgressCallback, self).__init__()
        self.steps_per_epoch = steps_per_epoch
        self.interval = steps_per_epoch // 20  # 5% 진행마다
        self.next_interval = self.interval

    def on_train_batch_end(self, batch, logs=None):
        if batch >= self.next_interval:
            percentage = (batch / self.steps_per_epoch) * 100
            print(f"Training progress: {percentage:.2f}% complete.")
            self.next_interval += self.interval

In [9]:
skf = StratifiedKFold(n_splits=5, random_state=SEED, shuffle=True)
for fold, (train_idx, valid_idx) in enumerate(skf.split(X, y)):
    X_train, y_train = X[train_idx], y[train_idx]
    X_valid, y_valid = X[valid_idx], y[valid_idx]    
    
    y_train = ohe.transform(y_train.reshape(-1, 1))
    y_valid = ohe.transform(y_valid.reshape(-1, 1))
    
    ds_train = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(y_train))
    ds_train = ds_train.map(lambda image, label: read_image(image, label), num_parallel_calls=AUTOTUNE).cache()
    ds_train = ds_train.map(lambda image, label: resize(image, label), num_parallel_calls=AUTOTUNE)    
    ds_train = ds_train.map(lambda image, label: apply_augment(image, label), num_parallel_calls=AUTOTUNE)
    ds_train = ds_train.batch(BATCH_SIZE)

    ds_valid = tf.data.Dataset.from_tensor_slices((X_valid, y_valid))
    ds_valid = ds_valid.map(lambda image, label: read_image(image, label), num_parallel_calls=AUTOTUNE)
    ds_valid = ds_valid.map(lambda image, label: resize(image, label), num_parallel_calls=AUTOTUNE)
    ds_valid = ds_valid.batch(BATCH_SIZE).prefetch(AUTOTUNE).cache()
    
    steps_per_epoch = len(ds_train)
    
    callbacks = [    
        DisplayCallback(),
        keras.callbacks.TensorBoard(log_dir=f"../../logs/keras/{VERSION}/fold_{fold}"),
        keras.callbacks.EarlyStopping(monitor="val_f1", mode="max", verbose=0, patience=5),
        keras.callbacks.ModelCheckpoint(f"../../ckpts/keras/{VERSION}/fold_{fold}.keras", monitor="val_f1", mode="max", save_best_only=True),
        keras.callbacks.ReduceLROnPlateau(monitor="val_f1", mode="min", factor=0.8, patience=3),
        TrainingProgressCallback(steps_per_epoch)
    ]

    optimizer = keras.optimizers.AdamW(1e-5)
    loss = keras.losses.CategoricalFocalCrossentropy(from_logits=False)
    f1 = keras.metrics.F1Score(average="macro", name="f1")
    
    model = build_model()
    model.compile(optimizer=optimizer, loss=loss, metrics=[f1])
    model.fit(ds_train, validation_data=ds_valid, epochs=EPOCHS, callbacks=callbacks)
    
    K.clear_session()    

Downloading data from https://github.com/leondgarse/keras_cv_attention_models/releases/download/swin_transformer_v2/swin_transformer_v2_base_window16_256_imagenet22k.h5
>>>> Load pretrained from: C:\Users\Seo\.keras\models\swin_transformer_v2_base_window16_256_imagenet22k.h5
Epoch 1/100


In [None]:
X_test = sorted(glob("./open/test/*.jpg"))

ds_test = tf.data.Dataset.from_tensor_slices(X_test)
ds_test = ds_test.map(lambda image: read_image(image), num_parallel_calls=AUTOTUNE)
ds_test = ds_test.map(lambda image: resize(image), num_parallel_calls=AUTOTUNE)
ds_test = ds_test.batch(BATCH_SIZE*2).prefetch(AUTOTUNE).cache()

y_preds = []
for cpkt in tqdm(glob(f"../../ckpts/keras/{VERSION}/fold_*.keras")):        
    best_model = keras.models.load_model(cpkt, compile=False)
    y_preds.append(best_model.predict(ds_test, verbose=0))
    
    K.clear_session()

y_preds = np.sum(np.array(y_preds), axis=0)
y_preds = ohe.inverse_transform(y_preds).reshape(-1)

In [None]:
df_submission = pd.read_csv("./open/sample_submission.csv")
df_submission["label"] = y_preds
df_submission.to_csv(f"../../results/keras/{VERSION}.csv", index=False)