# 트랜스포머 seq2seq DKTC 자연어처리

## step0. 라이브러리 준비

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder
import re
from collections import Counter
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

## step1. 데이터 로드 및 확인

In [4]:
train_data_path ="/content/new_train.csv"
train = pd.read_csv(train_data_path)
train.head()

Unnamed: 0,idx,class,conversation
0,0,협박 대화,지금 너 스스로를 죽여달라고 애원하는 것인가?\n 아닙니다. 죄송합니다.\n 죽을 ...
1,1,협박 대화,길동경찰서입니다.\n9시 40분 마트에 폭발물을 설치할거다.\n네?\n똑바로 들어 ...
2,2,기타 괴롭힘 대화,너 되게 귀여운거 알지? 나보다 작은 남자는 첨봤어.\n그만해. 니들 놀리는거 재미...
3,3,갈취 대화,어이 거기\n예??\n너 말이야 너. 이리 오라고\n무슨 일.\n너 옷 좋아보인다?...
4,4,갈취 대화,저기요 혹시 날이 너무 뜨겁잖아요? 저희 회사에서 이 선크림 파는데 한 번 손등에 ...


In [5]:
test_data_path ="/content/new_train.csv"
test = pd.read_csv(test_data_path)
test.head()

Unnamed: 0,idx,class,conversation
0,0,협박 대화,지금 너 스스로를 죽여달라고 애원하는 것인가?\n 아닙니다. 죄송합니다.\n 죽을 ...
1,1,협박 대화,길동경찰서입니다.\n9시 40분 마트에 폭발물을 설치할거다.\n네?\n똑바로 들어 ...
2,2,기타 괴롭힘 대화,너 되게 귀여운거 알지? 나보다 작은 남자는 첨봤어.\n그만해. 니들 놀리는거 재미...
3,3,갈취 대화,어이 거기\n예??\n너 말이야 너. 이리 오라고\n무슨 일.\n너 옷 좋아보인다?...
4,4,갈취 대화,저기요 혹시 날이 너무 뜨겁잖아요? 저희 회사에서 이 선크림 파는데 한 번 손등에 ...


In [6]:
test = test.drop(test.columns[1], axis=1)
test.head(5)

Unnamed: 0,idx,conversation
0,0,지금 너 스스로를 죽여달라고 애원하는 것인가?\n 아닙니다. 죄송합니다.\n 죽을 ...
1,1,길동경찰서입니다.\n9시 40분 마트에 폭발물을 설치할거다.\n네?\n똑바로 들어 ...
2,2,너 되게 귀여운거 알지? 나보다 작은 남자는 첨봤어.\n그만해. 니들 놀리는거 재미...
3,3,어이 거기\n예??\n너 말이야 너. 이리 오라고\n무슨 일.\n너 옷 좋아보인다?...
4,4,저기요 혹시 날이 너무 뜨겁잖아요? 저희 회사에서 이 선크림 파는데 한 번 손등에 ...


In [7]:
train.isnull().sum()

Unnamed: 0,0
idx,0
class,0
conversation,0


# step2. 데이터 전처리

라벨 인코딩

In [8]:
CLASS_NAMES = ['협박 대화', '갈취 대화', '직장 내 괴롭힘 대화', '기타 괴롭힘 대화', '일반']

label_size = len(CLASS_NAMES)

encoder = LabelEncoder()
encoder.fit(CLASS_NAMES)

train['class'] = encoder.transform(train['class'])
train.head()

Unnamed: 0,idx,class,conversation
0,0,4,지금 너 스스로를 죽여달라고 애원하는 것인가?\n 아닙니다. 죄송합니다.\n 죽을 ...
1,1,4,길동경찰서입니다.\n9시 40분 마트에 폭발물을 설치할거다.\n네?\n똑바로 들어 ...
2,2,1,너 되게 귀여운거 알지? 나보다 작은 남자는 첨봤어.\n그만해. 니들 놀리는거 재미...
3,3,0,어이 거기\n예??\n너 말이야 너. 이리 오라고\n무슨 일.\n너 옷 좋아보인다?...
4,4,0,저기요 혹시 날이 너무 뜨겁잖아요? 저희 회사에서 이 선크림 파는데 한 번 손등에 ...


konlpy 형태소 분석기

