<a href="https://colab.research.google.com/github/ktichola/ML-and-DL-to-Gravitational-waves-analysis/blob/main/%CE%91%CE%BD%CF%84%CE%AF%CE%B3%CF%81%CE%B1%CF%86%CE%BF_Kyriaki_3_Gravity_Spy_Vision_Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'gravity-spy-gravitational-waves:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F433366%2F823552%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240506%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240506T082340Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D0376ccb18e1bcbad8ed8fa106fb3f1b782af807dda0d2074d0647b6779f45a415d3a9ffb93a1fc3d12b7a678e5dc9b685e2d0249fc4f80c101627cd66ee0766e6a7554ea9941a793ec32177de2378aa8b9b7fdc748b205bac010544ad76856f6aa06c7648f1b60d183257429863aad076ab0e0f7481fc47245ec6b900538dd6606b9e5c7403c628be6c43e09461a801c51ffea622a5e9cf692c2cff66154ce5399d70911ac86d23010bb8f5e60331493f46b274a9641959c255e35e90111c5aebf56c219ad6233474ba34b767d0fdb2e1533f7bb9be0d1cf7388b43f447f4acf85a8e1fa7bdcce95962520dc63f49df4ae9e409b1fe28af1765b329778b42481'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


In [None]:
pip install einops

Collecting einops
  Downloading einops-0.8.0-py3-none-any.whl (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.2/43.2 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.8.0
[0mNote: you may need to restart the kernel to use updated packages.


In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn

from einops import repeat
from einops.layers.torch import Rearrange

In [None]:
transform = transforms.Compose([
     transforms.Resize((32,32)),
     transforms.ToTensor(),
     transforms.Normalize((0.5, ), (0.5,))
    ])


train_image_dir = '../input/gravity-spy-gravitational-waves/train/train/'
val_image_dir = '../input//gravity-spy-gravitational-waves/validation/validation/'
test_image_dir = '../input//gravity-spy-gravitational-waves/test/test/'



In [None]:
train_set = torchvision.datasets.ImageFolder(root=train_image_dir, transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=32, shuffle=True)

validation_set = torchvision.datasets.ImageFolder(root=val_image_dir, transform=transform)
validation_loader = torch.utils.data.DataLoader(validation_set, batch_size=16,
                                         shuffle=False)

test_set = torchvision.datasets.ImageFolder(root=test_image_dir, transform=transform)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=1, shuffle=False)


In [None]:
class Patching(nn.Module):
    def __init__(self, patch_size):

        super().__init__()
        self.net = Rearrange("b c (h ph) (w pw) -> b (h w) (ph pw c)", ph = patch_size, pw = patch_size)

    def forward(self, x):
        x = self.net(x)
        return x


class LinearProjection(nn.Module):
    def __init__(self, patch_dim, dim):
        super().__init__()
        self.net = nn.Linear(patch_dim, dim)

    def forward(self, x):
        x = self.net(x)
        return x

In [None]:
class Embedding(nn.Module):
    def __init__(self, dim, n_patches):
        super().__init__()
        # [class] token
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))

        # position embedding
        self.pos_embedding = nn.Parameter(torch.randn(1, n_patches + 1, dim))

    def forward(self, x):
        batch_size, _, __ = x.shape


        # x.shape : [batch_size, n_patches, patch_dim] -> [batch_size, n_patches + 1, patch_dim]
        cls_tokens = repeat(self.cls_token, "1 1 d -> b 1 d", b = batch_size)
        x = torch.concat([cls_tokens, x], dim = 1)

        x += self.pos_embedding

        return x


class MLP(nn.Module):
    def __init__(self, dim, hidden_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, dim)
        )

    def forward(self, x):
        x = self.net(x)
        return x


In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, dim, n_heads):
        super().__init__()
        self.n_heads = n_heads
        self.dim_heads = dim // n_heads

        self.W_q = nn.Linear(dim, dim)
        self.W_k = nn.Linear(dim, dim)
        self.W_v = nn.Linear(dim, dim)

        self.split_into_heads = Rearrange("b n (h d) -> b h n d", h = self.n_heads)

        self.softmax = nn.Softmax(dim = -1)

        self.concat = Rearrange("b h n d -> b n (h d)", h = self.n_heads)

    def forward(self, x):
        q = self.W_q(x)
        k = self.W_k(x)
        v = self.W_v(x)

        q = self.split_into_heads(q)
        k = self.split_into_heads(k)
        v = self.split_into_heads(v)

        # Logit[i] = Q[i] * tK[i] / sqrt(D) (i = 1, ... , n_heads)
        # AttentionWeight[i] = Softmax(Logit[i]) (i = 1, ... , n_heads)
        logit = torch.matmul(q, k.transpose(-1, -2)) * (self.dim_heads ** -0.5)
        attention_weight = self.softmax(logit)

        # Head[i] = AttentionWeight[i] * V[i] (i = 1, ... , n_heads)
        # Output = concat[Head[1], ... , Head[n_heads]]
        output = torch.matmul(attention_weight, v)
        output = self.concat(output)
        return output



