# Distributed PyTorch training With Azure ML

<b> Objective : To create a multiclass classification model with PyTorch on Azure ML</b>

In [1]:
%%capture

!mkdir ./newsDataset
!wget https://raw.githubusercontent.com/mhjabreel/CharCnn_Keras/master/data/ag_news_csv/train.csv -P ./newsDataset
!wget https://raw.githubusercontent.com/mhjabreel/CharCnn_Keras/master/data/ag_news_csv/test.csv -P ./newsDataset

In [3]:
from azureml.core import Workspace

ws = Workspace.from_config()
dstore = ws.get_default_datastore()

dstore.upload('./newsDataset', '/news-dataset', show_progress = True)

In [None]:
#Registering the dataset
from azureml.core import Dataset

trainDataset =  Dataset.Tabular.from_delimited_files(dstore.path('news-dataset/train.csv'))
testDataset =  Dataset.Tabular.from_delimited_files(dstore.path('news-dataset/test.csv'))

trainDataset.register(workspace=ws, name='news-train-dataset', description='news train dataset')
testDataset.register(workspace=ws, name='news-test-dataset', description='news test dataset')

In [6]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# choose a name for your cluster
cluster_name = 'pytorch-cluster'

try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing compute target.')
except ComputeTargetException:
    print('Creating a new compute target...')
    compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_D11_v2',
                                                           max_nodes=2) #Not enough quota on trial account

    # create the cluster
    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)

    compute_target.wait_for_completion(show_output=True)

# use get_status() to get a detailed status for the current AmlCompute. 
print(compute_target.get_status().serialize())

Found existing compute target.
{'currentNodeCount': 0, 'targetNodeCount': 0, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2022-09-14T13:11:08.250000+00:00', 'errors': None, 'creationTime': '2022-09-14T11:05:19.257863+00:00', 'modifiedTime': '2022-09-14T11:05:22.841283+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 2, 'nodeIdleTimeBeforeScaleDown': 'PT1800S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_D11_V2'}


In [4]:
from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies
import json

with open('./modelTrain/SP.json') as f:
    sp = json.load(f)

env = Environment(name = 'Pytorch-train')
conda_dep = CondaDependencies()

packages = ['torchtext', 'more-itertools', 'pandas']
for package in packages:
    conda_dep.add_pip_package(package)

env.environment_variables = {'clientId': sp['clientId'], 'clientSecret' : sp['clientSecret'], 'tenantId' : sp['tenantId']}
    
# Adds dependencies to PythonSection of myenv
env.python.conda_dependencies = conda_dep

env.register(ws)

Property environment_variables is deprecated. Use RunConfiguration.environment_variables to set runtime variables.


