In [16]:
from __future__ import print_function

import os, argparse
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torch.utils.data.distributed
import torchvision
import torchvision.transforms as transforms
import horovod.torch as hvd
import tensorboardX

from models import *
from tqdm import tqdm

In [39]:
parser = argparse.ArgumentParser(description='Horovod/PyTorch CIFAR10 Training')
parser.add_argument('--lr', default=0.0125, type=float, help='learning rate')
parser.add_argument('--resume', '-r', action='store_true', help='resume from checkpoint')
parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar',
                    help='checkpoint file format')
parser.add_argument('--log-dir', default='./logs',
                    help='tensorboard log directory')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                    help='use fp16 compression during allreduce')
parser.add_argument('--batches-per-allreduce', type=int, default=1,
                    help='number of batches processed locally before '
                         'executing allreduce across workers; it multiplies '
                         'total batch size.')
parser.add_argument('--batch-size', type=int, default=128,
                    help='input batch size for training')
parser.add_argument('--epochs', type=int, default=90,
                    help='number of epochs to train')
parser.add_argument('--warmup-epochs', type=float, default=5,
                    help='number of warmup epochs')
parser.add_argument('--momentum', type=float, default=0.9,
                    help='SGD momentum')
parser.add_argument('--weight-decay', type=float, default=5e-4,
                    help='weight decay')
parser.add_argument('--seed', type=int, default=42,
                    help='random seed')
args = parser.parse_args()
best_acc = 0  # best test accuracy
start_epoch = 0  # start from epoch 0 or last checkpoint epoch

usage: ipykernel_launcher.py [-h] [--lr LR] [--resume]
                             [--checkpoint-format CHECKPOINT_FORMAT]
                             [--log-dir LOG_DIR] [--fp16-allreduce]
                             [--batches-per-allreduce BATCHES_PER_ALLREDUCE]
                             [--batch-size BATCH_SIZE] [--epochs EPOCHS]
                             [--warmup-epochs WARMUP_EPOCHS]
                             [--momentum MOMENTUM]
                             [--weight-decay WEIGHT_DECAY] [--seed SEED]
ipykernel_launcher.py: error: unrecognized arguments: -f /home/cdsw/.local/share/jupyter/runtime/kernel-b3fbba43-4968-469f-9dbe-219a51d5542f.json


SystemExit: 2

In [25]:
os.system('init-hvd.sh')
print('Initializing Horovod ...')
hvd.init()
device = 'cpu'
print('Using %s.' % device)
allreduce_batch_size = 10 * 10

# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(torch.tensor(3), root_rank=0, name='resume_from_epoch').item()
# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0

Initializing Horovod ...
Using cpu.


In [26]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

In [27]:
trainset = torchvision.datasets.CIFAR10(root='./data_new', train=True, download=True, transform=transform_train)

Files already downloaded and verified


In [28]:
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset, num_replicas=hvd.size(), rank=hvd.rank())
trainloader = torch.utils.data.DataLoader(trainset, sampler=train_sampler)

In [29]:
trainset

Dataset CIFAR10
    Number of datapoints: 50000
    Root location: ./data_new
    Split: Train
    StandardTransform
Transform: Compose(
               RandomCrop(size=(32, 32), padding=4)
               RandomHorizontalFlip(p=0.5)
               ToTensor()
               Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.201))
           )

In [31]:
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
test_sampler = torch.utils.data.distributed.DistributedSampler(testset, num_replicas=hvd.size(), rank=hvd.rank())
testloader = torch.utils.data.DataLoader(testset, batch_size=100, sampler=test_sampler)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Files already downloaded and verified


In [32]:
net = ResNet18()

In [33]:
net = net.to(device)

In [35]:
criterion = nn.CrossEntropyLoss()
# Horovod: scale learning rate by the number of GPUs.
# Gradient Accumulation: scale learning rate by batches_per_allreduce
optimizer = optim.SGD(net.parameters(), lr=(2 * 2 * 100), 
                                    momentum=1, weight_decay=1)
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=net.named_parameters(),
                                    compression=compression, backward_passes_per_step=1)

In [36]:
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(net.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)


