In [1]:
import os
os.environ['GALILEO_AUTH_METHOD'] = 'email'

In [2]:
"""
Part 0. Log in to Galileo!
"""

import dataquality

dataquality.login()

🔭 Logging you into Galileo

👀 Found auth method email set via env, skipping prompt.
🚀 You're logged in to Galileo as ben@rungalileo.io!


In [3]:
dataquality.config

Config(api_url='http://localhost:8000', minio_url='127.0.0.1:9000', minio_access_key='minioadmin', minio_secret_key='minioadmin', auth_method=<AuthMethod.email: 'email'>, token='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJiZW5AcnVuZ2FsaWxlby5pbyIsImV4cCI6MTYzNDc2MjA1M30.3hQAXn2db8p1OBvjuteCpyRA2fDHO671tzjXBEzyfqY', current_user='ben@rungalileo.io', current_project_id=UUID('dd4a071f-9e7b-4476-a39e-d229d9398880'), current_run_id=UUID('fda2133f-aec5-4707-8261-fb87b2275195'), labels=None)

In [4]:
"""
Part 0.1 Create your first project!
"""

dataquality.init()

✨ Initializing project square_olive_beaver
🏃‍♂️ Starting run labour_olive_ptarmigan
🛰 Created project, square_olive_beaver, and new run, labour_olive_ptarmigan.


In [5]:
dataquality.config

Config(api_url='http://localhost:8000', minio_url='127.0.0.1:9000', minio_access_key='minioadmin', minio_secret_key='minioadmin', auth_method=<AuthMethod.email: 'email'>, token='eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJiZW5AcnVuZ2FsaWxlby5pbyIsImV4cCI6MTYzNDc2MjA1M30.3hQAXn2db8p1OBvjuteCpyRA2fDHO671tzjXBEzyfqY', current_user='ben@rungalileo.io', current_project_id='dd4a071f-9e7b-4476-a39e-d229d9398880', current_run_id='fda2133f-aec5-4707-8261-fb87b2275195', labels=None)

In [6]:
"""
Part 0.2 Install some dependencies for this workflow exercise.
"""

%pip install -q torch sklearn transformers pandas numpy pytorch_lightning torchmetrics

Note: you may need to restart the kernel to use updated packages.


In [7]:
"""
Part 1.

Log your datasets with Galileo.

Create the Newsgroup dataset class. Using huggingface Bert Tokenizer.

We are introducing some noise to these datasets because 
the newsgroup dataset is already well labeled.
"""

import torch
from sklearn.datasets import fetch_20newsgroups
from transformers import DistilBertTokenizerFast
import pandas as pd
import numpy as np

# Use the GalileoModelConfig and GalileoDataConfig to keep track of Galileo metrics for logging
from dataquality.core.integrations.config import GalileoModelConfig, GalileoDataConfig


def introduce_label_errors(df: pd.DataFrame, column: str, shuffle_percent: int) -> pd.DataFrame:
    arr = df[column].values
    shuffle = np.random.choice(
        np.arange(arr.shape[0]), 
        round(arr.shape[0] * shuffle_percent / 100), 
        replace=False)
    arr[np.sort(shuffle)] = arr[shuffle]
    df[column] = arr
    return df
    

class NewsgroupDataset(torch.utils.data.Dataset):
    def __init__(self, split: str) -> None:
        newsgroups = fetch_20newsgroups(subset="train" if split == "training" else "test", 
                                        remove=('headers', 'footers', 'quotes'))

        self.dataset = pd.DataFrame()
        self.dataset["text"] = newsgroups.data
        self.dataset["label"] = newsgroups.target
        self.dataset = self.dataset[:23]
        
        # Shuffle some percentage of the training dataset 
        # to force create mislabeled samples
        if split == "training":
            self.dataset = introduce_label_errors(self.dataset, "label", 11)

        #
        # 🔭 Logging Inputs with Galileo!
        #
        self.gconfig = GalileoDataConfig(text=self.dataset['text'], labels=self.dataset['label'])

#         for i in range(len(self.dataset)):
#             dataquality.log_input_data({
#                 "id": i,
#                 "text": self.dataset["text"][i],
#                 "gold": str(self.dataset["label"][i]),
#                 "split": split})

        tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
        self.encodings = tokenizer(self.dataset["text"].tolist(), truncation=True, padding=True)
    
    def __getitem__(self, idx):
        x = torch.tensor(self.encodings["input_ids"][idx])
        attention_mask = torch.tensor(self.encodings["attention_mask"][idx])
        y = self.dataset["label"][idx]
        return idx, x, attention_mask, y

    def __len__(self):
        return len(self.dataset)

In [8]:
"""
Part 2.

Log model outputs with Galileo.

We are using a DistilBERT pytorch lightning class for text classification.
"""

import pytorch_lightning as pl
from transformers import DistilBertForSequenceClassification, AdamW, DistilBertConfig, AutoModel
import torch.nn.functional as F
import torchmetrics


