In [6]:
import numpy as np

class Embedding:
    def __init__(self, w):
        self.params = [w]
        self.grads = [np.zeros_like(w)]
        self.idx = None
    def forward(self, idx):
        self.idx = idx
        return self.params[0][idx]
    def backward(self, dout):
        self.grads[0][...] = 0
        np.add.at(self.grads[0], self.idx, dout)
        return None

In [5]:
class EmbeddingDot:
    def __init__(self, w):
        self.embed = Embedding(w)
        self.params = self.embed.params
        self.grads = self.embed.grads
        self.cache = None
    def forward(self, h, idx):
        target_w = self.embed.forward(idx)
        out = np.sum(target_w*h, axis=1)
        self.cache = (h, target)
        return out
    def backward(self, dout):
        h, target_w = self.cache
        self.embed.backward(h*dout)
        return target_w*dout

array([[ 0,  1,  2],
       [ 3,  4,  5],
       [ 7,  8,  9],
       [10, 11, 12],
       [13, 14, 15],
       [15, 16, 17],
       [18, 19, 20]])

In [6]:
import collections
class UnigramSampler:
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None

        counts = collections.Counter()
        for word_id in corpus:
            counts[word_id] += 1

        vocab_size = len(counts)
        self.vocab_size = vocab_size

        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]

        self.word_p = np.power(self.word_p, power)
        self.word_p /= np.sum(self.word_p)

    def get_negative_sample(self, target):
        batch_size = target.shape[0]

        negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)

        for i in range(batch_size):
            p = self.word_p.copy()
            target_idx = target[i]
            p[target_idx] = 0
            p /= p.sum()
            negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
        return negative_sample

In [7]:
import numpy as np
corpus = np.array([0, 1, 2, 3, 4, 1, 2, 3])
power = 0.75
sample_size = 2

sampler = UnigramSampler(corpus, power, sample_size)
target = np.array([1, 3, 0])
negative_sample = sampler.get_negative_sample(target)
negative_sample

array([[2, 3],
       [2, 4],
       [3, 2]])

In [1]:
from common.layers import SigmoidWithLoss

class NegativeSamplingLoss:
    def __init__(self, w, corpus, power=0.75, sample_size=5):
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size+1)]
        self.embed_dot_layers = [EmbeddingDot(w) for _ in range(sample_size+1)]
        self.params, self.grads = [], []
        for layer in self.embed_dot_layers:
            self.params += [layer.params]
            self.grads += [layer.grads]
    def forward(self, h, target):
        batch_size = len(target)
        negative_sample = self.sampler.get_negative_sample(target)
        
        positive_target = np.ones_like(target, dtype=np.int32)
        score = self.embed_dot_layers[0].forward(h, target)
        loss = self.loss_layers[0].forward(score, positive_target)
        
        negative_target = np.zeros_like(negative_sample, dtype=np.int32)
        for i in range(self.sample_size):
            score = self.embed_dot_layers[i+1].forward(h, negative_sample[:, i])
            loss += self.loss_layers[i+1].forward(score, negative_target[:, i])
        return loss
    def backward(self, dout=1):
        dh = 0
        for i in range(self.sample_size+1):
            dscore = self.loss_layers[i].backward(dout)
            dh += self.embed_dot_layers[i].backward(dscore)
            
        return dh

In [None]:
from common.np import *
from common.layers import Embedding
from common.negative_sampling_layer import NegativeSamplingLoss

class CBOW:
    def __init__(self, vocab_size, hidden_size, window_size, corpus):
        w_in = np.random.randn(vocab_size, hidden_size).astype('f')
        w_out = np.random.randn(vocab_size, hidden_size).astype('f')
        
        self.in_layers = []
        for _ in range(2*window_size):
            self.in_layers += [Embedding(w_in)]
        self.ns_loss = NegativeSamplingLoss(w_out, corpus)
        
        layers = self.in_layers + [self.ns_loss]
        self.params, self.grads = [], []
        for layer in layers:
            self.params += layer.params
            self.grads += layer.grads
            
        self.word_vecs = w_in
    def forward(self, contexts, target):
        h = 0
        for i, layer in enumerate(self.in_layers):
            h += layer.forward(contexts[:, i])
        h *= 1/len(self.in_layers)
        loss = self.ns_loss.forward(h, target)
        return loss
    def backward(self, dout=1):
        dh = self.ns_loss.backward(dout)
        dh *= 1/len(self.in_layers)
        for layer in self.in_layers:
            layer.backward(dh)
        return None

In [None]:
from common.trainer import Trainer
from common.optimizer import Adam
from common.util import crate_contexts
import ptb

window_size = 5
hidden_size = 100
batch_size = 100
max_epoch = 10

corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)

contexts, 