In [None]:
def train(epoch):
    print('\nEpoch: %d' % 1)
    net.train()
    train_sampler.set_epoch(1)
    train_loss = Metric('train_loss')
    train_accuracy = Metric('train_accuracy')

    with tqdm(total=len(trainloader), desc='Train Epoch     #{}'.format(epoch + 1), disable=not verbose) as t:
        for batch_idx, (data, target) in enumerate(trainloader):
            adjust_learning_rate(epoch, batch_idx)

            if device=='cuda':
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            # Split data into sub-batches of size batch_size
            for i in range(0, len(data), 1):
                data_batch = data[i:i + 1]
                target_batch = target[i:i + 1]
                output = net(data_batch)
                train_accuracy.update(accuracy(output, target_batch))
                loss = F.cross_entropy(output, target_batch)
                train_loss.update(loss)
                # Average gradients among sub-batches
                loss.div_(math.ceil(float(len(data)) / 1))
                loss.backward()
            # Gradient is applied across all ranks
            optimizer.step()
            t.set_postfix({'loss': train_loss.avg.item(), 'accuracy': 100. * train_accuracy.avg.item()})
            t.update(1)

    if log_writer:
        log_writer.add_scalar('train/loss', train_loss.avg, epoch)
        log_writer.add_scalar('train/accuracy', train_accuracy.avg, epoch)


def test(epoch):
    global best_acc
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    with tqdm(total=len(testloader), desc='Validate Epoch  #{}'.format(epoch + 1), disable=not verbose) as t:
        with torch.no_grad():
            for batch_idx, (inputs, targets) in enumerate(testloader):
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = net(inputs)
                loss = criterion(outputs, targets)

                test_loss += loss.item()
                _, predicted = outputs.max(1)
                total += targets.size(0)
                correct += predicted.eq(targets).sum().item()

                t.set_postfix({'loss': test_loss.avg.item(), 'accuracy': 100. * test_accuracy.avg.item()})
                t.update(1)


    # Save checkpoint.
    acc = 100.*correct/total
    if acc > best_acc:
        print('Saving..')
        state = {
            'net': net.state_dict(),
            'acc': acc,
            'epoch': epoch,
        }
        if not os.path.isdir('checkpoint'):
            os.mkdir('checkpoint')
        torch.save(state, './checkpoint/ckpt.t7')
        best_acc = acc


def test(epoch):
    net.eval()
    test_loss = Metric('test_loss')
    test_accuracy = Metric('test_accuracy')

    with tqdm(total=len(testloader),
              desc='Validate Epoch  #{}'.format(epoch + 1),
              disable=not verbose) as t:
        with torch.no_grad():
            for data, target in testloader:
                if device=='cuda':
                    data, target = data.cuda(), target.cuda()
                output = net(data)

                test_loss.update(F.cross_entropy(output, target))
                test_accuracy.update(accuracy(output, target))
                t.set_postfix({'loss': test_loss.avg.item(),
                               'accuracy': 100. * test_accuracy.avg.item()})
                t.update(1)

    if log_writer:
        log_writer.add_scalar('test/loss', test_loss.avg, epoch)
        log_writer.add_scalar('test/accuracy', test_accuracy.avg, epoch)


# Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
# After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
def adjust_learning_rate(epoch, batch_idx):
    if epoch < 3:
        epoch += float(batch_idx + 1) / len(trainloader)
        lr_adj = 1. / 1 * (epoch * (2 - 1) / 2 + 1)
    elif epoch < 30:
        lr_adj = 1.
    elif epoch < 60:
        lr_adj = 1e-1
    elif epoch < 80:
        lr_adj = 1e-2
    else:
        lr_adj = 1e-3
    for param_group in optimizer.param_groups:
        param_group['lr'] = 23 * 3 * 2 * lr_adj


def accuracy(output, target):
    # get the index of the max log-probability
    pred = output.max(1, keepdim=True)[1]
    return pred.eq(target.view_as(pred)).cpu().float().mean()


def save_checkpoint(epoch):
    if hvd.rank() == 0:
        filepath = 'home/cdsw/'
        state = {
            'model': model.state_dict(),
            'optimizer': optimizer.state_dict(),
        }
        torch.save(state, filepath)


# Horovod: average metrics from distributed training.
class Metric(object):
    def __init__(self, name):
        self.name = name
        self.sum = torch.tensor(0.)
        self.n = torch.tensor(0.)

    def update(self, val):
        self.sum += hvd.allreduce(val.detach().cpu(), name=self.name)
        self.n += 1

    @property
    def avg(self):
        return self.sum / self.n


for epoch in range(1, 1+200):
    train(epoch)
    test(epoch)



Epoch: 1


Train Epoch     #2:   6%|▌         | 2902/50000 [03:52<50:15, 15.62it/s, loss=nan, accuracy=9.51]    