# 2.0 Env

In [None]:
import os
import argparse
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F

from tqdm.auto import tqdm
from transformers import (
    T5TokenizerFast,
    AutoTokenizer
)
import wandb

In [None]:
# Gradient False
torch.set_grad_enabled(True)
# work dir
work_dir = '/Users/cchyun/Workspace/nlp_ws/nlp-practice'

In [None]:
%cd {work_dir}
!pwd

# 2.1 RNN

In [None]:
# Tokenizer load
tokenizer = T5TokenizerFast.from_pretrained("data/kowiki_32k")

In [None]:
# input sentence
sentences = [
    '어린 시절.',
    '지미 카터는 조지아주 마을에서 태어났다.'
]

In [None]:
# input to tensor
tensors = [torch.from_numpy(np.array(tokenizer.encode(line))) \
                                   for line in sentences]
tensors

In [None]:
# mini-batch (padded)
mini_batch = torch.nn.utils.rnn.pad_sequence(
    tensors,
    batch_first=True,
    padding_value=tokenizer.pad_token_id
)
mini_batch

In [None]:
embedding = torch.nn.Embedding(
    len(tokenizer),
    4,
    padding_idx=tokenizer.pad_token_id)
embedding

In [None]:
# (bs, 8,) @ (48100, 4) = (bs, 8, 4)
z = embedding(mini_batch)
z

## RNN: Step-by-Step

### RNN (1 layer)

In [None]:
rnn = torch.nn.RNN(
    input_size=4,
    hidden_size=3,
    num_layers=1,
    nonlinearity='tanh',
    bias=True,
    batch_first=True,
    dropout=0.0,
    bidirectional=False,
)

In [None]:
# no padding process
# (bs, 8, 4) -> (bs, 8, 5)
hidden, state = rnn(z)
hidden, state

In [None]:
# length of input (without pad)
lengths = (mini_batch != 0).sum(dim=-1)
lengths

In [None]:
# drop pad z
packed_z = torch.nn.utils.rnn.pack_padded_sequence(
    z,
    lengths,
    batch_first=True,
    enforce_sorted=False
)
# run rnn
packed_h, state = rnn(packed_z)
# make noraml hidden
hidden, lengths = torch.nn.utils.rnn.pad_packed_sequence(
    packed_h,
    batch_first=True
)
hidden, state, lengths

### RNN (n layer)

In [None]:
rnn = torch.nn.RNN(
    input_size=4,
    hidden_size=3,
    num_layers=2,
    nonlinearity='tanh',
    bias=True,
    batch_first=True,
    dropout=0.0,
    bidirectional=False,
)

In [None]:
# length of input (without pad)
lengths = (mini_batch != 0).sum(dim=-1)
lengths

In [None]:
# drop pad z
packed_z = torch.nn.utils.rnn.pack_padded_sequence(
    z,
    lengths,
    batch_first=True,
    enforce_sorted=False
)
# run rnn
packed_h, state = rnn(packed_z)
# make noraml hidden
hidden, lengths = torch.nn.utils.rnn.pad_packed_sequence(
    packed_h,
    batch_first=True
)
hidden, state, lengths

### RNN (bidirectional, n-layer)

In [None]:
rnn = torch.nn.RNN(
    input_size=4,
    hidden_size=3,
    num_layers=2,
    nonlinearity='tanh',
    bias=True,
    batch_first=True,
    dropout=0.0,
    bidirectional=True,
)

In [None]:
# length of input (without pad)
lengths = (mini_batch != 0).sum(dim=-1)
lengths

In [None]:
# drop pad z
packed_z = torch.nn.utils.rnn.pack_padded_sequence(
    z,
    lengths,
    batch_first=True,
    enforce_sorted=False
)
# run rnn
packed_h, state = rnn(packed_z)
# make noraml hidden
hidden, lengths = torch.nn.utils.rnn.pad_packed_sequence(
    packed_h,
    batch_first=True
)
hidden, state, lengths

## LSTM: 더 긴 입출력 다루기

In [None]:
lstm = torch.nn.LSTM(
    input_size=4,
    hidden_size=3,
    num_layers=2,
    bias=True,
    batch_first=True,
    dropout=0.0,
    bidirectional=True,
)

In [None]:
# length of input (without pad)
lengths = (mini_batch != 0).sum(dim=-1)
lengths

