In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd /content/drive/MyDrive

In [None]:
!pip install gluonnlp pandas tqdm
!pip install sentencepiece
!pip install transformers
!pip install torch
!pip install 'git+https://github.com/SKTBrain/KoBERT.git#egg=kobert_tokenizer&subdirectory=kobert_hf'

In [None]:
import pandas as pd

import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gluonnlp as nlp
import numpy as np
from tqdm import tqdm, tqdm_notebook
from kobert_tokenizer import KoBERTTokenizer
from transformers import BertModel

from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup

In [None]:
original_train = pd.read_excel('감성대화말뭉치(원시데이터)_Training.xlsx')
original_val = pd.read_excel('감성대화말뭉치(원시데이터)_Validation.xlsx')

In [None]:
original_train = original_train.fillna("")
original_val = original_val.fillna("")

original_train["사람문장"] = original_train["사람문장1"].astype(str) + original_train["사람문장2"].astype(str) + original_train["사람문장3"].astype(str)
original_val["사람문장"] = original_val["사람문장1"].astype(str) + original_val["사람문장2"].astype(str) + original_val["사람문장3"].astype(str)

df_concat = pd.concat([original_train, original_val])

In [None]:
chatbot_data = df_concat[["사람문장", "감정_대분류"]]

chatbot_data = chatbot_data.rename({"감정_대분류": "Emotion"}, axis=1)
chatbot_data = chatbot_data.rename({"사람문장": "Sentence"}, axis=1)

chatbot_data["Emotion"] = chatbot_data["Emotion"].apply(lambda x: x.strip())

In [None]:
device = torch.device("cuda:0")
bert_model = BertModel.from_pretrained('skt/kobert-base-v1', return_dict=False)
vocab = nlp.vocab.BERTVocab.from_sentencepiece(tokenizer.vocab_file, padding_token='[PAD]')

In [None]:
chatbot_data.loc[(chatbot_data['Emotion'] == "불안"), 'Emotion']= 0
chatbot_data.loc[(chatbot_data['Emotion'] == "분노"), 'Emotion']= 1
chatbot_data.loc[(chatbot_data['Emotion'] == "상처"), 'Emotion']= 2
chatbot_data.loc[(chatbot_data['Emotion'] == "슬픔"), 'Emotion']= 3
chatbot_data.loc[(chatbot_data['Emotion'] == "당황"), 'Emotion']= 4
chatbot_data.loc[(chatbot_data['Emotion'] == "기쁨"), 'Emotion']= 5

data_list = []
for q, label in zip(chatbot_data['Sentence'], chatbot_data['Emotion']):
  data = []
  data.append(q)
  data.append(str(label))

  data_list.append(data)

print(data_list[:10])

In [None]:
from sklearn.model_selection import train_test_split

dataset_train, dataset_test = train_test_split(data_list, test_size = 0.2, shuffle = True, random_state = 32)

In [None]:
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower = False)

In [None]:
class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, vocab, max_len, pad, pair):
        transform = nlp.data.BERTSentenceTransform(bert_tokenizer, max_seq_length=max_len, vocab=vocab, pad=pad, pair=pair)
        self.sentences = [transform([i[sent_idx]]) for i in dataset] # 문장 변환
        self.labels = [np.int32(i[label_idx]) for i in dataset] # label 변환

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))

    def __len__(self):
        return (len(self.labels))

In [None]:
max_len = 64
batch_size = 64
warmup_ratio = 0.1
num_epochs = 5
max_grad_norm = 1
log_interval = 200
learning_rate = 5e-6

In [None]:
# tok = tokenizer.tokenize

data_train = BERTDataset(dataset_train, 0, 1, tok, vocab, max_len, True, False)
data_test = BERTDataset(dataset_test, 0, 1, tok, vocab, max_len, True, False)

In [None]:
# torch 형식의 dataset을 만들어 입력 데이터셋 전처리 마무리
# 테스트를 수행하기 위한 데이터로 로드
train_dataloader = torch.utils.data.DataLoader(data_train, batch_size = batch_size, num_workers = 5)
test_dataloader = torch.utils.data.DataLoader(data_test, batch_size = batch_size, num_workers = 5)

### Kobert 모델 구현

In [None]:
class BERTClassifier(nn.Module):
    def __init__(self,
                 bert,
                 hidden_size = 768,
                 num_classes = 6,
                 dr_rate = 0.1,
                 ):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        self.classifier = nn.Linear(hidden_size , num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p = dr_rate)

    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)

        # Pooler: Final hidden state of the [CLS]
        _, pooler = self.bert(
            input_ids = token_ids,
            token_type_ids = segment_ids.long(),
            attention_mask = attention_mask.float().to(token_ids.device),
            return_dict = False
        )

        if self.dr_rate:
            out = self.dropout(pooler)

        return self.classifier(out)

In [None]:
model = BERTClassifier(bert_model, dr_rate = 0.5).to(device)

# Prepare optimizer and schedule (linear warmup and decay)
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
    {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
    {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]


optimizer = AdamW(optimizer_grouped_parameters, lr = learning_rate)
loss_fn = nn.CrossEntropyLoss()

t_total = len(train_dataloader) * num_epochs
warmup_step = int(t_total * warmup_ratio)

scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps = warmup_step, num_training_steps = t_total)

