In [None]:
import sagemaker
import torch
import torchvision
import torchvision.transforms as transforms

role = sagemaker.get_execution_role()

### Download and process data, then upload to S3 for training

In [None]:
# Download and prepare data
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Download and transform both training and test sets
trainset = torchvision.datasets.FashionMNIST(
    root='data', train=True, download=True, transform=transform
)

testset = torchvision.datasets.FashionMNIST(
    root='data', train=False, download=True, transform=transform
)

# Save both datasets to disk
torch.save(trainset, 'train_dataset.pt')
torch.save(testset, 'test_dataset.pt')

# Upload to S3
session = sagemaker.Session()
bucket = session.default_bucket()
prefix = 'fashion-mnist'

# Upload training data to S3
train_data_path = session.upload_data(
    path='train_dataset.pt',
    bucket=bucket,
    key_prefix=f'{prefix}/train'
)

# Upload test data to S3
test_data_path = session.upload_data(
    path='test_dataset.pt',
    bucket=bucket,
    key_prefix=f'{prefix}/test'
)

print(f"Training data uploaded to: {train_data_path}")
print(f"Test data uploaded to: {test_data_path}")

### Create training job with data as input

In [None]:
from sagemaker.pytorch import PyTorch

pytorch_estimator = PyTorch(
    entry_point='train.py',
    source_dir='src',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    framework_version='2.0.1',
    py_version='py310',
    keep_alive_period_in_seconds=300, # Keep the instance alive for 300 seconds after the job has finished
)

pytorch_estimator.fit({
    'training': train_data_path,
    'testing': test_data_path
})

### Create training job with hyperparameters and custom metric definitions

In [None]:
# This parses stdout in the training job according to the below regexes
metric_definitions = [
    {'Name': 'train:loss', 'Regex': 'train_loss: (\d+\.\d+)'},
    {'Name': 'test:accuracy', 'Regex': 'test_accuracy: (\d+\.\d+)'},
    {'Name': 'test:loss', 'Regex': 'test_loss: (\d+\.\d+)'},
    {'Name': 'f1_score', 'Regex': 'f1_score: (\d+\.\d+)'}
]

In [None]:
from sagemaker.pytorch import PyTorch

pytorch_estimator = PyTorch(
    entry_point='train_extended.py',
    source_dir='src',
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    framework_version='2.0.1',
    py_version='py310',
    hyperparameters={'epochs': 5, 'batch-size': 64}, # Pass hyper parameters to the job
    metric_definitions=metric_definitions, # Pass the metric_definitions to the job
    keep_alive_period_in_seconds=300, # Keep the instance alive for 300 seconds after the job has finished
)

pytorch_estimator.fit({
    'training': train_data_path,
    'testing': test_data_path
})

### Distributed training

In [None]:
metric_definitions = [
    {'Name': 'train:loss', 'Regex': 'train_loss: (\d+\.\d+)'},
    {'Name': 'test:accuracy', 'Regex': 'test_accuracy: (\d+\.\d+)'},
    {'Name': 'test:loss', 'Regex': 'test_loss: (\d+\.\d+)'},
    {'Name': 'f1_score', 'Regex': 'f1_score: (\d+\.\d+)'}
]

In [None]:
from sagemaker.pytorch import PyTorch

pytorch_estimator = PyTorch(
    entry_point='train_distributed.py',
    source_dir='src',
    role=role,
    instance_count=2,
    instance_type='ml.m5.xlarge',
    framework_version='2.1.0',
    py_version='py310',
    hyperparameters={'epochs': 5, 'batch-size': 64},  # Pass hyper parameters to the job
    metric_definitions=metric_definitions,  # Pass the metric_definitions to the job
    distribution={
        "pytorchddp": {
            "enabled": True
        }
    },
)


pytorch_estimator.fit({
    'training': train_data_path,
    'testing': test_data_path
})