## 1. Setup Environment

In [None]:
# Check GPU availability
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Clone repository or upload project files
# Option 1: Clone from GitHub
!git clone https://github.com/topspeed69/Music-Classification-by-Spectogram.git
%cd Music-Classification-by-Spectogram

# Option 2: If files are in Google Drive
# import os
# os.chdir('/content/drive/MyDrive/Music-Classification-by-Spectogram')

In [None]:
# Install dependencies
!pip install -q -r requirements.txt

# Additional Colab-specific installations
!pip install -q tensorboard

## 2. Data Preparation

In [None]:
# Download or prepare dataset
# Example: Download FMA dataset or use your own
import os
from pathlib import Path

# Check if data already exists
data_dir = Path('AudioToSpectogram/fma_small_dataset')
output_dir = Path('AudioToSpectogram/output')

if not data_dir.exists():
    print("Data directory not found. Please upload your audio files or download the FMA dataset.")
    print("You can download FMA small dataset from: https://github.com/mdeff/fma")
else:
    print(f"Found data directory: {data_dir}")
    audio_files = list(data_dir.rglob('*.mp3')) + list(data_dir.rglob('*.wav'))
    print(f"Number of audio files: {len(audio_files)}")

In [None]:
# Generate spectrograms from audio files (if not already done)
import sys
sys.path.append('AudioToSpectogram')

from audio_to_spectogram_mel import AudioToMelSpectrogram
from pathlib import Path

# Configuration
input_dir = 'AudioToSpectogram/fma_small_dataset'
output_dir = 'AudioToSpectogram/output_mel'

# Check if spectrograms already exist
output_path = Path(output_dir)
existing_spectrograms = list(output_path.rglob('*.png')) if output_path.exists() else []

if len(existing_spectrograms) > 0:
    print(f"Found {len(existing_spectrograms)} existing spectrograms. Skipping generation.")
    print("Set regenerate=True to generate spectrograms again.")
    regenerate = False
else:
    regenerate = True

if regenerate:
    print("Generating mel spectrograms...")
    converter = AudioToMelSpectrogram(
        input_dir=input_dir,
        output_dir=output_dir,
        sr=22050,
        n_mels=128,
        duration=3.0  # 3-second segments
    )
    converter.process_all_audio()
    print("Spectrogram generation complete!")

# Count spectrograms
spectrograms = list(Path(output_dir).rglob('*.png'))
print(f"\nTotal spectrograms available: {len(spectrograms)}")

In [None]:
# Split data into train/validation sets
from sklearn.model_selection import train_test_split
import shutil
from pathlib import Path
from tqdm import tqdm

def create_train_val_split(source_dir, train_dir, val_dir, val_split=0.2, seed=42):
    """
    Split spectrograms into train and validation sets
    """
    source_path = Path(source_dir)
    train_path = Path(train_dir)
    val_path = Path(val_dir)
    
    # Create directories
    train_path.mkdir(parents=True, exist_ok=True)
    val_path.mkdir(parents=True, exist_ok=True)
    
    # Get all spectrogram files
    all_files = list(source_path.rglob('*.png'))
    
    # Split
    train_files, val_files = train_test_split(
        all_files, test_size=val_split, random_state=seed
    )
    
    print(f"Copying {len(train_files)} files to train...")
    for file in tqdm(train_files):
        rel_path = file.relative_to(source_path)
        dest = train_path / rel_path
        dest.parent.mkdir(parents=True, exist_ok=True)
        if not dest.exists():
            shutil.copy2(file, dest)
    
    print(f"Copying {len(val_files)} files to validation...")
    for file in tqdm(val_files):
        rel_path = file.relative_to(source_path)
        dest = val_path / rel_path
        dest.parent.mkdir(parents=True, exist_ok=True)
        if not dest.exists():
            shutil.copy2(file, dest)
    
    return len(train_files), len(val_files)

# Create split
source_dir = 'AudioToSpectogram/output_mel'
train_dir = 'data/train'
val_dir = 'data/val'

# Check if split already exists
if Path(train_dir).exists() and Path(val_dir).exists():
    train_count = len(list(Path(train_dir).rglob('*.png')))
    val_count = len(list(Path(val_dir).rglob('*.png')))
    print(f"Train/val split already exists: {train_count} train, {val_count} val")
else:
    train_count, val_count = create_train_val_split(source_dir, train_dir, val_dir, val_split=0.2)
    print(f"\nSplit complete: {train_count} train, {val_count} val")

## 3. Configuration