def calc_accuracy(X,Y):
    max_vals, max_indices = torch.max(X, 1)
    train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0]
    return train_acc

train_dataloader

In [None]:
for e in range(num_epochs):
    train_acc = 0.0
    test_acc = 0.0
    model.train()

    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(train_dataloader)):
        optimizer.zero_grad() # gradient 초기화

        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        label = label.long().to(device)

        out = model(token_ids, valid_length, segment_ids)

        # 손실 계산, 역전파 수행
        loss = loss_fn(out, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        optimizer.step()
        scheduler.step()  # LR scheduler update

        train_acc += calc_accuracy(out, label) # 정확도 누적

        if batch_id % log_interval == 0:
            print("epoch {} batch id {} loss {} train acc {}".format(e+1, batch_id+1, loss.data.cpu().numpy(), train_acc / (batch_id+1)))

    print("epoch {} train acc {}".format(e+1, train_acc / (batch_id+1)))

    model.eval() # 평가 모드로 설정
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(test_dataloader)):
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        label = label.long().to(device)

        out = model(token_ids, valid_length, segment_ids) # 모델에 데이터를 전달하고 예측 얻음

        test_acc += calc_accuracy(out, label) # 정확도 누적

    print("epoch {} test acc {}".format(e+1, test_acc / (batch_id+1)))

# Predict 함수 정의


In [None]:
def predict(predict_sentence):
    data = [predict_sentence, '0']
    dataset_another = [data]

    another_test = BERTDataset(dataset_another, 0, 1, tok, vocab, max_len, True, False)
    test_dataloader = torch.utils.data.DataLoader(another_test, batch_size=batch_size, num_workers=5)

    model.eval()
    emotion_percentages = {}

    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):

        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)

        valid_length = valid_length
        label = label.long().to(device)

        out = model(token_ids, valid_length, segment_ids) # 모델에 미니배치를 입력으로 주어 예측값을 얻습니다.

        for i in out:
            logits = i
            logits = logits.detach().cpu().numpy()

            # Softmax 직접 계산
            emotion_labels = ["불안", "분노", "상처", "슬픔", "당황", "기쁨"]
            probabilities = [np.exp(logit) / np.sum(np.exp(logits)) * 100 for logit in logits]
            for label, percent in zip(emotion_labels, probabilities):
                        emotion_percentages[label] = round(percent, 2)

    print("prediction done.")
    return emotion_percentages

In [None]:
while True:
    sentence = input("하고싶은 말을 입력해주세요 : ")
    if sentence == "0" : # 질문에 0 입력 시 종료
        break
    predict(sentence)
    print("\n")

In [None]:
!ngrok authtoken 2YwRAwF8qWKpH0iujbxVPIAyDlm_6K8m29hYitmvTtuSFMhem

In [None]:
!pip install nest-asyncio pyngrok uvicorn kaleido python-multipart
!pip install fastapi==0.103.2 typing-extensions

In [None]:
import uvicorn

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pyngrok import ngrok
from pydantic import BaseModel
import nest_asyncio
import shutil

class EmotionInput(BaseModel):
		pass

In [None]:
app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class EmotionInput(BaseModel):
    sentence: str

@app.get('/')
async def root():
    return {'message': 'Emotion Analysis API'}

@app.post("/predict-emotion")
async def predict_emotion(request: EmotionInput):
  try:
    response = predict(request.sentence)
    return{"result" : response}
  except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))

In [None]:
ngrok.set_auth_token("2YwRAwF8qWKpH0iujbxVPIAyDlm_6K8m29hYitmvTtuSFMhem") # ??????

In [None]:
!ngrok config add-authtoken 2YwRAwF8qWKpH0iujbxVPIAyDlm_6K8m29hYitmvTtuSFMhem

In [None]:
ngrok_tunnel = ngrok.connect(8000)
print('Public URL:', ngrok_tunnel.public_url)
nest_asyncio.apply()
uvicorn.run(app, port=8000)

In [None]:
# ngrok tunnels 닫기
tunnels = ngrok.get_tunnels()

for tunnel in tunnels:
    public_url = tunnel.public_url
    ngrok.disconnect(public_url)

In [None]:
# 모델 저장
torch.save(model.state_dict(), 'emotion_model.pt')
from google.colab import files
files.download('emotion_model.pt')

In [None]:
'''!pip install onnx'''

'''import torch.onnx
import torch

# 더미 입력을 GPU로 이동
dummy_input = (
    torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]).to(device),  # token_ids
    torch.LongTensor([10]).to(device),  # valid_length
    torch.LongTensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]).to(device),  # segment_ids
)

# 모델을 ONNX 형식으로 내보냅니다.
onnx_path = 'emotion_model.onnx'
torch.onnx.export(model, dummy_input, onnx_path, verbose=True, input_names=['input_ids', 'valid_length', 'segment_ids'], output_names=['output'])

# ONNX 파일 다운로드
from google.colab import files
files.download('emotion_model.onnx')
'''