Skip to content

Commit

Permalink
add airflow example and update torch
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Yang authored and Paul Yang committed Aug 8, 2024
1 parent 4d392a3 commit 4d7a4d1
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 18 deletions.
55 changes: 55 additions & 0 deletions examples/airflow-torch-training/airflow_example_torch_train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Import the functions from the training script
from callables import bring_up_cluster_callable, access_data_callable, train_model_callable, down_cluster

import runhouse as rh

default_args = {
'owner': 'paul',
'depends_on_past': False,
'start_date': datetime(2024, 8, 6),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'pytorch_training_pipeline_example',
default_args=default_args,
description='A simple PyTorch training DAG with multiple steps',
schedule=timedelta(days=1),
)


bring_up_cluster_task = PythonOperator(
task_id='bring_up_cluster_task',
python_callable=bring_up_cluster_callable,
dag=dag,
)

access_data_task = PythonOperator(
task_id='access_data_task',
python_callable=access_data_callable,
dag=dag,
)


train_model_task = PythonOperator(
task_id='train_model_task',
python_callable=train_model_callable,
dag=dag,
)


down_cluster_task = PythonOperator(
task_id='down_cluster_task',
python_callable=down_cluster,
dag=dag,
)


bring_up_cluster_task >> access_data_task >> train_model_task >> down_cluster_task
52 changes: 52 additions & 0 deletions examples/airflow-torch-training/callables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import runhouse as rh
import logging

from torch_example_for_airflow import SimpleTrainer, DownloadData

# Configure logging
logger = logging.getLogger(__name__)


def bring_up_cluster_callable(**kwargs):
logger.info("Connecting to remote cluster")
cluster = rh.ondemand_cluster(name="a10g-cluster", instance_type="A10G:1", provider="aws").up_if_not()
#cluster.save() ## Use if you have a Runhouse Den account to save and monitor the resource.

def access_data_callable(**kwargs):
env = rh.env(
name="test_env",
reqs=["torch", "torchvision"]
)

cluster = rh.cluster(name="a10g-cluster").up_if_not()

remote_download = rh.function(DownloadData).to(cluster, env=env)
remote_download()

def train_model_callable(**kwargs):
cluster = rh.cluster(name="a10g-cluster").up_if_not()

env = rh.env(
name="test_env",
reqs=["torch", "torchvision"]
)

remote_torch_example = rh.module(SimpleTrainer).to(cluster, env=env, name="torch-basic-training")

model = remote_torch_example()

batch_size = 64
epochs = 5
learning_rate = 0.01

model.load_train('./data', batch_size)
model.load_test('./data', batch_size)

for epoch in range(epochs):
model.train_model(learning_rate=learning_rate)
model.test_model()
model.save_model(bucket_name="my-simple-torch-model-example", s3_file_path=f'checkpoints/model_epoch_{epoch + 1}.pth')

def down_cluster(**kwargs):
cluster = rh.cluster(name="a10g-cluster")
cluster.teardown()
31 changes: 31 additions & 0 deletions examples/airflow-torch-training/local_testing_of_callables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
## You can easily test both the Airflow flow, and the underlying components and code by calling them from local

## Because the execution has been offloaded to GPU compute on remote, you can call each step from local, or from a notebook
## You can imagine that a DS or MLE might write a pipeline and interactively debug from local.
## Then, only when they are confident all the functions work, do they upload the Airflow pipeline which is minimal

## Airflow is used to schedule, monitor, and retry jobs while providing observability over runs.
## However, the code that is the substance of the program is not packed into the Airflow DAG.

import logging
from callables import bring_up_cluster_callable, access_data_callable, train_model_callable, down_cluster

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

if __name__ == "__main__":
logger.info("Starting the pipeline...")

logger.info("Step 1: Bring up cluster")
bring_up_cluster_callable()

logger.info("Step 2: Access data")
access_data_callable()

logger.info("Step 3: Train model")
train_model_callable()

logger.info("Pipeline completed.")

down_cluster()
logger.info("Cluster sucessfully downed.")
14 changes: 14 additions & 0 deletions examples/airflow-torch-training/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Using Airflow with Runhouse
The principal goal of using Runhouse alongside Airflow or other DAG systems is to restore interactive debuggability and fast iteration to developers. Packaging research code into production pipelines easily takes up half of machine learning engineers' time, and this is even true for sophisticated organizations.

