In [45]:
!pip install konlpy



In [46]:
import tensorflow as tf
import random
from konlpy.tag import Okt #openkorean
#konlpy : 한국어처리, okt는 트위터에서 만든 형태소 분석 패키지

In [47]:
EPOCHS = 200
NUM_WORDS = 2000

#Encoder

In [48]:
class Encoder(tf.keras.Model):
    def __init__(self):
        super(Encoder,self).__init__()
        self.emb = tf.keras.layers.Embedding(NUM_WORDS,64)
        self.lstm = tf.keras.layers.LSTM(512,return_state = True) #return state가 들어가야 hidden과 cell state
    def __call__(self, x, training = False, maks = None):
        x = self.emb(x)
        _, h, c = self.lstm(x)
        return h,c

#Decoder

In [49]:
class Decoder(tf.keras.Model):
    def __init__(self):
        super(Decoder,self).__init__()
        self.emb = tf.keras.layers.Embedding(NUM_WORDS,64)
        self.lstm = tf.keras.layers.LSTM(512,return_sequences=True,return_state=True)#return_sequence가 true여서 dense에 들어가는게 문장 전체로 들어가게 된다.
        self.dense = tf.keras.layers.Dense(NUM_WORDS, activation='softmax')
    
    def __call__(self,inputs, training =False,mask =None):
        x,h,c=inputs#shifted output, hidden, cellstate
        x = self.emb(x)
        x,h,c = self.lstm(x,initial_state=[h,c])#맨처음에 encoder에서 나온걸 넣어주고 나머진 shift
        return self.dense(x), h, c

#Seq2seq 모델 정의


In [50]:
class Seq2seq(tf.keras.Model):
    def __init__(self, sos, eos):
        super(Seq2seq, self).__init__()
        self.enc = Encoder()
        self.dec = Decoder()
        self.sos = sos#start of sequence
        self.eos = eos#end of sequence

    def __call__(self, inputs,training = False, maks = None):
        #autograph
        if training is True:
            x, y =inputs #output을 dec에 shifted 출력을 넣어야 하니까.
            h, c = self.enc(x) #hidden 과 cell state를 저장
            y, _, _ = self.dec((y,h,c))
            return y #y는 decorder결과라 전체 문장이 된다.

        #shifted 된걸 training에선 feeding하기 때문에 더 쉬운데
        #test땐 마지막 출력을 feeding해야하기 때문에 좀 더 복잡해진다.
        else:
            x = inputs
            h, c = self.enc(x) #context계산까진 똑같다.
            y = tf.convert_to_tensor(self.sos)
            y = tf.reshape(y,(1,1))

            seq = tf.TensorArray(tf.int32, 64)#최대 64의 길이로
            #autograph가 됨 tf for문이됨 다 tf로 쓰면
            for idx in tf.range(64):
                y, h, c = self.dec([y,h,c])# h,c는 encorder에서 받아온 hidden, cell state다. for 문 을 통해 이전 타임스텝의 값을 shift해서 얻게 된다.
                y = tf.cast(tf.argmax(y, axis = -1),dtype=tf.int32)#one-hot vector로 바꿔준다.sparse하게
                y = tf.reshape(y,(1,1)) #dimension이 하나가 되는데, batch하나의 값이라는 걸 표현하기 위해 고쳐준다.
                seq = seq.write(idx,y)

                if y == self.eos:
                    break
            return tf.reshape(seq.stack(),(1,64))#seq에 만들어둔걸 쌓아두는 작업이다.그리고 (배치,문장 최대길이)로 리쉐이프해서 출력해준다.

#루프 정의


In [51]:
@tf.function
def train_step(model,inputs,labels,loss_object,optimizer,train_loss,train_accuracy):
    output_labels = labels[:,1:]#맨앞은 sos라 뺀게 포함이 되고,
    shifted_labels = labels[:,:-1]#학습시 input으로 같이 넣어준다. seq2seq의 y이다. sos포함 eos빠짐. 즉, 오른쪽으로 shift된거
    with tf.GradientTape() as tape:
        predictions = model([inputs,shifted_labels],training = True)
        loss = loss_object(output_labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients,model.trainable_variables))
    train_loss(loss)
    train_accuracy(output_labels,predictions)
  
