In [4]:
!cp -r /kaggle/input/webnlg-parallel .

In [5]:
%cd webnlg-parallel/

/kaggle/working/webnlg-parallel/webnlg-parallel


In [6]:
!ls

multigpu.py  webnlg


In [12]:
%%writefile multigpu.py
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
# from datautils import MyTrainDataset

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os


from transformers import BartTokenizer, BartForConditionalGeneration, AdamW, get_scheduler, MBartTokenizer
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from transformers import T5ForConditionalGeneration, T5Tokenizer
from tqdm import tqdm


def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    torch.cuda.set_device(rank)
    init_process_group(backend="nccl", rank=rank, world_size=world_size)

class WebNLGDataset(Dataset):
    def __init__(self, dset_path, dsetType: str, tokenizer):
        if dsetType == 'train':
            files = ['train.source', 'train.target']
        elif dsetType == 'val':
            files = ['val.source', 'val.target']
        else:
            files = ['test_both.source', 'test_both.target']
        
        with open(dset_path + files[0], 'r') as f:
            source_raw = f.readlines()

        with open(dset_path + files[1], 'r') as f:
            target_raw = f.readlines()

        self.source_tok = tokenizer(source_raw, truncation=True, padding='max_length', max_length=384, return_tensors = 'pt')
        self.target_tok = tokenizer(target_raw, truncation=True, padding='max_length', max_length=384, return_tensors = 'pt')
        
    def __len__(self): # returns the total number of samples in the dataset
        assert self.source_tok['input_ids'].shape[0] == self.target_tok['input_ids'].shape[0]
        return self.source_tok['input_ids'].shape[0]
        
    def __getitem__(self, idx):
        return {'input_ids': self.source_tok['input_ids'][idx], 
                'attention_mask': self.source_tok['attention_mask'][idx],
                'labels': self.target_tok['input_ids'][idx],            
               }

class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        val_data: DataLoader,
        tokenizer: BartTokenizer,
        optimizer: torch.optim.Optimizer,
        gpu_id: int,
        save_every: int,
    ) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_data = train_data
        self.val_data = val_data
        self.tokenizer = tokenizer
        self.optimizer = optimizer
        self.save_every = save_every
        self.model = DDP(model, device_ids=[gpu_id])

    def _run_batch(self, input_ids, attention_mask, labels, run_type='train', eval_loss=None):
        if run_type == 'train':
            self.optimizer.zero_grad()
            # print(input_ids)
            # print(labels)
            # print(input_ids.shape, attention_mask.shape, labels.shape)
            output = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            # print(output)
            # loss = F.cross_entropy(output, targets)
            loss = output.loss
            # print(loss)
            ### TODO: Add lr_scheduler ###
            loss.backward()

            self.optimizer.step()
            return output, eval_loss
        elif run_type == 'validate':
            with torch.no_grad():
                output = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                eval_loss += output.loss.item()
            return output, eval_loss

    def _run_epoch(self, epoch, epoch_type='train'):

        if epoch_type == 'train':
            print("------ Training! ------")
            data = self.train_data
            eval_loss = None

        elif epoch_type == 'validate':
            print("------ Validating! ------")
            data = self.val_data
            eval_loss = 0

        # b_sz = len(next(iter(self.train_data))[0])
        # b_sz = len(next(iter(data))[0])
        # print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Steps: {len(data)}")
        data.sampler.set_epoch(epoch)
        # for source, targets in self.train_data:
        #     source = source.to(self.gpu_id)
        #     targets = targets.to(self.gpu_id)
        #     self._run_batch(source, targets)
        for batch_idx, batch in tqdm(enumerate(data), total=len(data)):
            input_ids = batch['input_ids'].to(self.gpu_id)
            attention_mask = batch['attention_mask'].to(self.gpu_id)
            labels = batch['labels'].to(self.gpu_id)

            if epoch_type == 'train':
                _, __ = self._run_batch(input_ids=input_ids, attention_mask=attention_mask, labels=labels, eval_loss=eval_loss, run_type='train')
            elif epoch_type == 'validate':
                _, eval_loss = self._run_batch(input_ids=input_ids, attention_mask=attention_mask, labels=labels, eval_loss=eval_loss, run_type='validate')
        if epoch_type == 'validate':
            print(f"Epoch {epoch+1}: Evaluation Loss = {eval_loss / len(self.val_data)}")

    def _save_checkpoint(self, epoch):
        ckp = self.model.module.state_dict()
        PATH = "."
        #         torch.save(ckp, PATH)
        self.model.module.save_pretrained(PATH)
        self.tokenizer.save_pretrained(PATH)

        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch=epoch, epoch_type='train')
            self._run_epoch(epoch=epoch, epoch_type='validate')
            if self.gpu_id == 0 and (epoch + 1) % self.save_every == 0:
                self._save_checkpoint(epoch)
                
    def generate_predictions(self, texts):
        model = self.model.module
        model.to('cuda')
        model.eval()
        predictions = []
    
        for text in tqdm(texts):
            inputs = self.tokenizer(text, return_tensors="pt", padding=True, truncation=True)
            with torch.no_grad():
                output_sequences = model.generate(**inputs.to('cuda'), num_beams=3, max_new_tokens=300, pad_token_id=model.config.eos_token_id)
        
            decoded_preds = self.tokenizer.batch_decode(output_sequences, skip_special_tokens=True)
            predictions.extend(decoded_preds)
    
        return predictions 

