# **INIT (RUN FIRST)**

In [None]:
import pandas as pd
import numpy as np
from brainflow.board_shim import BoardShim, BrainFlowInputParams, LogLevels, BoardIds
from brainflow.data_filter import DataFilter, FilterTypes
import matplotlib
import os
import torch
import pytorch_lightning as pl
from torch.utils.data import Dataset
from torchvision.transforms import ToTensor
from pytorch_lightning.loggers import WandbLogger
import torchinfo

import cv2
from torch.utils.data import DataLoader
import wandb

matplotlib.use('Agg')
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
%matplotlib inline
scaler = StandardScaler()

In [None]:
wandb.login()
api_key = 'a45abb01f9556b57620ce77c8984452bee7a8772'
board_id = 38
sf = 256

# **DATASET AND LOADER**

In [None]:
#INI DARI GAMBAR SPECTROGRAM
class AutoencoderSpectrogramImage(Dataset):
    def __init__(self, annotations_file, dir, transform=None):
        self.file_lists = pd.read_csv(annotations_file, header=None)
        self.dir = dir
        self.transform = transform
        self.eeg_names = ['TP9', 'Fp1', 'Fp2', 'TP10']

    def __len__(self):
        return len(self.file_lists)

    def __getitem__(self, idx):
        file_path = os.path.join(self.dir, self.file_lists.iloc[idx, 0].replace('/','\\'))
        file_path = file_path[:-4] + '\\'
        
        spectrograms = []
        for i in self.eeg_names:
          temp = cv2.imread(file_path + i + '.png', cv2.IMREAD_GRAYSCALE)
          spectrograms.append(temp)
        spectrograms = np.array(spectrograms)

        if self.transform:
            spectrograms = self.transform(spectrograms)
            spectrograms = spectrograms.permute(1,2,0)
        return spectrograms

In [None]:
# dataset_dir = 'D:\\Nicko\\TUGAS_AKHIR\\Dataset\\Dataset_TA\\'
# spectrogram_dir = 'D:\\Nicko\\TUGAS_AKHIR\\Dataset\\Dataset_TA_img\\'
dataset_dir = 'D:\\Nicko\\TUGAS_AKHIR\\Dataset_new_12\\Dataset_TA\\'
spectrogram_dir = 'D:\\Nicko\\TUGAS_AKHIR\\Dataset_new_12\\Dataset_TA_img\\'
training_file = dataset_dir + 'training_dir.csv'
testing_file = dataset_dir + 'testing_dir.csv'

In [None]:
#buat image
datasetTrain = AutoencoderSpectrogramImage(
    annotations_file=training_file,
    dir=spectrogram_dir,
    transform=ToTensor()
)
datasetTest = AutoencoderSpectrogramImage(
    annotations_file=testing_file,
    dir=spectrogram_dir,
    transform=ToTensor()
)

In [None]:
test = len(datasetTest)

In [None]:
test

In [None]:
# split train to train and validation
# use 20% of training data for validation
train_set_size = int(len(datasetTrain) * 0.8)
valid_set_size = len(datasetTrain) - train_set_size

# split the train set into two
seed = torch.Generator().manual_seed(42)
train_set, valid_set = torch.utils.data.random_split(datasetTrain, [train_set_size, valid_set_size], generator=seed)

# data loader
train_dataloader = DataLoader(train_set, batch_size=25, shuffle=True)
validation_dataloader = DataLoader(valid_set, batch_size=25)
test_dataloader = DataLoader(datasetTest, batch_size=25)

# **AUTOENCODER MODEL**

