<a href="https://colab.research.google.com/github/vijaygwu/IntroToDeepLearning/blob/main/GPURayParallelizedCNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Explanation of the Optimized Code with Ray on GPUs**

This code applies several optimizations to improve the performance of training a neural network on the MNIST dataset using **PyTorch** and **Ray**. The optimizations include **parallel data loading**, **GPU acceleration**, and **mixed precision training**. Let's go step by step to explain each part of the code in detail.




### **Key Optimizations**:

1. **Parallel Data Loading with Ray**:
   - Ray is used to parallelize the data loading process. This ensures that the data is loaded concurrently across multiple workers, speeding up data preparation.

2. **GPU Acceleration**:
   - By utilizing a GPU (if available), the model can perform operations much faster than on a CPU. Moving the model and data to the GPU can result in significant speed improvements, especially for larger models and datasets.

3. **Mixed Precision Training**:
   - Mixed precision allows for faster computation and lower memory usage by using half-precision floating point (FP16) for parts of the model. It is especially useful when training on modern GPUs that support this feature.

---


### **1. Importing Required Libraries**

```python
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import time
import ray
```

- **PyTorch Libraries**:
  - `torch`: Core PyTorch library for tensor operations.
  - `nn`: Provides modules to build neural networks, such as `nn.Linear`.
  - `optim`: Contains optimizers like Stochastic Gradient Descent (SGD).
  - `F`: Contains functions like `relu` and `cross_entropy` used in forward passes.
  - `DataLoader`: A utility to load and batch datasets.
  - `datasets` and `transforms`: Part of `torchvision`, used for loading and transforming popular datasets like MNIST.

- **Other Libraries**:
  - `time`: Used to record training time.
  - `ray`: A library for distributed computing, used here for parallel data loading and potentially distributing workloads.

---





In [21]:
!pip install ray



In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import time
import ray


### **2. Initialize Ray and Check Device Availability**

```python
# Initialize Ray
ray.init(ignore_reinit_error=True)

# Check if GPU is available and use it if possible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
```

- **Ray Initialization**: `ray.init()` initializes Ray, which allows you to run tasks in parallel across multiple cores. Here, `ignore_reinit_error=True` ensures that the script continues even if Ray is already initialized.

- **GPU Check**: `torch.device("cuda")` checks whether a GPU is available. If a GPU is detected, the model and data are moved to the GPU for faster computations. If not, it defaults to the CPU. The `device` variable is used later to move data and models to the appropriate device.

---


In [23]:
# Initialize Ray
ray.init(ignore_reinit_error=True)

# Check if GPU is available and use it if possible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


2024-09-04 20:13:57,885	INFO worker.py:1783 -- Started a local Ray instance.


Using device: cuda


### **3. MNIST Dataset and DataLoader Setup**

```python
# Define the data transformations: Convert to tensor and normalize
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# Load the MNIST dataset using torchvision
train_dataset = datasets.MNIST(root='./mnist_data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./mnist_data', train=False, download=True, transform=transform)

# Define the DataLoader for batching, with num_workers for parallel data loading
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)
```

- **Transforms**:
  - `ToTensor()`: Converts images from PIL format to PyTorch tensors.
  - `Normalize((0.1307,), (0.3081,))`: Normalizes the data using the mean and standard deviation values specific to MNIST. Normalization helps the network learn faster by scaling the pixel values to a more manageable range.

- **Datasets**:
  - `datasets.MNIST`: Automatically downloads and loads the MNIST dataset. The dataset is transformed using `transform` (converted to tensors and normalized).

- **DataLoader Optimizations**:
  - `batch_size=64`: Batches the dataset into groups of 64 images, which helps optimize GPU processing.
  - `num_workers=4`: Specifies 4 worker threads for parallel data loading. Multiple workers fetch the data in parallel to prevent the training process from waiting for data.
  - `pin_memory=True`: This pins the memory, ensuring faster data transfer between the host (CPU) and the device (GPU).

---

In [24]:
###############################################
# MNIST Dataset and Transformations
###############################################

# Define the data transformations: Convert to tensor and normalize
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# Load the MNIST dataset using torchvision
train_dataset = datasets.MNIST(root='./mnist_data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./mnist_data', train=False, download=True, transform=transform)

# Define the DataLoader for batching, with num_workers for parallel data loading
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)




### **4. Define the Neural Network Model**

```python
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(28*28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 28*28)  # Flatten the input image
        x = F.relu(self.fc1(x))  # First hidden layer with ReLU activation
        x = F.relu(self.fc2(x))  # Second hidden layer with ReLU activation
        x = self.fc3(x)  # Output layer
        return x
```

