In [1]:

# Data 전처리
## 숫자 벡터 사전 만들기 
## 데이터 불러오기
## 데이터 x t로 나누기
## 데이터 라벨링=정수로 바꾸기(신경망 안에 벡터 사전이 있을 때) or 벡터화=1차원 배열(신경망 안에 벡터 사진이 없을 때)
### 참고로 우리는 아직 <sos>와 <eos>를 쓰지 않을 겁니다.

# AI 만들기
## AI 학습
### DataLoader에 싣기 (x, t 입력)
### Encoder와 Decoder 구현 하기 (함수 만들기)
### y = F(x) (순전파)
### 손실함수
### 역전파

In [2]:
# 필요한 모듈 import
import os
import sys
sys.path.append(os.path.join(os.path.dirname(""), ".."))
import custom
import numpy as np

# Data 전처리 모듈 : custom과 numpy만으로 가능함
from tqdm import tqdm #자료가 많으니 진행상황은 봅시다

# AI 만들기 모듈
import torch
import torch.nn as nn #클래스 형식의 신경망 함수
import torch.nn.functional as F #함수 형식의 신경망 함수
from torch.utils.data import DataLoader #데이터 batch 나눠주는 역할의 class

KeyboardInterrupt: 

In [None]:
## 숫자 벡터 사전 만들기
## ' ',1,2,3,4,5,6,7,8,9,0,+ : 12개
## ' '(pad 문자) 는 0행렬
## 나머지 11개는 원 핫 인코딩

#벡터 값 만들기
vector_list = np.zeros((1,11)) # np.array의 백터 값 저장할 pad값만 저장된 행렬
vector_list = np.append(vector_list, np.eye(11), axis = 0) # 대각선 값은 1이고 나머지는 0인 정사각 행렬
print(vector_list)
print(vector_list.shape)

#숫자-라벨링 사전
num_dict = {}
num_dict[' '] = 0

for i in range(1,10) : #1~9까지 반복문으로 넣기
    num_dict[str(i)] = i
num_dict['0'] = 10
num_dict['+'] = 11
print(num_dict)

In [None]:
## 데이터 불러오기
file_name = 'addition.txt'
with open(file_name, mode = "r") as f:
    data = f.readlines()

print(len(data))
print(data[10])

In [None]:
## 데이터 x, t로 나누기
data_x = []
data_t = []

for d in data :
    d = d.split("_")
    data_x.append(d[0])
    data_t.append(d[1].replace('\n','')) #개행문자 제거

print("x의 갯수 :",len(data_x))
print("t의 갯수 :",len(data_t))
print("10번째 x :",data_x[10])
print("10번째 t :",data_t[10])
print("10번째 x 길이 :",len(data_x[10]))
print("10번째 t 길이 :",len(data_t[10]))

In [None]:
## 데이터 라벨링=정수로 바꾸기(신경망 안에 벡터 사전이 있을 때) or 벡터화=1차원 배열(신경망 안에 벡터 사진이 없을 때)
## 라벨링 하겠습니다 (= 신경망 안에 벡터 list 넣겠습니다)

for i in range(len(data_x)) :
    temp = data_x[i]
    temp = list(temp) #문장을 문자 하나하나씩 끊어서 list화 #역산 하려면 "".join() 쓰면 됩니다
    vector = custom.word_vectorize(temp, num_dict, 7)
    data_x[i] = vector
data_x = np.array(data_x)

for i in range(len(data_t)) :
    temp = data_t[i]
    temp = list(temp)
    vector = custom.word_vectorize(temp, num_dict, 4)
    data_t[i] = vector
data_t = np.array(data_t)

print(data_x.shape)
print(data_t.shape)


In [None]:
## AI 학습
### DataLoader에 싣기 (x, t 입력)
device = "cuda" if torch.cuda.is_available() else "cpu"

tensor_x = torch.tensor(data_x, dtype = torch.long, device = device)
tensor_t = torch.tensor(data_t, dtype = torch.long, device = device)

### 앞의 40000번째 까지는 train, 뒤의 10000개는 test(val)
s = 40000
train_zip = list(zip(tensor_x[:s], tensor_t[:s])) 
test_zip = list(zip(tensor_x[s:], tensor_t[s:]))

train_dataloader = DataLoader(train_zip, batch_size=100, shuffle=True)
test_dataloader = DataLoader(test_zip, batch_size=1000, shuffle=False)
print(len(train_dataloader))
print(len(test_dataloader))

In [None]:
### Encoder와 Decoder 구현 하기 (함수 만들기)

class Encoder(nn.Module) :
    def __init__(self, vector) :
        super().__init__()
        self.embedding = nn.Embedding.from_pretrained(vector, freeze = True, padding_idx=0) # 벡터화 함수, b값이 없는 선형함수, 차원 = (단어갯수, 벡터갯수)
        #.from_pretrained : W값을 해당 벡터로 만들겠다
        self.rnn = nn.LSTM(vector.shape[1], vector.shape[1], batch_first = True, bidirectional=True)# RNN 함수
    def forward(self, x) :
        x = self.embedding(x)
        y, hc = self.rnn(x) #LSTM은 hc
        return y, hc