@tf.function
def test_step(model,inputs,labels,loss_object,test_loss,test_accuracy):
    return model(inputs,training = True)#배치하나만 받아서 시퀀스 출력해준다!

#데이터셋

In [52]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [53]:
dataset_file = '/content/gdrive/My Drive/Colab Notebooks/aop-딥러닝-인공지능-강의자료/chatbot_data.csv'
okt = Okt()
#질문, 답, 질문, 답으로 이루어짐 카페에서의 문답!
with open(dataset_file, 'r') as file:
    lines = file.readlines()
    seq = [''.join(okt.morphs(line))for line in lines]#morphs는 한 줄 한줄 형태소 분석해주는거이다.
    #tokenizer에서 space를 기준으로 하기 때문에 ' '에 join을 해서 넣어준다.

questions = seq[::2]#홀수행
answers = ['\t'+lines for lines in seq[1::2]]#첫번째부터 짝수행을 가져온다. 탭의 역할은 start of sequence그리고 \n은 eos

num_sample = len(questions)

perm = list(range(num_sample))
random.seed(0)
random.shuffle(perm)

train_q = list()
train_a = list()

test_q = list()
test_a = list()

for idx, qna in enumerate(zip(questions, answers)):
    q, a = qna
    if perm[idx] > num_sample//5:#80%training, 
        train_q.append(q)
        train_a.append(a)

    else:#20% test
        test_q.append(q)
        test_a.append(a)

tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=NUM_WORDS,filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~')#빈도수가 높은 num_words개수로 토크나이즈

tokenizer.fit_on_texts(train_q+train_a)


#sparse 인코딩된 단어형태
train_q_seq = tokenizer.texts_to_sequences(train_q)
train_a_seq = tokenizer.texts_to_sequences(train_a)

test_q_seq = tokenizer.texts_to_sequences(test_q)
test_a_seq = tokenizer.texts_to_sequences(test_a)

x_train = tf.keras.preprocessing.sequence.pad_sequences(train_q_seq,
                                                        value=0,
                                                        padding='pre',
                                                        maxlen=64)
y_train = tf.keras.preprocessing.sequence.pad_sequences(train_a_seq,
                                                        value=0,
                                                        padding='post',#결과니까 뒤에다가 0으로 padding한다.
                                                        maxlen=65)#맨뒤의 eos는 떼고 사용하니까
x_test = tf.keras.preprocessing.sequence.pad_sequences(test_q_seq,
                                                        value=0,
                                                        padding='pre',
                                                        maxlen=64)
y_test = tf.keras.preprocessing.sequence.pad_sequences(test_a_seq,
                                                        value=0,
                                                        padding='post',
                                                        maxlen=65)

train_ds = tf.data.Dataset.from_tensor_slices((x_train,y_train)).shuffle(10000).batch(32).prefetch(1024)
test_ds = tf.data.Dataset.from_tensor_slices((x_train,y_train)).batch(1).prefetch(1024)

#학습 환경 정의

In [54]:
model = Seq2seq(sos = tokenizer.word_index['\t'],
                eos = tokenizer.word_index['\n'])

loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

In [56]:
for epoch in range(EPOCHS):
  for seqs,labels in train_ds:
    train_step(model, seqs, labels, loss_object, optimizer, train_loss, train_accuracy)

  template = 'Epoch:{}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
  print(template.format(epoch+1,
                        train_loss.result(),
                        train_accuracy.result()*100,
                        test_loss.result(),
                        test_accuracy.result()*100))
  train_loss.reset_states()
  train_accuracy.reset_states()

KeyboardInterrupt: ignored