- **Model Architecture**:
  - A simple feed-forward neural network is defined with three layers:
    - **Input layer**: Takes the 28x28 pixel images (MNIST) and flattens them into a vector of size 784 (`28*28`).
    - **Hidden Layers**: Two fully connected hidden layers (`fc1` and `fc2`) with ReLU activations. The first layer reduces the dimensionality from 784 to 128 neurons, and the second reduces it further to 64 neurons.
    - **Output Layer**: Outputs 10 values, one for each possible digit (0-9).
  
- **Activation Function**: `ReLU` (Rectified Linear Unit) is applied to both hidden layers to introduce non-linearity, helping the network learn more complex patterns.


In [25]:
###############################################
# Define Neural Network Model
###############################################

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(28*28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 28*28)  # Flatten the input image
        x = F.relu(self.fc1(x))  # First hidden layer with ReLU activation
        x = F.relu(self.fc2(x))  # Second hidden layer with ReLU activation
        x = self.fc3(x)  # Output layer
        return x


---

### **5. Parallel Data Loading with Ray**

```python
# Remote function to load data in parallel using Ray
@ray.remote
def load_batch(batch):
    return batch

# Parallelized data loading function
def load_data_in_parallel(data_loader):
    ray_batches = [load_batch.remote(batch) for batch in data_loader]  # Load all batches in parallel
    return ray.get(ray_batches)  # Retrieve the loaded batches
```

- **Ray for Parallel Data Loading**:
  - `@ray.remote`: This decorator allows functions to be run in parallel as Ray tasks. The function `load_batch` is responsible for loading individual batches of data.
  - **Parallel Loading**: The function `load_data_in_parallel` creates a list of parallel Ray tasks (one for each batch). These tasks load batches concurrently, reducing the time it takes to prepare data for the model. After loading, `ray.get()` retrieves the results from the parallel workers.

---



In [26]:
###############################################
# Ray-based Parallel Data Loading
###############################################

# Remote function to load data in parallel using Ray
@ray.remote
def load_batch(batch):
    return batch

# Parallelized data loading function
def load_data_in_parallel(data_loader):
    ray_batches = [load_batch.remote(batch) for batch in data_loader]  # Load all batches in parallel
    return ray.get(ray_batches)  # Retrieve the loaded batches


### **6. Training Function with GPU and Mixed Precision Support**

```python
def train_model(model, optimizer, criterion, train_loader, epochs):
    scaler = torch.cuda.amp.GradScaler()  # Use automatic mixed precision (optional, only if supported by hardware)
    
    for epoch in range(epochs):
        model.train()  # Set the model to training mode
        running_loss = 0.0
        
        # Load data in parallel using Ray
        batches = load_data_in_parallel(train_loader)
        
        for data, target in batches:  # Iterate over parallel-loaded batches
            data, target = data.to(device), target.to(device)  # Move data and target to the GPU if available
            
            optimizer.zero_grad()  # Clear previous gradients
            
            # Mixed precision training (optional)
            with torch.cuda.amp.autocast():
                output = model(data)  # Forward pass through the network
                loss = criterion(output, target)  # Compute loss
            
            scaler.scale(loss).backward()  # Backward pass to compute gradients with scaling for mixed precision
            scaler.step(optimizer)  # Update the weights using the scaled optimizer
            scaler.update()  # Update the scaling factor for mixed precision
            
            running_loss += loss.item()  # Track running loss for the epoch
        
        # Print the loss after each epoch
        print(f'Epoch {epoch+1}, Training Loss: {running_loss / len(train_loader):.4f}')
```

- **Mixed Precision Training**:
  - **`torch.cuda.amp.GradScaler()`**: This utility automatically scales the gradients in mixed-precision training. Mixed precision improves performance by reducing memory usage and speeding up calculations by using FP16 (16-bit floating point) where possible.
  - **`torch.cuda.amp.autocast()`**: Automatically casts variables to lower precision where it makes sense, improving efficiency without sacrificing too much accuracy.

- **Training Loop**:
  - **Forward Pass**: The data is passed through the model to get the predicted outputs.
  - **Loss Calculation**: The loss is computed by comparing the model's predictions with the actual labels.
  - **Backward Pass**: The gradients of the loss are computed, and the model parameters are updated using the optimizer.
  - **GPU Acceleration**: Both the data and the model are moved to the GPU (if available) using `data.to(device)` and `model.to(device)`.

---

