Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First try, PoC PyTorchLightning #323

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
.idea/
# C extensions
*.so

# Packages
PKG-INFO
*.egg
Expand Down Expand Up @@ -56,7 +55,11 @@ launch.json

# Data:

examples/trainer/lightning/lightning_logs


# Large datasets
*.lib
*.pbz2
*.csv
data_from_minio/
data_from_minio/
Empty file.
45 changes: 45 additions & 0 deletions examples/trainer/lightning/log_reg_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import torch
import torch.nn as nn
import torch.nn.functional as F

from knodle.trainer.lightning.base_model import KnodleLightningModule


class LitModel(KnodleLightningModule):
def __init__(
self,
max_features=4000,
num_classes=2,
**kwargs
):
super().__init__(**kwargs)
self.l1 = nn.Linear(max_features, max_features // 2)
self.l2 = nn.Linear(max_features // 2, max_features // 4)
self.l3 = nn.Linear(max_features, num_classes)

def forward(self, x):
# x = torch.relu(self.l1(x))
# x = torch.relu(self.l2(x))
x = torch.relu(self.l3(x))
return F.softmax(x, dim=-1)

def training_step(self, batch, batch_idx):
x = batch["x"].float()
y = batch["labels"].long()

y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
self.log("loss", loss)
self.log_metrics("train", y_hat, y)

return loss

def test_step(self, batch, batch_idx):
x = batch[0].float()
y = batch[1].long()
y_hat = self(x)

self.log_metrics("test", y_hat, y)

def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=0.01)
1 change: 1 addition & 0 deletions examples/trainer/lightning/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytorch-lightning
73 changes: 73 additions & 0 deletions examples/trainer/lightning/run_distilbert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os

from minio import Minio
from tqdm.auto import tqdm

import torchmetrics

from knodle.trainer.lightning.data_module import KnodleDataModule
from knodle.trainer.lightning.base_model import KnodleLightningModule
from knodle.trainer.lightning.majority_trainer import LightningMajorityTrainer
from knodle.trainer.lightning.snorkel_trainer import LightningSnorkelTrainer

from utils import get_tfidf_features, get_transformer_features


def main(dataset: str):

imdb_data_dir = os.path.join(os.getcwd(), "datasets", dataset)
processed_data_dir = os.path.join(imdb_data_dir, "processed")
os.makedirs(processed_data_dir, exist_ok=True)

client = Minio("knodle.cc", secure=False)
files = [
"df_train.csv", "df_test.csv",
"train_rule_matches_z.pbz2", "test_rule_matches_z.pbz2",
"mapping_rules_labels_t.lib",
"test_labels.pbz2",
# "df_dev.csv", "dev_rule_matches_z.pbz2", "dev_labels.pbz2"
]

for file in tqdm(files):
client.fget_object(
bucket_name="knodle",
object_name=os.path.join("datasets", dataset, "processed_unified_format", file),
file_path=os.path.join(processed_data_dir, file),
)

max_features = 3800

tfidf_data_module = KnodleDataModule(
data_dir=processed_data_dir,
batch_size=256,
preprocessing_fct=get_tfidf_features,
preprocessing_kwargs={"max_features": max_features},
dataloader_train_keys=["x"]

)
tfidf_data_module.setup()


num_classes = 6 if dataset == "trec" else 2
train_metrics = {
"accuracy": torchmetrics.Accuracy(num_classes=6)
}

test_metrics = {
"accuracy": torchmetrics.Accuracy(),
"f1": torchmetrics.F1(num_classes=6, average="macro")
}

model = LitModel(
max_features=max_features, num_classes=6,
train_metrics=train_metrics, test_metrics=test_metrics
)

trainer = LightningMajorityTrainer(max_epochs=20)
# trainer = LightningSnorkelTrainer(max_epochs=20)
trainer.fit(model=model, datamodule=k)



if __name__ == '__main__':
main()
Empty file.
117 changes: 117 additions & 0 deletions examples/trainer/lightning/transformer_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from typing import Optional

import torch

from transformers import (
AdamW,
AutoConfig,
AutoModelForSequenceClassification,

get_linear_schedule_with_warmup,
)

from knodle.trainer.lightning.base_model import KnodleLightningModule


class TransformerLitModel(KnodleLightningModule):
def __init__(
self,
model_name_or_path: str,
num_labels: int,
learning_rate: float = 2e-5,
adam_epsilon: float = 1e-8,
warmup_steps: int = 0,
weight_decay: float = 0.0,
train_batch_size: int = 32,
eval_batch_size: int = 32,
eval_splits: Optional[list] = None,
**kwargs,
):
super().__init__()

self.save_hyperparameters()

self.config = AutoConfig.from_pretrained(model_name_or_path, num_labels=num_labels)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name_or_path, config=self.config)

def forward(self, **inputs):
return self.model(**inputs)

def training_step(self, batch, batch_idx):
outputs = self(
input_ids=batch.get("input_ids"), attention_mask=batch.get("attention_mask"), labels=batch.get("labels")
)
loss = outputs[0]
print(loss.shape)
return loss

def test_step(self, batch, batch_idx, dataloader_idx=0):
outputs = self(**batch)
val_loss, logits = outputs[:2]

