# Load Datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# !unzip "/content/drive/MyDrive/Colab Notebooks/brainwave_image/train_image.zip" -d "/content/datasets"
# !unzip "/content/drive/MyDrive/Colab Notebooks/brainwave_image/test_image.zip" -d "/content/datasets"
!unzip "/content/drive/MyDrive/Model-EXP/datasets_train.zip" -d "/content/datasets"

# Train & Validation

In [None]:
import pandas as pd

df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/datasets/train.csv")
df

In [None]:
df[df['directory'] == 's2_d3_p014_001_data_time_series.npy']['directory'].count()

In [None]:
import glob

folder_path = '/content/datasets/signal_train'
file_names = glob.glob(folder_path + '/*.png')
names = []

for file_name in file_names:
    names.append(file_name)

len(names)

In [None]:
data = []
labels = []
num = 0
total = 0

for i in df['directory']:
    s = df[df['directory'] == i]['directory'].count()
    if "/content/datasets/content/train_image/" + i[:len(i) - 4] + f"_{num}.png" in names:
        data.append("/content/datasets/content/train_image/" + i[:len(i) - 4] + f"_{num}.png")
        labels.append(df['label'][total])
        num += 1
        total += 1
    if num == s:
        num = 0

len(data), len(labels)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.1, random_state=42, stratify=labels)
len(X_train), len(X_val)

In [None]:
import os
from PIL import Image
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import numpy as np

output_folder = '/content/datasets/signal_train/'
os.makedirs(output_folder, exist_ok=True)

for i in tqdm(range(len(X_train))):
    train = X_train[i]
    train = Image.open(train)
    label = y_train[i]
    label_folder = os.path.join(output_folder, str(label))
    os.makedirs(label_folder, exist_ok=True)
    file_path = os.path.join(label_folder, str(i) + '.png')
    fig = plt.figure(frameon=False)
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    ax.imshow(np.abs(train), aspect="auto")
    fig.savefig(file_path)
    fig.clear()
    plt.close(fig)

In [None]:
output_folder = '/content/datasets/signal_validation/'
os.makedirs(output_folder, exist_ok=True)

for i in tqdm(range(len(X_val))):
    train = X_val[i]
    train = Image.open(train)
    label = y_val[i]
    label_folder = os.path.join(output_folder, str(label))
    os.makedirs(label_folder, exist_ok=True)
    file_path = os.path.join(label_folder, str(i) + '.png')
    fig = plt.figure(frameon=False)
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    ax.imshow(np.abs(train), aspect="auto")
    fig.savefig(file_path)
    fig.clear()
    plt.close(fig)

In [None]:
import shutil

folder_to_zip = "/content/datasets"
output_filename = "datasets_exp"

shutil.make_archive(output_filename.replace("zip", ""), "zip", folder_to_zip)

In [None]:
source = "/content/datasets_exp.zip"
destination = "/content/drive/MyDrive/Colab Notebooks/datasets_exp.zip"

shutil.move(source, destination)

# Load Datasets From Hugging Face

In [None]:
from datasets import load_dataset, DatasetDict
import numpy as np

dataset = load_dataset('Expss4/img_train_band_spec')
split_ratio = 0.9
train_test_split = dataset['train'].train_test_split(test_size=1-split_ratio)

dataset_split = DatasetDict({
    'train': train_test_split['train'],
    'validation': train_test_split['test']
})
dataset_split

In [None]:
dataset['train'][0]['img_spec']

In [None]:
dataset['train'][0]['label']

In [None]:
# from sklearn.model_selection import train_test_split

# X_train, X_val, y_train, y_val = train_test_split(dataset['train'][:]['img_spec'], dataset['train'][:]['label'], test_size=0.1, random_state=42, stratify=dataset['train']['label'])
# len(X_train), len(X_val)

In [None]:
import os
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import numpy as np

output_folder = '/content/datasets/signal_train/'
os.makedirs(output_folder, exist_ok=True)

