In [5]:
%%writefile main.py

import torch
import numpy as np
from torchvision import datasets
import torchvision.transforms as transforms
import torch.utils.data as data
import torch.nn as nn
import torch.nn.functional as F

from tqdm.notebook import tqdm, trange


import copy
import random
import time
import os
import json

from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
import torch.multiprocessing as mp

SEED = 1234
ROOT = "."
MODEL_NAME = "ConvAutoencoder"
SENARIO = "1GPU"
EPOCHS = 10
BATCH_SIZE = 64

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

"""# 2. Initialize the DDP Environment"""

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'  # Change this to the master node's IP address if using multiple machines
    os.environ['MASTER_PORT'] = '12345'  # Pick a free port on the master node
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

"""# 3. Define a Model."""

class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
        ## encoder layers ##
        # conv layer (depth from 1 --> 16), 3x3 kernels
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        # conv layer (depth from 16 --> 4), 3x3 kernels
        self.conv2 = nn.Conv2d(16, 4, 3, padding=1)
        # pooling layer to reduce x-y dims by two; kernel and stride of 2
        self.pool = nn.MaxPool2d(2, 2)

        ## decoder layers ##
        ## a kernel of 2 and a stride of 2 will increase the spatial dims by 2
        self.t_conv1 = nn.ConvTranspose2d(4, 16, 2, stride=2)
        self.t_conv2 = nn.ConvTranspose2d(16, 1, 2, stride=2)


    def forward(self, x):
        ## encode ##
        # add hidden layers with relu activation function
        # and maxpooling after
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        # add second hidden layer
        x = F.relu(self.conv2(x))
        x = self.pool(x)  # compressed representation

        ## decode ##
        # add transpose conv layers, with relu activation function
        x = F.relu(self.t_conv1(x))
        # output layer (with sigmoid for scaling from 0 to 1)
        x = F.sigmoid(self.t_conv2(x))

        return x


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def create_model():
    model = ConvAutoencoder()

    print(f'The model has {count_parameters(model):,} trainable parameters')

    return model

"""# 4. Create a Dummy Dataset"""

def create_dataloader(rank, world_size, batch_size=32, root = ROOT):
    ## defined transforms
    transform = transforms.ToTensor()

    ## load the data with
    outdir = f'{root}/data'
    if rank == 0 and not os.path.exists(outdir):
        train_data = datasets.MNIST(outdir, train=True,
                                   download=True, transform=transform)
        test_data = datasets.MNIST(outdir, train=False,
                                  download=True, transform=transform)

    dist.barrier()  # Ensure all processes wait for the dataset to be downloaded
    train_data = datasets.MNIST(outdir, train=True,
                                   download=True, transform=transform)
    test_data = datasets.MNIST(outdir, train=False,
                                  download=True, transform=transform)

    ## create the validation split
    VALID_RATIO = 0.9

    n_train_examples = int(len(train_data) * VALID_RATIO)
    n_valid_examples = len(train_data) - n_train_examples

    train_data, valid_data = data.random_split(train_data,
                                              [n_train_examples, n_valid_examples])


    ## ensure the validation data uses the test transforms
    #valid_data = copy.deepcopy(valid_data)
    #valid_data.dataset.transform = test_transforms

    ## print
    print(f'Number of training examples: {len(train_data)}')
    print(f'Number of validation examples: {len(valid_data)}')
    print(f'Number of testing examples: {len(test_data)}')



    train_sampler = DistributedSampler(train_data, num_replicas=world_size, rank=rank, shuffle=True)
    val_sampler = DistributedSampler(valid_data, num_replicas=world_size, rank=rank)

    train_dataloader = data.DataLoader(train_data, batch_size=batch_size, sampler=train_sampler, pin_memory=True) #use num_workers > 0 for better performance
    val_dataloader = data.DataLoader(valid_data, batch_size=batch_size, sampler=val_sampler, pin_memory=True) #use num_workers > 0 for better performance
    test_dataloader = data.DataLoader(test_data, batch_size=batch_size, shuffle=False, pin_memory=True) #no sampling for test dataset


    return train_dataloader, val_dataloader, test_dataloader

"""# 5. Implement the Training Loop

## a. Help function
"""

RESULTS_FILE = f"{ROOT}/{MODEL_NAME}_{EPOCHS}epochs_{SENARIO}.json"

def log_results(scenario, results):
    """
    Save results to a JSON file for comparison across scenarios.
    """
    if os.path.exists(RESULTS_FILE):
        with open(RESULTS_FILE, 'r') as f:
            all_results = json.load(f)
    else:
        all_results = {}

    all_results[scenario] = results

    with open(RESULTS_FILE, 'w') as f:
        json.dump(all_results, f, indent=4)

def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim=True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

"""## b. train function"""

def train(model, iterator, optimizer, criterion, rank):

    epoch_loss = 0
    epoch_acc = 0

    model.train()
    i = 0
    for data in tqdm(iterator, desc= f"Training on the rank {rank}", leave=False):
        images, _ = data
        images = images.to(rank)
        # clear the gradients of all optimized variables
        optimizer.zero_grad()
        # forward pass: compute predicted outputs by passing inputs to the model
        outputs = model(images)
        # calculate the loss
        loss = criterion(outputs, images)
        # backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        # update running training loss
        epoch_loss += loss.item()

        if i % 50 == 0 and rank == 0:
            print(f"- {i} was passed over {len(iterator)}")

        i+=1
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

