<a href="https://colab.research.google.com/github/tomonari-masada/course2023-nlp/blob/main/10_decoding_methods_in_language_models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# テキスト生成におけるdecoding
* その他の参考になる資料
 * https://www.modeldifferently.com/en/2021/12/generaci%C3%B3n-de-fake-news-con-gpt-2/
* 可視化に関して参考にした資料
 * https://mlabonne.github.io/blog/decoding/
 * この資料は、アルゴリズムの説明は正確でないため、注意。


## 言語モデルを使ったテキスト生成
* 今回は、言語モデルをテキスト生成に使う。
* テキストを生成するアルゴリズム（decodingのアルゴリズム）は複数あることを学ぶ。


### テキストはどのように生成されるか？
* 生成に使う言語モデルの最終的な出力は、語彙集合上に定義された確率分布。
 * つまり、各単語に数値が割り振られていて、それらを合計すると1になる。
 * これらの数値は、次のトークンがその単語のトークンになる確率を表している。
* しかし、次の一つのトークンを生成するために、この確率分布をどのように使うのだろうか？
 * 確率最大の単語を次々に選ぶというアルゴリズムは、良いアルゴリズムだろうか？
* 次の一つのトークンを選ぶアルゴリズムに、複数あることを今日は学ぶ。
 * いずれのアルゴリズムを使うかで、生成されるテキストの様子が違ってくる。

**ランタイムのタイプはGPUにしておいてください。**

## 準備

### インストール

* graphvizをインストールしようとして、utf8関係のエラーが出たら、以下のセルを実行してからインストールする。

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

* graphvizとtransformersのインストール

In [None]:
!sudo apt-get install graphviz graphviz-dev
!pip install transformers pygraphviz

### インポート

In [None]:
import time
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.colors import LinearSegmentedColormap
import torch

torch.manual_seed(42)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

## 言語モデル
* 今回はGPT2を使う。
 * 最初だけモデルのダウンロードに時間がかかる。
* GPT2LMHeadModelとは
"The GPT2 Model transformer with a language modeling head on top (linear layer with weights tied to the input embeddings)."
 * https://huggingface.co/docs/transformers/model_doc/gpt2#transformers.GPT2LMHeadModel

* テキストを生成させるだけなので`eval()`メソッドを呼ぶ。

### モデルの取得

In [None]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer

model = GPT2LMHeadModel.from_pretrained('gpt2').to(device)
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model.eval()

* 語彙サイズを調べる。

In [None]:
tokenizer.vocab_size

* 適当な単語のidを調べる。

In [None]:
tokenizer.get_vocab()["hello"]

* 適当なテキストをidの列に変換してみる。

In [None]:
text = "I work as a data scientist"
input_ids = tokenizer.encode(text, return_tensors='pt').to(device)
input_ids

* デフォルトの設定のもとでテキストの続きを生成させてみる。

In [None]:
outputs = model.generate(
    input_ids,
    pad_token_id=tokenizer.eos_token_id,
    max_length=100,
    )
outputs

* 生成されたid列をトークン列に変換する。

In [None]:
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"Generated text:\n\n{generated_text}")

### モデルが出力する予測logitsの確認

In [None]:
text = "I work as a data scientist"
input_ids = tokenizer.encode(text, return_tensors='pt').to(device)
input_ids

* テキストを生成させるのではなく、モデルが直接出力するデータを調べる。

In [None]:
outputs = model(input_ids)
outputs.keys()

* `logit`は確率に変換される前の値。
 * その`shape`は[シーケンス数, トークン数, 語彙サイズ]

In [None]:
outputs.logits.shape

* 次のサブワードの予測logitを取得する。

In [None]:
logits = outputs.logits[0,-1]
logits.shape

* 確率最大のサブワードを調べる。
 * 確率最大のサブワードを次のトークンに選ぶのが、greedy search。

In [None]:
torch.argmax(logits)

In [None]:
tokenizer.decode(torch.argmax(logits))

## decodingの可視化の準備

### 対数確率を求めるヘルパ関数

In [None]:
def get_log_prob(logits, token_id):
  log_probabilities = torch.nn.functional.log_softmax(logits, dim=-1)
  token_log_probability = log_probabilities[token_id].item()
  return token_log_probability

### グラフ構造を可視化するヘルパ関数

