<a href="https://colab.research.google.com/github/sol-sun/Generative_Deep_Learning_2nd_Pytorch/blob/main/09_transformer/01_gpt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 🚀 GPT

本ノートブックでは，ワインのレビューデータセットで独自のGPTモデルを実装し訓練する．

In [None]:
!pip install datasets | tail -n 1
!pip install japanize_matplotlib | tail -n 1
!pip install kagglehub | tail -n 1
!pip install torchinfo | tail -n 1

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
gcsfs 2024.10.0 requires fsspec==2024.10.0, but you have fsspec 2024.9.0 which is incompatible.[0m[31m
[0mSuccessfully installed datasets-3.1.0 dill-0.3.8 fsspec-2024.9.0 multiprocess-0.70.16 xxhash-3.5.0
Successfully installed japanize_matplotlib-1.1.3
Successfully installed torchinfo-1.8.0


In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import japanize_matplotlib
import torchinfo
import copy
import re
import os
import string
import json
import random
import kagglehub
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, PretrainedConfig, PreTrainedModel, pipeline
from transformers.models.auto.modeling_auto import MODEL_FOR_CAUSAL_LM_MAPPING_NAMES
from transformers.modeling_outputs import CausalLMOutput
from datasets import load_dataset
from datasets import Dataset as HFDataset

import sys
print(sys.version)
print(torch.__version__)

3.10.12 (main, Nov  6 2024, 20:22:13) [GCC 11.4.0]
2.5.1+cu121


In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.backends.mps.is_available():
        # MPSを使用する場合の設定
        torch.backends.mps.deterministic = True
    elif torch.cuda.is_available():
        # CUDAが利用可能な場合
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

def save_model(model, path):
    os.makedirs(os.path.dirname(path), exist_ok=True)  # ディレクトリを作成
    model.eval()
    torch.save(model.state_dict(), path)

def load_model(model, path, device):
    model.load_state_dict(torch.load(path, map_location=device))
    model = model.to(device)
    model.eval()
    print(f"Model loaded from {path}")
    return model

def save_metrics(metrics_per_epoch, filename):
    # ディレクトリが存在しない場合は作成
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    # ファイルにメトリクスを保存
    with open(filename, "w") as f:
        json.dump(metrics_per_epoch, f, indent=2)

# シード値を設定
set_seed(1234)

if torch.cuda.is_available():
    device = torch.device('cuda')
elif torch.backends.mps.is_available():
    device = torch.device('mps')
else:
    device = torch.device('cpu')

print(device)

cpu


## 0. パラメータ

In [None]:
# VOCAB_SIZE = 10000
MAX_LEN = 80
EMBEDDING_DIM = 128 #256
KEY_DIM = 128#256
N_HEADS = 2
FEED_FORWARD_DIM = 128#256
VALIDATION_SPLIT = 0.2
SEED = 42
LOAD_MODEL = False
BATCH_SIZE = 64#32
EPOCHS = 20 #5

## 1. データ準備・トークン化

In [None]:
def review_create(x):
    eos = '[SEP]'  # EOSとしてSEPを用いる
    x["wine_review"] = "wine review : " + x["country"] + " : " + x["province"] + " : " + x["variety"] + " : " + x[
        "description"] + eos
    return x

# テキストをベクトル化して、xとyを作成する関数
def tokenize_and_shift(batch, tokenizer):
    texts = [text for text in batch["wine_review"]]
    encoding = tokenizer(
        texts,
        padding="max_length",
        truncation=True,
        max_length=MAX_LEN + 1,
        return_tensors="pt",
        add_special_tokens=False  # 特殊トークンを追加しない．
        # 確認用
        # print(tokenizer.encode(tokenizer.pad_token), add_special_tokens=True)
        # print(tokenizer.encode(tokenizer.sep_token), add_special_tokens=True)
        # print(tokenizer.encode(tokenizer.cls_token), add_special_tokens=True)
    )

    x = encoding["input_ids"][:, :-1]
    y = encoding["input_ids"][:, 1:]


In [None]:
path = kagglehub.dataset_download("zynicide/wine-reviews")
print("Path to dataset files:", path)

# データセットを読み込む
trainset = load_dataset("json", data_files=os.path.join(path, 'winemag-data-130k-v2.json'), split="train")
# Tokenizerを読み込む
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
# SEPをEOS（文の終わり）として扱う
tokenizer.eos_token_id = tokenizer.sep_token_id
VOCAB_SIZE = tokenizer.vocab_size
print(VOCAB_SIZE)

# フィルタ
trainset = trainset.filter(lambda x: x['country'] is not None and x['province'] is not None
                                        and x['variety'] is not None and x['description'] is not None,
                            )

trainset = trainset.map(review_create)  # wine_reviewを作成する
trainset = trainset.map(tokenize_and_shift, batched=True,
                        fn_kwargs={'tokenizer': tokenizer},  # xとyを作成する
                        remove_columns=['points', 'title', 'description', 'taster_name', 'taster_twitter_handle', 'price', 'designation', 'variety', 'region_1', 'region_2', 'province', 'country', 'winery'])
trainset = trainset.with_format("torch")

print(trainset)
n_wines = len(trainset)
print(f"{n_wines} recipes loaded")

# データローダーの作成
trainloader = DataLoader(
    trainset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    pin_memory=True,
)

Downloading from https://www.kaggle.com/api/v1/datasets/download/zynicide/wine-reviews?dataset_version_number=4...


100%|██████████| 50.9M/50.9M [00:00<00:00, 75.5MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/zynicide/wine-reviews/versions/4


Generating train split: 0 examples [00:00, ? examples/s]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

30522


Filter:   0%|          | 0/129971 [00:00<?, ? examples/s]

Map:   0%|          | 0/129907 [00:00<?, ? examples/s]

Map:   0%|          | 0/129907 [00:00<?, ? examples/s]

Dataset({
    features: ['points', 'title', 'description', 'taster_name', 'taster_twitter_handle', 'price', 'designation', 'variety', 'region_1', 'region_2', 'province', 'country', 'winery', 'wine_review'],
    num_rows: 129907
})
129907 recipes loaded


In [None]:
# レシピデータセットを確認
example_data = trainset[25]["wine_review"]
print(example_data)

wine review : US : California : Pinot Noir : Oak and earth intermingle around robust aromas of wet forest floor in this vineyard-designated Pinot that hails from a high-elevation site. Small in production, it offers intense, full-bodied raspberry and blackberry steeped in smoky spice and smooth texture.[SEP]


In [None]:
# 語彙とそのインデックスを表示
vocab = tokenizer.get_vocab()
for word, i in sorted(vocab.items(), key=lambda x:x[1], reverse=True)[:10]:
    print(f"{i} : {word}")

30521 : ##～
30520 : ##？
30519 : ##：
30518 : ##／
30517 : ##．
30516 : ##－
30515 : ##，
30514 : ##）
30513 : ##（
30512 : ##！


## 2. トランスフォーマーブロックの作成

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, num_heads, key_dim, embed_dim, ff_dim, dropout_rate=0.1):
        super().__init__()
        self.num_heads = num_heads  # マルチヘッドアテンション層のヘッド数
        self.key_dim = key_dim  # キー（とクエリー）のベクトル次元
        self.embed_dim = embed_dim  # 入力のベクトル次元
        self.ff_dim = ff_dim
        self.dropout_rate = dropout_rate
        self.attn = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout_rate,
                                          kdim=key_dim, vdim=key_dim, batch_first=True)
        self.dropout_1 = nn.Dropout(dropout_rate)
        self.ln_1 = nn.LayerNorm(eps=1e-6, normalized_shape=embed_dim)
        self.ffn_1 = nn.Linear(in_features=embed_dim, out_features=ff_dim)
        self.ffn_2 = nn.Linear(in_features=ff_dim, out_features=embed_dim)
        self.dropout_2 = nn.Dropout(dropout_rate)
        self.ln_2 = nn.LayerNorm(eps=1e-6, normalized_shape=embed_dim)

        self.relu = nn.ReLU()

    @staticmethod
    def generate_square_subsequent_mask(size, dtype, device=None):
        mask = torch.triu(torch.full(size=(size, size), fill_value=float('-inf'), device=device), diagonal=1)
        mask = mask.to(dtype)
        return mask

    def forward(self, x):
        batch_size, seq_len, embed_dim = x.shape
        causal_mask = self.generate_square_subsequent_mask(size=seq_len, dtype=torch.bool, device=x.device)
        attention_output, attention_scores = self.attn(x, x, x, attn_mask=causal_mask, is_causal=True)
        attention_output = self.dropout_1(attention_output)
        out1 = self.ln_1(x + attention_output)
        ffn_1 = self.relu(self.ffn_1(out1))
        ffn_2 = self.ffn_2(ffn_1)
        ffn_output = self.dropout_2(ffn_2)
        return (self.ln_2(out1 + ffn_output)), attention_scores


class TokenAndPositionEmbedding(nn.Module):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.maxlen = maxlen
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.token_emb = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embed_dim, padding_idx=0)
        self.pos_emb = nn.Embedding(num_embeddings=maxlen, embedding_dim=embed_dim, padding_idx=0)

    def forward(self, x):
        maxlen = x.size()[-1]
        positions = torch.arange(maxlen, dtype=torch.long, device=x.device)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

## 3. GPTモデルの作成

In [None]:
class GPT(PreTrainedModel):
    def __init__(self, config, max_len, vocab_size, embed_dim, num_heads, ff_dim, dropout_rate=0.1):
        super().__init__(config)
        self.token_and_position_embedding = TokenAndPositionEmbedding(maxlen=max_len, vocab_size=vocab_size, embed_dim=embed_dim)
        self.transformer = TransformerBlock(num_heads=num_heads, key_dim=embed_dim, embed_dim=embed_dim, ff_dim=ff_dim, dropout_rate=dropout_rate)
        self.fc = nn.Linear(in_features=embed_dim, out_features=vocab_size)

    def forward(self, input_ids, attention_mask=None, return_dict=False, token_type_ids=None):
        input_ids = self.token_and_position_embedding(input_ids)
        input_ids, attention_scores = self.transformer(input_ids)
        input_ids = self.fc(input_ids)
        if not return_dict:
            return input_ids, attention_scores
        return CausalLMOutput(logits=input_ids)

    def prepare_inputs_for_generation(self, input_ids, **kwargs):
        return {"input_ids": input_ids}

In [None]:
config = PretrainedConfig()
config.is_decoder = True
config.eos_token_id = tokenizer.eos_token_id

model = GPT(config=config, max_len=MAX_LEN, vocab_size=VOCAB_SIZE, embed_dim=EMBEDDING_DIM, num_heads=N_HEADS,
                ff_dim=FEED_FORWARD_DIM, dropout_rate=0.1)
print(model)
torchinfo.summary(model.eval(), input_size=(32, MAX_LEN), dtypes=[torch.long])

GPT has generative capabilities, as `prepare_inputs_for_generation` is explicitly overwritten. However, it doesn't directly inherit from `GenerationMixin`. From 👉v4.50👈 onwards, `PreTrainedModel` will NOT inherit from `GenerationMixin`, and this model will lose the ability to call `generate` and other related functions.
  - If you are the owner of the model architecture code, please modify your model class such that it inherits from `GenerationMixin` (after `PreTrainedModel`, otherwise you'll get an exception).
  - If you are not the owner of the model architecture class, please contact the model code owner to update it.


GPT(
  (token_and_position_embedding): TokenAndPositionEmbedding(
    (token_emb): Embedding(30522, 128, padding_idx=0)
    (pos_emb): Embedding(80, 128, padding_idx=0)
  )
  (transformer): TransformerBlock(
    (attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
    )
    (dropout_1): Dropout(p=0.1, inplace=False)
    (ln_1): LayerNorm((128,), eps=1e-06, elementwise_affine=True)
    (ffn_1): Linear(in_features=128, out_features=128, bias=True)
    (ffn_2): Linear(in_features=128, out_features=128, bias=True)
    (dropout_2): Dropout(p=0.1, inplace=False)
    (ln_2): LayerNorm((128,), eps=1e-06, elementwise_affine=True)
    (relu): ReLU()
  )
  (fc): Linear(in_features=128, out_features=30522, bias=True)
)


Layer (type:depth-idx)                   Output Shape              Param #
GPT                                      [32, 80, 30522]           --
├─TokenAndPositionEmbedding: 1-1         [32, 80, 128]             --
│    └─Embedding: 2-1                    [80, 128]                 10,240
│    └─Embedding: 2-2                    [32, 80, 128]             3,906,816
├─TransformerBlock: 1-2                  [32, 80, 128]             --
│    └─MultiheadAttention: 2-3           [32, 80, 128]             66,048
│    └─Dropout: 2-4                      [32, 80, 128]             --
│    └─LayerNorm: 2-5                    [32, 80, 128]             256
│    └─Linear: 2-6                       [32, 80, 128]             16,512
│    └─ReLU: 2-7                         [32, 80, 128]             --
│    └─Linear: 2-8                       [32, 80, 128]             16,512
│    └─Dropout: 2-9                      [32, 80, 128]             --
│    └─LayerNorm: 2-10                   [32, 80, 128]       

## 4. GPTモデルの学習

In [None]:
criterion = nn.CrossEntropyLoss(reduction="mean", ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# model = model.to(device)
# model.train()
# metrics_per_epoch = {
#     "loss": [],
# }
# for epoch in range(EPOCHS):
#     epoch_metrics = {metric: 0 for metric in metrics_per_epoch}
#     batch_count = 0

#     for i, data in enumerate(trainloader, 0):
#         inputs, labels = data["x"], data["y"]
#         inputs, labels = inputs.to(device), labels.to(device)
#         optimizer.zero_grad()
#         outputs, attention_scores = model(inputs, return_dict=False)
#         loss = criterion(outputs.permute(0, 2, 1), labels)  # 元の入力と出力を比較してlossを計算
#         loss.backward()  # backpropagation
#         optimizer.step()  # モデルのパラメータ更新

#         for key in epoch_metrics:
#             epoch_metrics[key] += loss.item()
#         batch_count += 1

#     for key in epoch_metrics:
#         epoch_metrics[key] /= batch_count
#         metrics_per_epoch[key].append(epoch_metrics[key])

#     # epoch終了時点での誤算の平均値
#     print("Epoch {}:".format(epoch + 1), epoch_metrics)

In [None]:
# # modelを保存
# save_path = f"./models/gpt_epoch_{EPOCHS}.pth"
# save_model(model, save_path)

# # metricsを保存
# metrics_save_path = f"./metrics/gpt_metrics.json"
# save_metrics(metrics_per_epoch)

### 学習済みのモデルのロードは以下

In [None]:
import urllib.request

# モデルのURL
model_url = "https://github.com/sol-sun/Generative_Deep_Learning_2nd_Pytorch/raw/main/09_transformer/models/gpt_epoch_20.pth"
local_model_path = "cgan_model_epoch_20.pth"

# モデルをダウンロード
urllib.request.urlretrieve(model_url, local_model_path)

# Move the model to the device and load it
model = load_model(model, local_model_path, device)

Model loaded from cgan_model_epoch_20.pth


  model.load_state_dict(torch.load(path, map_location=device))


## 5. GPTモデルを使った文章の生成

In [None]:
from pprint import pprint
MODEL_FOR_CAUSAL_LM_MAPPING_NAMES["gpt"] = model.__class__.__name__  # "GPT"

generator_sample = pipeline(
   'text-generation',
    model = model,
    temperature = 1.0,
    # repetition_penalty=2.0
    device=device,
    tokenizer=tokenizer, # トークナイザ
    max_new_tokens=80, # 生成する最大トークン数
    do_sample=True, # 生成をサンプル
    num_return_sequences=3, # 生成するシーケンスの数
    add_special_tokens=False # 入力テキストの前後に特殊トークンを追加しない
    )

generator_deterministic = pipeline(
   'text-generation',
    model = model,
    device=device,
    temperature = 0.05,
    # repetition_penalty=2.0
    tokenizer=tokenizer, # トークナイザ
    max_new_tokens=80, # 生成する最大トークン数
    do_sample=True, # 生成をサンプル
    num_return_sequences=3, # 生成するシーケンスの数
    add_special_tokens=False # 入力テキストの前後に特殊トークンを追加しない
    )

In [None]:
print("temperature: 1.0")
pprint(generator_sample("wine review : italy"))

temperature: 1.0
[{'generated_text': 'wine review : italy : washington : sangiovese : the way '
                    'to the nose that has a mix of wood aginged oaky herb. '
                    "this feels solid, it's not much like a minerality and "
                    'spicy dishes. but shows through 2018. the rich black '
                    "cherry. drink from 2016. it's a wine for another year."},
 {'generated_text': 'wine review : italy : washington : pinot noir : tannat : '
                    'a great price and spicy, but it shows the nose to - '
                    "production. this is fruit. with ripe blackberry, it's "
                    'juicy, with coffee bean and black liqueurboulee, with a '
                    'wine.'},
 {'generated_text': 'wine review : italy : champagne : cabernet sauvignon - '
                    'fruity wine is a generous in black - fermented pinot '
                    'noir. spicy pepper, tang, and rich in the nose on the '
                    'fi

In [None]:
print("temperature: 0.05")
pprint(generator_deterministic("wine review : italy"))

temperature: 0.05
[{'generated_text': 'wine review : italy : california : pinot noir : this is a '
                    'fine, this is a bit thorny wine with a whiff of the nose. '
                    "the palate, the palate, the palate, the palate, it's a "
                    'bit more.'},
 {'generated_text': 'wine review : italy : california : pinot noir : this is a '
                    'fine, this is a bit thorny wine with a whiff of the nose. '
                    'the palate, the palate, the palate, the palate, with a '
                    "hint of the palate, it's a bit more."},
 {'generated_text': 'wine review : italy : california : pinot noir : this is a '
                    'fine, this is a bit thorny wine with a whiff of this '
                    'wine. the palate, the palate, with a bit more fruit '
                    'flavors of the palate is a wine.'}]


## 6. アテンションスコアを表示する

In [None]:
from IPython.display import HTML
model.eval()
prompt = "wine review : US : California : Pinot Noir : "
logits, attention_scores = model(tokenizer.encode(prompt, return_tensors="pt", add_special_tokens=False), return_dict=False)

vocab = tokenizer.get_vocab()
vocab = {v: k for k, v in vocab.items()}  # インデックス→単語変換

# トークンごとの確率
probs = nn.functional.softmax(logits, dim=-1).detach().numpy()
# info の作成
info = [{
    "prompt": prompt,  # 入力文
    "atts": attention_scores.squeeze(0).detach().numpy(),  # [層数, トークン数]
    "word_probs": probs[0, -1]  # 最後のトークンの確率
}]

# print_probs 関数
def print_probs(info, vocab, top_k=5):
    for i in info:
        highlighted_text = []
        for word, att_score in zip(
            i["prompt"].split(), np.mean(i["atts"], axis=0)
        ):
            highlighted_text.append(
                '<span style="background-color:rgba(135,206,250,'
                + str(att_score / max(np.mean(i["atts"], axis=0)))
                + ');">'
                + word
                + "</span>"
            )
        highlighted_text = " ".join(highlighted_text)
        display(HTML(highlighted_text))

        word_probs = i["word_probs"]
        p_sorted = np.sort(word_probs)[::-1][:top_k]
        i_sorted = np.argsort(word_probs)[::-1][:top_k]
        for p, idx in zip(p_sorted, i_sorted):
            print(f"{vocab[int(idx)]}:   \t{np.round(100*p,2)}%")
        print("--------\n")

# 関数呼び出し
print_probs(info, vocab)

this:   	10.67%
a:   	7.12%
the:   	4.18%
cab:   	2.55%
from:   	2.23%
--------

