# Chapter 5: Building Your First DataPipe

In [None]:
# Torchdata isn't part of Colab's default environment
!pip install torchdata

In [None]:
# Required for PyTorch 2.3
import torch
torch.utils.data.datapipes.utils.common.DILL_AVAILABLE = torch.utils._import_utils.dill_available()

## 5.2 Learning Objectives

By the end of this chapter, you should be able to:
- build and use data pipes
- understand the role of batch normalization in deep learning models
- assess different alternatives for training models using higher-level libraries

## 5.3 A New Dataset

In this chapter, and in the second lab, we'll use a different dataset: [100,000 UK Used Car Dataset](https://www.kaggle.com/datasets/adityadesai13/used-car-dataset-ford-and-mercedes) from Kaggle. It contains scraped data of used car listings split into CSV files according to the manufacturer: Audi, BMW, Ford, Hyundai, Mercedes, Skoda, Toyota, Vauxhall, and VW. It also contains a few extra files of particular models (`cclass.csv`, `focus.csv`, `unclean_cclass.csv`, and `unclean_focus.csv`) that we won't be using.

Each file has nine columns with the car's attributes: model, year, price, transmission, mileage, fuel type, road tax, fuel consumption (mpg), and engine size. Transmission, fuel type, and year are discrete/categorical attributes, the others are continous. Our goal here is to predict the car's price based on its other attributes.

In [None]:
!wget https://github.com/dvgodoy/assets/raw/main/PyTorchInPractice/data/100KUsedCar/car_prices.zip
!unzip car_prices.zip -d car_prices

### 5.3.1 DataPipes

Our goal is to build a datapipe that produces a dictionary with three keys in it: `label` (containing the prices we want to predict), `cont_X` (an array of the continuous attributes), and `cat_X` (an array of sequentially-encoded categorical attributes).

In [36]:
import torchdata.datapipes as dp

datapipe = dp.iter.FileLister('./car_prices')

In [37]:
from torch.utils.data import DataLoader
next(iter(DataLoader(dataset=datapipe, batch_size=16)))

['./car_prices/audi.csv',
 './car_prices/bmw.csv',
 './car_prices/cclass.csv',
 './car_prices/focus.csv',
 './car_prices/ford.csv',
 './car_prices/hyundi.csv',
 './car_prices/merc.csv',
 './car_prices/skoda.csv',
 './car_prices/toyota.csv',
 './car_prices/unclean cclass.csv',
 './car_prices/unclean focus.csv',
 './car_prices/vauxhall.csv',
 './car_prices/vw.csv']

In [38]:
def filter_for_data(filename):
    return ("unclean" not in filename) and ("focus" not in filename) and ("cclass" not in filename) and filename.endswith(".csv")

datapipe = datapipe.filter(filter_fn=filter_for_data)

In [39]:
next(iter(DataLoader(dataset=datapipe, batch_size=16)))

['./car_prices/audi.csv',
 './car_prices/bmw.csv',
 './car_prices/ford.csv',
 './car_prices/hyundi.csv',
 './car_prices/merc.csv',
 './car_prices/skoda.csv',
 './car_prices/toyota.csv',
 './car_prices/vauxhall.csv',
 './car_prices/vw.csv']

**IMPORTANT**: we're chaining operations one after the other and reassigning the result to the original variable, `datapipe`. For this reason, please do not run these cells out of order, or your datapipe may behave in weird and unpredictable manners, or outright raise an exception. If you must, make sure to re-run the code from the top of section "5.3.1 Datapipes".

#### 5.3.1.1 Loading CSV Files

![](https://raw.githubusercontent.com/dvgodoy/assets/main/PyTorchInPractice/images/ch0/data_step1.png)

In [40]:
datapipe = datapipe.open_files(mode='rt')
datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1, return_path=True)

In [41]:
next(iter(DataLoader(dataset=datapipe, batch_size=4)))

[('./car_prices/audi.csv',
  './car_prices/audi.csv',
  './car_prices/audi.csv',
  './car_prices/audi.csv'),
 [(' A1', ' A6', ' A1', ' A4'),
  ('2017', '2016', '2016', '2017'),
  ('12500', '16500', '11000', '16800'),
  ('Manual', 'Automatic', 'Manual', 'Automatic'),
  ('15735', '36203', '29946', '25952'),
  ('Petrol', 'Diesel', 'Petrol', 'Diesel'),
  ('150', '20', '30', '145'),
  ('55.4', '64.2', '55.4', '67.3'),
  ('1.4', '2.0', '1.4', '2.0')]]

In [42]:
import os

def get_manufacturer(content):
    path, data = content
    manuf = os.path.splitext(os.path.basename(path))[0].upper()
    data.extend([manuf])
    return data

