# 3_C_train_sgns
- author: Eu-Bin KIM
- 20th of August 2021
- tlrndk123@gmail.com

## 목표
- `SGNS.forward()` 구현
- `SGNS.training_step()` 구현


In [1]:
# pytorch 라이브러리 설치
# https://pytorch.org/docs/stable/index.html
!pip3 install torch  



In [2]:
import torch
from torch import nn  # 하나의 신경망을 구현하기 위해.
from torch.nn import functional as F  # 여러 로스함수 등을 사용하기 위해.
from torch.nn import init  # 랜덤하게 가중치를 설정하기 위해.
from typing import Tuple, List, Dict 
from keras_preprocessing.text import Tokenizer
from torch.nn import functional as F  # 여러 로스함수 등을 사용하기 위해.
from torch.utils.data import Dataset, DataLoader  # 데이터 로더와 함께 쓰기 위해.
import torch  # 각종 타이핑을 위해
from tensorflow.keras.preprocessing.sequence import skipgrams
import torch.optim as optim
import logging
from sys import stdout
logging.basicConfig(stream=stdout, level=logging.INFO)

In [3]:
# 이번숙제의 목표는, pytorch framework를 활용하여 SGNS를 직접 구현해보는 것.
# Negative Sampling 로직은 제가 직접 구현합니다. 여러분은 이 클래스만 구현하면 됩니다!
class SGNS(nn.Module):
    """
    A pytorch implementation of SkipGram with Negative Sampling (SGNS).
    """
    def __init__(self, vocab_size: int, embed_size: int, word2idx: Dict[str, int]):
        super(SGNS, self).__init__()
        self.vocab_size = vocab_size  # 어휘의 크기.
        self.embed_size = embed_size  # 임베딩 벡터의 크기.
        self.input_embeddings = nn.Embedding(vocab_size, embed_size)
        self.context_embeddings = nn.Embedding(vocab_size, embed_size)
        self.word2idx = word2idx
        # 각 임베딩 테이블을 랜덤 초기화 해준다.
        init.uniform_(self.input_embeddings.weight.data)
        init.constant_(self.context_embeddings.weight.data, 0)

    def forward(self, X_input: torch.Tensor, X_context: torch.Tensor):
        """
        :param X_input: (N,). 정수인코딩된 입력 단어들 (고유 아이디가 담긴 리스트).
        :param X_context: (N,). 정수인코딩된 맥락 단어들 (고유 아이디가 담긴 리스트).
        :return: 입력과 맥락의 유사도 벡터 (N,) (범위 = [0, 1])
        """
        # --- TODO 1 --- #
        # 대문자로 시작하는 변수: 행렬.
        # 소문자로 시작하는 변수: 벡터.
        # 먼저, 입력과 맥락에 해당하는 모든 단어의 임베딩 벡터를 불러온다.
        Embed_input = self.input_embeddings(X_input) # (N,) -> (N, embed_size)
        Embed_context = self.context_embeddings(X_context) # (N,) -> (N, embed_size)
        # 입력 임베딩, 맥락 임베딩 사이의 유사도 점수(벡터의 내적)를 구해준다.
        sims = torch.sum(torch.mul(Embed_input, Embed_context), dim=1) # (N, embed_size), (N, embed_size) -> (N, )
        # sims 의 치역은 모든 실수이므로, [0, 1] 사이에 값이 놓이도록 시그모이드 함수로 값을 정규화 해준다.
        sims_normalized = F.sigmoid(sims) # (N, ) -> [0, 1]로 정규화 -> (N, )
        return sims_normalized
        # ------------- #

    def training_step(self,
                      X_input: torch.Tensor,
                      X_context: torch.Tensor,
                      y: torch.Tensor) -> torch.Tensor:
        """
        :param X_input (N, ) 정수인코딩된 입력 단어들 (고유 아이디가 담긴 리스트).
        :param X_context (N, ) 정수인코딩된 맥락 단어들 (고유 아이디가 담긴 리스트).
        :param y:  (N, ) 이웃=1/이웃x=0, 이진분류를 위한 레이블.
        :return: 해당 batch로부터 계산된 로스함수. (1,)
        """
        # --- TODO 2 --- #
        sims_normalized = self.forward(X_input, X_context)  # (N,)
        loss = F.binary_cross_entropy(sims_normalized, y)  # criterion((N,), (N,)) -> (1,)
        return loss
        # -------------- #
    
    def cos_similarity(self, word_1: str, word_2: str) -> float:
      word_1_idx = self.word2idx[word_1]
      word_2_idx = self.word2idx[word_2]
      # just compute the.. sim matrix.d
      word_1_embed = self.input_embeddings(torch.LongTensor([word_1_idx]))
      word_2_embed = self.input_embeddings(torch.LongTensor([word_2_idx]))
      return torch.cosine_similarity(word_1_embed, word_2_embed).item()

In [4]:
# 말뭉치 구성 - 그냥 간단한... 데이터 구성.
CORPUS: List[str] = [
    "They all admire her courage",
    "They all love her courage",
    "He hates his country's enemies"
]


