In [2]:
import polars as pl
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import vocab
from collections import Counter
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F

FILE_PATH = "./wiki_corpus_2.01/kyoto_lexicon.csv"
tokenizer_src = get_tokenizer('spacy', language='ja_core_news_sm')
tokenizer_tgt = get_tokenizer('spacy', language='en_core_web_sm')
# どれか一つの行がうまく読み込めなかったため、truncate_tagged_linesでfield数が一致しない行は無視している(元データ51983行)

class datasets(Dataset):
    def __init__(self, text, label):
        self.jp_datas = text
        self.en_datas = label

    def __len__(self):
        return len (self.jp_datas)

    def __getitem__(self, index):
        jp = self.jp_datas[index]
        en = self.en_datas[index]
        return jp,en

class DataLoaderCreater:

    def __init__(self, file_path, src_tokenizer, tgt_tokenizer):
        self.file_path = file_path
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer

    def build_vocab(self, texts, tokenizer):
        counter = Counter()
        for text in texts:
            counter.update(tokenizer(text))
        specials = ['<unk>', '<pad>', '<start>', '<end>']
        v = vocab(counter, specials=specials, min_freq=1)
        v.set_default_index(v['<unk>'])
        return v

    def convert_text_to_indexes(self, text, vocab, tokenizer):
        return [vocab['<start>']] + [
            vocab[token] if token in vocab else vocab['<unk>'] for token in tokenizer(text.strip("\n"))
        ] + [vocab['<end>']]

    def create_dataloader(self):
        df = pl.read_csv(self.file_path, separator=",", encoding="utf-8", has_header=True, truncate_ragged_lines=True)
        df_selected = df.select([df.columns[0], df.columns[1]])
        df_jp = df_selected[:, 0]
        jp_list = df_jp.to_list()
        df_en = df_selected[:, 1]
        en_list = df_en.to_list()

        self.vocab_src = self.build_vocab(jp_list, tokenizer_src)
        self.vocab_tgt = self.build_vocab(en_list, tokenizer_tgt)
        self.vocab_src_index_to_word = self.vocab_src.get_itos()
        self.vocab_tgt_index_to_word = self.vocab_tgt.get_itos()
        self.vocab_src = self.vocab_src.get_stoi()
        self.vocab_tgt = self.vocab_tgt.get_stoi()
        self.len_src_vocab = len(self.vocab_src)
        self.len_tgt_vocab = len(self.vocab_tgt)

        src_data = pad_sequence([torch.tensor(self.convert_text_to_indexes(text, self.vocab_src, tokenizer=self.src_tokenizer)) for text in jp_list], batch_first = True, padding_value = self.vocab_src["<pad>"])
        tgt_data = pad_sequence([torch.tensor(self.convert_text_to_indexes(text, self.vocab_tgt, tokenizer=self.tgt_tokenizer)) for text in en_list], batch_first = True, padding_value = self.vocab_tgt["<pad>"])

        dataset = datasets(src_data, tgt_data)

        # データセットの長さを取得
        dataset_length = len(dataset)

        # 各分割のサイズを計算
        train_size = int(0.8 * dataset_length)
        val_size = int(0.1 * dataset_length)
        test_size = dataset_length - train_size - val_size

        # データセットをランダムに分割
        train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
        train_dataloader = DataLoader(train_dataset, batch_size=128)
        valid_dataloader = DataLoader(valid_dataset, batch_size=128)
        test_dataloader = DataLoader(test_dataset, batch_size=128)

        return train_dataloader, test_dataloader, valid_dataloader

dataloader_creater = DataLoaderCreater(FILE_PATH, tokenizer_src, tokenizer_tgt)
train_dataloader, test_dataloader, valid_dataloader = dataloader_creater.create_dataloader()
src_vocab_size = dataloader_creater.len_src_vocab
tgt_vocab_size = dataloader_creater.len_tgt_vocab
vocab_src = dataloader_creater.vocab_src
vocab_tgt = dataloader_creater.vocab_tgt
# インデックス列を文字列に変換
vacab_src_index_to_word = dataloader_creater.vocab_src_index_to_word
vacab_tgt_index_to_word = dataloader_creater.vocab_tgt_index_to_word



In [27]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import math
# EncodingとEmbeddingの違い
# 学習を行わないのがPositional Encoding
# 学習を行うのがPositional Embedding

class PositionalEncoding():
    def __init__(self, embedding_dim, len_sequence):
        self.embedding_dim = embedding_dim
        self.len_sequence = len_sequence

    def get_sin(self,i,k):
        return math.sin(i/(10000)**(k/self.len_sequence))

    def get_cos(self,i,k):
        return math.cos(i/(10000)**(k/self.len_sequence))
    
    def get_positional_vector(self):
        pe = torch.zeros(self.len_sequence, self.embedding_dim)
        for pos in range(self.len_sequence):
            for i in range(0, int(self.embedding_dim/2)):
                pe[pos, 2*i] = self.get_sin(pos, i)
                pe[pos, 2*i+1] = self.get_cos(pos, i)
        return pe


