<a href="https://colab.research.google.com/github/kstyle2198/NLP_TIPS/blob/main/QM_fewshot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', 60000)
pd.set_option('display.max_columns', 1000)
pd.set_option('display.max_rows', 1000)

import warnings
warnings.filterwarnings(action='ignore')

import numpy as np

from tqdm import tqdm
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from tensorflow.keras.utils import to_categorical

In [None]:
import nltk
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package wordnet to /root/nltk_data...


True

[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

# Data Load

In [None]:
from google.colab import files
uploaded = files.upload()

KeyboardInterrupt: 

In [None]:
df = pd.read_excel("dataset02.xlsx")
df.head(1)
df.shape

# Preprocessing

In [None]:
df = df[["호선", "코멘트번호", "제목", "문제내용", "처리부서", "처리과", "회신내용", "구역", "LOCATION", "BLOCK"]]
df.shape

In [None]:
text_to_drop = ['이관', "재지정", "재 지정"]
for drop_word in text_to_drop:
    df = df[~df['회신내용'].str.contains(drop_word, regex=True, na=False)]
df.shape

In [None]:
def contains_word(main_string, target_word):
    return target_word in main_string

기준직능 = ["선장", "기장","전장", "선실", "엔진", "기관", "기계", "도장", "건조", "시운전","발판", "보냉", "공사"]
         #"의장", "설계","선행",

def 직능구하기(string):
    global 기준직능
    result = "일반"
    for 직능 in 기준직능:
        if contains_word(str(string), 직능):
            result = 직능
        else:
            pass
    return result

df["직능"] = df["처리과"].apply(직능구하기)

df["직능"].fillna("일반", inplace=True)
df["직능"].value_counts()

In [None]:
df['직능'] = df['직능'].str.replace("보냉", '도장', regex=True)
df['직능'] = df['직능'].str.replace("발판", '도장', regex=True)

In [None]:
df.shape
df = df[df["직능"]!="일반"]
df.shape

In [None]:
df = df.drop_duplicates(subset='코멘트번호', keep='first')
df.shape

In [None]:
import functools # not required, but helps in production
def unpack_df_columns(func):

    @functools.wraps(func)
    def _unpack_df_columns(*args, **kwargs):

        series = args[0]
        return func(*series.values)

    return _unpack_df_columns

def jaccard_simil(a, b):
    intersection_cardinality = len(set.intersection(*[set(str(a)), set(str(b))]))
    union_cardinality = len(set.union(*[set(str(a)), set(str(b))]))
    similar = intersection_cardinality / float(union_cardinality)
    return similar

@unpack_df_columns
def 문장병합(제목, 문제내용):
    '''
    자카드유사도가 일정 수준 이상이면 제목과 문제내용중 길이가 긴 내용만 살리기
    '''
    자카드유사도 = jaccard_simil(제목, 문제내용)
    thresh = 0.9  # 유사도 커트라인

    if 자카드유사도 > thresh:
        if len(str(제목)) > len(str(문제내용)):
            return 문제내용
        else:
            return 제목
    else:
        return f"{제목} {문제내용}"

In [None]:
df['comment'] = df[['제목',"문제내용"]].apply(문장병합, axis=1)
df.dropna(subset=["comment"], inplace=True)

In [None]:
korean_pattern = '[\u3131-\u3163\uac00-\ud7a3]+'
df = df[~df['comment'].str.contains(korean_pattern, regex=True)]

In [None]:
df1 =df[["comment", "직능"]]
df1.columns = ["text", "label"]
df1 = df1.reset_index(drop=True)
df1.head(2)

In [None]:
new_labels = {'기관':0, '기장':1, '선장':2, '전장':3, '선실':4, '건조':5, '시운전':6, '도장':7, '엔진':8, '기계':9, '공사':10}
df1["label"] = df1["label"].replace(to_replace=new_labels)
df1["label"]

In [None]:
classes = new_labels.keys()
classes = list(classes)
classes

# Train, Val, Test Dataset 분리

In [None]:
from sklearn.model_selection import train_test_split
df1.shape
train, val= train_test_split(df1, test_size=0.3, random_state=123, shuffle=True, stratify=df1["label"])
train.shape, val.shape

In [None]:
val, test= train_test_split(val, test_size=0.3, random_state=123, shuffle=True, stratify=val["label"])

val.shape, test.shape

In [None]:
!pip install datasets transformers accelerate setfit

In [None]:
from datasets import Dataset, DatasetDict

In [None]:
train1 = Dataset.from_pandas(train)
val1 = Dataset.from_pandas(val)
test1  = Dataset.from_pandas(test)

In [None]:
dataset = DatasetDict()

In [None]:
dataset["train"] = train1
dataset["validation"] = val1
dataset["test"] = test1

In [None]:
train_dataset = dataset["train"]
val_dataset = dataset["validation"]
test_dataset = dataset["test"]

In [None]:
train_dataset

In [None]:
val_dataset

In [None]:
test_dataset

In [None]:
def get_train_dataset(dataset, N):
    ids = []
    label2count = {}
    train_dataset = dataset['train'].shuffle(seed=41)
    for id, example in enumerate(train_dataset):
        if example['label'] not in label2count:
            label2count[example['label']]=1
        elif label2count[example['label']]>=N:
            continue
        else:
            label2count[example['label']]+=1
        ids.append(id)
    return train_dataset.select(ids)

In [None]:
N = 5
train_dataset = get_train_dataset(dataset, N)
train_dataset

# Setfit Few-Shot

In [None]:
from setfit import SetFitModel, Trainer, TrainingArguments
from sklearn.metrics import classification_report

In [None]:
model = SetFitModel.from_pretrained("BAAI/bge-base-en-v1.5")

args1 = TrainingArguments(
    batch_size=32,
    num_epochs=1,
    # evaluation_strategy="epoch",
    # save_strategy="epoch",
    # load_best_model_at_end=True,
)

trainer1 = Trainer(
    model=model,
    args=args1,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    metric="accuracy",
    column_mapping={"text": "text", "label": "label"}
)

In [None]:
trainer1.train()

In [None]:
metrics = trainer1.evaluate()
metrics

In [None]:
# trainer1.model._save_pretrained(save_directory="./model/")

In [None]:
preds = model.predict(test_dataset['text'])
preds

In [None]:
# print(classification_report(test_dataset['label'], preds, target_names=classes, digits=4))

In [None]:
print(classification_report(test_dataset['label'], preds, digits=4))

# Comprehand Few-Shot

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_name = 'knowledgator/comprehend_it-base'

tokenizer = AutoTokenizer.from_pretrained(model_name)

model = AutoModelForSequenceClassification.from_pretrained(model_name)
model

In [None]:
from transformers import TrainingArguments, Trainer
from transformers import DataCollatorWithPadding
from datasets import Dataset
import random
import torch
import evaluate
import numpy as np

In [None]:
accuracy = evaluate.load("accuracy")
accuracy

In [None]:
# 샘플 데이터를 10개씩 증강.. (원래 48개인데.. 10문장씩 복사 증강하여 총 데이터가 480개가 됨)

def transform_dataset(dataset, classes, template = '{}'):
   new_dataset = {'sources':[], 'targets': [], 'labels': []}

   texts = dataset['text']
   print(f"texts 앞에 두개: {texts}")
   labels = dataset['label']
   print(f"labels 앞에 두개: {labels}")

   label2count = {}
   for label in labels:
       if label not in label2count:
           label2count[label]=1
       else:
           label2count[label]+=1
   print(f"label2count: {label2count}")

   count = len(labels)
   print(f"count: {count}")

   label2prob = {label:lc/count for label, lc in label2count.items()}
   print(f"label2prob: {label2prob}")

   unique_labels = list(label2prob)
   print(f"unique_labels: {unique_labels}")

   probs = list(label2prob.values())
   print(f"probs: {probs}")

   ids = list(range(len(labels)))
   print(f"ids: {ids}")

   print(f"classes: {classes}")

   for text, label_id in zip(texts, labels):
       label = classes[label_id]
       for i in range(len(classes)-1):
           new_dataset['sources'].append(text)
           new_dataset['targets'].append(template.format(label))
           new_dataset['labels'].append(1.)

       print(f"new_dataset1: {new_dataset}")

       for i in range(len(classes)-1):
           neg_class_ = label
           while neg_class_==label:
               # neg_class_ = random.sample(classes, k=1)[0]
               neg_lbl = np.random.choice(unique_labels, p=probs)
               neg_class_ = classes[neg_lbl]

           new_dataset['sources'].append(text)
           new_dataset['targets'].append(template.format(neg_class_))
           new_dataset['labels'].append(-1.)
       print(f"new_dataset2: {new_dataset}")

   result = Dataset.from_dict(new_dataset)
   print(f"result : {result}")
   return result

In [None]:
def compute_metrics(eval_pred):
   predictions, labels = eval_pred

   predictions = np.argmax(predictions, axis=1)

   return accuracy.compute(predictions=predictions, references=labels)

In [None]:
def tokenize_and_align_label(example):
   hypothesis = example['targets']

   seq = example["sources"]+hypothesis

   tokenized_input = tokenizer(seq, truncation=True, max_length=512,
                                                    padding="max_length")

   label = example['labels']
   if label==1.0:
       label = torch.tensor(1)
   elif label==0.0:
       label = torch.tensor(2)
   else:
       label = torch.tensor(0)
   tokenized_input['label'] = label
   return tokenized_input

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
data_collator

In [None]:
dataset = transform_dataset(train_dataset, classes)

In [None]:
tokenized_dataset = dataset.map(tokenize_and_align_label)

In [None]:
tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.1)
tokenized_dataset

In [None]:
training_args = TrainingArguments(
   output_dir='comprehendo',
   learning_rate=3e-5,
   per_device_train_batch_size=8,
   per_device_eval_batch_size=8,
   num_train_epochs=3,
   weight_decay=0.01,
   evaluation_strategy="epoch",
)

In [None]:
trainer = Trainer(
   model=model,
   args=training_args,
   train_dataset=tokenized_dataset["train"],
   eval_dataset=tokenized_dataset['test'],
   tokenizer=tokenizer,
   data_collator=data_collator,
   compute_metrics=compute_metrics,
)

In [None]:
trainer.train()

In [None]:
trainer.save_model('comprehender')

In [None]:
from transformers import pipeline
from sklearn.metrics import classification_report
from tqdm import tqdm

classifier = pipeline("zero-shot-classification",
                     model='comprehendo',tokenizer=tokenizer, device=device)

In [None]:
preds = []
label2idx = {label: id for id, label in enumerate(classes)}

for example in tqdm(test_dataset):
   pred = classifier(example['text'],classes)['labels'][0]
   idx = label2idx[pred]
   preds.append(idx)

print(classification_report(test_dataset['label'], preds,
                                        target_names=classes, digits=4))