# 하이퍼 파라미터
WINDOW_SIZE = 3
EMBED_SIZE = 100
VOCAB_SIZE = 40
LR = 0.01
EPOCHS = 50
NEGATIVE_SAMPLES = 3


# --- negative samples를 구축하기 위한 함수 --- #
def build_rows(seqs: List[List[int]]) -> List[Tuple[Tuple[int, int], int]]:
    global VOCAB_SIZE, WINDOW_SIZE
    rows = list()
    for seq in seqs:
        skip_grams = skipgrams(sequence=seq,
                               vocabulary_size=VOCAB_SIZE,
                               window_size=WINDOW_SIZE,
                               negative_samples=NEGATIVE_SAMPLES)
        pairs, labels = skip_grams
        rows += [
            (tuple(pair), label)
            for pair, label in zip(pairs, labels)
        ]
    return rows


# --- builders --- #
def build_X_input(rows: List[Tuple[Tuple[int, int], int]]) -> torch.LongTensor:
    X_input = [
        row[0][0]
        for row in rows
    ]
    return torch.LongTensor(X_input)


def build_X_context(rows: List[Tuple[Tuple[int, int], int]]) -> torch.LongTensor:
    X_context = [
        row[0][1]
        for row in rows
    ]
    return torch.LongTensor(X_context)


def build_y(rows: List[Tuple[Tuple[int, int], int]]) -> torch.FloatTensor:
    y = [
        row[1]
        for row in rows
    ]
    return torch.FloatTensor(y)


# 데이터셋 구성
class SGNSDataset(Dataset):
    def __init__(self, seqs: List[List[int]]):
        rows = build_rows(seqs)
        self.X_input = build_X_input(rows)
        self.X_output = build_X_context(rows)
        self.y = build_y(rows)

    def __len__(self) -> int:
        """
        Returning the size of the dataset
        :return:
        """
        return self.y.shape[0]

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor, torch.LongTensor]:
        """
        Returns features & the label
        :param idx:
        :return:
        """
        return self.X_input[idx], self.X_input[idx], self.y[idx]


In [5]:
# 테스트용 코드 - 모두 이해할필요는 없습니다.
# 로스가 작아지는지만 확인할 수 있으면 됩니다.
# 토크나이즈 & 정수인코딩 된것들.
tokenizer = Tokenizer()
tokenizer.fit_on_texts(texts=CORPUS)
seqs = tokenizer.texts_to_sequences(texts=CORPUS)  
dataset = SGNSDataset(seqs)
sgns = SGNS(vocab_size=VOCAB_SIZE, embed_size=EMBED_SIZE, word2idx=tokenizer.word_index)
optimizer = optim.Adam(params=sgns.parameters(), lr=LR)
dataloader = DataLoader(dataset, batch_size=10)
for e_idx, epoch in enumerate(range(EPOCHS)):
    losses = list() # 로스 저장
    for b_idx, batch in enumerate(dataloader):
        X_input, X_context, y = batch
        loss = sgns.training_step(X_input, X_context, y)
        optimizer.zero_grad()  # resetting the gradients
        loss.backward()  # backpropagation
        optimizer.step()  # gradient step
        # 각 배치의 로스를 저장.
        losses.append(loss.item())
    # 에폭이 끝날 때마다 평균 로스를 출력. = 데이터를 우려먹는 횟수.
    avg_loss = (sum(losses) / len(losses))
    print("{}: {}".format(e_idx, avg_loss))



0: 0.6360471004789526
1: 0.624479849907485
2: 0.6222527758641676
3: 0.6199913891878995
4: 0.5971720584414222
5: 0.5904165601188486
6: 0.5881355066191066
7: 0.5848686668005857
8: 0.5835355967283249
9: 0.5826276892965491
10: 0.5816626833243803
11: 0.5814870243722742
12: 0.5807738832452081
13: 0.5808008191260424
14: 0.5801790004426782
15: 0.5801715783097527
16: 0.5796865400942889
17: 0.5796423757618124
18: 0.579289747910066
19: 0.5792019028555263
20: 0.5788792141459205
21: 0.5787180242213336
22: 0.5784347978505221
23: 0.578304414044727
24: 0.5780907056548379
25: 0.5779733861034567
26: 0.577828284014355
27: 0.577692061662674
28: 0.5774722383780913
29: 0.5773446600545536
30: 0.5771903490478342
31: 0.5770576623353091
32: 0.5769497657364066
33: 0.5768876604058526
34: 0.5767957608808171
35: 0.5766107087785547
36: 0.5765078270977194
37: 0.5764253288507462
38: 0.5762275403196161
39: 0.5761238038539886
40: 0.5760240771553733
41: 0.5759555250406265
42: 0.5760092572732405
43: 0.5759340890429236
44:

In [6]:
# hates 보다 loves 사이의 유사도 높으면 성공입니다!
print(sgns.cos_similarity("admire", "love"))
print(sgns.cos_similarity("admire", "hates"))

0.6550629138946533
0.5786426067352295
