In [None]:
# NSMC 데이터 로딩 및 전처리
# 데이터 로딩
from ratsnlp.nlpbook.generation import NsmcCorpus

corpus = NsmcCorpus()

# 데이터 전처리
from ratsnlp.nlpbook.generation import GenerationDataset

train_dataset = GenerationDataset(
    args=args,
    corpus=corpus,
    tokenizer=tokenizer,
    mode="train",
)

In [None]:
# NSMCCORPUS 클래스
import os, csv
from ratsnlp.nlpbook.generation.corpus import GenerationExample

class NsmcCorpus:

    def __init__(self):
        pass

    def _read_corpus(cls, input_file, quotechar='"'):
        with open(input_file, "r", encoding="utf-8") as f:
            return list(csv.reader(f, delimiter="\t", quotechar=quotechar))

    def _create_examples(self, lines):
        examples = []
        
        for (i, line) in enumerate(lines):
            if i == 0:
                continue
                
            _, review_sentence, sentiment = line
            sentiment = "긍정" if sentiment == "1" else "부정"
            text = sentiment + " " + review_sentence
            examples.append(GenerationExample(text=text))
            
        return examples

    def get_examples(self, data_root_path, mode):
        data_fpath = os.path.join(data_root_path, f"ratings_{mode}.txt")
        logger.info(f"loading {mode} data... LOOKING AT {data_fpath}")
        
        return self._create_examples(self._read_corpus(data_fpath))

In [None]:
# 커스텀 말뭉치 클래스
import os
from ratsnlp.nlpbook.generation.corpus import GenerationExample

class NewsCorpus:

    def __init__(self):
        pass

    def get_examples(self, data_root_path, mode):
        data_fpath = os.path.join(data_root_path, f"{mode}.txt")
        lines = open(data_fpath, "r", encoding="utf-8").readlines()
        examples = []
        
        for (i, line) in enumerate(lines):
            if i == 0:
                continue
                
            text, label = line
            sentence = label + " " + text 
            examples.append(GenerationExample(text=sentence))
            
        return examples

In [None]:
# 커스텀 데이터 로딩 및 전처리
from ratsnlp.nlpbook.generation import GenerationDataset

corpus = NewsCorpus()

train_dataset = GenerationDataset(
    args=args,
    corpus=corpus,
    tokenizer=tokenier,
    mode="train",
)

In [None]:
# GENERATIONDATASET 클래스
from torch.utils.data.dataset import Dataset
from transformers import PreTrainedTokenizer
from ratsnlp.nlpbook.generation.arguments import GenerationTrainArguments
from ratsnlp.nlpbook.generation.corpus import _convert_examples_to_generation_features

class GenerationDataset(Dataset):

    def __init__(
            self,
            args: GenerationTrainArguments,
            tokenizer: PreTrainedTokenizerFast,
            corpus,
            mode: Optional[str] = "train",
            convert_examples_to_features_fn=_convert_examples_to_generation_features,
    ):
            self.corpus = corpus
            
                examples = self.corpus.get_examples(corpus_path, mode)
                self.features = convert_examples_to_features_fn(
                    examples,
                    tokenizer,
                    args,
                    label_list=self.corpus.get_labels(),
                )

    def __len__(self):
        return len(self.features)

    def __getitem__(self, i):
        return self.features[i]

In [None]:
# 다른 모델 사용하기
from ratsnlp.nlpbook.generation import GenerationTrainArguments
from transformers import GPT2LMHeadModel, GPT2Tokenizer

args = GenerationTrainArguments(
    pretrained_model_name="gpt2",
)

tokenizer = GPT2Tokenizer.from_pretrained(
    args.pretrained_model_name,
)

model = GPT2LMHeadModel.from_pretrained(
    args.pretrained_model_name,
)

In [None]:
from ratsnlp.nlpbook.generation import GenerationTask

task = GenerationTask(model, args)

In [None]:
# 문장 생성 태스크 클래스
from transformers import PreTrainedModel
from transformers.optimization import AdamW
from pytorch_lightning import LightningModule
from ratsnlp.nlpbook.generation.arguments import GenerationTrainArguments
from torch.optim.lr_scheduler import ExponentialLR, CosineAnnealingWarmRestarts

class GenerationTask(LightningModule):

    def __init__(self,
                 model: PreTrainedModel,
                 args: GenerationTrainArguments,
    ):
        super().__init__()
        self.model = model
        self.args = args

    def configure_optimizers(self):
        optimizer = AdamW(self.parameters(), lr=self.args.learning_rate)
        scheduler = ExponentialLR(optimizer, gamma=0.9)
        
        return {
            'optimizer': optimizer,
            'scheduler': scheduler,
        }

    def training_step(self, inputs, batch_idx):
        # outputs: CausalLMOutputWithCrossAttentions
        outputs = self.model(**inputs)
        self.log("loss", outputs.loss, prog_bar=False, logger=True, on_step=True, on_epoch=False)
        return outputs.loss

    def validation_step(self, inputs, batch_idx):
        # outputs: CausalLMOutputWithCrossAttentions
        outputs = self.model(**inputs)
        self.log("val_loss", outputs.loss, prog_bar=True, logger=True, on_step=False, on_epoch=True)
        return outputs.loss

In [None]:
# GPT2LMHEADMODEL
class GPT2LMHeadModel(GPT2PreTrainedModel):

    def __init__(self, config):
        super().__init__(config)
        self.transformer = GPT2Model(config)
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)

        self.init_weights()

        self.model_parallel = False

    def forward(
        self,
        input_ids=None,
        past_key_values=None,
        attention_mask=None,
        token_type_ids=None,
        position_ids=None,
        head_mask=None,
        inputs_embeds=None,
        encoder_hidden_states=None,
        encoder_attention_mask=None,
        labels=None,
        use_cache=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None,
    ):
        
        transformer_outputs = self.transformer(
            input_ids,
            past_key_values=past_key_values,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            encoder_hidden_states=encoder_hidden_states,
            encoder_attention_mask=encoder_attention_mask,
            use_cache=use_cache,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = transformer_outputs[0]

        # Set device for model parallelism
        if self.model_parallel:
            torch.cuda.set_device(self.transformer.first_device)
            hidden_states = hidden_states.to(self.lm_head.weight.device)

        lm_logits = self.lm_head(hidden_states)

        loss = None
        
        if labels is not None:
            # Shift so that tokens < n predict n
            shift_logits = lm_logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            # Flatten the tokens
            loss_fct = CrossEntropyLoss()
            loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))

        return CausalLMOutputWithCrossAttentions(
            loss=loss,
            logits=lm_logits,
            past_key_values=transformer_outputs.past_key_values,
            hidden_states=transformer_outputs.hidden_states,
            attentions=transformer_outputs.attentions,
            cross_attentions=transformer_outputs.cross_attentions,
        )