# Dataloading 01

In this notebook, we'll figure out how to use PyTorch's DataLoader class to load our massive files without reading the entirety of them into memory

In [1]:
import comet_ml
import dask.dataframe as dd
import pandas as pd 
import torch
import linecache 
import csv
import numpy as np
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
import torch.nn.functional as F
import sys, os
from pathlib import Path

here = Path().cwd()

We'll first design a custom dataset to use with PyTorch's `DataLoader` class

In [2]:
class GeneExpressionData(Dataset):
    def __init__(self, filename, labelname):
        self._filename = filename
        self._labelname = labelname
        self._total_data = 0
        
        with open(filename, "r") as f:
            self._total_data = len(f.readlines()) - 1
    
    def __getitem__(self, idx):        
        line = linecache.getline(self._filename, idx + 2)
        label = linecache.getline(self._labelname, idx + 2)
        
        csv_data = csv.reader([line])
        csv_label = csv.reader([label])
        
        data = [x for x in csv_data][0]
        label = [x for x in csv_label][0]
        return torch.from_numpy(np.array([float(x) for x in data])).float(), [int(float(x)) for x in label][0]
    
    def __len__(self):
        return self._total_data
    
    def num_labels(self):
        return pd.read_csv(self._labelname)['# label'].nunique()
    
    def num_features(self):
        return len(self.__getitem__(0)[0])

Since PyTorch loss functions require classes in $[0, C]$, we'll first add $1$ to the labels and re-write it out so we can use it for training

In [3]:
def fix_labels(file):
    labels = pd.read_csv(file)
    labels['# label'] = labels['# label'].astype(int) + 1
    labels.to_csv('fixed_' + file.split('/')[-1], index=False)

fix_labels('../data/processed/labels/primary_labels_neighbors_50_components_100_clust_size_100.csv')

Great, we now continue as normal

In [4]:
t = GeneExpressionData(
    filename=os.path.join(here, '../data/processed/umap/primary_reduction_neighbors_100_components_3.csv'),
    labelname=os.path.join(here, 'fixed_primary_labels_neighbors_50_components_50_clust_size_100.csv')
)
t.num_labels()

16

Let's see how fast it takes to load a minibatch of data

In [5]:
%%time 

for i in range(64):
    t.__getitem__(i)

CPU times: user 20.5 ms, sys: 4.69 ms, total: 25.2 ms
Wall time: 24.8 ms


Before we train our model, we need to split our data into training and testing sets, in order to get an unbiased evaluation of our model's performance. Likely, we will initially overfit the training set since we provide no regularization.

In [6]:
train_size = int(0.8 * len(t))
test_size = len(t) - train_size

train, test = torch.utils.data.random_split(t, [train_size, test_size])

In [7]:
traindata = DataLoader(train, batch_size = 8, num_workers = 0)
valdata = DataLoader(test, batch_size = 8, num_workers = 0)

For clarity, let's define a data generation method that simply returns the train and test split from our gene expression dataset

In [16]:
def data_gen():
    t = GeneExpressionData(
        filename=os.path.join(here, '../data/processed/umap/primary_reduction_neighbors_100_components_3.csv'),
        labelname=os.path.join(here, 'fixed_primary_labels_neighbors_50_components_50_clust_size_100.csv')
    )

    train_size = int(0.8 * len(t))
    test_size = len(t) - train_size

    train, test = torch.utils.data.random_split(t, [train_size, test_size])
    
    return train, test

Now that we've defined our `DataLoader`, let's test it when training a simple Neural Network

## Using PyTorch Lightning

PyTorch lightning seems nicer than Ignite, especially for GPU training. Let's test it out 

In [17]:
from torchmetrics import Accuracy, ConfusionMatrix
from sklearn.utils.class_weight import compute_class_weight

