In [1]:
"""
Part 0. Log in to Galileo!
"""

import dataquality

dataquality.login()

🔭 Logging you into Galileo

🔐 How would you like to login? 
Enter one of the following: email
email
🚀 You're logged in to Galileo as ben@rungalileo.io!


In [2]:
dataquality.config

Config(api_url='http://localhost:8000', minio_url='127.0.0.1:9000', minio_access_key='minioadmin', minio_secret_key='minioadmin', auth_method=<AuthMethod.email: 'email'>, token='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJiZW5AcnVuZ2FsaWxlby5pbyIsImV4cCI6MTYzMzcyMTIyMH0.oY37cpqnQ4igpCgtyWIKDYqkhWYhZ-AF3JK0IfedW_w', current_user='ben@rungalileo.io', current_project_id=UUID('6b35e739-899e-4cc2-8e46-83ed58ca7b21'), current_run_id=UUID('8538646f-bb04-4dd3-8bdf-0efa92259800'))

In [3]:
"""
Part 0.1 Create your first project!
"""

dataquality.init()

✨ Initializing project dirty_amber_ant
🏃‍♂️ Starting run still_black_puma
🛰 Created project, dirty_amber_ant, and new run, still_black_puma.


In [4]:
dataquality.config

Config(api_url='http://localhost:8000', minio_url='127.0.0.1:9000', minio_access_key='minioadmin', minio_secret_key='minioadmin', auth_method=<AuthMethod.email: 'email'>, token='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJiZW5AcnVuZ2FsaWxlby5pbyIsImV4cCI6MTYzMzcyMTIyMH0.oY37cpqnQ4igpCgtyWIKDYqkhWYhZ-AF3JK0IfedW_w', current_user='ben@rungalileo.io', current_project_id='4501cd41-24c1-4eff-bdf4-685345e34213', current_run_id='f2d1e5fb-a4ad-4cad-9667-25f9a9013b5b')

In [5]:
"""
Part 0.2 Install some dependencies for this workflow exercise.
"""

%pip install -q torch sklearn transformers pandas numpy pytorch_lightning torchmetrics

Note: you may need to restart the kernel to use updated packages.


In [5]:
"""
Part 1.

Log your datasets with Galileo.

Create the Newsgroup dataset class. Using huggingface Bert Tokenizer.

We are introducing some noise to these datasets because 
the newsgroup dataset is already well labeled.
"""

import torch
from sklearn.datasets import fetch_20newsgroups
from transformers import DistilBertTokenizerFast
import pandas as pd
import numpy as np


def introduce_label_errors(df: pd.DataFrame, column: str, shuffle_percent: int) -> pd.DataFrame:
    arr = df[column].values
    shuffle = np.random.choice(
        np.arange(arr.shape[0]), 
        round(arr.shape[0] * shuffle_percent / 100), 
        replace=False)
    arr[np.sort(shuffle)] = arr[shuffle]
    df[column] = arr
    return df
    

class NewsgroupDataset(torch.utils.data.Dataset):
    def __init__(self, split: str) -> None:
        newsgroups = fetch_20newsgroups(subset="train" if split == "training" else "test", 
                                        remove=('headers', 'footers', 'quotes'))

        self.dataset = pd.DataFrame()
        self.dataset["text"] = newsgroups.data
        self.dataset["label"] = newsgroups.target
        self.dataset = self.dataset[:23]

        # Shuffle some percentage of the training dataset 
        # to force create mislabeled samples
        if split == "training":
            self.dataset = introduce_label_errors(self.dataset, "label", 11)

        #
        # 🔭 Logging Inputs with Galileo!
        #
#         for i in range(len(self.dataset)):
#             dataquality.log_input_data({
#                 "id": i,
#                 "text": self.dataset["text"][i],
#                 "gold": str(self.dataset["label"][i]),
#                 "split": split})

        tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
        self.encodings = tokenizer(self.dataset["text"].tolist(), truncation=True, padding=True)
    
    def __getitem__(self, idx):
        x = torch.tensor(self.encodings["input_ids"][idx])
        attention_mask = torch.tensor(self.encodings["attention_mask"][idx])
        y = self.dataset["label"][idx]
        return idx, x, attention_mask, y

    def __len__(self):
        return len(self.dataset)