In [None]:
# Load configuration files
import yaml
from pathlib import Path

def load_config(config_path):
    """Load YAML configuration file"""
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)

# Load all configs
model_config = load_config('configs/model_config.yaml')
training_config = load_config('configs/training_config.yaml')
data_config = load_config('configs/data_config.yaml')

# Merge configs
config = {**model_config, **training_config, **data_config}

print("Configuration loaded successfully!")
print(f"\nModel: {config['model']['name']}")
print(f"Embedding dimension: {config['model']['projection_head']['embedding_dim']}")
print(f"Batch size: {config['training']['batch_size']}")
print(f"Epochs: {config['training']['epochs']}")
print(f"Learning rate: {config['training']['optimizer']['learning_rate']}")

In [None]:
# Update data paths for Colab
config['data']['train_dir'] = 'data/train'
config['data']['val_dir'] = 'data/val'

# Adjust batch size for Colab GPU memory (if needed)
# Reduce if you encounter OOM errors
config['training']['batch_size'] = 32

# Adjust number of workers for Colab
config['training']['num_workers'] = 2

print("Configuration updated for Colab environment")

## 4. Model Setup

In [None]:
# Import model components
import sys
sys.path.append('CNN')

from CNN.models import build_model
from CNN.augmentation import get_augmentation_pipeline
from CNN.data import create_dataloaders
from CNN.training import get_contrastive_loss
from CNN.utils.metrics import AverageMeter

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Build model
print("\nBuilding model...")
model = build_model(config)
model = model.to(device)

# Print model summary
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\nModel Summary:")
print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
print(f"Model size: {total_params * 4 / 1e6:.2f} MB (float32)")

## 5. Data Loading

In [None]:
# Create augmentation pipelines
train_transform = get_augmentation_pipeline(config, training=True)
val_transform = get_augmentation_pipeline(config, training=False)

# Create dataloaders
print("Creating dataloaders...")
train_loader, val_loader = create_dataloaders(config, train_transform, val_transform)

print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")
print(f"Train samples: {len(train_loader.dataset)}")
print(f"Val samples: {len(val_loader.dataset)}")

In [None]:
# Visualize a batch of spectrograms
import matplotlib.pyplot as plt
import numpy as np

def visualize_batch(loader, num_samples=4):
    """
    Visualize augmented spectrogram pairs
    """
    view1, view2 = next(iter(loader))
    
    fig, axes = plt.subplots(2, num_samples, figsize=(15, 6))
    fig.suptitle('Augmented Spectrogram Pairs (View 1 and View 2)', fontsize=14)
    
    for i in range(num_samples):
        # View 1
        img1 = view1[i].cpu().numpy().transpose(1, 2, 0)
        axes[0, i].imshow(img1)
        axes[0, i].axis('off')
        axes[0, i].set_title(f'View 1 - Sample {i+1}')
        
        # View 2
        img2 = view2[i].cpu().numpy().transpose(1, 2, 0)
        axes[1, i].imshow(img2)
        axes[1, i].axis('off')
        axes[1, i].set_title(f'View 2 - Sample {i+1}')
    
    plt.tight_layout()
    plt.show()

print("Visualizing augmented pairs...")
visualize_batch(train_loader, num_samples=4)

## 6. Training Setup

In [None]:
import torch.optim as optim
from pathlib import Path

# Create loss function
contrastive_config = training_config['training']['contrastive']
criterion = get_contrastive_loss(
    loss_type=contrastive_config['loss_type'],
    temperature=contrastive_config['temperature'],
    use_cosine_similarity=contrastive_config['use_cosine_similarity']
)

# Create optimizer
optimizer_config = training_config['training']['optimizer']
optimizer = optim.Adam(
    model.parameters(),
    lr=optimizer_config['learning_rate'],
    weight_decay=optimizer_config['weight_decay']
)

# Create learning rate scheduler
scheduler_config = training_config['training']['scheduler']
if scheduler_config['type'] == 'cosine':
    scheduler = optim.lr_scheduler.CosineAnnealingLR(
        optimizer,
        T_max=training_config['training']['epochs'],
        eta_min=scheduler_config['min_lr']
    )

# Create checkpoint directory
checkpoint_dir = Path('checkpoints')
checkpoint_dir.mkdir(parents=True, exist_ok=True)

print("Training setup complete!")
print(f"Optimizer: {optimizer_config['type']}")
print(f"Learning rate: {optimizer_config['learning_rate']}")
print(f"Loss function: {contrastive_config['loss_type']}")
print(f"Temperature: {contrastive_config['temperature']}")

