<a href="https://colab.research.google.com/github/june-oh/cseg109/blob/main/LAB4_CNN_Solution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import urllib.request

urllib.request.urlretrieve("https://raw.githubusercontent.com/lovit/soynlp/master/tutorials/2016-10-20.txt", filename="2016-10-20.txt")

('2016-10-20.txt', <http.client.HTTPMessage at 0x7c96593f9930>)

In [None]:
texts = [_.strip() for _ in open('2016-10-20.txt').readlines()]

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import re

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


# 문자 단위 데이터셋 클래스 정의
class CharDataset(Dataset):
    def __init__(self, text, sequence_length=32):
        self.sequence_length = sequence_length + 1  # 예측 대상 포함
        self.characters = sorted(list(set(text)))
        self.char_to_int = dict((c, i) for i, c in enumerate(self.characters))
        self.int_to_char = dict(enumerate(self.characters))
        self.int_text = np.array([self.char_to_int[char] for char in text])
        self.data = self._create_dataset()

    def _create_dataset(self):
        data = []
        for i in range(0, len(self.int_text) - self.sequence_length):
            seq = self.int_text[i:i + self.sequence_length]
            data.append((seq[:-1], seq[1:]))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

# 전체 텍스트 데이터를 하나의 문자열로 결합
full_text = ' '.join([sentence.strip() for sentence in texts])
full_text = re.sub(r'\s+', ' ', full_text)  # 여러 개의 공백을 하나의 공백으로 줄임

In [None]:
import torch.nn as nn


class Causal_Conv1d(nn.Module):
  def __init__(self,in_channel,out_channel,kernel_size=2,stride=1,dilation=1):
    super(Causal_Conv1d,self).__init__()
    self.causal_padding = (kernel_size-1)*dilation
    self.conv = nn.Conv1d(in_channel,out_channel,kernel_size,stride,self.causal_padding,dilation)
    self.relu = nn.ReLU()
  def forward(self, x):
    x = self.conv(x)[:,:,:-self.causal_padding]
    return self.relu(x)

# 1d-CNN 모델 클래스 정의
class CharCNN(nn.Module):
    def __init__(self, vocab_size, hidden_size):
        super(CharCNN, self).__init__()
        self.hidden_size = hidden_size
        self.dilation = 1
        self.stride = 1
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        self.causal_convs = nn.ModuleList([Causal_Conv1d(hidden_size,hidden_size,dilation=2**i) for i in range(4)])
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):
        x = self.embedding(x).permute(0,2,1)
        for idx,conv in enumerate(self.causal_convs):
          x = conv(x)
        x = x.permute(0,2,1)
        x = self.fc(x)
        return x


In [None]:
vocab_size = 256
cnn_model = CharCNN(vocab_size=vocab_size,hidden_size=128)

In [None]:
cnn_model

CharCNN(
  (embedding): Embedding(256, 128)
  (causal_convs): ModuleList(
    (0): Causal_Conv1d(
      (conv): Conv1d(128, 128, kernel_size=(2,), stride=(1,), padding=(1,))
      (relu): ReLU()
    )
    (1): Causal_Conv1d(
      (conv): Conv1d(128, 128, kernel_size=(2,), stride=(1,), padding=(2,), dilation=(2,))
      (relu): ReLU()
    )
    (2): Causal_Conv1d(
      (conv): Conv1d(128, 128, kernel_size=(2,), stride=(1,), padding=(4,), dilation=(4,))
      (relu): ReLU()
    )
    (3): Causal_Conv1d(
      (conv): Conv1d(128, 128, kernel_size=(2,), stride=(1,), padding=(8,), dilation=(8,))
      (relu): ReLU()
    )
  )
  (fc): Linear(in_features=128, out_features=256, bias=True)
)

In [None]:
dataset = CharDataset(full_text[:100]) # for test
data_loader = DataLoader(dataset, batch_size=128, shuffle=False)

In [None]:
from tqdm.notebook import tqdm
vocab_size = len(dataset.characters)
hidden_size = 128
model = CharCNN(vocab_size, hidden_size).to(device)
def train(model, data_loader, epochs=10, lr=0.001):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    model.train()
    pbar = tqdm(range(epochs))
    for epoch in pbar:
        for batch, (seq_in, seq_out) in tqdm(enumerate(data_loader),total=len(data_loader),leave=False):

            seq_in, seq_out = seq_in.to(device), seq_out.to(device)
            optimizer.zero_grad()

            output = model(seq_in.long())
            loss = criterion(output.view(-1, vocab_size), seq_out.view(-1).long())
            loss.backward()
            pbar.set_postfix({'loss':loss.item()})

            optimizer.step()

train(model, data_loader, epochs=20)

  0%|          | 0/20 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
def generate_text(model, start_string, generation_length=100):
    # 모델의 현재 장치를 저장
    original_device = next(model.parameters()).device

    # 인퍼런스를 위해 모델을 CPU로 이동
    model.to('cpu')
    model.eval()

    input_eval = [dataset.char_to_int[c] for c in start_string]
    input_eval = torch.LongTensor(input_eval).unsqueeze(0)

    text_generated = []

    with torch.no_grad():
        for _ in range(generation_length):
            output = model(input_eval)
            predicted_id = torch.argmax(output, dim=-1)[0][-1]
            predicted_tensor = predicted_id.unsqueeze(0).unsqueeze(0)
            input_eval = torch.cat((input_eval,predicted_tensor),dim=-1)
            text_generated.append(dataset.int_to_char[predicted_id.item()])

    # 인퍼런스가 끝난 후, 모델을 원래의 장치로 복귀
    model.to(original_device)

    return start_string + ''.join(text_generated)


# 텍스트 생성 예시
print(generate_text(model, start_string="경찰 관계자들이 19일 오후 서울 강북구", generation_length=60))


경찰 관계자들이 19일 오후 서울 강북구                                                            