**Use Airflow for what Airflow is good for**
* Scheduling
* Ensuring reliable execution
* Observability of runs

The usage pattern for Runhouse with Airflow should be as follows:
* Write Python classes and functions using normal, ordinary coding best practices. Do not think about DAGs or DSLs at all, just write great code.
* Send the code for remote execution with Runhouse, and figure out whether the code works, debugging it interactively. Runhouse lets you send the code in seconds, and streams logs back. You can work on remote as if it were local.
* Once you are satisfied with your code, you can write the callables for an Airflow PythonOperator which is the minimal code required to operationalize already working Classes and Functions
* And then you can setup your DAG to be **minimal code beyond defining the order of calling steps**
* And you can easily iterate further on your code, or test the pipeline end-to-end from local with no Airflow participation
120 changes: 120 additions & 0 deletions examples/airflow-torch-training/torch_example_for_airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms


class TorchExampleBasic(nn.Module):
def __init__(self):
super(TorchExampleBasic, self).__init__()
self.fc1 = nn.Linear(28*28, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 10)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #
self.to(self.device)


def forward(self, x):
x = x.view(-1, 28*28) # Flatten the input
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x


def DownloadData(path = './data'):
train_dataset = datasets.MNIST(path, train=True, download=True)
test_dataset = datasets.MNIST(path, train=False, download = True)


class SimpleTrainer():
def __init__(self):
super(SimpleTrainer, self).__init__()
self.model = TorchExampleBasic()
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.epoch = 0
self.train_loader = None
self.test_loader = None
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])


def load_train(self, path, batch_size):
data = datasets.MNIST(path, train=True, download=False, transform=self.transform)
self.train_loader = DataLoader(data, batch_size=batch_size, shuffle=True)

def load_test(self, path, batch_size):
data = datasets.MNIST(path, train=False, download=False, transform=self.transform)
self.test_loader = DataLoader(data, batch_size=batch_size, shuffle=True)

def train_model(self, learning_rate=0.001):
self.model.train()

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)


running_loss = 0.0

for batch_idx, (data, target) in enumerate(self.train_loader):
data, target = data.to(self.device), target.to(self.device)
optimizer.zero_grad()
output = self.model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()

running_loss += loss.item()

if batch_idx % 100 == 99: # print every 100 mini-batches
print(f'[{self.epoch + 1}, {batch_idx + 1:5d}] loss: {running_loss / 100:.3f}')
running_loss = 0.0

print('Finished Training')

def test_model(self):

self.model.eval()
test_loss = 0
correct = 0

with torch.no_grad():
for data, target in self.test_loader:
data, target = data.to(self.device), target.to(self.device)
output = self.model(data)
test_loss += F.cross_entropy(output, target, reduction='sum').item()
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(self.test_loader.dataset)
accuracy = 100. * correct / len(self.test_loader.dataset)

print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(self.test_loader.dataset)} ({accuracy:.2f}%)\n')


def predict(self, data):
self.model.eval()
with torch.no_grad():
data = data.to(self.device)
output = self.model(data)
pred = output.argmax(dim=1, keepdim=True)
return pred.item()

def save_model(self, bucket_name, s3_file_path):
try: ## Avoid failing if you're just trying the example and don't have S3 setup
import io
import boto3

buffer = io.BytesIO()
torch.save(self.model.state_dict(), buffer)
buffer.seek(0) # Rewind the buffer to the beginning

s3 = boto3.client('s3')
s3.upload_fileobj(buffer, bucket_name, s3_file_path)
print('uploaded checkpoint')
except:
print('did not upload checkpoint')
2 changes: 1 addition & 1 deletion examples/torch-simple-model/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Deploy and Train a Model with Torch
This example demonstrates how to use the `SimpleTrainer` class to train and test a machine learning model using PyTorch and the . The `SimpleTrainer` class handles model training, evaluation, and prediction tasks and shows you how you can send model classes to train and execute on remote compute.
This example demonstrates how to use the `SimpleTrainer` class to train and test a machine learning model using PyTorch and the MNIST dataset. The `SimpleTrainer` class handles model training, evaluation, and prediction tasks and shows you how you can send model classes to train and execute on remote compute.

## Setup and Installation