In [None]:
# Setup TensorBoard
%load_ext tensorboard

from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

# Create log directory with timestamp
log_dir = Path('runs') / datetime.now().strftime('%Y%m%d_%H%M%S')
log_dir.mkdir(parents=True, exist_ok=True)
writer = SummaryWriter(log_dir)

print(f"TensorBoard log directory: {log_dir}")
print("Launch TensorBoard with: %tensorboard --logdir runs")

## 7. Training Loop

In [None]:
# Training functions
from tqdm.notebook import tqdm

def train_epoch(model, train_loader, criterion, optimizer, device, epoch):
    """
    Train for one epoch
    """
    model.train()
    losses = AverageMeter()
    
    pbar = tqdm(train_loader, desc=f"Epoch {epoch}")
    for batch_idx, (view1, view2) in enumerate(pbar):
        view1 = view1.to(device)
        view2 = view2.to(device)
        
        # Forward pass for both views
        z1 = model(view1)
        z2 = model(view2)
        
        # Compute loss
        loss = criterion(z1, z2)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Update metrics
        losses.update(loss.item(), view1.size(0))
        
        # Update progress bar
        pbar.set_postfix({'loss': f'{losses.avg:.4f}'})
    
    return losses.avg


def validate(model, val_loader, criterion, device):
    """
    Validate model
    """
    model.eval()
    losses = AverageMeter()
    
    with torch.no_grad():
        for view1, view2 in tqdm(val_loader, desc="Validation"):
            view1 = view1.to(device)
            view2 = view2.to(device)
            
            # Forward pass
            z1 = model(view1)
            z2 = model(view2)
            
            # Compute loss
            loss = criterion(z1, z2)
            losses.update(loss.item(), view1.size(0))
    
    return losses.avg

print("Training functions defined")

In [None]:
# Main training loop
import time

print("Starting training...\n")
best_val_loss = float('inf')
epochs = training_config['training']['epochs']
train_losses = []
val_losses = []

start_time = time.time()

for epoch in range(1, epochs + 1):
    # Train
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device, epoch)
    train_losses.append(train_loss)
    
    # Validate
    val_loss = validate(model, val_loader, criterion, device)
    val_losses.append(val_loss)
    
    # Step scheduler
    scheduler.step()
    current_lr = optimizer.param_groups[0]['lr']
    
    # Log to tensorboard
    writer.add_scalar('Loss/train', train_loss, epoch)
    writer.add_scalar('Loss/val', val_loss, epoch)
    writer.add_scalar('LR', current_lr, epoch)
    
    # Print epoch summary
    print(f"\n{'='*60}")
    print(f"Epoch {epoch}/{epochs}")
    print(f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
    print(f"Learning Rate: {current_lr:.6f}")
    
    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        checkpoint_path = checkpoint_dir / 'best_model.pth'
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'val_loss': val_loss,
            'config': config
        }, checkpoint_path)
        print(f"✓ Saved best model (val_loss: {val_loss:.4f})")
    
    # Save periodic checkpoint
    if epoch % 10 == 0:
        checkpoint_path = checkpoint_dir / f'checkpoint_epoch_{epoch}.pth'
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'val_loss': val_loss,
            'config': config
        }, checkpoint_path)
        print(f"✓ Saved checkpoint at epoch {epoch}")
    
    print(f"{'='*60}")

# Training complete
total_time = time.time() - start_time
print(f"\n{'='*60}")
print("Training completed!")
print(f"Total time: {total_time/3600:.2f} hours")
print(f"Best validation loss: {best_val_loss:.4f}")
print(f"{'='*60}")

writer.close()

## 8. Training Visualization

In [None]:
# Plot training curves
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 4))

# Loss plot
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss', linewidth=2)
plt.plot(val_losses, label='Val Loss', linewidth=2)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True, alpha=0.3)

# Loss difference
plt.subplot(1, 2, 2)
loss_diff = [v - t for t, v in zip(train_losses, val_losses)]
plt.plot(loss_diff, label='Val - Train', linewidth=2, color='orange')
plt.xlabel('Epoch')
plt.ylabel('Loss Difference')
plt.title('Overfitting Monitor')
plt.axhline(y=0, color='r', linestyle='--', alpha=0.5)
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('training_curves.png', dpi=150, bbox_inches='tight')
plt.show()

print(f"Final train loss: {train_losses[-1]:.4f}")
print(f"Final val loss: {val_losses[-1]:.4f}")
print(f"Best val loss: {min(val_losses):.4f} (epoch {val_losses.index(min(val_losses))+1})")