In [9]:
!pip install konlpy # 형태소 분석

Collecting konlpy
  Downloading konlpy-0.6.0-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting JPype1>=0.7.0 (from konlpy)
  Downloading JPype1-1.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading konlpy-0.6.0-py2.py3-none-any.whl (19.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.4/19.4 MB[0m [31m54.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading JPype1-1.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (488 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m488.6/488.6 kB[0m [31m28.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: JPype1, konlpy
Successfully installed JPype1-1.5.0 konlpy-0.6.0


In [10]:
from konlpy.tag import Okt

# 형태소 분석기와 불용어 목록 정의
okt = Okt()

# 불용어 리스트 정의 (필요에 따라 확장 가능)
stopwords = ["이", "그", "저", "가", "을", "를", "에", "의", "와", "과", "들"]

# 전처리 함수 정의
def preprocess_sentence(sentence):
    sentence = sentence.lower()     # 1. 소문자 변환 (한국어에는 큰 영향을 미치지 않지만 영문 대비용)
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)    # 2. 구두점 양쪽에 공백 추가
    sentence = re.sub(r'[" "]+', " ", sentence)    # 3. 중복된 공백 제거
    sentence = re.sub(r"[^가-힣ㄱ-ㅎㅏ-ㅣa-zA-Z0-9.,!? ]+", " ", sentence)    # 4. 한글과 구두점 이외의 문자 제거
    sentence = re.sub(r"\s+", " ", sentence)

    tokens = okt.morphs(sentence, stem=True)     # 5. 형태소 분석-어간추출

    tokens = [word for word in tokens if word not in stopwords]    # 6. 불용어 제거

    sentence = " ".join(tokens)    # 7. 최종 문장 생성
    sentence = sentence.strip()    # 8. 좌우 공백 제거

    return sentence

# 전처리 적용
train['conversation'] = train['conversation'].apply(preprocess_sentence)
test['conversation'] = test['conversation'].apply(preprocess_sentence)

# 전처리된 데이터 확인
print(train['conversation'].head())
print(test['conversation'].head())

0    지금 너 스스로 죽이다 달라 고 애원 하다 것 인가 ? 아니다 . 죄송하다 . 죽 ...
1    길동 경찰서 이다 . 9시 40분 마트 폭발물 설치 하다 . 네 ? 똑바로 들다 한...
2    너 되게 귀엽다 알 지 ? 나 보다 작다 남자 는 첨 보다 . 그만하다 . 니 놀리...
3    어이 거기 예 ? ? 너 말 이야 너 . 이리 오라 고 무슨 일 . 너 옷 좋다 보...
4    저기 요 혹시 날 너무 뜨겁다 ? 저희 회사 에서 선크림 팔다 하다 번 손등 발라 ...
Name: conversation, dtype: object
0    지금 너 스스로 죽이다 달라 고 애원 하다 것 인가 ? 아니다 . 죄송하다 . 죽 ...
1    길동 경찰서 이다 . 9시 40분 마트 폭발물 설치 하다 . 네 ? 똑바로 들다 한...
2    너 되게 귀엽다 알 지 ? 나 보다 작다 남자 는 첨 보다 . 그만하다 . 니 놀리...
3    어이 거기 예 ? ? 너 말 이야 너 . 이리 오라 고 무슨 일 . 너 옷 좋다 보...
4    저기 요 혹시 날 너무 뜨겁다 ? 저희 회사 에서 선크림 팔다 하다 번 손등 발라 ...
Name: conversation, dtype: object


In [11]:
# 데이터 정제
clean_data=[]

for x in train['conversation']:
    clean_data.append(preprocess_sentence(x))

len(clean_data)

12092

In [12]:
# 정제된 데이터 conversation에 저장
train['conversation'] = clean_data

단어 사전 정의

In [13]:
from collections import Counter
from konlpy.tag import Okt

# Okt 형태소 분석기 인스턴스 생성
okt = Okt()

# 토큰화 실행 함수 (KoNLPy 사용)
def tokenizer(data):
    tokens = []

    # 토큰화
    for sentence in data:
        result = okt.morphs(sentence)  # 'morphs()'는 형태소만 추출
        tokens.extend(result)  # 토큰 리스트에 결과 추가

    return tokens

# 트랜스포머류 단어 사전 정의 함수
def build_vocab(tokens):
    # 단어의 빈도수 계산
    word_counts = Counter(tokens)

    # 고유한 단어 리스트 만들기 (빈도수 기준으로 정렬)
    vocab = {word: i for i, (word, _) in enumerate(word_counts.most_common())}

    vocab_size = len(vocab)

    return vocab, vocab_size


In [14]:
# 토큰화
tokens = tokenizer(train['conversation'])

# 상황에 맞게 단어 사전 생성

vocab, vocab_size = build_vocab(tokens)
# 시작 토큰과 종료 토큰에 고유한 정수를 부여
START_TOKEN, END_TOKEN = [vocab_size], [vocab_size + 1]
vocab_size = vocab_size+2

print("단어 사전:", vocab)
print("단어 수:", vocab_size)

단어 사전: {'.': 0, '?': 1, '하다': 2, '!': 3, '그렇다': 4, ',': 5, '다': 6, '있다': 7, '아니다': 8, '너': 9, '내': 10, '도': 11, '안': 12, '거': 13, '나': 14, '네': 15, '보다': 16, '야': 17, '없다': 18, '왜': 19, '아': 20, '뭐': 21, '늘다': 22, '말': 23, '은': 24, '요': 25, '좀': 26, '알다': 27, '되다': 28, '못': 29, '자다': 30, '돼다': 31, '같다': 32, '니': 33, '오다': 34, '만': 35, '가다': 36, '게': 37, '지금': 38, '좋다': 39, '사람': 40, '한테': 41, '돈': 42, '어떻다': 43, '이다': 44, '것': 45, '제': 46, '고': 47, '우리': 48, '진짜': 49, '로': 50, '주다': 51, '무슨': 52, '먹다': 53, '죄송하다': 54, '으로': 55, '님': 56, '들다': 57, '일': 58, '그냥': 59, '이야': 60, '않다': 61, '에서': 62, '어': 63, '모르다': 64, '여기': 65, '생각': 66, '그렇게': 67, '지다': 68, '까지': 69, '더': 70, '오늘': 71, '해': 72, '수': 73, '정말': 74, '싶다': 75, '때': 76, '집': 77, '너무': 78, '받다': 79, '알': 80, '랑': 81, '나다': 82, '이렇다': 83, '맞다': 84, '해주다': 85, '죽다': 86, '서다': 87, '엄마': 88, '거야': 89, '말다': 90, '이렇게': 91, '그게': 92, '나오다': 93, '난': 94, '싫다': 95, '애': 96, '줄': 97, '니까': 98, '전': 99, '면': 100, '하나': 101, '이르다': 102, '

정수 인코딩

In [15]:
# 문장 -> 숫자 시퀀스 변환(시작 / 종료 토큰 추가)

def sentence_to_sequence(wordlist):
    return [vocab[word] for word in wordlist if word in vocab]

def tokenize_input(data):

    tokenized_inputs = []

    sequences = list(map(sentence_to_sequence, data))

    for sentence in sequences:
        # 정수 인코딩 과정에서 시작 토큰과 종료 토큰을 추가
        sentence = START_TOKEN + sentence + END_TOKEN
        tokenized_inputs.append(sentence)

    return tokenized_inputs

In [16]:
# 숫자 시퀀스 변환로 변환
train_conversation= tokenize_input(train['conversation'])
train_conversation

[[17171,
  3683,
  1345,
  9,
  5208,
  5208,
  50,
  132,
  6,
  279,
  167,
  47,
  96,
  556,
  106,
  6,
  45,
  193,
  1,
  20,
  33,
  6,
  0,
  696,
  8604,
  106,
  6,
  0,
  132,
  13,
  100,
  3688,
  233,
  132,
  3683,
  1592,
  635,
  3683,
  417,
  123,
  3474,
  23,
  635,
  37,
  72,
  1,
  1394,
  132,
  6,
  2607,
  635,
  6,
  6,
  0,
  693,
  23,
  233,
  6,
  29,
  106,
  6,
  0,
  9,
  825,
  5000,
  106,
  6,
  0,
  9,
  132,
  6,
  15,
  4266,
  132,
  622,
  307,
  6,
  0,
  696,
  8604,
  106,
  6,
  0,
  693,
  23,
  233,
  6,
  29,
  106,
  6,
  0,
  9,
  37,
  10283,
  825,
  5000,
  1299,
  6,
  0,
  825,
  5000,
  29,
  106,
  6,
  9,
  15,
  4266,
  3683,
  1315,
  1242,
  635,
  132,
  6,
  2607,
  635,
  6,
  0,
  825,
  5000,
  29,
  106,
  6,
  0,
  16486,
  182,
  35,
  11,
  307,
  6,
  0,
  4913,
  2650,
  6,
  132,
  6,
  2607,
  635,
  6,
  0,
  6,
  1,
  46,
  841,
  11,
  307,
  6,
  0,
  17172],
 [17171,
  617,
  1483,
  3166,
  11684,
  7145

In [17]:
# 정수 인코딩된 데이터 conversation에 저장
train['conversation'] = train_conversation

MAX_LENGTH 이하 샘플링

In [18]:
# max_len 길이를 선택했을 때, 얼마나 많은 샘플들을 자르지 않고
# 포함할 수 있는지 통계로 확인하는 함수
def below_threshold_len(max_len, nested_list):
    cnt = 0
    for s in nested_list:
        if(len(s) <= max_len):
            cnt = cnt + 1
    print('전체 샘플 중 길이가 %s 이하인 샘플의 비율: %s'%(max_len, (cnt / len(nested_list))))

In [19]:
# 샘플의 최대 허용 길이 또는 패딩 후의 최종 길이
MAX_LENGTH = 180

In [20]:
# 통계 확인
below_threshold_len(MAX_LENGTH, train['conversation'])

전체 샘플 중 길이가 180 이하인 샘플의 비율: 0.9120079391333112


In [21]:
# MAX_LENGTH 이하 샘플링
train_data = train[train['conversation'].apply(lambda x: len(x) <= MAX_LENGTH)]

print(len(train))

12092


MAX_LENGTH로 패딩

In [22]:
# MAX_LENGTH  180으로 모든 데이터셋을 패딩
train_conversation = tf.keras.preprocessing.sequence.pad_sequences(train['conversation'],
                                                                   maxlen=MAX_LENGTH,
                                                                   padding='post')

label(target) -> list

In [23]:
# 리스트화
train_label = np.array(list(train['class']))

데이터셋 분리

In [24]:
x_train, x_test, y_train, y_test = train_test_split(train_conversation, train_label, test_size=0.25, random_state=14561)

print("x_train lenght :", len(x_train))
print("x_test lenght :", len(x_test))
print("y_train lenght :", len(y_train))
print("y_test lenght :", len(y_test))

x_train lenght : 9069
x_test lenght : 3023
y_train lenght : 9069
y_test lenght : 3023


In [25]:
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2, random_state=14561)

print("x_train lenght :", len(x_train))
print("x_val lenght :", len(x_val))
print("y_train lenght :", len(y_train))
print("y_val lenght :", len(y_val))

x_train lenght : 7255
x_val lenght : 1814
y_train lenght : 7255
y_val lenght : 1814


In [26]:
# input 데이터와 차원을 맞추기 위해 라벨 증식
y_train_expanded = np.repeat(y_train[:, np.newaxis], 180, axis=1)
y_val_expanded = np.repeat(y_val[:, np.newaxis], 180, axis=1)
y_test_expanded = np.repeat(y_test[:, np.newaxis], 180, axis=1)

데이터셋 객체 생성 (교사 강요 사용)

In [27]:
BATCH_SIZE = 64
BUFFER_SIZE = 20000

train_dataset = tf.data.Dataset.from_tensor_slices((
        x_train[:, :-1],
        y_train_expanded[:, 1:]
))
train_dataset = train_dataset.cache()
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)

In [28]:
val_dataset = tf.data.Dataset.from_tensor_slices((
        x_val[:, :-1],
        y_val_expanded[:, 1:]
))
val_dataset = val_dataset.cache()
val_dataset = val_dataset.shuffle(BUFFER_SIZE)
val_dataset = val_dataset.batch(BATCH_SIZE)
val_dataset = val_dataset.prefetch(tf.data.experimental.AUTOTUNE)

In [29]:
test_dataset = tf.data.Dataset.from_tensor_slices((
        x_test[:, :-1],
        y_test_expanded[:, 1:]
))
test_dataset = test_dataset.cache()
test_dataset = test_dataset.shuffle(BUFFER_SIZE)
test_dataset = test_dataset.batch(BATCH_SIZE)
test_dataset = test_dataset.prefetch(tf.data.experimental.AUTOTUNE)

# step3. 트랜스포머 모델링

트랜스포머에 필요한 함수들 정의 -> 여기선 인코더만 사용

In [30]:
import tensorflow as tf

class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, position, d_model):
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.positional_encoding(position, d_model)

    def get_angles(self, pos, i, d_model):
        angle_rates = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
        return pos * angle_rates

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(
            pos=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
            i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
            d_model=d_model)

        # apply sin to even indices in the array; 2i
        sines = tf.math.sin(angle_rads[:, 0::2])

        # apply cos to odd indices in the array; 2i+1
        cosines = tf.math.cos(angle_rads[:, 1::2])

        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]

        return tf.cast(pos_encoding, tf.float32)

    def call(self, inputs):
        # SparseTensor일 경우 dense로 변환
        if isinstance(inputs, tf.SparseTensor):
            inputs = tf.sparse.to_dense(inputs)

        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

In [31]:
# 스케일드 닷 프로덕트 어텐션 함수
def scaled_dot_product_attention(query, key, value, mask):
  # 어텐션 가중치는 Q와 K의 닷 프로덕트
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  # 가중치를 정규화
  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # 패딩에 마스크 추가
  if mask is not None:
    logits += (mask * -1e9)

  # softmax적용
  attention_weights = tf.nn.softmax(logits, axis=-1)

  # 최종 어텐션은 가중치와 V의 닷 프로덕트
  output = tf.matmul(attention_weights, value)
  return output

In [32]:
# 멀티헤드 어텐션
class MultiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0

    self.depth = d_model // self.num_heads

    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    self.dense = tf.keras.layers.Dense(units=d_model)

  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0, 2, 1, 3])

  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs[
        'value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # Q, K, V에 각각 Dense를 적용합니다
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # 병렬 연산을 위한 머리를 여러 개 만듭니다
    query = self.split_heads(query,batch_size)
    key = self.split_heads(key,batch_size)
    value = self.split_heads(value,batch_size)

    # 스케일드 닷 프로덕트 어텐션 함수
    scaled_attention = scaled_dot_product_attention(query, key, value, mask)

    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # 어텐션 연산 후에 각 결과를 다시 연결(concatenate)합니다
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # 최종 결과에도 Dense를 한 번 더 적용합니다
    outputs = self.dense(concat_attention)

    return outputs

In [33]:
# 마스크 패딩
def create_padding_mask(x):
  mask = tf.cast(tf.math.equal(x, 0), tf.float32)
  # (batch_size, 1, 1, sequence length)
  return mask[:, tf.newaxis, tf.newaxis, :]

In [34]:
# 인코더 하나의 레이어를 함수로 구현.
# 이 하나의 레이어 안에는 두 개의 서브 레이어가 존재합니다.
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")

  # 패딩 마스크 사용
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  # 첫 번째 서브 레이어 : 멀티 헤드 어텐션 수행 (셀프 어텐션)
  attention = MultiHeadAttention(
      d_model, num_heads, name="attention")({
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': padding_mask
      })

  # 어텐션의 결과는 Dropout과 Layer Normalization이라는 훈련을 돕는 테크닉을 수행
  attention = tf.keras.layers.Dropout(rate=dropout)(attention)
  attention = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(inputs + attention)

  # 두 번째 서브 레이어 : 2개의 완전연결층
  outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)

  # 완전연결층의 결과는 Dropout과 LayerNormalization이라는 훈련을 돕는 테크닉을 수행
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention + outputs)

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)