class Decoder(nn.Module) :
    def __init__(self, vector, max_len) :
        super().__init__()
        self.embedding = nn.Embedding.from_pretrained(vector, freeze = True, padding_idx=0) #벡터화 함수
        self.rnn = nn.LSTM(vector.shape[1], vector.shape[1], batch_first = True) # RNN 함수
        self.f = nn.Linear(vector.shape[1], vector.shape[0]) #벡터 값을 다시 단어 라벨값으로 역산하는 함수
        self.max_len = max_len #문장 단어의 최대 갯수
    def forward(self, encoder_output, encoder_hc, t = None) :
        decoder_y_list = [] #출력값

        decoder_x = torch.zeros((encoder_output.shape[0],1)).type(torch.long).to(encoder_output.device) #첫번째 입력값, 라벨값이기 때문에 차원 = (문장 갯수,1), 문장 갯수는 encoder_output에서 가져옵니다
        #.type(torch.long) 은 형변환, .to(encoder_output.device) 은 기기 배정
        encoder_h_for = encoder_hc[0][::2]
        encoder_h_back= encoder_hc[0][1::2]
        encoder_h = encoder_h_for + encoder_h_back
        encoder_c_for = encoder_hc[1][::2]
        encoder_c_back= encoder_hc[1][1::2]
        encoder_c = encoder_c_for + encoder_c_back
        decoder_hc = (encoder_h, encoder_c) #첫번째 hc값      

        for i in range(self.max_len) :
            decoder_y, decoder_hc = self.forward_cal(decoder_x, decoder_hc) # RNN 계산 한블록 합니다.

            if t is not None : #teaching force, t가 존재할 때
                decoder_x = t[:,i:i+1]  # i번째 t의 단어 가져옴
            else : # greedy, t가 존재하지 않을 때
                decoder_x = torch.argmax(decoder_y, dim = -1).detach() #출력 값 중에 가장 큰값(라벨값) 가져오고, 해당 값은 미분 대상이 아님 (.detach())

            decoder_y_list.append(decoder_y) #출력값에 단어 하나하나 씩 저장

        decoder_y_list = torch.cat(decoder_y_list, dim = 1) #list를 tensor로 묶기 위해서 cat 함수 쓰는 것 decoder_y.shape = (문장 갯수, 단어 갯수(1개), 벡터 갯수)
        
        return decoder_y_list, decoder_hc, None # decoder layer 2개 이상 묶어서 쓰기 위해서 hc값과, attention값도 return하기 위해 3개의 return 값
    def forward_cal(self, x, hc) : #rnn 블록하나 계산하는 함수
        x = self.embedding(x) # 벡터로 변환하고
        x, hc = self.rnn(x, hc) # rnn 계산, hc값은 이전 hc계산 결과, hc값도 따로 입력합니다
        x = self.f(x) # 벡터 계산값 역산해서 다시 라벨로 변환
        return x, hc
        
class Encoder_n_Decoder(nn.Module) : #encoder와 decoder를 하나로 묶는 새로운 신경망
    def __init__(self, encoder, decoder) :
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
    def forward(self, x, t = None) :
        y, hc = self.encoder(x)
        y, _, _ = self.decoder(y, hc, t)
        return y

In [None]:
encoder_file_name = 'data/encoder3.pt'
decoder_file_name = 'data/decoder3.pt'

tensor_vector = torch.tensor(vector_list, dtype = torch.float)
encoder = Encoder(tensor_vector).to(device)
decoder = Decoder(tensor_vector, 4).to(device)
loss_func = nn.CrossEntropyLoss()
encoder_optim = torch.optim.Adam(encoder.parameters(), lr = 0.02)
decoder_optim = torch.optim.Adam(decoder.parameters(), lr = 0.02)

epoch = 100
prev_acc = 0
cnt = 0

for e in range(epoch) :
    loss_sum = 0
    encoder.train() #dropout 켜주기
    decoder.train()
    for x, t in train_dataloader :
### y = F(x) (순전파)
        y, hc = encoder(x)
        y, _, _ = decoder(y, hc)
### 손실함수
        loss = loss_func(y.reshape(-1, y.shape[-1]) , t.reshape(-1)) #3차원을 2차원으로 펴주기
        loss_sum += loss.item()
### 역전파
        loss.backward()
        decoder_optim.step()
        encoder_optim.step()
        decoder_optim.zero_grad()
        encoder_optim.zero_grad()
    loss_sum /= len(train_dataloader) #평균 구하기
### 중간 점검
    correct = 0
    total = 0
    encoder.eval() #dropout 꺼주기
    decoder.eval()
    for x, t in test_dataloader :
        with torch.no_grad() :
            y, hc = encoder(x)
            y, _, _ = decoder(y, hc)
            correct += (y.argmax(dim = -1) == t).sum().item()
        total += len(x) * 4
    acc = correct / total
### earlystopper
    if acc <= prev_acc :
        cnt += 1
    else :
        cnt = 0
        prev_acc = acc
        torch.save(encoder, encoder_file_name) #중간 저장
        torch.save(decoder, decoder_file_name)
    print(f"epoch {e+1} | loss {loss_sum} | acc {acc} | cnt {cnt}")
    if cnt >= 5 :
        print("train stopped")
        break

In [None]:
# onnx로 변환하기 쉽게 하기 위해 다시 불러와서 cpu로 변환 한 다음에 하나로 묶는 작업

encoder = torch.load(encoder_file_name, weights_only=False, map_location='cpu') #cpu로 불러오기 
decoder = torch.load(decoder_file_name, weights_only=False, map_location='cpu')

F = Encoder_n_Decoder(encoder, decoder)
print(F)
print(F.state_dict()['encoder.embedding.weight']) #.state_dict() : 신경망 내의 W, b 값 불러오는 함수



In [None]:
# 묶은 신경망을 다시 저장

torch.save(F, "data/add_ai3.pt")