def load_train_objs(preTmodel):
    # train_set = MyTrainDataset(2048)  # load your dataset
    #     model_path = './model_cache/models--facebook--bart-base/snapshots/aadd2ab0ae0c8268c7c9693540e9904811f36177'
    if preTmodel == 'bart':
        tokenizer = BartTokenizer.from_pretrained('facebook/bart-base')
        model = BartForConditionalGeneration.from_pretrained('facebook/bart-base')
    elif preTmodel == 'gpt2':
        tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
        tokenizer.pad_token = tokenizer.eos_token  # Set padding token to EOS (since GPT-2 does not have a dedicated pad token)
        model = GPT2LMHeadModel.from_pretrained("gpt2")
    elif preTmodel == 'T5':
        tokenizer = T5Tokenizer.from_pretrained("t5-small")  # Using t5-base instead of t5-small
        model = T5ForConditionalGeneration.from_pretrained('t5-small')
        # Ensure all necessary special tokens are present
        special_tokens = {
            'pad_token': '[PAD]',
            'eos_token': '</s>',
            'bos_token': '<s>',
        }
        tokenizer.add_special_tokens(special_tokens)
        model.resize_token_embeddings(len(tokenizer))
    new_tokens = ['<H>', '<R>', '<T>']
    tokenizer.add_special_tokens({'additional_special_tokens': new_tokens})
    model.resize_token_embeddings(len(tokenizer))
    dataset_path = './webnlg/'
    train_dataset = WebNLGDataset(dataset_path, 'train', tokenizer)
    val_dataset = WebNLGDataset(dataset_path, 'val', tokenizer)
    # model = torch.nn.Linear(20, 1)  # load your model
    optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
    return train_dataset, val_dataset, model, tokenizer, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)
    train_dataset, val_dataset, model, tokenizer, optimizer = load_train_objs("T5")
    train_data = prepare_dataloader(train_dataset, batch_size)
    val_data = prepare_dataloader(val_dataset, batch_size)
    trainer = Trainer(model, train_data, val_data, tokenizer, optimizer, rank, save_every)
    trainer.train(total_epochs)
    with open('./webnlg/'+ 'test_both.source', 'r') as f:
        test_source = f.readlines()
    with open('./webnlg/' + 'test_both.target', 'r') as f:
        test_target = f.readlines()

    print('Calculating BLEU Score')
    predictions = trainer.generate_predictions(test_source)
    from nltk.translate.bleu_score import corpus_bleu
    references = [[ref.split()] for ref in test_target]  # Reference texts should be a list of lists of tokens
    predicted_tokens = [pred.split() for pred in predictions]  # Predictions should also be a list of lists of tokens
    bleu_score = corpus_bleu(references, predicted_tokens)
    print(f"BLEU score: {bleu_score:.4f}")
    destroy_process_group()


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
    parser.add_argument('save_every', type=int, help='How often to save a snapshot')
    parser.add_argument('--batch_size', default=12, type=int, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()

    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)






Overwriting multigpu.py


In [14]:
!python multigpu.py 3 2

[W1125 02:58:38.646370313 socket.cpp:697] [c10d] The client socket has failed to connect to [localhost]:12355 (errno: 99 - Cannot assign requested address).
You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this 