In [35]:
# 인코더 정의 함수
def encoder(vocab_size,
            num_layers,
            units,
            d_model,
            num_heads,
            dropout,
            name="encoder"):
  inputs = tf.keras.Input(shape=(None,), name="inputs")

  # 패딩 마스크 사용
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  # 임베딩 레이어
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))

  # 포지셔널 인코딩
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)

  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  # num_layers만큼 쌓아올린 인코더의 층.
  for i in range(num_layers):
    outputs = encoder_layer(
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
        name="encoder_layer_{}".format(i),
    )([outputs, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)

In [36]:
# 트랜스포머 모델 정의 함수
def transformer(vocab_size,
                num_layers,
                units,
                d_model,
                num_heads,
                dropout,
                name="transformer"):
  inputs = tf.keras.Input(shape=(None,), name="inputs")

  # 인코더에서 패딩을 위한 마스크
  enc_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='enc_padding_mask')(inputs)

  # 인코더
  enc_outputs = encoder(
      vocab_size=vocab_size,
      num_layers=num_layers,
      units=units,
      d_model=d_model,
      num_heads=num_heads,
      dropout=dropout,
  )(inputs=[inputs, enc_padding_mask])

  # 완전연결층
  outputs = tf.keras.layers.Dense(units=label_size, activation='softmax', name="outputs")(enc_outputs)

  return tf.keras.Model(inputs=inputs, outputs=outputs, name=name)

