In [1]:
%%bash
source activate azureml_py38_PT_TF
pip install deepspeed
pip install mpi4py
pip install ipywidgets



In [2]:
#setup an experiment to log metrics
from azureml.core import Workspace
import mlflow
from mlflow.tracking import MlflowClient

ws = Workspace.from_config()

mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())

mlflow.create_experiment("deepspeed-notebook")
mlflow.set_experiment("deepspeed-notebook")
mlflow_run = mlflow.start_run()

In [3]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import argparse
import deepspeed

#initialize deepspeed to get process rank below
deepspeed.init_distributed()


#load and prepare the dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

if torch.distributed.get_rank() != 0:
    # might be downloading cifar data, let rank 0 download first
    torch.distributed.barrier()

trainset = torchvision.datasets.CIFAR10(root='./data',
                                        train=True,
                                        download=True,
                                        transform=transform)

if torch.distributed.get_rank() == 0:
    # cifar data is downloaded, indicate other ranks can proceed
    torch.distributed.barrier()

trainloader = torch.utils.data.DataLoader(trainset,
                                          batch_size=16,
                                          shuffle=True,
                                          num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data',
                                       train=False,
                                       download=True,
                                       transform=transform)
testloader = torch.utils.data.DataLoader(testset,
                                         batch_size=4,
                                         shuffle=False,
                                         num_workers=2)

[2022-02-11 01:43:11,179] [INFO] [distributed.py:36:init_distributed] Not using the DeepSpeed or torch.distributed launchers, attempting to detect MPI environment...
[2022-02-11 01:43:13,558] [INFO] [distributed.py:83:mpi_discovery] Discovered MPI settings of world_rank=0, local_rank=0, world_size=1, master_addr=10.0.0.5, master_port=29500
[2022-02-11 01:43:13,559] [INFO] [distributed.py:46:init_distributed] Initializing torch distributed with backend: nccl
Files already downloaded and verified
Files already downloaded and verified


In [4]:
ds_config = {
  "train_batch_size": 16,
  "steps_per_print": 2000,
  "optimizer": {
    "type": "Adam",
    "params": {
      "lr": 0.001,
      "betas": [
        0.8,
        0.999
      ],
      "eps": 1e-8,
      "weight_decay": 3e-7
    }
  },
  "scheduler": {
    "type": "WarmupLR",
    "params": {
      "warmup_min_lr": 0,
      "warmup_max_lr": 0.001,
      "warmup_num_steps": 1000
    }
  },
  "gradient_clipping": 1.0,
  "prescale_gradients": False,
  "fp16": {
      "enabled": True,
      "fp16_master_weights_and_grads": False,
      "loss_scale": 0,
      "loss_scale_window": 500,
      "hysteresis": 2,
      "min_loss_scale": 1,
      "initial_scale_power": 15
  },
  "wall_clock_breakdown": False,
  "zero_optimization": {
      "stage": 0,
      "allgather_partitions": True,
      "reduce_scatter": True,
      "allgather_bucket_size": 50000000,
      "reduce_bucket_size": 50000000,
      "overlap_comm": True,
      "contiguous_gradients": True,
      "cpu_offload": False
  }
}

In [5]:
parser = argparse.ArgumentParser(description='CIFAR')

parser.add_argument('--with_cuda',
                    default=True,
                    action='store_true',
                    help='use CPU in case there\'s no GPU support')

parser.add_argument('--use_ema',
                    default=False,
                    action='store_true',
                    help='whether use exponential moving average')

parser.add_argument('-e',
                    '--epochs',
                    default=30,
                    type=int,
                    help='number of total epochs (default: 30)')

parser.add_argument('--local_rank',
                    type=int,
                    default=-1,
                    help='local rank passed from distributed launcher')

parser.add_argument('--log-interval',
                    type=int,
                    default=500,
                    help="output logging information at a given interval")


parser.add_argument('--ep-world-size',
                    default=4,
                    type=int,
                    help='(moe) expert parallel world size')
parser.add_argument('--num-experts',
                    default=8,
                    type=int,
                    help='(moe) number of total experts')

parser.add_argument('--top-k',
                    default=1,
                    type=int,
                    help='(moe) gating top 1 and 2 supported')

parser.add_argument('--min-capacity',
                    default=0,
                    type=int,
                    help='(moe) minimum capacity of an expert regardless of the capacity_factor')

parser.add_argument(
                '--noisy-gate-policy',
                default=None,
                type=str,
                help='(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter')

parser.add_argument(
                '--moe-param-group',
                default=False,
                action='store_true',
                help='(moe) create separate moe param groups, required when using ZeRO w. MoE')

# Include DeepSpeed configuration arguments
parser = deepspeed.add_config_arguments(parser)

args = parser.parse_args("")

In [6]:
mlflow.log_param("ema", args.use_ema)
mlflow.log_param("epochs", args.epochs)
mlflow.log_param("experts", args.num_experts)
mlflow.log_param("noisy gate", args.noisy_gate_policy)


In [7]:
deepspeed.utils.groups.initialize(ep_size=args.ep_world_size)

[2022-02-11 01:44:42,211] [INFO] [logging.py:69:log_dist] [Rank 0] initializing deepspeed groups
[2022-02-11 01:44:42,212] [INFO] [logging.py:69:log_dist] [Rank 0] initializing deepspeed model parallel group with size 1
[2022-02-11 01:44:42,215] [INFO] [logging.py:69:log_dist] [Rank 0] initializing deepspeed expert parallel group with size 4
[2022-02-11 01:44:42,216] [INFO] [logging.py:69:log_dist] [Rank 0] creating expert data parallel process group with ranks: [0]
[2022-02-11 01:44:42,217] [INFO] [logging.py:69:log_dist] [Rank 0] creating expert parallel process group with ranks: [0]