In [None]:
def plot_graph(graph, length, beams, score_type):
  fig, ax = plt.subplots(figsize=(3+1.2*beams**length, max(5, 2+length)), dpi=300, facecolor='white')

  # ノードのレイアウトを確定させる
  pos = nx.nx_agraph.graphviz_layout(graph, prog="dot")

  # 確率に応じてノードの色を決める
  assert score_type in ['token', 'sequence']
  scores = [
      data[score_type + 'score']
      for _, data in graph.nodes(data=True)
      if data['token'] is not None
      ]
  vmin = min(scores)
  vmax = max(scores)
  norm = mcolors.Normalize(vmin=vmin, vmax=vmax)
  cmap = LinearSegmentedColormap.from_list('rg', ["r", "y", "g"], N=256)

  # ノードを描く
  nx.draw_networkx_nodes(
      graph, pos, node_size=2000, node_shape='o',
      alpha=1, linewidths=4,
      node_color=scores, cmap=cmap,
      )

  # エッジを描く
  nx.draw_networkx_edges(graph, pos)

  # ラベルを描く
  suffix = "%" if score_type == "token" else ""
  labels = {
      node: data['token'].split('_')[0] + f"\n{data[score_type + 'score']:.2f}" + suffix
      for node, data in graph.nodes(data=True)
      if data['token'] is not None
      }
  nx.draw_networkx_labels(graph, pos, labels=labels, font_size=10)
  plt.box(False)

  # 確率の高低を表すカラーバーを追加する
  sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
  sm.set_array([])
  if score_type == 'token':
    label = 'Token probability (%)'
  elif score_type == 'sequence':
    label = 'Sequence score'
  fig.colorbar(sm, ax=ax, orientation='vertical', pad=0, label=label)

  plt.show()

## 🏃‍♂️ Greedy Search
* その都度確率最大のトークンを選ぶという、最もシンプルなdecoding。

### データとパラメータの設定

In [None]:
# デモンストレーション用のデータ
text = "I work as a data scientist"
input_ids = tokenizer.encode(text, return_tensors='pt').to(device)

# 可視化のパラメータ
length = 5

### decodeの実行

In [None]:
# ノードが一つのグラフを作成
graph = nx.DiGraph()
graph.add_node(graph.number_of_nodes())
node = graph.number_of_nodes() - 1

# グラフの根ノードが持つ情報を初期化する
graph.nodes[node]['tokenscore'] = 100
graph.nodes[node]['token'] = text

# 根ノードを出発点に設定する
node = 0

for n in range(length):

  # モデルが出力するlogitsを得る
  with torch.no_grad():
    outputs = model(input_ids)
  predictions = outputs.logits

  # 次のトークンを予測するlogitだけを取り出す
  logits = predictions[0][-1]

  # 確率最大のサブワードを得て、その確率の対数をとったものを求める。
  token_id = torch.argmax(logits).unsqueeze(0)
  token_score = get_log_prob(logits, token_id)

  # 可視化用のグラフに選ばれたトークンの情報を追加する。
  next_token = tokenizer.decode(token_id, skip_special_tokens=True)
  graph.add_node(graph.number_of_nodes())
  new_node = graph.number_of_nodes() - 1
  graph.add_edge(node, new_node)
  node = new_node
  graph.nodes[node]['tokenscore'] = np.exp(token_score) * 100
  graph.nodes[node]['token'] = next_token + f"_{length}"

  # 選ばれたサブワードを入力のトークン列に継ぎ足す。
  input_ids = torch.cat([input_ids, token_id.unsqueeze(0)], dim=-1)

output = tokenizer.decode(input_ids.squeeze().tolist(), skip_special_tokens=True)
print(f"Generated text: {output}")

### グラフの描画

In [None]:
plot_graph(graph, length, 1.5, 'token')

## ⚖️ Beam Search
* 複数の候補を残しつつトークンを生成していく。
* ある程度生成を進めたところでベストなトークン列を選ぶ。

### テキスト生成のヘルパ関数
* ほとんどの処理が全てのアルゴリズムに共通している。
* サンプリングの方法だけが異なる。