datapipe = datapipe.map(get_manufacturer)

In [43]:
next(iter(DataLoader(dataset=datapipe, batch_size=4)))

[(' A1', ' A6', ' A1', ' A4'),
 ('2017', '2016', '2016', '2017'),
 ('12500', '16500', '11000', '16800'),
 ('Manual', 'Automatic', 'Manual', 'Automatic'),
 ('15735', '36203', '29946', '25952'),
 ('Petrol', 'Diesel', 'Petrol', 'Diesel'),
 ('150', '20', '30', '145'),
 ('55.4', '64.2', '55.4', '67.3'),
 ('1.4', '2.0', '1.4', '2.0'),
 ('AUDI', 'AUDI', 'AUDI', 'AUDI')]

#### 5.3.1.2 Encoding Categorical Attributes

![](https://raw.githubusercontent.com/dvgodoy/assets/main/PyTorchInPractice/images/ch0/data_step3.png)

In [44]:
import pandas as pd

colnames = ['model', 'year', 'price', 'transmission', 'mileage', 'fuel_type', 'road_tax', 'mpg', 'engine_size', 'manufacturer']
df = pd.DataFrame(list(datapipe), columns=colnames)
N_ROWS = len(df)

In [47]:
cont_attr = ['year', 'mileage', 'road_tax', 'mpg', 'engine_size']
cat_attr = ['model', 'transmission', 'fuel_type', 'manufacturer']

def gen_encoder_dict(series):
    values = series.unique()
    return dict(zip(values, range(len(values))))

dropdown_encoders = {col: gen_encoder_dict(df[col]) for col in cat_attr}

In [48]:
dropdown_encoders['fuel_type']

{'Petrol': 0, 'Diesel': 1, 'Hybrid': 2, 'Other': 3, 'Electric': 4}

#### 5.3.1.3 Row Output

In [49]:
import numpy as np

def preproc(row):
    colnames = ['model', 'year', 'price', 'transmission', 'mileage', 'fuel_type', 'road_tax', 'mpg', 'engine_size', 'manufacturer']
    
    cat_attr = ['model', 'transmission', 'fuel_type', 'manufacturer']
    cont_attr = ['year', 'mileage', 'road_tax', 'mpg', 'engine_size']
    target = 'price'
    
    vals = dict(zip(colnames, row))
    cont_X = [float(vals[name]) for name in cont_attr]
    cat_X = [dropdown_encoders[name][vals[name]] for name in cat_attr]
            
    return {'label': np.array([float(vals[target])], dtype=np.float32),
            'cont_X': np.array(cont_X, dtype=np.float32), 
            'cat_X': np.array(cat_X, dtype=int)}

In [50]:
datapipe = datapipe.map(preproc)

In [51]:
next(iter(DataLoader(dataset=datapipe, batch_size=4)))

{'label': tensor([[12500.],
         [16500.],
         [11000.],
         [16800.]]),
 'cont_X': tensor([[2.0170e+03, 1.5735e+04, 1.5000e+02, 5.5400e+01, 1.4000e+00],
         [2.0160e+03, 3.6203e+04, 2.0000e+01, 6.4200e+01, 2.0000e+00],
         [2.0160e+03, 2.9946e+04, 3.0000e+01, 5.5400e+01, 1.4000e+00],
         [2.0170e+03, 2.5952e+04, 1.4500e+02, 6.7300e+01, 2.0000e+00]]),
 'cat_X': tensor([[0, 0, 0, 0],
         [1, 1, 1, 0],
         [0, 0, 0, 0],
         [2, 1, 1, 0]])}

#### 5.3.1.4 The Full DataPipe and Splits

![](https://raw.githubusercontent.com/dvgodoy/assets/main/PyTorchInPractice/images/ch0/data_step4.png)

In [52]:
datapipe = dp.iter.FileLister('./car_prices')
datapipe = datapipe.filter(filter_fn=filter_for_data)
datapipe = datapipe.open_files(mode='rt')
datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1, return_path=True)
datapipe = datapipe.map(get_manufacturer)
datapipe = datapipe.map(preproc)

In [53]:
datapipes = {}
datapipes['train'] = datapipe.random_split(total_length=N_ROWS, weights={"train": 0.8, "val": 0.1, "test": 0.1}, seed=11, target='train')
datapipes['val'] = datapipe.random_split(total_length=N_ROWS, weights={"train": 0.8, "val": 0.1, "test": 0.1}, seed=11, target='val')
datapipes['test'] = datapipe.random_split(total_length=N_ROWS, weights={"train": 0.8, "val": 0.1, "test": 0.1}, seed=11, target='test')

datapipes['train'] = datapipes['train'].shuffle(buffer_size=100000)

![](https://raw.githubusercontent.com/dvgodoy/assets/main/PyTorchInPractice/images/ch0/data_step5.png)

In [54]:
dataloaders = {}
dataloaders['train'] = DataLoader(dataset=datapipes['train'], batch_size=128, drop_last=True, shuffle=True)
dataloaders['val'] = DataLoader(dataset=datapipes['val'], batch_size=128)
dataloaders['test'] = DataLoader(dataset=datapipes['test'], batch_size=128)

### 5.3.2 BatchNorm for Continuous Attributes

![](https://raw.githubusercontent.com/dvgodoy/assets/main/PyTorchInPractice/images/ch0/model_step1.png)

In [55]:
import torch.nn as nn

batch = next(iter(dataloaders['train']))
batch['cont_X'].mean(axis=0), batch['cont_X'].std(axis=0, unbiased=False)

(tensor([2.0174e+03, 2.1038e+04, 1.1512e+02, 5.3813e+01, 1.6766e+00]),
 tensor([1.5904e+00, 1.7717e+04, 5.7773e+01, 1.1183e+01, 5.1135e-01]))

In [56]:
bn_layer = nn.BatchNorm1d(num_features=len(cont_attr))

normalized_cont = bn_layer(batch['cont_X'])
normalized_cont.mean(axis=0), normalized_cont.std(axis=0, unbiased=False)

(tensor([ 4.0997e-05,  3.3528e-08,  3.7253e-08, -2.4214e-07, -1.0058e-07],
        grad_fn=<MeanBackward1>),
 tensor([1.0000, 1.0000, 1.0000, 1.0000, 1.0000], grad_fn=<StdBackward0>))

In [57]:
bn_layer.state_dict()

OrderedDict([('weight', tensor([1., 1., 1., 1., 1.])),
             ('bias', tensor([0., 0., 0., 0., 0.])),
             ('running_mean',
              tensor([2.0174e+02, 2.1038e+03, 1.1512e+01, 5.3813e+00, 1.6766e-01])),
             ('running_var',
              tensor([1.1549e+00, 3.1638e+07, 3.3730e+02, 1.3504e+01, 9.2635e-01])),
             ('num_batches_tracked', tensor(1))])

## 5.4 Lab 2: Price Prediction

## 5.5 Tour of High-Level Libraries

So far, we've been implementing everything ourselves, including a lot of boilerplate code such as the training loop and the early stopping. 

However, there are several high-level libraries built on top of PyTorch whose goal is, in general, to remove boilerplate and/or allow users to more easily leverage advanced capabilities such as mixed precision, distributed training, and more. 

Let's take a quick look at the most popular available libraries: HuggingFace Accelerate, Ignite, Catalyst, PyTorch Lightning, fast.ai, and Skorch.

### 5.5.1 HuggingFace Accelerate

[HuggingFace Accelerate](https://huggingface.co/docs/accelerate/index) is a library that allows you to leverage parallelization and distributed training with only a few lines of extra code added to your existing PyTorch workflow.

Here is a short example from its documentation (the plus signs indicated the lines added to the original code):

```python
+ from accelerate import Accelerator
+ accelerator = Accelerator()

+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+     model, optimizer, training_dataloader, scheduler
+ )

  for batch in training_dataloader:
      optimizer.zero_grad()
      inputs, targets = batch
      inputs = inputs.to(device)
      targets = targets.to(device)
      outputs = model(inputs)
      loss = loss_function(outputs, targets)
+     accelerator.backward(loss)
      optimizer.step()
      scheduler.step()
```

For more details, check Accelerate's [migration](https://huggingface.co/docs/accelerate/basic_tutorials/migration) documentation.

### 5.5.2 Ignite

[PyTorch Ignite](https://pytorch-ignite.ai/) is a library focused on three high-level features: an engine and event system, out-of-the-box metrics for evaluation, and built-in handlers to composing pipelines, saving artifacts, and logging. Since it focuses on the training and validation pipelines, it means that your models, datasets, and optimizers remain in pure PyTorch.

Here's a short example from its documentation:

```python
# Setup training engine:
def train_step(engine, batch):
    # Users can do whatever they need on a single iteration
    # Eg. forward/backward pass for any number of models, optimizers, etc
    # ...

trainer = Engine(train_step)

# Setup single model evaluation engine
evaluator = create_supervised_evaluator(model, metrics={"accuracy": Accuracy()})

def validation():
    state = evaluator.run(validation_data_loader)
    # print computed metrics
    print(trainer.state.epoch, state.metrics)

# Run model's validation at the end of each epoch
trainer.add_event_handler(Events.EPOCH_COMPLETED, validation)

# Start the training
trainer.run(training_data_loader, max_epochs=100)
```

For more details, check Ignite's [migration](https://pytorch-ignite.ai/how-to-guides/02-convert-pytorch-to-ignite/) documentation and [code generator](https://code-generator.pytorch-ignite.ai/).

### 5.5.3 Catalyst

[Catalyst](https://catalyst-team.com/) focuses on reproducibility and rapid experimentation. It removes boilerplate code, improves readability, and offers scalability to any hardware without code changes. It is a deep learning framework and its basic building block is the Runner class, which takes care of the training loop.

Here's a short example from its documentation:
```python
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)

# model training
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.AccuracyCallback(input_key="logits", target_key="targets", topk=(1, 3, 5)),
        dl.PrecisionRecallF1SupportCallback(input_key="logits", target_key="targets"),
    ],
    logdir="./logs",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)
```

For more details, check Catalyst's [quick start](https://catalyst-team.github.io/catalyst/getting_started/quickstart.html) documentation.

### 5.5.4 PyTorch Lightning

[PyTorch Lightning](https://www.pytorchlightning.ai/index.html) takes care of the engineering aspects of building and training a model in PyTorch. It is a framework itself, and its basic building block is the Lightning Module class, which acts as a model "recipe" that specifies all training details, and inherits from the typical PyTorch Module class. This means that, if you already have an implemented PyTorch workflow, your code will need to be refactored.

Here is a short example from its documentation:

```python
class LitAutoEncoder(pl.LightningModule):
	def __init__(self):
		super().__init__()
		self.encoder = nn.Sequential(
              nn.Linear(28 * 28, 64),
              nn.ReLU(),
              nn.Linear(64, 3))
		self.decoder = nn.Sequential(
              nn.Linear(3, 64),
              nn.ReLU(),
              nn.Linear(64, 28 * 28))

	def forward(self, x):
		embedding = self.encoder(x)
		return embedding

	def configure_optimizers(self):
		optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
		return optimizer

	def training_step(self, train_batch, batch_idx):
		x, y = train_batch
		x = x.view(x.size(0), -1)
		z = self.encoder(x)    
		x_hat = self.decoder(z)
		loss = F.mse_loss(x_hat, x)
		self.log('train_loss', loss)
		return loss

	def validation_step(self, val_batch, batch_idx):
		x, y = val_batch
		x = x.view(x.size(0), -1)
		z = self.encoder(x)
		x_hat = self.decoder(z)
		loss = F.mse_loss(x_hat, x)
		self.log('val_loss', loss)
```

For more details, check [this](https://github.com/Lightning-AI/lightning#pytorch-lightning-train-and-deploy-pytorch-at-scale) example of refactoring native PyTorch code into PyTorch Lightning.

### 5.5.5 fast.ai

[Fast.ai](https://docs.fast.ai/) is library that provides both high- and low- level components for practitioners to be rapidly productive and for researchers to hack it and configure it. Its high-level components include data loaders and learners, and fast.ai applications follow the same basic steps: creating data loaders, creating a learner, calling its `fit()` method, and making predictions.

Here is a short example from its documentation:

```python
path = untar_data(URLs.PETS)/'images'

def is_cat(x): return x[0].isupper()
dls = ImageDataLoaders.from_name_func(
    path, get_image_files(path), valid_pct=0.2, seed=42,
    label_func=is_cat, item_tfms=Resize(224))

learn = vision_learner(dls, resnet34, metrics=error_rate)
learn.fine_tune(1)

img = PILImage.create('images/cat.jpg')
is_cat,_,probs = learn.predict(img)
print(f"Is this a cat?: {is_cat}.")
print(f"Probability it's a cat: {probs[1].item():.6f}")
```

For more details, check fast.ai's [migration](https://docs.fast.ai/#migrating-from-other-libraries) documentation.

### 5.5.6 Skorch

[Skorch](https://github.com/skorch-dev/skorch) is a Scikit-Learn-compatible wrapper for PyTorch models. Its goal is to make it possible to use PyTorch with Sciki-Learn. It offers classes such as `NeuralNetClassifier` and `NeuralNetRegressor` to wrap your models that can then be used and trained like any other Scikit-Learn model.

Here'a a short example from its documentation:
```python
net = NeuralNetClassifier(
    MyModule,
    max_epochs=10,
    lr=0.1,
    # Shuffle training data on each epoch
    iterator_train__shuffle=True,
)

net.fit(X, y)
y_proba = net.predict_proba(X)
```

For more details, check Skorch's [documentation](https://skorch.readthedocs.io/en/latest/).