for i in tqdm(range(len(dataset_split['train']))):
    train = dataset_split['train'][i]['img_spec']
    label = dataset_split['train'][i]['label']
    label_folder = os.path.join(output_folder, str(label))
    os.makedirs(label_folder, exist_ok=True)
    file_path = os.path.join(label_folder, str(i) + '.png')
    fig = plt.figure(frameon=False)
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    ax.imshow(np.abs(train), aspect="auto")
    fig.savefig(file_path)
    fig.clear()
    plt.close(fig)

In [None]:
import os
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import numpy as np

output_folder = '/content/datasets/signal_validation/'
os.makedirs(output_folder, exist_ok=True)

for i in tqdm(range(len(dataset_split['validation']))):
    img = dataset_split['validation'][i]['img_spec']
    crop_box = (85, 60, img.width - 65, img.height - 55)
    train = img.crop(crop_box)
    label = dataset_split['validation'][i]['label']
    label_folder = os.path.join(output_folder, str(label))
    os.makedirs(label_folder, exist_ok=True)
    file_path = os.path.join(label_folder, str(i) + '.png')
    fig = plt.figure(frameon=False)
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    ax.imshow(np.abs(train), aspect="auto")
    fig.savefig(file_path)
    fig.clear()
    plt.close(fig)

In [None]:
from datasets import load_dataset

test = load_dataset('Expss4/img_test_band_spec')
test

In [None]:
test['train']['img_spec'][0]

In [None]:
import os
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import numpy as np

output_folder = '/content/datasets/signal_test/'
os.makedirs(output_folder, exist_ok=True)

for i in tqdm(range(0, 500)):
    img = test['train']['img_spec'][i]
    crop_box = (85, 60, img.width - 65, img.height - 55)
    train = img.crop(crop_box)
    file_path = os.path.join(output_folder, str(test['train']['id'][i]) + '.png')
    fig = plt.figure(frameon=False)
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    ax.imshow(np.abs(train), aspect="auto")
    fig.savefig(file_path)
    fig.clear()
    plt.close(fig)

In [None]:
import shutil

folder_to_zip = "/content/datasets"
output_filename = "datasets_test_1"

shutil.make_archive(output_filename.replace("zip", ""), "zip", folder_to_zip)

In [None]:
source = "/content/datasets_test_1.zip"
destination = "/content/drive/MyDrive/Colab Notebooks/datasets_test_1.zip"

shutil.move(source, destination)

# Modeling

In [None]:
!pip install torch transformers datasets evaluate
!pip install git+https://github.com/rwightman/pytorch-image-models.git

In [None]:
from huggingface_hub import login

login(token="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")

In [None]:
import glob
import torch
import torch.nn as nn
import torchvision.transforms as T
from torch.utils.data import DataLoader

train_files = glob.glob("/content/datasets/datasets_train/signal_train/**/*.png")
val_files = glob.glob("/content/datasets/datasets_train/signal_validation/**/*.png")
# test_files = glob.glob("/kaggle/working/signal_test/*.png")

transforms = {
    "train": T.Compose([
        T.Resize((384, 384), interpolation=T.InterpolationMode.BICUBIC),
        T.ToTensor(),
        T.Normalize(mean=torch.tensor([0.4850, 0.4560, 0.4060]), std=torch.tensor([0.2290, 0.2240, 0.2250]))
    ]),
    "test": T.Compose([
        T.Resize((384, 384), interpolation=T.InterpolationMode.BICUBIC),
        T.ToTensor(),
        T.Normalize(mean=torch.tensor([0.4850, 0.4560, 0.4060]), std=torch.tensor([0.2290, 0.2240, 0.2250]))
    ])
}

In [None]:
import timm
import timm.optim
import timm.scheduler
from timm.data import ImageDataset

train_dataset = ImageDataset("/content/datasets/datasets_train/signal_train", transform=transforms["train"])
val_dataset = ImageDataset("/content/datasets/datasets_train/signal_validation", transform=transforms["train"])
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = timm.create_model("hf_hub:timm/resnext101_32x16d.fb_swsl_ig1b_ft_in1k", pretrained=True, num_classes=3).to(device)