트랜스포머 모델 정의

In [37]:
tf.keras.backend.clear_session()

# 하이퍼파라미터
NUM_LAYERS = 2 # 인코더와 디코더의 층의 개수
D_MODEL = 256 # 인코더와 디코더 내부의 입, 출력의 고정 차원
NUM_HEADS = 8 # 멀티 헤드 어텐션에서의 헤드 수
UNITS = 512 # 피드 포워드 신경망의 은닉층의 크기
DROPOUT = 0.1 # 드롭아웃의 비율

model = transformer(
    vocab_size=vocab_size,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT)

model.summary()

손실 함수 정의

In [38]:
def loss_function(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))

    loss = tf.keras.losses.SparseCategoricalCrossentropy(reduction='none')(y_true, y_pred)

    return tf.reduce_mean(loss)

커스텀 된 학습률 정의

In [39]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps

    def __call__(self, step):
        # step을 float32로 변환
        step = tf.cast(step, tf.float32)

        arg1 = tf.math.rsqrt(step)  # 부동소수점 연산을 위해 변환
        arg2 = step * (self.warmup_steps**-1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

모델 컴파일

In [40]:
learning_rate = CustomSchedule(D_MODEL)

optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
    return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

model.compile(
    optimizer='adam',  # 또는 다른 옵티마이저
    loss='sparse_categorical_crossentropy',  # 또는 다른 손실 함수
    metrics=['accuracy']  # 필요한 메트릭
)

모델 훈련

In [41]:
EPOCHS = 50
model.fit(train_dataset, epochs=EPOCHS, validation_data=val_dataset, verbose=1)

Epoch 1/50
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m403s[0m 3s/step - accuracy: 0.6654 - loss: 1.2369 - val_accuracy: 0.8241 - val_loss: 0.5236
Epoch 2/50
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m387s[0m 3s/step - accuracy: 0.8353 - loss: 0.4682 - val_accuracy: 0.8551 - val_loss: 0.4069
Epoch 3/50
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m396s[0m 3s/step - accuracy: 0.8851 - loss: 0.3185 - val_accuracy: 0.8656 - val_loss: 0.3800
Epoch 4/50
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m391s[0m 3s/step - accuracy: 0.9183 - loss: 0.2417 - val_accuracy: 0.8609 - val_loss: 0.4509
Epoch 5/50
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m397s[0m 3s/step - accuracy: 0.9349 - loss: 0.1848 - val_accuracy: 0.8518 - val_loss: 0.4938
Epoch 6/50
[1m114/114[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m449s[0m 4s/step - accuracy: 0.9494 - loss: 0.1452 - val_accuracy: 0.8646 - val_loss: 0.4808
Epoch 7/50
[1m114/114

<keras.src.callbacks.history.History at 0x7bd030a2d360>

# step4. 평가

예측

In [42]:
y_pred = model.predict(x_test)
y_pred = np.argmax(y_pred, axis=2)

y_pred_probs=[]
for x in y_pred:

    x = np.bincount(x).argmax()
    y_pred_probs.append(x)

[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 512ms/step


In [43]:
# F1 점수 계산
f1 = f1_score(y_test, y_pred_probs, average='weighted')  # weighted는 클래스 비율에 따른 가중 평균
print("F1 Score:", f1)

F1 Score: 0.8538504664833139
