In [1]:
import torch
from transformers import (
    AdamW,
    AutoModel,
    get_linear_schedule_with_warmup,
    AutoTokenizer,
    AutoConfig
)

In [2]:
import os 
import numpy as np

In [5]:
from transformers import RobertaConfig, RobertaModel, RobertaPreTrainedModel
import torch.nn as nn
import torch
from typing import Any, Dict

class SentRobertaModelConfig(RobertaConfig):
    model_type = "sent_roberta"
    
    def __init__(self, pooling_option: str = 'mean', **kwargs):
        super().__init__(**kwargs)
        self.pooling_option = pooling_option


class SentRobertaModel(RobertaPreTrainedModel):
    config_class = SentRobertaModelConfig
    POOLING_OPTIONS = ['mean', 'first']

    def __init__(self, config, **kwargs):
        super().__init__(config)
        self.config = config
        self.pooling_option = config.pooling_option
        assert self.pooling_option in self.POOLING_OPTIONS, f'Check the pooling options [{", ".join(self.POOLING_OPTIONS)}]'

        self.roberta = RobertaModel(config)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)

        self.post_init()

    def mean_pooling(self, last_hidden_state, attention_mask=None):
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
        return torch.sum(last_hidden_state * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

    def forward(self, input_ids: torch.Tensor = None, attention_mask: torch.Tensor = None, **kwargs):
        model_outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)

        if isinstance(model_outputs, tuple):
            last_hidden_state, _ = model_outputs
        elif hasattr(model_outputs, 'last_hidden_state'):
            last_hidden_state = model_outputs.last_hidden_state
        else:
            raise NotImplementedError(f'Cannot support model output type: {type(model_outputs)}')

        if self.pooling_option == 'mean':
            logits = self.mean_pooling(last_hidden_state, attention_mask)
        else:
            logits = model_outputs[1]

        return logits


# Model Load

In [6]:
model_path = '/home/x1112436/final_result/faq/klue-faq-large-sent_robert/train_MNR_Triple'

In [10]:
model = SentRobertaModel.from_pretrained(
    model_path 
)
tokenizer = AutoTokenizer.from_pretrained(model_path)

# device Setting

In [11]:
if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'cpu'

# Inference Example

In [12]:
model = model.to(device)

In [13]:
input = '이것은 테스트'

In [21]:
model_inputs = tokenizer.encode_plus(input, return_tensors='pt')

In [24]:
input_ids = model_inputs['input_ids'].to(device)
attention_mask = model_inputs['attention_mask'].to(device)

In [30]:
embedding = model.forward(input_ids = input_ids, attention_mask = attention_mask)

In [33]:
if device == 'cuda':
    embedding = embedding.detach().cpu()