class LightningDistilBERT(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', config=DistilBertConfig(num_labels=20))
        self.feature_extractor = AutoModel.from_pretrained('distilbert-base-uncased')
        self.train_acc = torchmetrics.Accuracy()
        self.val_acc = torchmetrics.Accuracy()
        self.test_acc = torchmetrics.Accuracy()

    def forward(self, x, attention_mask, x_idxs, epoch, split):
        out = self.model(x, attention_mask=attention_mask)
        log_probs = F.log_softmax(out.logits, dim=1)
        probs = F.softmax(out.logits, dim=1)
        encoded_layers = self.feature_extractor(x, return_dict=False)[0]
        
        # Logging with Galileo!
        self.g_model_config = GalileoModelConfig(emb=[i[0] for i in encoded_layers.tolist()], probs=probs.tolist(), ids=x_idxs.tolist())
        
#         if x_idxs is not None:
#             for i in range(len(x_idxs)):
#                 index = int(x_idxs[i])
#                 prob = probs[i].detach().cpu().numpy().tolist()
#                 emb = encoded_layers[i, 0].detach().cpu().numpy().tolist()
#                 #
#                 # 🔭 Logging outputs with Galileo!
#                 #
#                 dataquality.log_model_output({
#                     "id": int(x_idxs[i]),
#                     "epoch": epoch,
#                     "split": split,
#                     "emb": emb,
#                     "prob": prob,
#                     "pred": str(int(np.argmax(prob)))})
        return log_probs

    def training_step(self, batch, batch_idx):
        """Model training step."""
        x_idxs, x, attention_mask, y = batch
        log_probs = self(x=x, attention_mask=attention_mask, x_idxs=x_idxs, epoch=self.current_epoch, split="training")
        loss = F.nll_loss(log_probs, y)
        self.train_acc(torch.argmax(log_probs, 1), y)
        self.log("train_acc", self.train_acc, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        """Model validation step."""
        x_idxs, x, attention_mask, y = batch
        log_probs = self(x=x, attention_mask=attention_mask, x_idxs=x_idxs, epoch=self.current_epoch, split="validation")
        loss = F.nll_loss(log_probs, y)
        self.val_acc(torch.argmax(log_probs, 1), y)
        self.log("val_acc", self.val_acc, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx): 
        """Model test step."""
        x_idxs, x, attention_mask, y = batch
        log_probs = self(x=x, attention_mask=attention_mask, x_idxs=x_idxs, epoch=self.current_epoch, split="test")
        loss = F.nll_loss(log_probs, y)
        self.test_acc(torch.argmax(log_probs, 1), y)
        self.log("test_acc", self.test_acc, prog_bar=True)
        return loss

    def configure_optimizers(self):
        """Model optimizers."""
        return torch.optim.AdamW(filter(lambda p: p.requires_grad, self.parameters()), lr=1e-5)

In [9]:
"""
Part 3.

Instantiate a model and train it with PyTorch Lightning.
"""

# Use the PyTorch Lightning Callback to log data to Galileo
from dataquality.core.integrations.lightning import DataQualityCallback

model = LightningDistilBERT()

train_dataloader = torch.utils.data.DataLoader(NewsgroupDataset("training"), batch_size=8, shuffle=True)
validation_dataloader = torch.utils.data.DataLoader(NewsgroupDataset("validation"), batch_size=8, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(NewsgroupDataset("test"), batch_size=8, shuffle=True)

trainer = pl.Trainer(max_epochs=2, num_sanity_val_steps=0, callbacks=[(DataQualityCallback())])

trainer.fit(model, train_dataloader, validation_dataloader)
trainer.test(model, test_dataloader)

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertForSequenceClassification: ['vocab_projector.bias', 'vocab_transform.weight', 'vocab_projector.weight', 'vocab_layer_norm.bias', 'vocab_layer_norm.weight', 'vocab_transform.bias']
- This IS expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['pre_classifier.bias', 'pre_classifier.weight', 'classi

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


  rank_zero_warn(
  rank_zero_warn(
  rank_zero_warn(
  rank_zero_warn(


Training: -1it [00:00, ?it/s]

  member_class = getattr(cls, attr)


Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

☁️ Uploading Data


  rank_zero_warn(
  rank_zero_warn(


Testing: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_acc': 0.043478261679410934}
--------------------------------------------------------------------------------
☁️ Uploading Data


[{'test_acc': 0.043478261679410934}]

In [10]:
train_dataloader = torch.utils.data.DataLoader(NewsgroupDataset("training"), batch_size=8, shuffle=True)

list(set(train_dataloader.dataset.dataset['label'].tolist()))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 13, 14, 16, 19]

In [11]:
dataquality.config.labels = list(set(train_dataloader.dataset.dataset['label'].tolist()))

In [14]:
dataquality.finish()

In [35]:
import requests
from dataquality.schemas.pipeline import Pipeline
from dataquality.utils.auth import headers
import json
 
requests.post(
    f'{dataquality.config.api_url}/pipelines',
    json=dict(
        project_id=str(dataquality.config.current_project_id),
        run_id=str(dataquality.config.current_run_id),
        pipeline_name=Pipeline.calculate_metrics,
        pipeline_env_vars=dict(
            labels=str(dataquality.config.labels)
        )
    ),
    headers=headers(dataquality.config.token),
).text

'{"project_id":"dd4a071f-9e7b-4476-a39e-d229d9398880","run_id":"fda2133f-aec5-4707-8261-fb87b2275195","pipeline_name":"simple-jsonl-io-with-metrics-d1e9d0da","pipeline_id":"4bd143b8-4299-4eea-a6a6-d1493b3379db","pipeline_env_vars":{"labels":"[0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 13, 14, 16, 19]"}}'

In [29]:
len('simple-jsonl-io-with-metrics-c333eed0-e508-49e3-87e2-504bdf84efce')

65

In [32]:
from pydantic import BaseModel

class RunPipelineRequest(BaseModel):
    project_id: str
    run_id:str
        
p = RunPipelineRequest(project_id='5',run_id='5')
p.dict().pop('run_id')

'5'

In [33]:
p.dict()

{'project_id': '5', 'run_id': '5'}