In [None]:
# drop pad z
packed_z = torch.nn.utils.rnn.pack_padded_sequence(
    z,
    lengths,
    batch_first=True,
    enforce_sorted=False
)
# run rnn
packed_h, (state_h, state_c) = lstm(packed_z)
# make noraml hidden
hidden, lengths = torch.nn.utils.rnn.pad_packed_sequence(
    packed_h,
    batch_first=True
)
hidden, state_h, state_c, lengths

In [None]:
gru = torch.nn.GRU(
    input_size=4,
    hidden_size=3,
    num_layers=2,
    bias=True,
    batch_first=True,
    dropout=0.0,
    bidirectional=True,
)

In [None]:
# drop pad z
packed_z = torch.nn.utils.rnn.pack_padded_sequence(
    z,
    lengths,
    batch_first=True,
    enforce_sorted=False
)
# run rnn
packed_h, state = gru(packed_z)
# make noraml hidden
hidden, lengths = torch.nn.utils.rnn.pad_packed_sequence(
    packed_h,
    batch_first=True
)
hidden, state, lengths

# 2.2 RNN Text Classifier

In [None]:
%cd {work_dir}/src/tc
!pwd

## preprocess

In [None]:
os.makedirs("../../data/nsmc", exist_ok=True)

In [None]:
!wget https://github.com/e9t/nsmc/raw/master/ratings_train.txt \
    -O ../../data/nsmc/train.tsv

In [None]:
!wget https://github.com/e9t/nsmc/raw/master/ratings_test.txt \
    -O ../../data/nsmc/test.tsv

In [None]:
# run src/tc/preprocess.sh

## rnn tc tutorial

### inputs & labels

In [None]:
input_text = [
    "강력 추천합니다.",
    "나중에 집에서 보는게 딱 좋은영화"
]
label_id = [1, 0]

In [None]:
tokenizer = AutoTokenizer.from_pretrained("klue/bert-base")

In [None]:
inputs = tokenizer(input_text,
                   padding=True,
                   truncation=True,
                   max_length=128,
                   return_tensors="pt")
inputs

In [None]:
labels = torch.tensor(label_id)
# |labels| = (batch_size,)
labels

### Model

In [None]:
n_layers = 2
embedding_dim = 3
hidden_dim = 4
output_dim = 2
vocab_size = tokenizer.vocab_size
pad_idx = tokenizer.pad_token_id

In [None]:
embedding = nn.Embedding(
    vocab_size,
    embedding_dim,
    padding_idx=pad_idx,
)

lstm = nn.LSTM(
    embedding_dim,
    hidden_dim,
    num_layers=n_layers,
    bidirectional=True,
    batch_first=True,  # If False, input shape is (seq_len, batch_size, input_size).
)

fc = nn.Linear(hidden_dim * 2, output_dim)

### embedding & lstm

In [None]:
embed = embedding(inputs['input_ids'])
 # |embed| = (batch_size, seq_len, embedding_dim)
embed

In [None]:
output, (hidden_l, cell_l) = lstm(embed)
# |output| = (batch_size, seq_len, hidden_dim * 2)
# |hidden_l| = (n_layers * 2, batch_size, hidden_dim)
# |cell_l| = (n_layers * 2, batch_size, hidden_dim)
output, hidden_l, cell_l

In [None]:
hidden = torch.cat((hidden_l[-2], hidden_l[-1]), dim=-1)
# |hidden| = (batch_size, hidden_dim * 2)
hidden

### linear & softmax

In [None]:
logits = fc(hidden)
# |logits| = (batch_size, output_dim)
logits

In [None]:
prob = F.softmax(logits, dim=-1)
# |logits| = (batch_size, output_dim)
prob

### loss

In [None]:
criterion = torch.nn.CrossEntropyLoss()

In [None]:
loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1,))
loss

## rnn train

In [None]:
# run src/tc/train_rnn.sh
!sh train_rnn.sh "cchyun-rnn-tc"

## rnn classify

In [None]:
# run src/tc/classify_rnn.sh
!sh classify_rnn.sh "../../checkpoints/cchyun-rnn-tc-20240321-152704.pt"

## rnn infer

In [None]:
from rnn import LSTMClassifier

In [None]:
device = (
    torch.device("cpu")
)

model_fn = "../../checkpoints/cchyun-rnn-tc-20240321-152704.pt"

data = torch.load(model_fn, map_location=device)
train_config = data["config"]
label2idx = data["label2idx"]
idx2label = data["idx2label"]

