In [41]:
#Embedding 계층 구현
import numpy as np
W = np.arange(21).reshape(7,3)
W#가중치

array([[ 0,  1,  2],
       [ 3,  4,  5],
       [ 6,  7,  8],
       [ 9, 10, 11],
       [12, 13, 14],
       [15, 16, 17],
       [18, 19, 20]])

In [42]:
W[2]#원하는 행 명시

array([6, 7, 8])

In [43]:
W[5]#원하는 행 명시

array([15, 16, 17])

In [44]:
#가중치 W로부터 여러 행을 한꺼번에 추출
idx = np.array([1,0,3,0])
idx

array([1, 0, 3, 0])

In [45]:
W[idx]

array([[ 3,  4,  5],
       [ 0,  1,  2],
       [ 9, 10, 11],
       [ 0,  1,  2]])

In [46]:
sample_weights = [[1,2],[3,4]]
np.zeros_like(sample_weights)

array([[0, 0],
       [0, 0]])

In [47]:
a, = [1]
a

1

word2vec의 구현의 입력측 MatMul 계층을 Embedding 계층으로 전환

In [48]:
#Embedding 계층의 forward() 메서드를 구현
class Embedding:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.idx = None
    
    def forward(self, idx):
        W, = self.params
        self.idx =idx#추출하는 행의 인덱스를 배열로 저장
        out = W[idx]
        return out
    '''
    <비추 backward>
    def backward(self, dout):
        dW, = self.grads
        dW[...] = 0#dW의 원소를 0으로 덮어씀
        dW[self.idx] = dout#앞 층에서 전해진 기울기 dout을 idx번째 행에 할당
        return None
    '''
    #올바른 역전파
    def backward(self, dout):
        dW, = self.grads
        dW[...] = 0
        
        #중복 문제 해결하기 위해 '할당'이 아닌 '더하기'를 해야 함
        for i, word_id in enumerate(self.idx):
            dW[word_id] += dout[i]
        #혹은 dout를 dW의 self.idx번째 행에 더해줌(numpy가 더 빠름)
        #np.add.at(dW, self.idx, dout)
        return None

In [49]:
class EmbeddingDot:
    def __init__(self, W):
        self.embed = Embedding(W)#embedding 계층
        self.params = self.embed.params#매개변수 저장
        self.grads = self.embed.grads#기울기 저장
        self.cache = None#순전파 시의 계산 결과를 잠시 유지하기 위한 변수
    
    def forward(self, h, idx):#순전파
        target_W = self.embed.forward(idx)
        out = np.sum(target_W*h, axis = 1)#내적 계산
        self.cache = (h, target_W)
        return out
    
    def backward(self, dout):#역전파
        h, target_W = self.cache
        dout = dout.reshape(dout.shape[0],1)
        
        dtarget_W = dout*h
        self.embed.backward(dtarget_W)
        dh = dout*target_W
        return dh

In [50]:
import numpy as np

#0부터 9까지의 숫자 중 하나를 무작위로 샘플링
np.random.choice(10)

0

In [51]:
np.random.choice(10)

0

In [52]:
#words에서 하나만 무작위로 샘플링
words = ['you', 'say', 'goodbye', 'I', 'hello', '.']
np.random.choice(words)

'goodbye'

In [53]:
#5개만 무작위로 샘플링(중복 있음)
np.random.choice(words, size = 5)

array(['goodbye', '.', 'goodbye', '.', 'say'], dtype='<U7')

In [54]:
#5개만 무작위로 샘플링(중복 없음)
np.random.choice(words, size = 5, replace = False)

array(['hello', 'you', 'say', '.', 'goodbye'], dtype='<U7')

In [55]:
#확률분포에 따라 샘플링
p = [0.5, 0.1, 0.05, 0.2, 0.05, 0.1]
np.random.choice(words, p = p)

'you'

In [56]:
p = [0.7, 0.29, 0.01]
new_p = np.power(p, 0.75)
new_p /= np.sum(new_p)
print(new_p)

[0.64196878 0.33150408 0.02652714]


In [57]:
#negative_sampling_layer.py
# coding: utf-8
import sys
sys.path.append('..')
from common.np import *  # import numpy as np
from common.layers import Embedding, SigmoidWithLoss
import collections


class EmbeddingDot:
    def __init__(self, W):
        self.embed = Embedding(W)
        self.params = self.embed.params
        self.grads = self.embed.grads
        self.cache = None

    def forward(self, h, idx):
        target_W = self.embed.forward(idx)
        out = np.sum(target_W * h, axis=1)

        self.cache = (h, target_W)
        return out

    def backward(self, dout):
        h, target_W = self.cache
        dout = dout.reshape(dout.shape[0], 1)

        dtarget_W = dout * h
        self.embed.backward(dtarget_W)
        dh = dout * target_W
        return dh


class UnigramSampler:
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None

        counts = collections.Counter()
        for word_id in corpus:
            counts[word_id] += 1

        vocab_size = len(counts)
        self.vocab_size = vocab_size

        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]

        self.word_p = np.power(self.word_p, power)
        self.word_p /= np.sum(self.word_p)

    def get_negative_sample(self, target):
        batch_size = target.shape[0]

        if not GPU:
            negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)

            for i in range(batch_size):
                p = self.word_p.copy()
                target_idx = target[i]
                p[target_idx] = 0
                p /= p.sum()
                negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
        else:
            # GPU(cupy）로 계산할 때는 속도를 우선한다.
            # 부정적 예에 타깃이 포함될 수 있다.
            negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size),
                                               replace=True, p=self.word_p)

        return negative_sample