{
    "assetId": "azureml://locations/centralindia/workspaces/a48b0d16-eea3-472f-8972-07820f686339/environments/Pytorch-train/versions/6",
    "databricks": {
        "eggLibraries": [],
        "jarLibraries": [],
        "mavenLibraries": [],
        "pypiLibraries": [],
        "rcranLibraries": []
    },
    "docker": {
        "arguments": [],
        "baseDockerfile": null,
        "baseImage": "mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:20220708.v1",
        "baseImageRegistry": {
            "address": null,
            "password": null,
            "registryIdentity": null,
            "username": null
        },
        "buildContext": null,
        "enabled": false,
        "platform": {
            "architecture": "amd64",
            "os": "Linux"
        },
        "sharedVolumes": true,
        "shmSize": null
    },
    "environmentVariables": {
        "clientId": "3f437a83-9aaa-4632-935f-e703b210ca06",
        "clientSecret": "yQp8Q~LatQDoUSnofy3B216YDrMXlO3N6

In [11]:
from azureml.core import Workspace, ScriptRunConfig, Environment, Experiment
from azureml.core.runconfig import MpiConfiguration

curated_env_name = 'Pytorch-train'
pytorch_env = Environment.get(workspace=ws, name=curated_env_name)
distr_config = MpiConfiguration(process_count_per_node = 2, node_count = 2)

args = ['--num_classes', 4, '--embed_size', 128, '--max_width', 35]

run_config = ScriptRunConfig(
  source_directory= './modelTrain',
  arguments = args,
  script='Pytorch_train.py',
  compute_target=compute_target,
  environment=pytorch_env,
  distributed_job_config=distr_config,
)

In [12]:
run = Experiment(ws, "torch-exp2").submit(run_config)

# PyTorch Script - Model with training and evaluation script

In [13]:
!pip install more-itertools
!pip install torchtext

Collecting more-itertools
  Downloading more_itertools-8.14.0-py3-none-any.whl (52 kB)
[K     |████████████████████████████████| 52 kB 376 kB/s eta 0:00:01
[?25hInstalling collected packages: more-itertools
Successfully installed more-itertools-8.14.0


In [1]:
import os
import argparse
import more_itertools as mit
import time
from azureml.core import Workspace, Dataset
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchtext
from torchtext.data.utils import get_tokenizer, ngrams_iterator
from torchtext.vocab import build_vocab_from_iterator

from azureml.core.authentication import ServicePrincipalAuthentication

sp = ServicePrincipalAuthentication(tenant_id= os.environ['tenantId'], # tenantID
                                    service_principal_id= os.environ['clientId'], # clientId
                                    service_principal_password = os.environ['clientSecret']) # clientSecret

global MAX_WIDTH

SEED= 20
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

ws = Workspace.get(name= 'mlops-san',
                   auth=sp,
                   subscription_id= '0d1442c1-d386-4505-9abe-0bedfd63701e',
                   resource_group= 'mlops-san')

train_dataset = Dataset.get_by_name(ws, name='news-train-dataset')
train_df = train_dataset.to_pandas_dataframe()

test_dataset = Dataset.get_by_name(ws, name='news-test-dataset')
test_df = test_dataset.to_pandas_dataframe()

train_text, train_label = train_df.iloc[:, 2].map(lambda x : x.lower()), train_df.iloc[:, 0].map(lambda x : int(x) - 1)
test_text, test_label = test_df.iloc[:, 2].map(lambda x : x.lower()), test_df.iloc[:, 0].map(lambda x : int(x) - 1)

tokenizer = get_tokenizer("basic_english")

def vocabGenerator(lst):
      for data in lst:
        yield list(ngrams_iterator(tokenizer(data), 2))

vocab = build_vocab_from_iterator(vocabGenerator(train_text.values.tolist()), min_freq = 5, specials = ['<unk>', '<pad>'], max_tokens = 30002, special_first = True)
vocab.set_default_index(vocab['<unk>']) #unknown tokens will have this index

def collator(batch):
    text = [list(mit.padded(vocab(tokenizer(i[0])), vocab['<pad>'], MAX_WIDTH))[:MAX_WIDTH] for i in batch]
    label = [i[1] for i in batch]
    return torch.tensor(text), torch.LongTensor(label)

def dataBatcher(batch, bs = 32):
    return DataLoader(batch, batch_size = bs, shuffle = True, collate_fn = collator)

class FastText(nn.Module):
    def __init__(self, vocab_size, embed_dims, pad_index, n_classes):
        
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dims, pad_index)
        self.fc = nn.Linear(embed_dims, n_classes) 
    
    def forward(self, inputs):
        embedded = self.embed(inputs) #batch_size, seq_len, embed_size
        pooled = F.avg_pool2d(embedded, (embedded.shape[1], 1)).squeeze(1) #returns batch_size, embed_size
        return self.fc(pooled)

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

def accuracy(preds, true):
    _, index = torch.max(preds, dim = 1)
    return (index == true).sum().float() / len(preds)

def train_m(model, iterator, optimizer, l):
    e_loss = 0
    e_acc = 0
    model.train()
    for inputs, labels in iterator:
        optimizer.zero_grad()
        preds = model(inputs)
        acc = accuracy(preds,  labels)
        loss = l(preds.squeeze(1), labels.long())
        loss.backward()
        optimizer.step()
        e_loss += loss.item()
        e_acc += acc.item()
    return e_loss/len(iterator), e_acc/len(iterator)

def evaluate_m(model, iterator, l):
    e_loss = 0
    e_acc = 0
    model.eval()
    with torch.no_grad():
        for inputs, labels in iterator:
            preds = model(inputs)
            loss = l(preds.squeeze(1), labels.long())
            acc = accuracy(preds,  labels)
            e_loss += loss.item()
            e_acc += acc.item()
    return e_loss/len(iterator), e_acc/len(iterator)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--num_epochs', type=int, default=2, help='Number of epochs to train')
    parser.add_argument('--exp_name', type=str, help='Name of MLFlow experiment')
    parser.add_argument('--max_width', type=int, help='Maximum width including padding')
    parser.add_argument('--num_classes', type=int, help='Number of classes')
    parser.add_argument('--embed_size', type=int, default = 128, help='Embed vector size')
    args = parser.parse_args()
    
    #call from main script
    global MAX_WIDTH
    MAX_WIDTH = args.max_width
    trainDataloader = dataBatcher([[i, j] for i,j in zip(train_text.values.tolist(), train_label.values.tolist())])
    testDataloader = dataBatcher([[i, j] for i,j in zip(test_text.values.tolist(), test_label.values.tolist())])
    #end of call from main script
    
    model = FastText(len(vocab), args.embed_size, vocab['<pad>'], args.num_classes)
    optimizer = optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    model = model.to(device)
    criterion = criterion.to(device)
    N_EPOCHS = args.num_epochs

    best_valid_loss = float('inf')

    for epoch in range(N_EPOCHS):

        start_time = time.time()

        train_loss, train_acc = train_m(model, trainDataloader, optimizer, criterion)
        valid_loss, valid_acc = evaluate_m(model, testDataloader, criterion)

        end_time = time.time()

        epoch_mins, epoch_secs = epoch_time(start_time, end_time)

        print(f'Epoch: {epoch+1:02} / {N_EPOCHS} | Epoch Time: {epoch_mins}m {epoch_secs}s')
        print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
        print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

if __name__ == '__main__':
    main()

In [2]:
class FastText(nn.Module):
    def __init__(self, vocab_size, embed_dims, pad_index, n_classes):
        
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dims, pad_index)
        self.fc = nn.Linear(embed_dims, n_classes) 
    
    def forward(self, inputs):
        embedded = self.embed(inputs) #batch_size, seq_len, embed_size
        pooled = F.avg_pool2d(embedded, (embedded.shape[1], 1)).squeeze(1) #returns batch_size, embed_size
        return self.fc(pooled)

model = FastText(len(vocab), 128, vocab['<pad>'], 4)
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
model = model.to(device)
criterion = criterion.to(device)

In [3]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

def accuracy(preds, true):
    _, index = torch.max(preds, dim = 1)
    return (index == true).sum().float() / len(preds)

In [4]:
def train_m(model, iterator, optimizer, l):
    e_loss = 0
    e_acc = 0
    model.train()
    for inputs, labels in iterator:
        optimizer.zero_grad()
        preds = model(inputs)
        acc = accuracy(preds,  labels)
        loss = l(preds.squeeze(1), labels.long())
        loss.backward()
        optimizer.step()
        e_loss += loss.item()
        e_acc += acc.item()
    return e_loss/len(iterator), e_acc/len(iterator)

def evaluate_m(model, iterator, l):
    e_loss = 0
    e_acc = 0
    model.eval()
    with torch.no_grad():
        for inputs, labels in iterator:
            preds = model(inputs)
            loss = l(preds.squeeze(1), labels.long())
            acc = accuracy(preds,  labels)
            e_loss += loss.item()
            e_acc += acc.item()
    return e_loss/len(iterator), e_acc/len(iterator)

In [5]:
N_EPOCHS = 1

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_acc = train_m(model, trainDataloader, optimizer, criterion)
    valid_loss, valid_acc = evaluate_m(model, testDataloader, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f'Epoch: {epoch+1:02} / {N_EPOCHS} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

Epoch: 01 / 1 | Epoch Time: 0m 45s
	Train Loss: 0.518 | Train Acc: 82.58%
	 Val. Loss: 0.336 |  Val. Acc: 88.34%


In [2]:
%%writefile Pytorch_train.py
import os
import argparse
import more_itertools as mit
import time
from azureml.core import Workspace, Dataset
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchtext
from torchtext.data.utils import get_tokenizer, ngrams_iterator
from torchtext.vocab import build_vocab_from_iterator

from azureml.core.authentication import ServicePrincipalAuthentication

sp = ServicePrincipalAuthentication(tenant_id= os.environ['tenantId'], # tenantID
                                    service_principal_id= os.environ['clientId'], # clientId
                                    service_principal_password = os.environ['clientSecret']) # clientSecret


SEED= 20
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

ws = Workspace.get(name= 'mlops-san',
                   auth=sp,
                   subscription_id= '0d1442c1-d386-4505-9abe-0bedfd63701e',
                   resource_group= 'mlops-san')

train_dataset = Dataset.get_by_name(ws, name='news-train-dataset')
train_df = train_dataset.to_pandas_dataframe()

test_dataset = Dataset.get_by_name(ws, name='news-test-dataset')
test_df = test_dataset.to_pandas_dataframe()

train_text, train_label = train_df.iloc[:, 2].map(lambda x : x.lower()), train_df.iloc[:, 0].map(lambda x : int(x) - 1)
test_text, test_label = test_df.iloc[:, 2].map(lambda x : x.lower()), test_df.iloc[:, 0].map(lambda x : int(x) - 1)

tokenizer = get_tokenizer("basic_english")

def vocabGenerator(lst):
      for data in lst:
        yield list(ngrams_iterator(tokenizer(data), 2))

vocab = build_vocab_from_iterator(vocabGenerator(train_text.values.tolist()), min_freq = 5, specials = ['<unk>', '<pad>'], max_tokens = 30002, special_first = True)
vocab.set_default_index(vocab['<unk>']) #unknown tokens will have this index

def collator(batch):
    text = [list(mit.padded(vocab(tokenizer(i[0])), vocab['<pad>'], MAX_WIDTH))[:MAX_WIDTH] for i in batch]
    label = [i[1] for i in batch]
    return torch.tensor(text), torch.LongTensor(label)

def dataBatcher(batch, bs = 32):
    return DataLoader(batch, batch_size = bs, shuffle = True, collate_fn = collator)

class FastText(nn.Module):
    def __init__(self, vocab_size, embed_dims, pad_index, n_classes):
        
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dims, pad_index)
        self.fc = nn.Linear(embed_dims, n_classes) 
    
    def forward(self, inputs):
        embedded = self.embed(inputs) #batch_size, seq_len, embed_size
        pooled = F.avg_pool2d(embedded, (embedded.shape[1], 1)).squeeze(1) #returns batch_size, embed_size
        return self.fc(pooled)

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

def accuracy(preds, true):
    _, index = torch.max(preds, dim = 1)
    return (index == true).sum().float() / len(preds)

def train_m(model, iterator, optimizer, l):
    e_loss = 0
    e_acc = 0
    model.train()
    for inputs, labels in iterator:
        optimizer.zero_grad()
        preds = model(inputs)
        acc = accuracy(preds,  labels)
        loss = l(preds.squeeze(1), labels.long())
        loss.backward()
        optimizer.step()
        e_loss += loss.item()
        e_acc += acc.item()
    return e_loss/len(iterator), e_acc/len(iterator)

def evaluate_m(model, iterator, l):
    e_loss = 0
    e_acc = 0
    model.eval()
    with torch.no_grad():
        for inputs, labels in iterator:
            preds = model(inputs)
            loss = l(preds.squeeze(1), labels.long())
            acc = accuracy(preds,  labels)
            e_loss += loss.item()
            e_acc += acc.item()
    return e_loss/len(iterator), e_acc/len(iterator)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--num_epochs', type=int, default=2, help='Number of epochs to train')
    parser.add_argument('--exp_name', type=str, help='Name of MLFlow experiment')
    parser.add_argument('--max_width', type=str, help='Maximum width including padding')
    parser.add_argument('--num_classes', type=int, help='Number of classes')
    parser.add_argument('--embed_size', type=int, default = 128, help='Embed vector size')
    args = parser.parse_args()
    
    #call from main script
    MAX_WIDTH = args.max_width
    trainDataloader = dataBatcher([[i, j] for i,j in zip(train_text.values.tolist(), train_label.values.tolist())])
    testDataloader = dataBatcher([[i, j] for i,j in zip(test_text.values.tolist(), test_label.values.tolist())])
    #end of call from main script
    
    model = FastText(len(vocab), args.embed_size, vocab['<pad>'], args.num_classes)
    optimizer = optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()
    model = model.to(device)
    criterion = criterion.to(device)
    N_EPOCHS = args.num_epochs

    best_valid_loss = float('inf')

    for epoch in range(N_EPOCHS):

        start_time = time.time()

        train_loss, train_acc = train_m(model, trainDataloader, optimizer, criterion)
        valid_loss, valid_acc = evaluate_m(model, testDataloader, criterion)

        end_time = time.time()

        epoch_mins, epoch_secs = epoch_time(start_time, end_time)

        print(f'Epoch: {epoch+1:02} / {N_EPOCHS} | Epoch Time: {epoch_mins}m {epoch_secs}s')
        print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
        print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

if __name__ == '__main__':
    main()


Writing Pytorch_train.py