In [27]:
###############################################
# Training Function with Ray-Parallelized Data Loading and GPU Support
###############################################

def train_model(model, optimizer, criterion, train_loader, epochs):
    scaler = torch.cuda.amp.GradScaler()  # Use automatic mixed precision (optional, only if supported by hardware)

    for epoch in range(epochs):
        model.train()  # Set the model to training mode
        running_loss = 0.0

        # Load data in parallel using Ray
        batches = load_data_in_parallel(train_loader)

        for data, target in batches:  # Iterate over parallel-loaded batches
            data, target = data.to(device), target.to(device)  # Move data and target to the GPU if available

            optimizer.zero_grad()  # Clear previous gradients

            # Mixed precision training (optional)
            with torch.cuda.amp.autocast():
                output = model(data)  # Forward pass through the network
                loss = criterion(output, target)  # Compute loss

            scaler.scale(loss).backward()  # Backward pass to compute gradients with scaling for mixed precision
            scaler.step(optimizer)  # Update the weights using the scaled optimizer
            scaler.update()  # Update the scaling factor for mixed precision

            running_loss += loss.item()  # Track running loss for the epoch

        # Print the loss after each epoch
        print(f'Epoch {epoch+1}, Training Loss: {running_loss / len(train_loader):.4f}')



### **7. Testing Function**

```python
def test_model(model, test_loader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    
    # No gradient computation during evaluation
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)  # Move data and target to the GPU if available
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    # Calculate and print accuracy
    accuracy = 100 * correct / total
    print(f'Test

 Accuracy: {accuracy:.2f}%')
```

- **Evaluation Mode**: The model is set to evaluation mode (`model.eval()`), disabling features like dropout and batch normalization updates. This ensures the model behaves consistently during testing.
- **No Gradient Calculation**: `torch.no_grad()` prevents the computation of gradients, which saves memory and speeds up inference.
- **Accuracy Calculation**: The model’s predictions are compared to the actual labels, and accuracy is computed.

---


In [28]:

###############################################
# Testing Function to Evaluate Model Performance
###############################################

def test_model(model, test_loader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    # No gradient computation during evaluation
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)  # Move data and target to the GPU if available
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    # Calculate and print accuracy
    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')

### **8. Main Function**

```python
if __name__ == "__main__":
    # Initialize the neural network, loss function, and optimizer
    model = Net().to(device)  # Move the model to GPU if available
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)

    # Record start time for training
    start_time = time.time()

    # Train the model with Ray-parallelized data loading
    train_model(model, optimizer, criterion, train_loader, epochs=5)

    # Record end time and print training time
    end_time = time.time()
    print(f"Training Time with Ray and Optimizations: {end_time - start_time:.2f} seconds")

    # Test the model on the test set
    test_model(model, test_loader)

    # Shutdown Ray
    ray.shutdown()
```

- **Model Training**: The `train_model` function is called to train the model for 5 epochs.
- **Timing**: The total training time is calculated using `time.time()` and printed after the training loop finishes.
- **Model Testing**: After training, the model’s performance is evaluated on the test dataset.
- **Ray Shutdown**: Finally, `ray.shutdown()` is called to terminate Ray once the work is done.

---


In [29]:
###############################################
# Main Function to Train and Test the Model
###############################################

if __name__ == "__main__":
    # Initialize the neural network, loss function, and optimizer
    model = Net().to(device)  # Move the model to GPU if available
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)

    # Record start time for training
    start_time = time.time()

    # Train the model with Ray-parallelized data loading
    train_model(model, optimizer, criterion, train_loader, epochs=5)

    # Record end time and print training time
    end_time = time.time()
    print(f"Training Time with Ray and Optimizations: {end_time - start_time:.2f} seconds")

    # Test the model on the test set
    test_model(model, test_loader)

    # Shutdown Ray
    ray.shutdown()

  scaler = torch.cuda.amp.GradScaler()  # Use automatic mixed precision (optional, only if supported by hardware)
[36m(load_batch pid=14785)[0m   return torch.load(io.BytesIO(b))
  with torch.cuda.amp.autocast():


Epoch 1, Training Loss: 0.8079
Epoch 2, Training Loss: 0.3131
Epoch 3, Training Loss: 0.2586
Epoch 4, Training Loss: 0.2213
Epoch 5, Training Loss: 0.1932
Training Time with Ray and Optimizations: 40.02 seconds
Test Accuracy: 94.65%


[36m(load_batch pid=14778)[0m   return torch.load(io.BytesIO(b))[32m [repeated 11x across cluster][0m
