In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os


# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [5]:
import torch
import torch.nn as nn
from tqdm import tqdm
from IPython.display import clear_output

import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader, random_split
import os
import random
import numpy as np
from PIL import Image
import torch.nn.functional as F

# Define transformation for spectrogram images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# Dataset to load spectrograms and apply masking
class SpectrogramDataset(Dataset):
    def __init__(self, folder_path, transform=None, mask_ratio=0.5, patch_size=16):
        self.folder_path = folder_path
        self.transform = transform
        self.mask_ratio = mask_ratio
        self.patch_size = patch_size
        self.image_paths = self.image_paths = [
            os.path.join(root, f) 
            for root, _, files in os.walk(folder_path) 
            for f in files if f.endswith('.png')
        ]

    import torch.nn.functional as F

    import torch.nn.functional as F

    def mask_spectrogram(self, img):
        """Randomly mask patches of the spectrogram"""
        c, h, w = img.shape
        num_patches_h = h // self.patch_size
        num_patches_w = w // self.patch_size
    
        # Create binary mask with patches
        mask = torch.ones((num_patches_h, num_patches_w))
        num_masked = int(self.mask_ratio * num_patches_h * num_patches_w)
        masked_indices = random.sample(range(num_patches_h * num_patches_w), num_masked)
    
        for idx in masked_indices:
            i, j = divmod(idx, num_patches_w)
            mask[i, j] = 0  # Set masked patches to 0
    
        # Resize mask to match image size
        mask = mask.unsqueeze(0).unsqueeze(0)  # Shape (1,1,H,W)
        mask = F.interpolate(mask, size=(h, w), mode="bilinear", align_corners=False)  # Resize smoothly
        mask = mask.squeeze(0).squeeze(0)  # Remove extra dimensions
    
        return img * mask, mask



    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = Image.open(img_path).convert("L")  # Convert to grayscale
        if self.transform:
            img = self.transform(img)
        masked_img, mask = self.mask_spectrogram(img)
        return masked_img, img, mask  # Masked spectrogram, original spectrogram, mask

# Define EfficientNet-B0 based encoder
class EfficientNetMaskedModel(nn.Module):
    def __init__(self, pretrained=True):
        super(EfficientNetMaskedModel, self).__init__()
        self.encoder = models.efficientnet_b0(pretrained=pretrained)
        
        # Modify first convolution layer to accept 1-channel input
        self.encoder.features[0][0] = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1, bias=False)
        
        self.encoder.classifier = nn.Identity()  # Remove classification head
        
        self.decoder = nn.Sequential(
            nn.Conv2d(1280, 512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(512, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(256, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),  # 7x7 → 14x14
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1),  # 14x14 → 28x28
            nn.ReLU(),
            nn.ConvTranspose2d(16, 8, kernel_size=4, stride=2, padding=1),   # 28x28 → 56x56
            nn.ReLU(),
            nn.ConvTranspose2d(8, 4, kernel_size=4, stride=2, padding=1),    # 56x56 → 112x112
            nn.ReLU(),
            nn.ConvTranspose2d(4, 1, kernel_size=4, stride=2, padding=1)     # 112x112 → 224x224
        )


    def forward(self, x):
        encoded = self.encoder.features(x)  # Extract features
        reconstructed = self.decoder(encoded)  # Reconstruct masked spectrogram
        return reconstructed



# Training setup with validation loop
def train_model(data_folder, epochs=10, batch_size=16, lr=1e-3, val_split=0.2):
    dataset = SpectrogramDataset(data_folder, transform)
    
    # Split dataset into train and validation sets
    train_size = int((1 - val_split) * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # model = EfficientNetMaskedModel(pretrained=True).cuda()
    model = trained_model
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr)

    best_val_loss = float('inf')

    for epoch in range(epochs):
        # Training phase
        model.train()
        train_loss = 0
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", leave=True)
        for masked_spectrograms, original_spectrograms, _ in progress_bar:
            masked_spectrograms, original_spectrograms = masked_spectrograms.cuda(), original_spectrograms.cuda()
            optimizer.zero_grad()
            reconstructed = model(masked_spectrograms)
            # print(reconstructed.shape,original_spectrograms.shape)
            loss = criterion(reconstructed, original_spectrograms)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            progress_bar.set_postfix({"Batch Loss": loss.item()})

        train_loss /= len(train_loader)

        # Validation phase
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for masked_spectrograms, original_spectrograms, _ in val_loader:
                masked_spectrograms, original_spectrograms = masked_spectrograms.cuda(), original_spectrograms.cuda()
                reconstructed = model(masked_spectrograms)
                loss = criterion(reconstructed, original_spectrograms)
                val_loss += loss.item()

        val_loss /= len(val_loader)
        clear_output()
        if (epoch%3==0) and (epoch!=0):
            print(f"Saving Checkpoint for {epoch}")
            torch.save(model.state_dict(), f"best_model_{epoch}.pth")
        # Save best model
        # if val_loss < best_val_loss:
        #     best_val_loss = val_loss
        #     torch.save(model.state_dict(), "best_model.pth")
        #     print(f"Best model saved at epoch {epoch + 1}")
        
        print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}")

    return model

# Run training
data_folder = "/kaggle/input/spectograms-5-sec-cut/"
trained_model = train_model(data_folder,batch_size = 128)


Saving Checkpoint for 9
Epoch 10/10, Train Loss: 0.006944, Val Loss: 0.006803