In [None]:
def decoding(ids, length, num_beams, sampling, temperature=0.1, top_k=20, nucleus_p=0.5):

  input_ids = ids

  # ノードが一つのグラフを作成
  graph = nx.DiGraph()
  graph.add_node(graph.number_of_nodes())
  node = graph.number_of_nodes() - 1

  # グラフの根ノードが持つ情報を初期化する
  graph.nodes[node]['tokenscore'] = 100
  graph.nodes[node]['score'] = 0
  graph.nodes[node]['sequencescore'] = 0
  graph.nodes[node]['token'] = text

  # 根ノードを出発点に設定する
  nodes = [node]

  for n in range(length):
    # モデルが出力するlogitsを得る
    with torch.no_grad():
      outputs = model(input_ids)
    predictions = outputs.logits

    # 次のトークンを予測するlogitだけを取り出す
    logits = predictions[:,-1]

    if sampling == 'greedy':
      top_token_ids = torch.topk(logits, num_beams, dim=-1).indices
    elif sampling == 'top_k':
      top_token_ids = top_k_sampling(logits, top_k, num_beams, temperature=temperature)
    elif sampling == 'nucleus':
      top_token_ids = nucleus_sampling(logits, nucleus_p, num_beams, temperature=temperature)

    assert len(nodes) == input_ids.shape[0]

    # 各シーケンスの末尾に選ばれたトークンを追加する
    node_scores = list()
    for i in range(input_ids.shape[0]):
      node = nodes[i]
      for j, token_id in enumerate(top_token_ids[i]):

        # 予測されたトークンの対数確率を求めて、ここまでのシーケンスの対数確率に加算する
        token_score = get_log_prob(logits[i], token_id)

        graph.add_node(graph.number_of_nodes())
        new_node = graph.number_of_nodes() - 1
        graph.add_edge(node, new_node)

        cumulative_score = graph.nodes[node]['score'] + token_score
        graph.nodes[new_node]['score'] = cumulative_score
        graph.nodes[new_node]['tokenscore'] = np.exp(token_score) * 100
        graph.nodes[new_node]['sequencescore'] = cumulative_score / (input_ids.shape[1] + 1)
        token = tokenizer.decode(token_id, skip_special_tokens=True)
        graph.nodes[new_node]['token'] = token + f"_{n}_{i}"
        node_scores.append((new_node, cumulative_score, i, token_id, token_score))

    # スコアの降順にノードをソート
    node_scores.sort(key=lambda a: a[1], reverse=True)

    new_nodes = list()
    new_input_ids = list()
    for i, node_score in enumerate(node_scores[:num_beams]):
      new_node, _, sequence_id, token_id, token_score = node_score
      print(f"node id: {new_node}, token score:{token_score:.3f},",
            "token:", tokenizer.decode(token_id, skip_special_tokens=True))
      # 予測されたトークンを、ここまでのシーケンスの末尾に繋げる。
      new_nodes.append(new_node)
      new_input_ids.append(
          torch.cat([input_ids[sequence_id], token_id.unsqueeze(0)],
                    dim=-1)
          )

    nodes = new_nodes
    input_ids = torch.stack(new_input_ids)

  return graph

### データとパラメータの設定

In [None]:
# デモンストレーション用のデータ
text = "I work as a data scientist"
input_ids = tokenizer.encode(text, return_tensors='pt').to(device)

# 可視化のパラメータ
length = 5

# ビームサーチのパラメータ
num_beams = 2

### decodeの実行

In [None]:
graph = decoding(input_ids, length, num_beams, 'greedy')

### スコア最大のトークン列を取得するヘルパ関数

In [None]:
def get_best_sequence(graph):
  depths = nx.shortest_path_length(graph, 0)
  max_depth = max(depths.values())
  leaf_nodes = [node for node in graph.nodes if depths[node] == max_depth]
  max_score_node = None
  max_score = float('-inf')
  for node in leaf_nodes:
    if graph.nodes[node]['sequencescore'] > max_score:
      max_score = graph.nodes[node]['sequencescore']
      max_score_node = node
    path = nx.shortest_path(graph, source=0, target=max_score_node)
    sequence = "".join([graph.nodes[node]['token'].split('_')[0] for node in path])
  return sequence, max_score


In [None]:
sequence, max_score = get_best_sequence(graph)
print(f"Generated text: {sequence}")

### グラフの描画
* 示されているスコアは、そのノードに至るまでのトークン列のスコア。
* ビームサーチは、最もスコアの大きいトークン列を選ぶ。

In [None]:
plot_graph(graph, length, num_beams, 'sequence')

## 🎲 Top-k sampling
* top-kサンプリングは、確率が高いk個のサブワードから・・・
* それらの確率に従ってランダムに次のトークンを選ぶ。

### サンプリングを行う関数

In [None]:
def top_k_sampling(logits, top_k, num_beams, temperature=1.0, plot=False):
  assert top_k >= 1
  assert num_beams <= top_k

  logit_k = torch.topk(logits, top_k, dim=-1).values[:,-1]
  indices_to_remove = logits < logit_k.unsqueeze(-1)
  new_logits = torch.clone(logits)
  new_logits[indices_to_remove] = float('-inf')

  # logitを確率に変換する
  probabilities = torch.nn.functional.softmax(new_logits / temperature, dim=-1)

  # top-kのサブワードからランダムにbeams個を選ぶ
  next_tokens = torch.multinomial(probabilities, num_beams)

  if plot:
    # サブワードを選ぶときに使った確率分布を描画する
    total_prob = torch.nn.functional.softmax(logits / temperature, dim=-1)
    plot_prob_distribution(total_prob, next_tokens, 'top_k', top_k)

  return next_tokens

### サブワードの確率分布を描画するヘルパ関数