In [None]:
tokenizer = AutoTokenizer.from_pretrained(train_config.tokenizer)

In [None]:
model = LSTMClassifier(
    vocab_size=len(tokenizer),
    embedding_dim=train_config.embedding_dim,
    hidden_dim=train_config.hidden_dim,
    output_dim=len(label2idx),
    n_layers=train_config.n_layers,
    dropout=train_config.dropout,
    pad_idx=tokenizer.pad_token_id,
)
model.load_state_dict(data["model"])
model.eval()
model.to(device)

In [None]:
while True:
    print("input> ", end="")
    line = str(input())
    if len(line) == 0:
        break

    x = tokenizer(
        line,
        truncation=True,
        max_length=train_config.max_length,
        return_tensors="pt",
    )["input_ids"]
    x = x.to(device)
    # |x| = (batch_size, seq_len)

    logit = model(x)[0]
    prob = F.softmax(logit, dim=-1)
    # |prob| = (batch_size, output_dim)

    y = prob.argmax(dim=-1)
    # |y| = (batch_size,)

    print(f"{idx2label[y.item()]}\t{prob[y].item():.4f}\t{line}")

# 2.3 CNN Text Classifier

In [None]:
%cd {work_dir}/src/tc
!pwd

## CNN

### Inputs

In [None]:
# Tokenizer load
tokenizer = AutoTokenizer.from_pretrained("klue/bert-base")

In [None]:
# input sentence
sentences = [
    '어린 시절.',
    '지미 카터는 조지아주 마을에서 태어났다.'
]

In [None]:
# input to tensor
tensors = [torch.from_numpy(np.array(tokenizer.encode(line))) \
                                   for line in sentences]
tensors

In [None]:
# mini-batch (padded)
mini_batch = torch.nn.utils.rnn.pad_sequence(
    tensors,
    batch_first=True,
    padding_value=tokenizer.pad_token_id
)
mini_batch

In [None]:
embedding = torch.nn.Embedding(
    len(tokenizer),
    4,
    padding_idx=tokenizer.pad_token_id)
embedding

In [None]:
# (bs, 8,) @ (48100, 4) = (bs, 8, 4)
z = embedding(mini_batch)
z

### Conv1D

In [None]:
conv1 = nn.Conv1d(
    in_channels=4,
    out_channels=5,
    kernel_size=3,
    stride=1,
    padding=1)

In [None]:
# (batch_size, in_channel, length) -> (batch_size, out_channel, length)
hidden = conv1(z.transpose(2, 1)).transpose(1, 2)
hidden.shape

### cnn train

In [None]:
# run src/tc/train_cnn.sh
!sh train_cnn.sh "cchyun-cnn-tc"

### cnn classify

In [None]:
# run src/tc/classify_cnn.sh
!sh classify_cnn.sh "../../checkpoints/cchyun-cnn-tc-20240321-153708.pt"

### cnn infer

In [None]:
from cnn import CNNClassifier

In [None]:
device = (
    torch.device("cpu")
)

model_fn = "../../checkpoints/cchyun-cnn-tc-20240321-153708.pt"

data = torch.load(model_fn, map_location=device)
train_config = data["config"]
label2idx = data["label2idx"]
idx2label = data["idx2label"]

In [None]:
tokenizer = AutoTokenizer.from_pretrained(train_config.tokenizer)

In [None]:
model = CNNClassifier(
    vocab_size=len(tokenizer),
    embedding_dim=train_config.embedding_dim,
    hidden_dim=train_config.hidden_dim,
    output_dim=len(label2idx),
    n_layers=train_config.n_layers,
    dropout=train_config.dropout,
    pad_idx=tokenizer.pad_token_id,
)
model.load_state_dict(data["model"])
model.eval()
model.to(device)

In [None]:
while True:
    print("input> ", end="")
    line = str(input())
    if len(line) == 0:
        break

    x = tokenizer(
        line,
        truncation=True,
        max_length=train_config.max_length,
        return_tensors="pt",
    )["input_ids"]
    x = x.to(device)
    # |x| = (batch_size, seq_len)

    logit = model(x)[0]
    prob = F.softmax(logit, dim=-1)
    # |prob| = (batch_size, output_dim)

    y = prob.argmax(dim=-1)
    # |y| = (batch_size,)

    print(f"{idx2label[y.item()]}\t{prob[y].item():.4f}\t{line}")