In [1]:
from importlib.metadata import version

pkgs = [
    "numpy",       # 파이토치와 텐서플로 의존성
    "matplotlib",  # 그래프 라이브러리
    "tiktoken",    # 토크나이저b
    "torch",       # 딥러닝 라이브러리
    "tqdm",        # 진행 표시줄
    "tensorflow",  # OpenAI에서 사전 훈련된 가중치를 로드하기 위해
]
for p in pkgs:
    print(f"{p} 버전: {version(p)}")

numpy 버전: 2.0.2
matplotlib 버전: 3.10.0
tiktoken 버전: 0.12.0
torch 버전: 2.9.0+cu126
tqdm 버전: 4.67.1
tensorflow 버전: 2.19.0


In [2]:
import json
import os
import requests


def download_and_load_file(file_path, url):
    if not os.path.exists(file_path):
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        text_data = response.text
        with open(file_path, "w", encoding="utf-8") as file:
            file.write(text_data)

    with open(file_path, "r", encoding="utf-8") as file:
        data = json.load(file)

    return data

file_path = "instruction-data.json"
url = (
    "https://raw.githubusercontent.com/rasbt/LLMs-from-scratch"
    "/main/ch07/01_main-chapter-code/instruction-data.json"
)

data = download_and_load_file(file_path, url)
print("샘플 개수:", len(data))

# 책과 다르다 urllib는 VPN을 사용하는 경우 문제가 있을 수 있음 책코드는 책 참고

샘플 개수: 1100


In [3]:
print("샘플 예시:\n", data[50])

샘플 예시:
 {'instruction': 'Identify the correct spelling of the following word.', 'input': 'Ocassion', 'output': "The correct spelling is 'Occasion.'"}


In [4]:
print("다른 샘플:\n", data[999])

다른 샘플:
 {'instruction': "What is an antonym of 'complicated'?", 'input': '', 'output': "An antonym of 'complicated' is 'simple'."}


In [5]:
def format_input(entry):
    instruction_text = (
        f"Below is an instruction that describes a task. "
        f"Write a response that appropriately completes the request."
        f"\n\n### Instruction:\n{entry['instruction']}"
    )

    input_text = f"\n\n### Input:\n{entry['input']}" if entry["input"] else ""

    return instruction_text + input_text

In [6]:
model_input = format_input(data[50])
desired_response = f"\n\n### Response:\n{data[50]['output']}"

print(model_input + desired_response)

Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
Identify the correct spelling of the following word.

### Input:
Ocassion

### Response:
The correct spelling is 'Occasion.'


In [7]:
model_input = format_input(data[999])
desired_response = f"\n\n### Response:\n{data[999]['output']}"

print(model_input + desired_response)

Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
What is an antonym of 'complicated'?

### Response:
An antonym of 'complicated' is 'simple'.


In [8]:
train_portion = int(len(data) * 0.85) # 훈련을 위한 85%
test_portion = int(len(data) * 0.1) # 테스트을 위한 10%
val_portion = len(data) - train_portion - test_portion # 나머지 5%는 검증용

train_data = data[:train_portion]
test_data = data[train_portion:train_portion + test_portion]
val_data = data[train_portion + test_portion:]

In [9]:
print("훈련 세트 길이:", len(train_data))
print("검증 세트 길이:", len(val_data))
print("테스트 세트 길이:", len(test_data))

훈련 세트 길이: 935
검증 세트 길이: 55
테스트 세트 길이: 110


In [10]:
import torch
from torch.utils.data import Dataset

class InstructionDataset(Dataset):
  def __init__(self, data, tokenizer):
    self.data = data

    # 텍스트 토큰화
    self.encoded_texts = []
    for entry in data:
      instruction_plus_input = format_input(entry)
      response_text = f"\n\n### Response:\n{entry['output']}"
      full_text = instruction_plus_input + response_text
      self.encoded_texts.append(
          tokenizer.encode(full_text)
      )
  def __getitem__(self, index):
    return self.encoded_texts[index]

  def __len__(self):
    return len(self.data)

In [11]:
import tiktoken
tokenizer = tiktoken.get_encoding("gpt2")

print(tokenizer.encode("<|endoftext|>", allowed_special={"<|endoftext|>"}))

[50256]


In [12]:
def custom_collate_draft_1(
    batch,
    pad_token_id=50256,
    device="cpu"
):
  # 배치에서 가장 긴 시퀀스 찾기
  # 그리고 최대 길이를 +1씩 증가시켜 아래에서 패딩 토큰을 하나 추가
  batch_max_length = max(len(item)+1 for item in batch)

  # 입력 패딩 및 준비
  inputs_lst = []

  for item in batch:
    new_item = item.copy()
    # <|endoftext|> 토큰 추가
    new_item += [pad_token_id]
    # batch_max_length까지 시퀀스 패딩
    padded = (
        new_item + [pad_token_id] * (batch_max_length - len(new_item))
    )
    # padded[:-1]를 통해 batch_max_lenth의 +1 설정을 통해 추가된
    # 추가 패딩 토큰을 제거
    # (추가 패딩 토큰은 이후 코드에서 관련이 있다)
    inputs = torch.tensor(padded[:-1])
    inputs_lst.append(inputs)

  # 입력 리스트를 텐서로 변환하고 타깃 장치로 전송
  inputs_tensor = torch.stack(inputs_lst).to(device)
  return inputs_tensor