"""## c. Validation function"""

def evaluate(model, iterator, criterion, rank):

    epoch_loss = 0
    epoch_acc = 0

    model.eval()
    i=0
    with torch.no_grad():

        for (images, labels) in tqdm(iterator, desc=f"Evaluating on the rank {rank}", leave=False):

            images = images.to(rank)
            labels = labels.to(rank)

            # get sample outputs
            output = model(images)

            loss = criterion(output, images)

            # acc = calculate_accuracy(y_pred, y)

            epoch_loss += loss.item()
            # epoch_acc += acc.item()
            if i % 50 == 0 and rank == 0:
                print(f"- {i} was passed over {len(iterator)}")
            i+=1

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

outdir = f'{ROOT}/model/'
if not os.path.exists(outdir):
    os.makedirs(outdir)

def main_train(rank, world_size, root = outdir, num_epochs = EPOCHS, model_name = MODEL_NAME):
    ## a. Set up the distributed process groups
    setup(rank, world_size)
    print(f"Process {rank} initialized.")

    ## b. Create Model, DataLoader
    model = create_model().to(rank)  # Move model to GPU
    train_dataloader, val_dataloader, test_dataloader = create_dataloader(rank, world_size)

    ## c. Wrap the model with DistributedDataParallel
    ddp_model = DDP(model, device_ids=[rank])

    ## d. Loss and Optimizer
    criterion = nn.MSELoss().to(rank) # Move loss to GPU
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    ## e. Training Loop
    best_valid_loss = float('inf')
    training_times = []
    train_losses = []
    train_accurcy = []
    validation_times = []
    validation_losses = []
    validation_accurcy = []

    epoch_times = []

    for epoch in trange(num_epochs, desc="Epochs"):
        start_epoch_time = time.monotonic()
        start_time = time.monotonic()

        train_loss, train_acc = train(ddp_model, train_dataloader, optimizer, criterion, rank)
        train_time = time.monotonic() - start_time
        training_times.append(train_time)
        train_losses.append(train_loss)
        train_accurcy.append(train_acc)

        start_time = time.monotonic()
        valid_loss, valid_acc = evaluate(ddp_model, val_dataloader, criterion, rank)
        val_time = time.monotonic() - start_time
        validation_times.append(val_time)
        validation_losses.append(valid_loss)
        validation_accurcy.append(valid_acc)

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(ddp_model.state_dict(), f'{root}tut4-model.pt')

        end_time = time.monotonic()
        e_time = end_time - start_epoch_time
        epoch_times.append(e_time)
        epoch_mins, epoch_secs = epoch_time(start_epoch_time, end_time)

        print(f'--------------|     On process {rank}      |----------------')
        print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
        print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
        print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

    ## f. test after train
    ddp_model.load_state_dict(torch.load(f'{root}tut4-model.pt'))
    start_time = time.monotonic()
    test_loss, test_acc = evaluate(ddp_model, test_dataloader, criterion, rank)
    test_time = time.monotonic() - start_time
    print(f'Test results on process {rank}: Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

    # Log results
    results = {
        "world_size": world_size,
        "rank": rank,
        "training_times": training_times,
        "train_losses": train_losses,
        "train_accurcy": train_accurcy,
        "validation_times": validation_times,
        "validation_losses": validation_losses,
        "validation_accurcy": validation_accurcy,
        "test_time": test_time,
        "test_loss": test_loss,
        "test_acc": test_acc,
        "epoch_times": epoch_times
     }

    scenario = f"model_{model_name}_epochs_{num_epochs}_{world_size}_GPUs_rank_{rank}"
    log_results(scenario, results)
    dist.barrier()

    cleanup()
    print(f'Process {rank} finished training.')

"""# 6. Main Execution"""
if __name__ == "__main__":

    def main():
        world_size = torch.cuda.device_count()
        print(f'Total number of devices detected: {world_size}')

        if world_size >= 1:
            #start the training process on all available GPUs
            if world_size > 1:
                #start the training process on all available GPUs
                mp.spawn(
                    main_train,
                    args=(world_size,),
                    nprocs=world_size,
                    join=True
                )
            else:
                #run training on single GPU
                main_train(rank=0, world_size=1)

        else:
            print('no GPUs found. Please make sure you have configured CUDA correctly')

    main()

Overwriting main.py


In [6]:
!python main.py

Total number of devices detected: 1
Process 0 initialized.
The model has 1,077 trainable parameters
[rank0]:[W109 15:57:55.492580377 ProcessGroupNCCL.cpp:4115] [PG ID 0 PG GUID 0 Rank 0]  using GPU 0 to perform barrier as devices used by this process are currently unknown. This can potentially cause a hang if this rank to GPU mapping is incorrect.Specify device_ids in barrier() to force use of a particular device,or call init_process_group() with a device_id.
Number of training examples: 54000
Number of validation examples: 6000
Number of testing examples: 10000
Epochs:   0%|          | 0/10 [00:00<?, ?it/s]
Training on the rank 0:   0%|          | 0/1688 [00:00<?, ?it/s]
- 0 was passed over 1688
- 50 was passed over 1688
- 100 was passed over 1688
- 150 was passed over 1688
- 200 was passed over 1688
- 250 was passed over 1688
- 300 was passed over 1688
- 350 was passed over 1688
- 400 was passed over 1688
- 450 was passed over 1688
- 500 was passed over 1688
- 550 was passed over 168