In [None]:
# Launch TensorBoard
%tensorboard --logdir runs

## 9. Model Evaluation

In [None]:
# Load best model
checkpoint_path = checkpoint_dir / 'best_model.pth'
checkpoint = torch.load(checkpoint_path)

model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

print(f"Loaded best model from epoch {checkpoint['epoch']}")
print(f"Validation loss: {checkpoint['val_loss']:.4f}")

In [None]:
# Extract embeddings for a few samples
from CNN.embeddings import extract_embeddings

# Extract embeddings from validation set
print("Extracting embeddings...")
embeddings, file_paths = extract_embeddings(
    model=model,
    data_dir=config['data']['val_dir'],
    device=device,
    batch_size=64
)

print(f"\nExtracted embeddings shape: {embeddings.shape}")
print(f"Number of samples: {len(file_paths)}")

In [None]:
# Visualize embeddings using t-SNE
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import numpy as np

print("Computing t-SNE...")
tsne = TSNE(n_components=2, random_state=42, perplexity=30)
embeddings_2d = tsne.fit_transform(embeddings)

plt.figure(figsize=(12, 10))
plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1], alpha=0.6, s=50)
plt.title('t-SNE Visualization of Learned Embeddings', fontsize=14)
plt.xlabel('t-SNE 1')
plt.ylabel('t-SNE 2')
plt.grid(True, alpha=0.3)
plt.savefig('embeddings_tsne.png', dpi=150, bbox_inches='tight')
plt.show()

print("Embeddings visualization saved!")

## 10. Export Model

In [None]:
# Save model for inference
export_dir = Path('exported_models')
export_dir.mkdir(exist_ok=True)

# Save full model
model_path = export_dir / 'music_encoder.pth'
torch.save({
    'model_state_dict': model.state_dict(),
    'config': config,
    'best_val_loss': best_val_loss
}, model_path)

print(f"Model exported to: {model_path}")
print(f"Model size: {model_path.stat().st_size / 1e6:.2f} MB")

# Save to Google Drive (optional)
# import shutil
# drive_path = '/content/drive/MyDrive/models/music_encoder.pth'
# shutil.copy(model_path, drive_path)
# print(f"Model also saved to Google Drive: {drive_path}")

In [None]:
# Download trained model and artifacts
from google.colab import files
import zipfile

# Create zip file with all artifacts
zip_path = 'music_classification_artifacts.zip'
with zipfile.ZipFile(zip_path, 'w') as zipf:
    # Add model
    zipf.write(model_path, 'music_encoder.pth')
    
    # Add training curves
    if Path('training_curves.png').exists():
        zipf.write('training_curves.png')
    
    # Add t-SNE visualization
    if Path('embeddings_tsne.png').exists():
        zipf.write('embeddings_tsne.png')
    
    # Add best checkpoint
    if (checkpoint_dir / 'best_model.pth').exists():
        zipf.write(checkpoint_dir / 'best_model.pth', 'best_model.pth')

print(f"Created artifact package: {zip_path}")
print(f"Package size: {Path(zip_path).stat().st_size / 1e6:.2f} MB")

# Download
# files.download(zip_path)

## 11. Inference Example

In [None]:
# Test inference on a single spectrogram
from PIL import Image
import torchvision.transforms as transforms

def predict_embedding(model, image_path, device):
    """
    Extract embedding for a single spectrogram
    """
    # Load and preprocess image
    image = Image.open(image_path).convert('RGB')
    
    transform = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
    ])
    
    image_tensor = transform(image).unsqueeze(0).to(device)
    
    # Extract embedding
    model.eval()
    with torch.no_grad():
        embedding = model(image_tensor)
    
    return embedding.cpu().numpy()

# Test on a sample
sample_path = list(Path(config['data']['val_dir']).rglob('*.png'))[0]
embedding = predict_embedding(model, sample_path, device)

print(f"Sample: {sample_path.name}")
print(f"Embedding shape: {embedding.shape}")
print(f"Embedding (first 10 values): {embedding[0, :10]}")

## 12. Summary

### Files Generated:
- `checkpoints/best_model.pth` - Best model checkpoint
- `exported_models/music_encoder.pth` - Exported model for inference
- `training_curves.png` - Training visualization
- `embeddings_tsne.png` - Embedding visualization

### To use this model:
```python
# Load model
checkpoint = torch.load('exported_models/music_encoder.pth')
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

# Extract embeddings
embedding = model(spectrogram_tensor)
```