In [13]:
inputs_1 = [0,1,2,3,4]
inputs_2 = [5,6]
inputs_3 = [7,8,9]

batch = (
    inputs_1,
    inputs_2,
    inputs_3,
)

print(custom_collate_draft_1(batch))

tensor([[    0,     1,     2,     3,     4],
        [    5,     6, 50256, 50256, 50256],
        [    7,     8,     9, 50256, 50256]])


In [16]:
def custom_collate_draft_2(
    batch,
    pad_token_id=50256,
    device="cpu"
):
  # 배치에서 가장 긴 시퀀스 찾기
  batch_max_length = max(len(item) + 1 for item in batch)

  # 입력 및 타깃 준비
  inputs_lst, targets_lst = [], []

  for item in batch:
    new_item = item.copy()
    # <|endoftext|> 토큰 추가
    new_item += [pad_token_id]
    # 시퀀스를 max_length까지 패딩
    padded = (
        new_item + [pad_token_id] * (batch_max_length - len(new_item))
    )
    inputs = torch.tensor(padded[:-1]) # 입력을 위해 마지막 토큰 자르기
    targets = torch.tensor(padded[1:]) # 타깃을 위해 오른쪽으로 +1 이동
    inputs_lst.append(inputs)
    targets_lst.append(targets)

  # 입력 리스트를 텐서로 변환하고 타깃 장치로 전송
  inputs_tensor = torch.stack(inputs_lst).to(device)
  targets_tensor = torch.stack(targets_lst).to(device)
  return inputs_tensor, targets_tensor

In [17]:
inputs, targets = custom_collate_draft_2(batch)
print(inputs)
print(targets)

tensor([[    0,     1,     2,     3,     4],
        [    5,     6, 50256, 50256, 50256],
        [    7,     8,     9, 50256, 50256]])
tensor([[    1,     2,     3,     4, 50256],
        [    6, 50256, 50256, 50256, 50256],
        [    8,     9, 50256, 50256, 50256]])


In [20]:
def custom_collate_fn(
    batch,
    pad_token_id=50256,
    ignore_index=-100,
    allowed_max_length=None,
    device="cpu"
):
  # 배치에서 가장 긴 시퀀스 찾기
  batch_max_length = max(len(item) + 1 for item in batch)

  # 입력과 타깃 패딩 및 준비
  inputs_lst, targets_lst = [], []

  for item in batch:
    new_item = item.copy()
    # <|endoftext|> 토큰 추가
    new_item += [pad_token_id]
    # 시퀀스를 max_length까지 패딩
    padded = (
        new_item + [pad_token_id] * (batch_max_length - len(new_item))
    )
    inputs = torch.tensor(padded[:-1]) # 입력을 위해 마지막 토큰 자르기
    targets = torch.tensor(padded[1:]) # 목표를 위해 오른쪽으로 +1 이동

    # 새로 추가: 목표에서 첫 번째 패딩 토큰을 제외한 모든 토큰을 ignore_index로 바꾸기
    mask = targets == pad_token_id
    indices = torch.nonzero(mask).squeeze()
    if indices.numel() > 1:
      targets[indices[1:]] = ignore_index

    # 새로 추가: 최대 시퀀스 길이로 자르기 (선택사항)
    if allowed_max_length is not None:
      inputs = inputs[:allowed_max_length]
      targets = targets[:allowed_max_length]

    inputs_lst.append(inputs)
    targets_lst.append(targets)

  # 입력 및 타깃 리스트를 텐서로 변환하고 타깃 장치로 전송
  inputs_tensor = torch.stack(inputs_lst).to(device)
  targets_tensor = torch.stack(targets_lst).to(device)

  return inputs_tensor, targets_tensor

In [21]:
inputs, targets = custom_collate_fn(batch)
print(inputs)
print(targets)

tensor([[    0,     1,     2,     3,     4],
        [    5,     6, 50256, 50256, 50256],
        [    7,     8,     9, 50256, 50256]])
tensor([[    1,     2,     3,     4, 50256],
        [    6, 50256,  -100,  -100,  -100],
        [    8,     9, 50256,  -100,  -100]])


In [23]:
logits_1 = torch.tensor(
    [
        [-1.0, 1.0], # 첫 번째 훈련 샘플
        [-0.5, 1.5], # 두 번째 훈련 샘플
    ]
)
targets_1 = torch.tensor([0,1])

loss_1 = torch.nn.functional.cross_entropy(logits_1, targets_1)
print(loss_1)

tensor(1.1269)


In [24]:
logits_2 = torch.tensor(
    [
        [-1.0, 1.0], # 첫 번째 훈련 샘플
        [-0.5, 1.5], # 두 번째 훈련 샘플
        [-0.5, 1.5], # 세 번째 훈련 샘플
    ]
)
targets_2 = torch.tensor([0,1, 1])

loss_2 = torch.nn.functional.cross_entropy(logits_2, targets_2)
print(loss_2)

tensor(0.7936)


In [25]:
targets_3 = torch.tensor([0, 1, -100])

loss_3 = torch.nn.functional.cross_entropy(logits_2, targets_3)
print(loss_3)
print("loss_1 == loss_3:", loss_1 == loss_3)

tensor(1.1269)
loss_1 == loss_3: tensor(True)