class GeneClassifier(pl.LightningModule):
    def __init__(self, N_features, N_labels, weights, config):
        """
        Initialize the gene classifier neural network

        Parameters:
        N_features: Number of features in the inpute matrix 
        N_labels: Number of classes 
        """
        
        self.train_data, self.test_data = data_gen()
        
        # Params for optimizer 
        self.lr = config['lr']
        self.momentum = config['momentum']
        self.weight_decay = config['weight_decay']
        
        super(GeneClassifier, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(N_features, 512),
            nn.ReLU(),
            nn.Linear(512, N_labels),
        )
        
        self.accuracy = Accuracy()
        self.confusion = ConfusionMatrix(N_labels)
        self.weights = weights

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

    def configure_optimizers(self):
        optimizer = torch.optim.SGD(
            self.parameters(), 
            lr=self.lr, 
            momentum=self.momentum,
            weight_decay=self.weight_decay
            
        )
        return optimizer

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y, weight=self.weights)
        acc = self.accuracy(y_hat.softmax(dim=-1), y)
        matrix = self.confusion(y_hat.softmax(dim=-1), y)
        
        self.log("train_loss", loss, on_step=False, on_epoch=True, logger=True)
        self.log("train_accuracy", acc, on_step=False, on_epoch=True, logger=True)
        self.log("train_confusion_mat", matrix, on_step=False, on_epoch=True, logger=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        val_loss = F.cross_entropy(y_hat, y, weight=self.weights)
        acc = self.accuracy(y_hat.softmax(dim=-1), y)
        matrix = self.confusion(y_hat.softmax(dim=-1), y)
        
        self.log("val_loss", val_loss, on_step=False, on_epoch=True, logger=True)
        self.log("val_accuracy", acc, on_step=False, on_epoch=True, logger=True)
        self.log("val_confusion_mat", matrix, on_step=False, on_epoch=True, logger=True)
        return val_loss
    
    def train_dataloader(self):
        return DataLoader(self.train_data, batch_size=8, num_workers=0)

    def val_dataloader(self):
        return DataLoader(self.test_data, batch_size=8, num_workers=0)


In [18]:
from sklearn.utils.class_weight import compute_class_weight

def class_weights(label_df):
    label_df = pd.read_csv(label_df)
    
    weights = compute_class_weight(
        class_weight='balanced', 
        classes=np.unique(label_df), 
        y=label_df.values.reshape(-1)
    )

    weights = torch.from_numpy(weights)
    return weights.float()

weights = class_weights('fixed_primary_labels_neighbors_50_components_50_clust_size_100.csv')

Now let's set up RayTune

In [19]:
class UploadCallback(pl.callbacks.Callback):
    def __init__(self, path, WIDTH, LAYERS) -> None:
        super().__init__()
        self.path = path 
        self.width = WIDTH
        self.layers = LAYERS

    def on_train_epoch_end(self, trainer, pl_module):
        epoch = trainer.current_epoch
        trainer.save_checkpoint(f'checkpoints/checkpoint-{epoch}-width-{self.width}-layers-{self.layers}.ckpt')
        print(os.listdir('checkpoints'))
        print ('Uploading file...')

uploadcallback = UploadCallback('checkpoints', 10, 10)

Now let's set up RayTune

In [20]:
from ray.tune.integration.pytorch_lightning import TuneReportCallback
import ray.tune as tune 

raytunecallback = TuneReportCallback(
    {
        "loss": "val_loss", 
        "mean_accuracy": "val_accuracy"
    },
    on="validation_end"
)

In [23]:
def train_with_tune(config, max_epochs):
    model = GeneClassifier(t.num_features(), t.num_labels(), weights, config)
    
    trainer = pl.Trainer(
        max_epochs=max_epochs, 
        callbacks=[
            raytunecallback,
            uploadcallback,
        ]
    )
    
    trainer.fit(model)
    
def model_search(num_samples=10, max_epochs=10):
    config = {
        "lr" : tune.loguniform(1e-4, 1e-1),
        "momentum" : tune.loguniform(0.1, 0.8),
        "weight_decay" : tune.uniform(1e-4, 1e-1)
    }

    scheduler = ASHAScheduler(
        max_t=max_epochs,
        grace_period=1,
        reduction_factor=2)

    reporter = CLIReporter(
        parameter_columns = ["lr", "momentum", "weight_decay"],
        metric_columns=["loss", "mean_accuracy", "training_iteration"])

    train_fn_with_parameters = tune.with_parameters(train_with_tune, max_epochs=max_epochs)
                                            
    resources_per_trial = {"cpu": 1}

    analysis = tune.run(train_fn_with_parameters,
        resources_per_trial=resources_per_trial,
        metric="loss",
        mode="min",
        config=config,
        num_samples=num_samples,
        scheduler=scheduler,
        progress_reporter=reporter,
        name="model_search"
    )

    print("Best hyperparameters found were: ", analysis.best_config)

model_search()

== Status ==
Current time: 2021-12-15 15:56:45 (running for 00:00:00.16)
Memory usage on this node: 14.7/16.0 GiB: ***LOW MEMORY*** less than 10% of the memory on this node is available for use. This can cause unexpected crashes. Consider reducing the memory used by your application or reducing the Ray object store size by setting `object_store_memory` when calling `ray.init`.
Using AsyncHyperBand: num_stopped=0
Bracket: Iter 8.000: None | Iter 4.000: None | Iter 2.000: None | Iter 1.000: None
Resources requested: 0/10 CPUs, 0/0 GPUs, 0.0/8.53 GiB heap, 0.0/4.26 GiB objects
Result logdir: /Users/julian/ray_results/model_search
Number of trials: 10/10 (10 PENDING)
+-----------------------------+----------+-------+-------------+------------+----------------+
| Trial name                  | status   | loc   |          lr |   momentum |   weight_decay |
|-----------------------------+----------+-------+-------------+------------+----------------|
| train_with_tune_a8cac_00000 | PENDING  | 

[2m[36m(ImplicitFunc pid=41022)[0m GPU available: False, used: False
[2m[36m(ImplicitFunc pid=41022)[0m TPU available: False, using: 0 TPU cores
[2m[36m(ImplicitFunc pid=41022)[0m IPU available: False, using: 0 IPUs
[2m[36m(ImplicitFunc pid=41022)[0m   rank_zero_deprecation(
[2m[36m(ImplicitFunc pid=41022)[0m 
[2m[36m(ImplicitFunc pid=41022)[0m   | Name              | Type            | Params
[2m[36m(ImplicitFunc pid=41022)[0m ------------------------------------------------------
[2m[36m(ImplicitFunc pid=41022)[0m 0 | flatten           | Flatten         | 0     
[2m[36m(ImplicitFunc pid=41022)[0m 1 | linear_relu_stack | Sequential      | 10.3 K
[2m[36m(ImplicitFunc pid=41022)[0m 2 | accuracy          | Accuracy        | 0     
[2m[36m(ImplicitFunc pid=41022)[0m 3 | confusion         | ConfusionMatrix | 0     
[2m[36m(ImplicitFunc pid=41022)[0m ------------------------------------------------------
[2m[36m(ImplicitFunc pid=41022)[0m 10.3 K    Trai

[2m[36m(ImplicitFunc pid=41017)[0m   rank_zero_warn(
[2m[36m(ImplicitFunc pid=41017)[0m 2021-12-15 15:56:48,815	ERROR function_runner.py:268 -- Runner Thread raised error.
[2m[36m(ImplicitFunc pid=41017)[0m Traceback (most recent call last):
[2m[36m(ImplicitFunc pid=41017)[0m   File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/function_runner.py", line 262, in run
[2m[36m(ImplicitFunc pid=41017)[0m     self._entrypoint()
[2m[36m(ImplicitFunc pid=41017)[0m   File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/function_runner.py", line 330, in entrypoint
[2m[36m(ImplicitFunc pid=41017)[0m     return self._trainable_func(self.config, self._status_reporter,
[2m[36m(ImplicitFunc pid=41017)[0m   File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 451, in _resume_span
[2m[36m(ImplicitFunc pid=41017)[0m     return met

[2m[36m(ImplicitFunc pid=41022)[0m   File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/pytorch_lightning/loops/dataloader/evaluation_loop.py", line 139, in on_run_end
[2m[36m(ImplicitFunc pid=41022)[0m     self._on_evaluation_end()
[2m[36m(ImplicitFunc pid=41022)[0m   File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/pytorch_lightning/loops/dataloader/evaluation_loop.py", line 201, in _on_evaluation_end
[2m[36m(ImplicitFunc pid=41022)[0m     self.trainer.call_hook("on_validation_end", *args, **kwargs)
[2m[36m(ImplicitFunc pid=41022)[0m   File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/pytorch_lightning/trainer/trainer.py", line 1490, in call_hook
[2m[36m(ImplicitFunc pid=41022)[0m     callback_fx(*args, **kwargs)
[2m[36m(ImplicitFunc pid=41022)[0m   File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/pytorch_lightning/trainer/callback_hook

2021-12-15 15:56:48,994	ERROR trial_runner.py:958 -- Trial train_with_tune_a8cac_00007: Error processing event.
Traceback (most recent call last):
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/trial_runner.py", line 924, in _process_trial
    results = self.trial_executor.fetch_result(trial)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 787, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/worker.py", line 1713, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TuneError): [36mray::ImplicitFunc.train_buffered()[39m (pid=41015, ip=127.0.0.1, repr=<ray.

2021-12-15 15:56:49,006	ERROR trial_runner.py:958 -- Trial train_with_tune_a8cac_00006: Error processing event.
Traceback (most recent call last):
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/trial_runner.py", line 924, in _process_trial
    results = self.trial_executor.fetch_result(trial)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 787, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/worker.py", line 1713, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TuneError): [36mray::ImplicitFunc.train_buffered()[39m (pid=41016, ip=127.0.0.1, repr=<ray.

2021-12-15 15:56:49,027	ERROR trial_runner.py:958 -- Trial train_with_tune_a8cac_00004: Error processing event.
Traceback (most recent call last):
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/trial_runner.py", line 924, in _process_trial
    results = self.trial_executor.fetch_result(trial)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 787, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/worker.py", line 1713, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TuneError): [36mray::ImplicitFunc.train_buffered()[39m (pid=41021, ip=127.0.0.1, repr=<ray.

[2m[36m(ImplicitFunc pid=41018)[0m Validation sanity check: 0it [00:00, ?it/s]Validation sanity check:   0%|          | 0/2 [00:00<?, ?it/s]
[2m[36m(ImplicitFunc pid=41016)[0m Validation sanity check: 0it [00:00, ?it/s]Validation sanity check:   0%|          | 0/2 [00:00<?, ?it/s]
[2m[36m(ImplicitFunc pid=41022)[0m Validation sanity check: 0it [00:00, ?it/s]Validation sanity check:   0%|          | 0/2 [00:00<?, ?it/s]
[2m[36m(ImplicitFunc pid=41015)[0m Validation sanity check: 0it [00:00, ?it/s]Validation sanity check:   0%|          | 0/2 [00:00<?, ?it/s]Validation sanity check:  50%|█████     | 1/2 [00:00<00:00,  7.96it/s]
[2m[36m(ImplicitFunc pid=41017)[0m Validation sanity check: 0it [00:00, ?it/s]Validation sanity check:   0%|          | 0/2 [00:00<?, ?it/s]Validation sanity check:  50%|█████     | 1/2 [00:00<00:00,  7.46it/s]
[2m[36m(ImplicitFunc pid=41014)[0m Validation sanity check: 0it [00:00, ?it/s]Validation sanity check:   0%|          | 0/2

2021-12-15 15:56:49,043	ERROR trial_runner.py:958 -- Trial train_with_tune_a8cac_00002: Error processing event.
Traceback (most recent call last):
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/trial_runner.py", line 924, in _process_trial
    results = self.trial_executor.fetch_result(trial)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 787, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/julian/miniconda3/envs/base-data-science/lib/python3.9/site-packages/ray/worker.py", line 1713, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TuneError): [36mray::ImplicitFunc.train_buffered()[39m (pid=41020, ip=127.0.0.1, repr=<ray.



Result for train_with_tune_a8cac_00002:
  date: 2021-12-15_15-56-48
  experiment_id: bbd8789f6724464d95bc856f64c3f828
  hostname: MacBook-Pro.local
  node_ip: 127.0.0.1
  pid: 41020
  timestamp: 1639612608
  trial_id: a8cac_00002
  
Result for train_with_tune_a8cac_00000:
  date: 2021-12-15_15-56-48
  experiment_id: 4ad9589641f84c6589b47615d88292b6
  hostname: MacBook-Pro.local
  node_ip: 127.0.0.1
  pid: 41019
  timestamp: 1639612608
  trial_id: a8cac_00000
  
== Status ==
Current time: 2021-12-15 15:56:49 (running for 00:00:03.43)
Memory usage on this node: 14.5/16.0 GiB: ***LOW MEMORY*** less than 10% of the memory on this node is available for use. This can cause unexpected crashes. Consider reducing the memory used by your application or reducing the Ray object store size by setting `object_store_memory` when calling `ray.init`.
Using AsyncHyperBand: num_stopped=0
Bracket: Iter 8.000: None | Iter 4.000: None | Iter 2.000: None | Iter 1.000: None
Resources requested: 0/10 CPUs, 0/0

TuneError: ('Trials did not complete', [train_with_tune_a8cac_00000, train_with_tune_a8cac_00001, train_with_tune_a8cac_00002, train_with_tune_a8cac_00003, train_with_tune_a8cac_00004, train_with_tune_a8cac_00005, train_with_tune_a8cac_00006, train_with_tune_a8cac_00007, train_with_tune_a8cac_00008, train_with_tune_a8cac_00009])

In [None]:
t[0]

In [None]:
t = torch.from_numpy(a)
l = torch.from_numpy(l)
t.softmax(dim=-1)

In [None]:
t.softmax(dim=-1)