In [None]:
num_epochs = 3
criterion = nn.CrossEntropyLoss()
optimizer = timm.optim.create_optimizer_v2(model, opt="AdamW", lr=1e-3)
optimizer = timm.optim.Lookahead(optimizer, alpha=0.5, k=8)
scheduler = timm.scheduler.create_scheduler_v2(optimizer, num_epochs=num_epochs)[0]

info = {
    "metric_train": [],
    "metric_val": [],
    "train_loss": [],
    "val_loss": [],
    "best_metric_val": -999,
}
info

In [None]:
from sklearn.metrics import f1_score
from tqdm.auto import tqdm
import numpy as np

for epoch in range(num_epochs):
    train_loss_epoch = []
    val_loss_epoch = []
    train_preds = []
    train_targets = []
    val_preds = []
    val_targets = []
    num_updates = epoch * len(train_dataloader)

    model.train()
    for batch in tqdm(train_dataloader):
        inputs, targets = batch
        outputs = model(inputs.to(device))
        loss = criterion(outputs, targets.to(device))
        loss.backward()
        optimizer.step()
        scheduler.step_update(num_updates=num_updates)
        optimizer.zero_grad()
        train_loss_epoch.append(loss.item())
        train_preds += outputs.argmax(-1).detach().cpu().tolist()
        train_targets += targets.tolist()
    optimizer.sync_lookahead()
    scheduler.step(epoch + 1)

    model.eval()
    with torch.no_grad():
        for batch in tqdm(val_dataloader):
            inputs, targets = batch
            outputs = model(inputs.to(device))
            loss = criterion(outputs, targets.to(device))
            val_loss_epoch.append(loss.item())
            val_preds += outputs.argmax(-1).detach().cpu().tolist()
            val_targets += targets.tolist()

    metric_train = f1_score(train_targets, train_preds, average="macro")
    metric_val = f1_score(val_targets, val_preds, average="macro")
    info["metric_train"].append(metric_train)
    info["metric_val"].append(metric_val)
    info["train_loss"].append(np.average(train_loss_epoch))
    info["val_loss"].append(np.average(val_loss_epoch))

    if metric_val > info["best_metric_val"] and np.average(val_loss_epoch) <= min(info["val_loss"]):
        !cp checkpoint.pt model/checkpoint.pt
        print("New Best Score have been save!")
        info["best_metric_val"] = metric_val
        torch.save(model, "/content/drive/MyDrive/Model-EXP/checkpoint_best.pt")
    torch.save(model, "/content/drive/MyDrive/Model-EXP/checkpoint.pt")

    print(info)
    print(f"Epoch: {epoch} | Metric: {metric_val} | Training Loss: {np.average(train_loss_epoch)} | Validation Loss: {np.average(val_loss_epoch)}")

## Model Evaluation

In [None]:
import os

output_folder = '/kaggle/working/model/'
os.makedirs(output_folder, exist_ok=True)
!cp checkpoint.pt /kaggle/working/model/checkpoint.pt

In [None]:
loaded_model = torch.load("/kaggle/working/model/checkpoint.pt")

predictions = []
references = []

loaded_model.eval()
with torch.no_grad():
    for batch in tqdm(val_dataloader):
        inputs, targets = batch
        outputs = loaded_model(inputs.to(device))
        predictions += outputs.argmax(-1).detach().cpu().tolist()
        references += targets.tolist()

## Confusion Matrix

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

cm = confusion_matrix(references, predictions)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

# Evaluation & Submission

In [None]:
from PIL import Image

answers = dict()
with torch.no_grad():
    for f in tqdm(test_files):
        key = f.split("/")[-1].split(".")[0]
        img = Image.open(f).convert("RGB")
        transformed = transforms["test"](img).unsqueeze(0).to(device)
        answers[key] = loaded_model(transformed).argmax(-1).item()
answers

In [None]:
df = pd.read_csv("/kaggle/input/ultra-wide-band-pose-prediction/sample_submission.csv")
df

In [None]:
df['id'][0]

In [None]:
for i in range(len(df)):
    file_name = df['id'][i]
    df.loc[i, 'class'] = str(answers[file_name])
df

In [None]:
df.to_csv('MaXViT_BASE.csv', index=False)