In [12]:
"""
Part 2.

Log model outputs with Galileo.

We are using a DistilBERT pytorch lightning class for text classification.
"""

import pytorch_lightning as pl
from transformers import DistilBertForSequenceClassification, AdamW, DistilBertConfig, AutoModel
import torch.nn.functional as F
import torchmetrics


class LightningDistilBERT(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', config=DistilBertConfig(num_labels=20))
        self.feature_extractor = AutoModel.from_pretrained('distilbert-base-uncased')
        self.train_acc = torchmetrics.Accuracy()
        self.val_acc = torchmetrics.Accuracy()
        self.test_acc = torchmetrics.Accuracy()

    def forward(self, x, attention_mask, x_idxs, epoch, split):
        out = self.model(x, attention_mask=attention_mask)
        log_probs = F.log_softmax(out.logits, dim=1)
        probs = F.softmax(out.logits, dim=1)
        encoded_layers = self.feature_extractor(x, return_dict=False)[0]
#         print(f'starting forward pass in split {split} (no logging) \n\n')
#         if x_idxs is not None:
#             for i in range(len(x_idxs)):
#                 index = int(x_idxs[i])
#                 prob = probs[i].detach().cpu().numpy().tolist()
#                 emb = encoded_layers[i, 0].detach().cpu().numpy().tolist()
#                 #
#                 # 🔭 Logging outputs with Galileo!
#                 #
#                 dataquality.log_model_output({
#                     "id": int(x_idxs[i]),
#                     "epoch": epoch,
#                     "split": split,
#                     "emb": emb,
#                     "prob": prob,
#                     "pred": str(int(np.argmax(prob)))})
#         print(f'ending forward pass in split {split} \n\n')
        return log_probs

    def training_step(self, batch, batch_idx):
        """Model training step."""
        x_idxs, x, attention_mask, y = batch
        log_probs = self(x=x, attention_mask=attention_mask, x_idxs=x_idxs, epoch=self.current_epoch, split="training")
        loss = F.nll_loss(log_probs, y)
        self.train_acc(torch.argmax(log_probs, 1), y)
        self.log("train_acc", self.train_acc, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        """Model validation step."""
        x_idxs, x, attention_mask, y = batch
        log_probs = self(x=x, attention_mask=attention_mask, x_idxs=x_idxs, epoch=self.current_epoch, split="validation")
        loss = F.nll_loss(log_probs, y)
        self.val_acc(torch.argmax(log_probs, 1), y)
        self.log("val_acc", self.val_acc, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx): 
        """Model test step."""
        x_idxs, x, attention_mask, y = batch
        log_probs = self(x=x, attention_mask=attention_mask, x_idxs=x_idxs, epoch=self.current_epoch, split="test")
        loss = F.nll_loss(log_probs, y)
        self.test_acc(torch.argmax(log_probs, 1), y)
        self.log("test_acc", self.test_acc, prog_bar=True)
        return loss

    def configure_optimizers(self):
        """Model optimizers."""
        return torch.optim.AdamW(filter(lambda p: p.requires_grad, self.parameters()), lr=1e-5)

In [21]:
train_dataloader.dataset

<__main__.NewsgroupDataset at 0x152393d60>

In [9]:
from pytorch_lightning.callbacks import Callback
import pytorch_lightning as pl
from pytorch_lightning.callbacks import Callback
from torch.utils.data.dataloader import DataLoader
from dataquality import config
import dataquality
from typing import Optional, Sequence, Union, Any, Dict
from pytorch_lightning.utilities.types import STEP_OUTPUT
import torch.nn.functional as F

class DataQualityCallback(Callback):
    
    def __init__(self, dataloader_config: Dict[str,str] = None):
        self.checkpoint_data = {'epoch_start': False, 'epoch':0}
        
    def on_load_checkpoint(self, trainer, pl_module, callback_state):
        self.checkpoint_data = callback_state

    def on_save_checkpoint(self, trainer, pl_module, checkpoint):
        return self.checkpoint_data.copy()

    def _log_dataquality_input_data(self, split: str, dataloader: Union[DataLoader, Sequence[DataLoader]]):
        #
        # 🔭 Logging Inputs with Galileo!
        #
        print(f'Logging data input for split {split} of epoch {self.epoch}')
        loaders = dataloader if isinstance(dataloader, Sequence) else [dataloader]
        for loader in loaders:
            dataset = loader.dataset if split in ('test', 'validation') else loader.dataset.datasets
            assert hasattr(dataset, 'dataset') or hasattr(dataset, 'data'), "Your Dataloader's Dataset must have a 'data' or 'dataset' attribute with your data!"
            data = dataset.dataset if hasattr(dataset, 'dataset') else dataset.data
            for i in range(len(data)):
                dataquality.log_input_data({
                    "id": i,
                    "text": data['text'][i],
                    "gold": str(data['label'][i]),
                    "split": split})
    
    def _log_model_outputs(self, trainer: pl.Trainer, batch: Any, split: str) -> None:
        x_idxs, x, attention_mask, y = batch
        out = trainer.model.model(x, attention_mask=attention_mask)
        probs = F.softmax(out.logits, dim=1)
        encoded_layers = trainer.model.feature_extractor(x, return_dict=False)[0]
        print(f'Logging model outputs for split {split} epoch {self.epoch}')
        if x_idxs is not None:
            for i in range(len(x_idxs)):
                index = int(x_idxs[i])
                prob = probs[i].detach().cpu().numpy().tolist()
                emb = encoded_layers[i, 0].detach().cpu().numpy().tolist()
                #
                # 🔭 Logging outputs with Galileo!
                #
                dataquality.log_model_output({
                    "id": int(x_idxs[i]),
                    "epoch": self.epoch,
                    "split": split,
                    "emb": emb,
                    "prob": prob,
                    "pred": str(int(np.argmax(prob)))})


    def on_train_start(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
        assert config.current_project_id, "You must initialize dataquality before invoking a callback!"
        assert config.current_run_id, "You must initialize dataquality before invoking a callback!"
        print('Starting train!')
        self._log_dataquality_input_data('training', trainer.train_dataloader)

    def on_test_start(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
        assert config.current_project_id, "You must initialize dataquality before invoking a callback!"
        assert config.current_run_id, "You must initialize dataquality before invoking a callback!"
        print('Starting test!')
        self._log_dataquality_input_data('test', trainer.test_dataloaders)

    def on_validation_start(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
        assert config.current_project_id, "You must initialize dataquality before invoking a callback!"
        assert config.current_run_id, "You must initialize dataquality before invoking a callback!"
        print('Starting validation!')
        self._log_dataquality_input_data('validation', trainer.val_dataloaders)

    def on_epoch_start(*args, **kwargs):
        self.checkpoint_data['epoch_start'] = True
    
    def on_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
        print('Ending epoch')
        if self.checkpoint_data['epoch_start']:
            self.checkpoint_data['epoch'] += 1
        self.checkpoint_data['epoch_start'] = False
    
    def on_train_batch_end(
        self,
        trainer: "pl.Trainer",
        pl_module: "pl.LightningModule",
        outputs: STEP_OUTPUT,
        batch: Any,
        batch_idx: int,
        dataloader_idx: int,
    ) -> None:
        self._log_model_outputs(trainer, batch, 'training')
        
    def on_validation_batch_end(
        self,
        trainer: "pl.Trainer",
        pl_module: "pl.LightningModule",
        outputs: Optional[STEP_OUTPUT],
        batch: Any,
        batch_idx: int,
        dataloader_idx: int,
    ) -> None:
        self._log_model_outputs(trainer, batch, 'validation')
        
    def on_test_batch_end(
        self,
        trainer: "pl.Trainer",
        pl_module: "pl.LightningModule",
        outputs: Optional[STEP_OUTPUT],
        batch: Any,
        batch_idx: int,
        dataloader_idx: int,
    ) -> None:
        self._log_model_outputs(trainer, batch, 'test')
        
    def on_test_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -> None:
        print('done!')
        dataquality.finish()  
#     def teardown(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule", stage: Optional[str] = None) -> None:
#         print('done!')
#         dataquality.finish()


In [16]:
class SimpleCallback(Callback):
    def __init__(self):
        self.checkpoint_data = {'epoch_start': False, 'epoch':0}
    
    def on_load_checkpoint(self, trainer, pl_module, callback_state):
        self.checkpoint_data = callback_state

    def on_save_checkpoint(self, trainer, pl_module, checkpoint):
        return self.checkpoint_data.copy()
    
    def on_epoch_start(self, *args, **kwargs):
        self.checkpoint_data['epoch_start'] = True
        
    def on_epoch_end(self, *args, **kwargs):
        print('double print')
        if self.checkpoint_data['epoch_start']:
            print('ending epoch count properly')
        self.checkpoint_data['epoch_start'] = False

In [17]:
"""
Part 3.

Instantiate a model and train it with PyTorch Lightning.
"""

model = LightningDistilBERT()

train_dataloader = torch.utils.data.DataLoader(NewsgroupDataset("training"), batch_size=8, shuffle=True)
validation_dataloader = torch.utils.data.DataLoader(NewsgroupDataset("validation"), batch_size=8, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(NewsgroupDataset("test"), batch_size=8, shuffle=True)

# trainer = pl.Trainer(max_epochs=2, num_sanity_val_steps=0, callbacks=[(DataQualityCallback())])
trainer = pl.Trainer(max_epochs=2, num_sanity_val_steps=0, callbacks=[(SimpleCallback())])

trainer.fit(model, train_dataloader, validation_dataloader)
trainer.test(model, test_dataloader)

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertForSequenceClassification: ['vocab_transform.bias', 'vocab_transform.weight', 'vocab_projector.bias', 'vocab_layer_norm.bias', 'vocab_layer_norm.weight', 'vocab_projector.weight']
- This IS expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier

Training: -1it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

double print
ending epoch count properly
double print


Validating: 0it [00:00, ?it/s]

double print
ending epoch count properly
double print


Testing: 0it [00:00, ?it/s]

double print
ending epoch count properly
--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_acc': 0.0}
--------------------------------------------------------------------------------


[{'test_acc': 0.0}]

In [54]:
from torchnlp.datasets import wmt_dataset

import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt


training_data = wmt_dataset(
    train=True,
)

test_data = wmt_dataset(
    train=False,
)

from torch.utils.data import DataLoader

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
# test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)

In [28]:
custom_train_dataloader = torch.utils.data.DataLoader(NewsgroupDataset("training"), batch_size=8, shuffle=True)


In [32]:
custom_train_dataloader.dataset.__dict__

{'dataset':                                                  text  label
 0   I was wondering if anyone out there could enli...     10
 1   A fair number of brave souls who upgraded thei...      4
 2   well folks, my mac plus finally gave up the gh...      4
 3   \nDo you have Weitek's address/phone number?  ...      7
 4   From article <C5owCB.n3p@world.std.com>, by to...     14
 5   \n\n\n\n\nOf course.  The term must be rigidly...     16
 6   There were a few people who responded to my re...     13
 7                                                 ...      3
 8   I have win 3.0 and downloaded several icons an...      2
 9   \n\n\nI've had the board for over a year, and ...      4
 10  I have a line on a Ducati 900GTS 1978 model wi...      8
 11  \nYep, that's pretty much it. I'm not a Jew bu...     19
 12                                               --\n      4
 13  \n   {Description of "External Tank" option fo...     14
 14  Reduced Prices! \nI have a list of things fors...     

In [33]:
train_dataloader.dataset.__dict__

{'root': 'data',
 'transform': ToTensor(),
 'target_transform': None,
 'transforms': StandardTransform
 Transform: ToTensor(),
 'train': True,
 'data': tensor([[[0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          ...,
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0]],
 
         [[0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          ...,
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0]],
 
         [[0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          ...,
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0]],
 
         ...,
 
         [[0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          [0, 0, 0,  ..., 0, 0, 0],
          ...,
        

In [22]:
dataquality.config.current_run_id

'cde89c9f-440d-48ff-abc2-527f8d305aba'

In [23]:
dataquality.finish()

☁️ Uploading Data
🧹 Cleaning up


In [24]:
dataquality.config.current_run_id

'cde89c9f-440d-48ff-abc2-527f8d305aba'

In [25]:
dataquality.init()

✨ Initializing project dear_tan_wildcat
🏃‍♂️ Starting run increasing_silver_crayfish
🛰 Created project, dear_tan_wildcat, and new run, increasing_silver_crayfish.


In [26]:
dataquality.init()

✨ Initializing project injured_aquamarine_earthworm
🏃‍♂️ Starting run dangerous_lavender_galliform
🛰 Created project, injured_aquamarine_earthworm, and new run, dangerous_lavender_galliform.
