In [1]:
rank = 0
world_size = 3

import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallelCPU as DDP

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = '10.0.1.121'
    os.environ['MASTER_PORT'] = '8890'
    os.environ['GLOO_SOCKET_IFNAME'] = 'ens3'

    # initialize the process group
    dist.init_process_group(backend='gloo', 
                            init_method='env://', rank=rank, world_size=world_size)

    # Explicitly setting seed to make sure that models created in two processes
    # start from same random weights and biases.
    torch.manual_seed(42)


def cleanup():
    dist.destroy_process_group()

setup(rank = rank, world_size = world_size)

import torch 
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms

import torch.nn.functional as F

sequence_length = 28
input_size = 28

# Hyper-parameters
batch_size = 100
num_epochs = 10
learning_rate = 0.1

hid1 = 120
hid2 = 84
out_size = 10

class Cnn(nn.Module):
    def __init__(self):
        super(Cnn, self).__init__()

        self.conv1 = nn.Conv2d(
            in_channels = 1, out_channels = 6,
            kernel_size=5, stride = 1
        )
        
        self.conv2 = nn.Conv2d(
            in_channels = 6, out_channels = 16,
            kernel_size=5, stride = 1
        )
        
        self.pool = nn.AvgPool2d(
            kernel_size = 2, stride = 2
        )
        
        self.flat = nn.Flatten()
        
        self.fc1 = nn.Linear(
            in_features = 256,
            out_features = 120
        )
        
        self.fc2 = nn.Linear(
            in_features = 120,
            out_features = 84
        )
        
        self.fc3 = nn.Linear(
            in_features = 84,
            out_features = 10
        )

    def forward(self, img):
        out = torch.tanh(self.conv1(img))
        out = self.pool(out)
        
        out = torch.tanh(self.conv2(out))
        out = self.pool(out)
        
        out = self.flat(out)
        
        out = torch.tanh(self.fc1(out))
        out = self.flat(out)
        
        out = torch.tanh(self.fc2(out))
        out = self.fc3(out)
        
        return out

# MNIST dataset
train_dataset = torchvision.datasets.MNIST(root='../data/',
                                           train=True, 
                                           transform=transforms.ToTensor(),
                                           download=True)

test_dataset = torchvision.datasets.MNIST(root='../data/',
                                          train=False, 
                                          transform=transforms.ToTensor())

train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset,
        num_replicas = world_size,
        rank = rank
    )

# test_sampler = torch.utils.data.distributed.DistributedSampler(
#         test_dataset,
#         num_replicas = world_size,
#         rank = rank
#     )

# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           sampler = train_sampler)
#                                            shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
#                                           sampler = test_sampler)
                                          shuffle=False)

model = Cnn()

model = DDP(model)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

def train():
    # Train the model
    total_step = len(train_loader)
    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(train_loader):
            labels = labels

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (i+1) % 100 == 0:
                print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                       .format(epoch+1, num_epochs, i+1, total_step, loss.item()))

def test():
    # Test the model
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            labels = labels
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total)) 



In [2]:
def main():
    with torch.autograd.profiler.profile(use_cuda=False) as prof:
        train()
    tbl = prof.key_averages().table(sort_by="self_cpu_time_total")
    print(tbl)
    
    test()
    

if __name__ == '__main__':
    main()

-----------------------------------  ---------------  ---------------  ---------------  ---------------  ---------------  ---------------  
Name                                 Self CPU total %  Self CPU total   CPU total %      CPU total        CPU time avg     Number of Calls  
-----------------------------------  ---------------  ---------------  ---------------  ---------------  ---------------  ---------------  
mkldnn_convolution_backward          40.99%           831.094ms        40.99%           831.094ms        415.547ms        2                
mkldnn_convolution                   23.44%           475.346ms        23.44%           475.346ms        237.673ms        2                
avg_pool2d_backward                  10.46%           212.096ms        10.46%           212.096ms        106.048ms        2                
avg_pool2d                           8.05%            163.123ms        8.05%            163.123ms        81.561ms         2                
tanh               