if self.hparams.num_labels >= 1:
preds = torch.argmax(logits, axis=1)
elif self.hparams.num_labels == 1:
preds = logits.squeeze()

labels = batch["labels"]
self.log("loss", val_loss)
self.log_metrics("dev", logits, labels)

def configure_optimizers(self):
"""Prepare optimizer and schedule (linear warmup and decay)"""
model = self.model
no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters = [
{
"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
"weight_decay": self.hparams.weight_decay,
},
{
"params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
"weight_decay": 0.0,
},
]
optimizer = AdamW(optimizer_grouped_parameters, lr=self.hparams.learning_rate, eps=self.hparams.adam_epsilon)

scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=self.hparams.warmup_steps,
num_training_steps=self.total_steps,
)
scheduler = {"scheduler": scheduler, "interval": "step", "frequency": 1}
return [optimizer], [scheduler]

# def validation_epoch_end(self, outputs):
# if self.hparams.task_name == "mnli":
# for i, output in enumerate(outputs):
# # matched or mismatched
# split = self.hparams.eval_splits[i].split("_")[-1]
# preds = torch.cat([x["preds"] for x in output]).detach().cpu().numpy()
# labels = torch.cat([x["labels"] for x in output]).detach().cpu().numpy()
# loss = torch.stack([x["loss"] for x in output]).mean()
# self.log(f"val_loss_{split}", loss, prog_bar=True)
# split_metrics = {
# f"{k}_{split}": v for k, v in self.metric.compute(predictions=preds, references=labels).items()
# }
# self.log_dict(split_metrics, prog_bar=True)
# return loss
#
# preds = torch.cat([x["preds"] for x in outputs]).detach().cpu().numpy()
# labels = torch.cat([x["labels"] for x in outputs]).detach().cpu().numpy()
# loss = torch.stack([x["loss"] for x in outputs]).mean()
# self.log("val_loss", loss, prog_bar=True)
# self.log_dict(self.metric.compute(predictions=preds, references=labels), prog_bar=True)
# return loss
#
# def setup(self, stage=None) -> None:
# if stage != "fit":
# return
# # Get dataloader by calling it - train_dataloader() is called after setup() by default
# train_loader = self.train_dataloader()
# print(train_loader.batch_size)
#
# # Calculate total steps
# tb_size = self.hparams.train_batch_size * max(1, self.trainer.gpus)
# ab_size = self.trainer.accumulate_grad_batches * float(self.trainer.max_epochs)
# self.total_steps = (len(train_loader.dataset) // tb_size) // ab_size
37 changes: 37 additions & 0 deletions examples/trainer/lightning/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import List, Union, Tuple, Optional, Dict

import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer

from pytorch_lightning import LightningModule
from transformers import AutoTokenizer


def get_tfidf_features(
data, max_features: int = None
) -> Dict:

vectorizer = TfidfVectorizer(max_features=max_features)

data["train_x"] = vectorizer.fit_transform(data["train_df"]["sample"]).toarray()
if data["test_df"] is not None:
data["test_x"] = vectorizer.transform(data["test_df"]["sample"]).toarray()
if data["dev_df"] is not None:
data["dev_x"] = vectorizer.transform(data["dev_df"]["sample"]).toarray()

return data


def get_transformer_features(
data, transformer_name: str = 'distilbert-base-uncased', max_length: int = 512
) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, None]]:

tokenizer = AutoTokenizer.from_pretrained(transformer_name)
splits = ["train", "test"]

for split in splits:
tokenized = tokenizer(data[f"{split}_df"]["sample"].tolist(), max_length=max_length, truncation=True, padding=True)
data[f"{split}_input_ids"] = np.array(tokenized["input_ids"])
data[f"{split}_attention_mask"] = np.array(tokenized["attention_mask"])

return data
Empty file.
39 changes: 39 additions & 0 deletions knodle/trainer/lightning/base_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Optional, Dict

import torch
import torchmetrics

import pytorch_lightning as pl
from pytorch_lightning.utilities.types import STEP_OUTPUT


class KnodleLightningModule(pl.LightningModule):
def __init__(
self,
train_metrics=None,
test_metrics: Dict = {"accuracy": torchmetrics.Accuracy()},
**kwargs
):
super().__init__(**kwargs)

self.set_metrics("train", train_metrics)
self.set_metrics("test", test_metrics)

def set_metrics(self, split: str, metrics: Dict = dict()):
metrics = {} if metrics is None else metrics

self.__setattr__(f"{split}_metric_names", metrics.keys())
for name, metric in metrics.items():
self.__setattr__(f"{split}_{name}", metric)

def log_metrics(self, split: str, logits: torch.Tensor, truth: torch.Tensor):
for name in self.__getattribute__(f"{split}_metric_names"):
metric = self.__getattr__(f"{split}_{name}")
metric(logits, truth)
self.log(f"{split}_{name}", metric)

def training_step(self, *args, **kwargs) -> STEP_OUTPUT:
raise NotImplementedError("Please implement training_step")

def validation_step(self, *args, **kwargs) -> Optional[STEP_OUTPUT]:
raise NotImplementedError("Please implement validation_step")
Loading