# Transformerモデルの定義
class TransformerModel(nn.Module):
    def __init__(self, vocab_size_src, vocab_size_tgt, embedding_dim, num_heads, num_layers, device,  dropout=0.1):
        super().__init__()
        self.device = device
        # Positional Encoderを加算する必要あり
        self.embedding_src = nn.Embedding(vocab_size_src, embedding_dim)
        self.embedding_tgt = nn.Embedding(vocab_size_tgt, embedding_dim)
        self.pos_embedding_src = PositionalEncoding(embedding_dim, 33)
        self.pos_embedding_tgt = PositionalEncoding(embedding_dim, 73)
        self.transformer = nn.Transformer(d_model=embedding_dim, nhead=num_heads, num_encoder_layers=num_layers, num_decoder_layers=num_layers, dropout=dropout, batch_first=True)
        self.fc_out = nn.Linear(embedding_dim, vocab_size_tgt)

    def forward(self, src, tgt):
        src = self.embedding_src(src)
        tgt = self.embedding_tgt(tgt)
        batch_size = src.shape[0]
        pos_src = self.pos_embedding_src.get_positional_vector().to(device)
        pos_src = pos_src.unsqueeze(0).repeat(batch_size, 1, 1)
        pos_tgt = self.pos_embedding_tgt.get_positional_vector().to(device)
        pos_tgt = pos_tgt.unsqueeze(0).repeat(batch_size, 1, 1)
        src = src + pos_src
        tgt = tgt + pos_tgt
        output = self.transformer(src, tgt)
        output = self.fc_out(output)
        return output



# モデルのハイパーパラメータ
embedding_dim = 512
num_heads = 8
num_layers = 6
device = torch.device('cuda:7' if torch.cuda.is_available() else 'cpu')
# モデルの初期化
# モデルの初期化
model = TransformerModel(src_vocab_size, tgt_vocab_size, embedding_dim, num_heads, num_layers, device, dropout=0).to(device)

# 保存された重みをロード
model.load_state_dict(torch.load("./model_weight.pth", map_location=device))
model.eval()  # モデルを評価モードに設定


TransformerModel(
  (embedding_src): Embedding(30957, 512)
  (embedding_tgt): Embedding(37917, 512)
  (transformer): Transformer(
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-5): 6 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
          )
          (linear1): Linear(in_features=512, out_features=2048, bias=True)
          (dropout): Dropout(p=0, inplace=False)
          (linear2): Linear(in_features=2048, out_features=512, bias=True)
          (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0, inplace=False)
          (dropout2): Dropout(p=0, inplace=False)
        )
      )
      (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    )
    (decoder): TransformerDecoder(
      (layers): ModuleList(
      

In [30]:
import torch
import torch.nn.functional as F

# インデックスから単語に変換する関数
def indexes_to_sentence(indexes, vocab_index_to_word):
    return ' '.join([vocab_index_to_word[idx] for idx in indexes])

# 翻訳関数
def translate_sentence(model, sentence, src_vocab, tgt_vocab, src_tokenizer, src_vocab_index_to_word, tgt_vocab_index_to_word, device, max_len=73):
    model.eval()
    tokens = [src_vocab['<start>']] + [src_vocab.get(token, src_vocab['<unk>']) for token in src_tokenizer(sentence)] + [src_vocab['<end>']]
    src_tensor = torch.tensor(tokens).unsqueeze(0).to(device)
    src_tensor = F.pad(src_tensor, (0, 33 - src_tensor.shape[1]), 'constant', src_vocab['<pad>'])
    
    # エンコーダの出力を取得
    memory = model.transformer.encoder(model.embedding_src(src_tensor) + model.pos_embedding_src.get_positional_vector().unsqueeze(0).to(device))

    # デコーダの入力
    tgt_indexes = [tgt_vocab['<start>']]
    
    for _ in range(max_len):
        tgt_tensor = torch.tensor(tgt_indexes).unsqueeze(0).to(device)
        tgt_tensor = F.pad(tgt_tensor, (0, 73 - tgt_tensor.shape[1]), 'constant', tgt_vocab['<pad>'])
        
        # デコーダの出力を取得
        output = model.transformer.decoder(model.embedding_tgt(tgt_tensor) + model.pos_embedding_tgt.get_positional_vector().unsqueeze(0).to(device), memory)
        output = model.fc_out(output)
        next_token = output.argmax(2)[:, len(tgt_indexes) - 1].item()
        tgt_indexes.append(next_token)
        
        if next_token == tgt_vocab['<end>']:
            break
    
    translated_sentence = indexes_to_sentence(tgt_indexes, tgt_vocab_index_to_word)
    return translated_sentence

# 任意の日本語の文章
jp_sentence = "こんにちは、元気ですか？"

# 翻訳の実行
translated_sentence = translate_sentence(model, jp_sentence, vocab_src, vocab_tgt, tokenizer_src, vacab_src_index_to_word, vacab_tgt_index_to_word, device)
print(translated_sentence)



<start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start>