Expand Down
49 changes: 32 additions & 17 deletions examples/torch-simple-model/TorchBasicExample-AWS.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@

import runhouse as rh

# Let's define a function that downloads the data. You can imagine this as a generic function to access data.
def DownloadData(path = './data'):
train_dataset = datasets.MNIST(path, train=True, download = True)
test_dataset = datasets.MNIST(path, train=False, download = True)
print('Done with data download')


# Next, we define a model class. We define a very basic feedforward neural network with three fully connected layers.
class TorchExampleBasic(nn.Module):
def __init__(self):
Expand Down Expand Up @@ -71,10 +78,18 @@ def __init__(self):
self.train_loader = None
self.test_loader = None

def load_train(self, data, batch_size):
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])


def load_train(self, path, batch_size):
data = datasets.MNIST(path, train=True, download=False, transform=self.transform)
self.train_loader = DataLoader(data, batch_size=batch_size, shuffle=True)

def load_test(self, data, batch_size):
def load_test(self, path, batch_size):
data = datasets.MNIST(path, train=False, download=False, transform=self.transform)
self.test_loader = DataLoader(data, batch_size=batch_size, shuffle=True)

def train_model(self, learning_rate=0.001):
Expand Down Expand Up @@ -164,23 +179,25 @@ def save_model(self, bucket_name, s3_file_path):

# Define a cluster type - here we launch an on-demand AWS cluster with 1 NVIDIA A10G GPU.
# You can use any cloud you want, or existing compute
cluster = rh.ondemand_cluster(name="rh-a10x", instance_type="A10G:1", provider="aws").up_if_not()
cluster = rh.ondemand_cluster(name="a10g-jason", instance_type="A10G:1", provider="aws").up_if_not()


# Next, we define the environment for our module. This includes the required dependencies that need
# to be installed on the remote machine, as well as any secrets (not needed here) that need to be synced up from local to remote.
env = rh.env(
name="test_env",
secrets=["huggingface"], # As an example
reqs=["torch", "torchvision"]
)

# Finally, we define our module and run it on the remote cluster. We take our normal Python class SimpleTrainer, and wrap it in rh.module()
# We define our module and run it on the remote cluster. We take our normal Python class SimpleTrainer, and wrap it in rh.module()
# We also take our function DownloadData and send it to the remote cluster as well
# Then, we use `.to()` to send it to the remote cluster we just defined.
#
# Note that we also pass the `env` object to the `get_or_to` method, which will ensure that the environment is
# set up on the remote machine before the module is run.
remote_torch_example = rh.module(SimpleTrainer).to(cluster, env=env, name="torch-basic-training")
remote_download = rh.function(DownloadData).to(cluster, env=env)



# ## Calling our remote Trainer
Expand All @@ -189,26 +206,18 @@ def save_model(self, bucket_name, s3_file_path):

# Though we could just as easily run identical code on local if my machine is capable of handling it.
#model = SimpleTrainer() # If instantiating a local example


# We set some settings for the model training
batch_size = 64
epochs = 5
learning_rate = 0.01


# We create the datasets locally, and then send them to the remote model / remote .load_train() method. The "preprocessing" happens remotely.
# We create the datasets remotely, and then send them to the remote model / remote .load_train() method. The "preprocessing" happens remotely.
# They become instance variables of the remote Trainer.
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, transform=transform)
remote_download()

model.load_train(train_dataset, batch_size)
model.load_test(test_dataset, batch_size)
model.load_train('./data', batch_size)
model.load_test('./data', batch_size)

# We can train the model per epoch, use the remote .test() method to assess the accuracy, and save it from remote to a S3 bucket.
# All errors, prints, and logs are sent to me as if I were debugging on my local machine, but all the work is done in the cloud.
Expand All @@ -222,6 +231,12 @@ def save_model(self, bucket_name, s3_file_path):

# Finally, let's just see one prediction, as an example of using the remote model for inference.
# We have in essence done the research, and in one breath, debugged the production pipeline and deployed a microservice.

transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
test_dataset = datasets.MNIST('./data', train=False, transform=transform)
example_data, example_target = test_dataset[0][0].unsqueeze(0), test_dataset[0][1]
prediction = model.predict(example_data)
print(f'Predicted: {prediction}, Actual: {example_target}')
Expand Down

0 comments on commit 4d7a4d1

Please sign in to comment.