In [None]:
def plot_prob_distribution(total_prob, next_tokens, sampling, potential_nb, total_nb=50):
  for i in range(total_prob.shape[0]):
    probabilities = total_prob[i]
    next_token = next_tokens[i]
    # top kのサブワードを取得
    top_k_prob, top_k_indices = torch.topk(probabilities, total_nb)
    top_k_tokens = [tokenizer.decode([idx]) for idx in top_k_indices.tolist()]

    # 次のトークンとして選ばれたサブワードの確率を取得
    next_token_list = [tokenizer.decode([idx]) for idx in next_token.tolist()]
    next_token_prob = probabilities[next_token].tolist()

    # 確率分布のを棒グラフとして描く
    plt.figure(figsize=(0.4*total_nb, 5), dpi=300, facecolor='white')
    plt.rc('axes', axisbelow=True)
    plt.grid(axis='y', linestyle='-', alpha=0.5)
    if potential_nb < total_nb:
      plt.axvline(x=potential_nb-0.5, ls=':', color='grey', label='Sampled tokens')
    plt.bar(top_k_tokens, top_k_prob.tolist(), color='blue')
    plt.bar(next_token_list, next_token_prob, color='red', label='Selected tokens')
    plt.xticks(rotation=45, ha='right', va='top')
    plt.gca().spines['top'].set_visible(False)
    plt.gca().spines['right'].set_visible(False)
    if sampling == 'top_k':
      plt.title('Probability distribution of predicted tokens with top-k sampling')
    elif sampling == 'nucleus':
      plt.title('Probability distribution of predicted tokens with nucleus sampling')
    plt.legend()
    plt.savefig(f'{sampling}_{time.time()}.png', dpi=300)
    plt.close()

### decodeの実行

In [None]:
# デモンストレーション用のデータ
text = "I work as a data scientist"
input_ids = tokenizer.encode(text, return_tensors='pt').to(device)

# 可視化のパラメータ
length = 5

# top-kサンプリングのパラメータ
temperature = 5
top_k = 20
num_beams = 2

In [None]:
graph = decoding(input_ids, length, num_beams, 'top_k', temperature=temperature, top_k=top_k)

In [None]:
sequence, max_score = get_best_sequence(graph)
print(f"Generated text: {sequence}")

### グラフの描画

In [None]:
plot_graph(graph, length, num_beams, 'sequence')

## 🔬 Nucleus sampling
* 参考資料
 * https://www.youtube.com/watch?v=JETxaSaj6_k

In [None]:
def nucleus_sampling(logits, nucleus_p, num_beams, temperature=1.0, top_k=100, plot=False):
  assert nucleus_p > 0
  assert nucleus_p <= 1

  # まずtop kのサブワードへとサンプリングの対象を絞り込む
  logit_k = torch.topk(logits, top_k, dim=-1).values[:,-1]
  indices_to_remove = logits < logit_k.unsqueeze(-1)
  new_logits = torch.clone(logits)
  new_logits[indices_to_remove] = float('-inf')

  # 次にtop pのサブワードへとサンプリングの対象を絞り込む
  sorted_logits, sorted_indices = torch.sort(logits, dim=-1, descending=True)
  probabilities = torch.nn.functional.softmax(sorted_logits / temperature, dim=-1)
  cumulative_probabilities = torch.cumsum(probabilities, dim=-1)

  # 確率の和がnucleus_pを超えるところからのサブワードをサンプリングから除外する
  # ただし、最低でもnum_beams個のサブワードは残す
  mask = cumulative_probabilities >= cumulative_probabilities[:,num_beams]
  mask = torch.logical_and(mask, cumulative_probabilities > nucleus_p)
  for i in range(logits.shape[0]):
    new_logits[i,sorted_indices[i,mask[i]]] = float('-inf')

  mask_p = cumulative_probabilities > nucleus_p

  # logitを確率に変換する
  probabilities = torch.nn.functional.softmax(new_logits / temperature, dim=-1)

  # top pのサブワードからランダムにnum_beams個を選ぶ
  next_tokens = torch.multinomial(probabilities, num_beams)

  # Plot distribution
  if plot:
    total_prob = torch.nn.functional.softmax(logits / temperature, dim=-1)
    plot_prob_distribution(total_prob, next_tokens, 'nucleus', top_p_index_to_keep)

  return next_tokens

In [None]:
# デモンストレーション用のデータ
text = "I work as a data scientist"
input_ids = tokenizer.encode(text, return_tensors='pt').to(device)

# 可視化のパラメータ
length = 5

# top-kサンプリングのパラメータ
temperature = 1
nucleus_p = 0.5
num_beams = 2

In [None]:
graph = decoding(input_ids, length, num_beams, 'nucleus', temperature=temperature, nucleus_p=nucleus_p)

In [None]:
sequence, max_score = get_best_sequence(graph)
print(f"Generated text: {sequence}")

In [None]:
plot_graph(graph, length, num_beams, 'sequence')