In [8]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 84)
        self.fc3 = deepspeed.moe.layer.MoE(
            hidden_size=84,
            expert=self.fc3,
            num_experts=args.num_experts,
            k=args.top_k,
            min_capacity=args.min_capacity,
            noisy_gate_policy=args.noisy_gate_policy)
        self.fc4 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x, gate_loss, _ = self.fc3(x)
        x = self.fc4(x)
        return x , gate_loss


net = Net()


def create_moe_param_groups(model):
    from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer

    parameters = {'params': model.parameters(), 'name': 'parameters'}

    return split_params_into_different_moe_groups_for_optimizer(parameters)


parameters = filter(lambda p: p.requires_grad, net.parameters())
if args.moe_param_group:
    parameters = create_moe_param_groups(net)

#initialize Deepspeed
model_engine, optimizer, trainloader, __ = deepspeed.initialize(
    args=args, model=net, model_parameters=parameters, training_data=trainset, config=ds_config)

fp16 = model_engine.fp16_enabled()
print(f'fp16={fp16}')

[2022-02-11 01:44:42,466] [INFO] [logging.py:69:log_dist] [Rank 0] num_experts: 8 | num_local_experts: 8 | expert_parallel_size: 1
[2022-02-11 01:44:42,604] [INFO] [logging.py:69:log_dist] [Rank 0] DeepSpeed info: version=0.5.10, git-hash=unknown, git-branch=unknown
[2022-02-11 01:44:42,740] [INFO] [engine.py:275:__init__] DeepSpeed Flops Profiler Enabled: False
Installed CUDA version 11.2 does not match the version torch was compiled with 11.1 but since the APIs are compatible, accepting this combination
Using /home/azureuser/.cache/torch_extensions/py38_cu111 as PyTorch extensions root...
Detected CUDA files, patching ldflags
Emitting ninja build file /home/azureuser/.cache/torch_extensions/py38_cu111/fused_adam/build.ninja...
Building extension module fused_adam...
Allowing ninja to set a default number of workers... (overridable by setting the environment variable MAX_JOBS=N)
Loading extension module fused_adam...
Time to load fused_adam op: 0.4612245559692383 seconds
[2022-02-11 0

In [9]:
torch.cuda.device_count()

4

In [10]:
model_engine.world_size

1

In [11]:
model_engine.num_experts

8

In [12]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()


for epoch in range(args.epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(model_engine.local_rank), data[1].to(
            model_engine.local_rank)
        if fp16:
            inputs = inputs.half()
        outputs, gate_loss = model_engine(inputs)
        loss = criterion(outputs, labels)

        model_engine.backward(loss)
        model_engine.step()

        # print statistics
        running_loss += loss.item()
        if i % args.log_interval == (args.log_interval - 1):  # print every log_interval mini-batches
            print('training loss [%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / args.log_interval))
            mlflow.log_metric("train loss", running_loss / args.log_interval)
            mlflow.log_metric("gate loss", gate_loss.detach().cpu().numpy() / args.log_interval)
            running_loss = 0.0

print('Finished Training')

[2022-02-11 01:45:02,473] [INFO] [fused_optimizer.py:339:_update_scale] 
Grad overflow on iteration 453
[2022-02-11 01:45:02,475] [INFO] [fused_optimizer.py:340:_update_scale] Reducing dynamic loss scale from 32768 to 16384.0
[2022-02-11 01:45:02,477] [INFO] [logging.py:69:log_dist] [Rank 0] Overflow detected. Skipping step. Attempted loss scale: 32768, reducing to 16384.0
training loss [1,   500] loss: 2.286
[2022-02-11 01:45:09,417] [INFO] [fused_optimizer.py:349:_update_scale] No Grad overflow for 500 iterations
[2022-02-11 01:45:09,418] [INFO] [fused_optimizer.py:351:_update_scale] Increasing dynamic loss scale from 16384.0 to 32768.0
training loss [1,  1000] loss: 2.271
[2022-02-11 01:45:16,006] [INFO] [fused_optimizer.py:349:_update_scale] No Grad overflow for 500 iterations
[2022-02-11 01:45:16,008] [INFO] [fused_optimizer.py:351:_update_scale] Increasing dynamic loss scale from 32768.0 to 65536.0
[2022-02-11 01:45:16,019] [INFO] [fused_optimizer.py:339:_update_scale] 
Grad over

In [None]:
#test accuracy
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data
        if fp16:
            images = images.half()
        outputs,gate_loss = net(images.to(model_engine.local_rank))
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels.to(
            model_engine.local_rank)).sum().item()
mlflow.log_metric("test accuracy", 100*correct/total)

print('Accuracy of the network on the 10000 test images: %d %%' %
      (100 * correct / total))

class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))
with torch.no_grad():
    for data in testloader:
        images, labels = data
        if fp16:
            images = images.half()
        outputs,gate_loss = net(images.to(model_engine.local_rank))
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels.to(model_engine.local_rank)).squeeze()
        for i in range(4):
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1

for i in range(10):
    print('Accuracy of %5s : %2d %%' %
          (classes[i], 100 * class_correct[i] / class_total[i]))

In [None]:
mlflow.end_run()