In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, dim, n_heads, mlp_dim, depth):
        super().__init__()

        # Layers
        self.norm = nn.LayerNorm(dim)
        self.multi_head_attention = MultiHeadAttention(dim = dim, n_heads = n_heads)
        self.mlp = MLP(dim = dim, hidden_dim = mlp_dim)
        self.depth = depth

    def forward(self, x):
        for _ in range(self.depth):
            x = self.multi_head_attention(self.norm(x)) + x
            x = self.mlp(self.norm(x)) + x

        return x


class MLPHead(nn.Module):
    def __init__(self, dim, out_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, out_dim)
        )

    def forward(self, x):
        x = self.net(x)
        return x


In [None]:
class ViT(nn.Module):
    def __init__(self, image_size, patch_size, n_classes, dim, depth, n_heads, channels = 3, mlp_dim = 256):

        super().__init__()

        # Params
        n_patches = (image_size // patch_size) ** 2
        patch_dim = channels * patch_size * patch_size
        self.depth = depth

        # Layers
        self.patching = Patching(patch_size = patch_size)
        self.linear_projection_of_flattened_patches = LinearProjection(patch_dim = patch_dim, dim = dim)
        self.embedding = Embedding(dim = dim, n_patches = n_patches)
        self.transformer_encoder = TransformerEncoder(dim = dim, n_heads = n_heads, mlp_dim = mlp_dim, depth = depth)
        self.mlp_head = MLPHead(dim = dim, out_dim = n_classes)


    def forward(self, img):

        x = img

        # x.shape : [batch_size, channels, image_height, image_width] -> [batch_size, n_patches, channels * (patch_size ** 2)]
        x = self.patching(x)

        # x.shape : [batch_size, n_patches, channels * (patch_size ** 2)] -> [batch_size, n_patches, dim]
        x = self.linear_projection_of_flattened_patches(x)

        # x.shape : [batch_size, n_patches, dim] -> [batch_size, n_patches + 1, dim]
        x = self.embedding(x)

        # x.shape : No Change
        x = self.transformer_encoder(x)

        # x.shape : [batch_size, n_patches + 1, dim] -> [batch_size, dim] -> [batch_size, n_classes]
        x = x[:, 0]
        x = self.mlp_head(x)

        return x

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print(device)

cpu


In [None]:
net = ViT(
    image_size=32,
    patch_size=4,
    n_classes=22,
    dim=256,
    depth=3,
    n_heads=4,
    mlp_dim = 256
).to(device)

In [None]:
import torch.optim as optim
import torch.nn as nn
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)

In [None]:
import time
epochs = 10
accuracy=[]
losses=[]
val_accuracy=[]
val_losses=[]
sta = time.time()
for epoch in range(0, epochs):
    epoch_train_loss = 0
    epoch_train_acc = 0
    epoch_val_loss = 0
    epoch_val_acc = 0

    net.train()
    for data in train_loader:
        inputs, labels = data[0].to(device), data[1].to(device)

        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        epoch_train_loss += loss.item()/len(train_loader)
        acc = (outputs.argmax(dim=1) == labels).float().mean()
        epoch_train_acc += acc/len(train_loader)

        del inputs
        del outputs
        del loss
    accuracy.append(epoch_train_acc)
    losses.append(epoch_train_loss)

    net.eval()
    with torch.no_grad():
        for data in validation_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            epoch_val_loss += loss.item()/len(validation_loader)
            val_acc = (outputs.argmax(dim=1) == labels).float().mean()
            epoch_val_acc += val_acc/len(validation_loader)
        val_accuracy.append(epoch_val_acc)
        val_losses.append(epoch_val_loss)

    print("epoch: {}, loss: {}, acc: {}    " \
    "val_epoch: {}, val_loss: {}, val_acc: {}".format(epoch+1, epoch_train_loss, epoch_train_acc, epoch+1, epoch_val_loss, epoch_val_acc))


end= time.time()
print(end-sta)

In [None]:
import matplotlib.pyplot as plt
plt.plot(losses, label='train loss')
plt.plot(val_losses, label='validation loss')
plt.legend()

In [None]:
accslist=[]
val_accslist=[]
for i in range(len(accuracy)):
    accslist.append(accuracy[i].item())

for i in range(len(val_accuracy)):
    val_accslist.append(val_accuracy[i].item())

plt.plot(accslist, label='train acc')
plt.plot(val_accslist, label='validation acc')
plt.legend()

In [None]:
import time
epoch_test_loss = 0
epoch_test_acc = 0
pred=[]
ans=[]
start=time.time()
with torch.no_grad():
    for data in test_loader:
        inputs, labels = data[0].to(device), data[1].to(device)
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        epoch_test_loss += loss.item()/len(test_loader)
        test_acc = (outputs.argmax(dim=1) == labels).float().mean()
        pre =outputs.argmax(dim=1)
        pred.append(pre.item())
        ans.append(labels.item())
        epoch_test_acc += test_acc/len(test_loader)


end = time.time()
print(end-start)
print("test_loss: {}, test_acc: {}".format(epoch_test_loss, epoch_test_acc))

In [None]:
from sklearn.metrics import f1_score

print(f1_score(ans, pred,average='weighted'))

In [None]:
import os
files = os.listdir(test_image_dir)
print(files)
files.sort()


In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
cm = confusion_matrix(ans, pred)
cm = pd.DataFrame(data=cm, index=files, columns=files)
plt.figure(figsize = (10,7))
sns.heatmap(cm,annot=True,cmap='Blues',fmt='d')