In [None]:
#MODEL 5, 32640 param (64 x 10 x 51)
class LitAutoEncoder5(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.encoder = torch.nn.Sequential(
            torch.nn.Conv2d(4, 8, 3, stride=2, padding=1),
            torch.nn.BatchNorm2d(8),
            torch.nn.ReLU(True),
            torch.nn.Conv2d(8, 16, 3, stride=2, padding=1),
            torch.nn.BatchNorm2d(16),
            torch.nn.ReLU(True),
            torch.nn.Conv2d(16, 32, 3, stride=2, padding=0),
            torch.nn.ReLU(True),
            torch.nn.Conv2d(32, 64, 3, stride=1, padding=0),
            torch.nn.ReLU(True),
            torch.nn.BatchNorm2d(64),
            torch.nn.Conv2d(64, 64, 3, stride=2, padding=1),
            torch.nn.ReLU(True),
        )
        self.decoder = torch.nn.Sequential(
            torch.nn.ConvTranspose2d(64, 64, 3, stride=2,
            padding=1, output_padding=1),
            torch.nn.BatchNorm2d(64),
            torch.nn.ReLU(True),
            torch.nn.ConvTranspose2d(64, 32, 3, stride=1,
            padding=0, output_padding=0),
            torch.nn.ReLU(True),
            torch.nn.ConvTranspose2d(32, 16, 3, stride=2,
            padding=0, output_padding=0),
            torch.nn.ReLU(True),
            torch.nn.BatchNorm2d(16),
            torch.nn.ConvTranspose2d(16, 8, 3, stride=2,
            padding=1, output_padding=1),
            torch.nn.BatchNorm2d(8),
            torch.nn.ReLU(True),
            torch.nn.ConvTranspose2d(8, 4, 3, stride=2,
            padding=1, output_padding=1)
        )

    def forward(self, x):
      embedding = self.encoder(x)
      return embedding

    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        # it is independent of forward
        x = batch
        z = self.encoder(x)
        x_hat = self.decoder(z)
        loss = torch.nn.functional.mse_loss(x_hat, x)
        # Logging to TensorBoard (if installed) by default
        self.log("train_loss", loss, logger=True, on_epoch=True)
        return loss

    def validation_step(self, val_batch, batch_idx):
        x = val_batch
        z = self.encoder(x)
        x_hat = self.decoder(z)
        val_loss = torch.nn.functional.mse_loss(x_hat, x)
        self.log("val_loss", val_loss, logger=True, on_epoch=True)

    def test_step(self, batch, batch_idx):
        # this is the test loop
        x = batch
        z = self.encoder(x)
        x_hat = self.decoder(z)
        test_loss = torch.nn.functional.mse_loss(x_hat, x)
        self.log("test_loss", test_loss, logger=True)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

In [None]:
autoencoder = LitAutoEncoder5()
autoencoder

In [None]:
wandb_logger = WandbLogger(project='autoencoder', save_dir='D:\\Nicko\\TUGAS_AKHIR\\AutoEncoder\\model_6_4channel')

In [None]:
checkpoint_callback = pl.callbacks.ModelCheckpoint(monitor="val_loss", dirpath="D:\\Nicko\\TUGAS_AKHIR\\AutoEncoder\\model_6_4channel\\autoencoder\\run_1",
    filename="classifier-{epoch:02d}-{val_loss:.2f}",)

In [None]:
trainer = pl.Trainer(max_epochs=5000, devices=1, accelerator='gpu', log_every_n_steps=9, logger=wandb_logger, callbacks=[checkpoint_callback])
trainer.fit(autoencoder, train_dataloader, validation_dataloader)

In [None]:
autoencoder_load = LitAutoEncoder5().load_from_checkpoint('D:\\Nicko\\TUGAS_AKHIR\\Classifier\\model_latent_32640_2channel.ckpt')

In [None]:
trainer = pl.Trainer(max_epochs=5000, devices=1, accelerator='gpu', log_every_n_steps=9, logger=False)
trainer.test(autoencoder, test_dataloader)

In [None]:
data_test = datasetTest[10]

In [None]:
data_test = data_test[None, :]

In [None]:
data_test.shape

In [None]:
data_squeezed = torch.squeeze(data_test)
plt.imshow(data_squeezed[0].numpy(), cmap='gray_r')

In [None]:
autoencoder.eval()
with torch.no_grad():
    result = autoencoder(data_test)
    decoded_result = autoencoder.decoder(result)
    print(decoded_result.shape)

In [None]:
decoded_squeezed = torch.squeeze(decoded_result)
plt.imshow(decoded_squeezed[0].detach().numpy(), cmap='gray_r')