class NegativeSamplingLoss:
    def __init__(self, W, corpus, power=0.75, sample_size=5):
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]
        self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)]

        self.params, self.grads = [], []
        for layer in self.embed_dot_layers:
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, h, target):
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target)

        # 긍정적 예 순전파
        score = self.embed_dot_layers[0].forward(h, target)
        correct_label = np.ones(batch_size, dtype=np.int32)
        loss = self.loss_layers[0].forward(score, correct_label)

        # 부정적 예 순전파
        negative_label = np.zeros(batch_size, dtype=np.int32)
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i]
            score = self.embed_dot_layers[1 + i].forward(h, negative_target)
            loss += self.loss_layers[1 + i].forward(score, negative_label)

        return loss

    def backward(self, dout=1):
        dh = 0
        for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):
            dscore = l0.backward(dout)
            dh += l1.backward(dscore)

        return dh

In [58]:
corpus = np.array([0,1,2,3,4,1,2,3])#단어 ID 목록
power = 0.75#확률 분포에 '제곱'할 값
sample_size = 2#'부정적 예 샘플링'을 수행하는 횟수

sampler = UnigramSampler(corpus, power, sample_size)
target = np.array([1,3,0])
negative_sample = sampler.get_negative_sample(target)
print(negative_sample)#3개의 데이터를 미니 배치로 다룸(부정적 예를 2개씩 샘플링)

[[3 0]
 [2 1]
 [2 1]]


### 네거티브 샘플링 구현


In [59]:
class NegativeSamplingLoss:
    def __init__(self, W, corpus, power= 0.75, sample_size = 5):
        '''
        <초기화 매서드의 인수>
        W: 출력 측의 가중치
        corpus: 말뭉치(단어 ID와 리스트)
        power: 확률 분포에 제곱할 값
        sample_size: 부정적 예의 샘플링 횟수
        '''
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]
        self.params, self.grads = [],[]
        for layer in self.embed_dot_layers:
            self.params += layer.params
            self.grads += layer.grads
    
    def forward(self, h, target):#순전파 구현
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target)
        
        #긍정적 예 순전파
        score = self.embed_dot_layers[0].forward(h, target)#점수
        correct_label = np.ones(batch_size, dtype= np.int32)
        loss = self.loss_layers[0].forward(score, correct_label)#손실
        
        #부정적 예 순전파
        negative_sample = np.zeros(batch_size, dtype = np.int32)
        for i in range(self.sample_size):
            negative_target = negative_sample[:,i]
            score = self.embed_dot_layers[1+i].forward(h, negative_target)
            loss += self.loss_layers[1+i].forward(score, negative_label)
        
        return loss
    
    def backward(self, dout = 1):#역전파
        dh = 0
        for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):
            dscore = l0.backward(dout)
            dh += l1.backward(dscore)
        
        return dh

In [61]:
import sys
sys.path.append('..')
import numpy as np
from common.layers import Embedding

class CBOW:#CBOW 모델 구현
    def __init__(self, vocab_size, hidden_size, window_size, corpus):#초기화
        V, H = vocab_size, hidden_size#어휘수, 은닉층 뉴런수
        #corpus: 단어 ID 목록
        
        #가중치 초기화
        W_in = 0.01*np.random.randn(V,H).astype('f')
        W_out = 0.01*np.random.randn(V,H).astype('f')
        
        #계층 생성
        self.in_layers = []
        for i in range(2*window_size):
            layer = Embedding(W_in)#Embedding 계층 사용
            self.in_layers.append(layer)
        self.ns_loss = NegativeSamplingLoss(W_out, corpus, power = 0.75, sample_size= 5)
        
        #모든 가중치와 기울기를 배열에 모은다
        layers = self.in_layers + [self.ns_loss]
        self.params, self.grads = [],[]
        for layer in layers:
            self.params += layer.params#매개변수
            self.grads += layer.grads#기울기
            
        #인스턴스 변수에 단어의 분산 표현을 저장한다
        self.word_vecs = W_in
    
    def forward(self, contexts, target):#순전파(인수로 받는 맥락과 타깃이 단어ID)
        h = 0
        for i, layer in enumerate(self.in_layers):
            h += layer.forward(contexts[:, i])
        h *= 1/len(self.in_layers)
        loss = self.ns_loss.forward(h, target)
        return loss
    
    def backward(self, dout = 1):#역전파
        dout = self.ns_loss.backward(dout)
        dout *= 1/len(self.in_layers)
        for layer in self.layers:
            layer.backward(dout)
        return None

In [62]:
#CBOW 모델 학습 코드
import sys
sys.path.append('..')
import numpy as np
from common import config
# GPU에서 실행하려면 아래 주석을 해제하세요(쿠파이 필요)
# =======================================
config.GPU = True#GPU로 실행하는 코드

In [64]:
import pickle
from common.trainer import Trainer
from common.optimizer import Adam

from common.util import create_contexts_target, to_cpu, to_gpu
from dataset import ptb

#하이퍼파라미터 설정
window_size = 5#윈도우크기
hidden_size = 100#은닉층의 뉴런수
batch_size = 100
max_epoch = 10

#데이터 읽기
corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)

Downloading ptb.train.txt ... 
Done


In [None]:
contexts, target = create_contexts_target(corpus, window_size)
if config.GPU:
    contexts, target = to_gpu(contexts), to_gpu(target)