### 다중 분류에서 이진 분류로(구현)

In [1]:
import sys
sys.path.append('..')
from common.np import *  # import numpy as np
from common.layers import Embedding, SigmoidWithLoss
import collections

class EmbeddingDot:
    def __init__(self, W):
        self.embed = Embedding(W)
        self.params = self.embed.params
        self.grads = self.embed.grads
        self.cache = None

    def forward(self, h, idx):
        target_W = self.embed.forward(idx)
        print(target_W)
        out = np.sum(target_W * h, axis=1)   # 열의 합, sum((2,3)*(2,3)) => (2,)

        self.cache = (h, target_W)
        return out

    def backward(self, dout):
        h, target_W = self.cache
        dout = dout.reshape(dout.shape[0], 1)

        dtarget_W = dout * h
        print(dtarget_W)
        self.embed.backward(dtarget_W)
        print(self.embed.grads[0])
        dh = dout * target_W
        return dh

In [2]:
a = np.arange(7).reshape(-1,1) # a 세워서
W_out = np.repeat(a, 3, axis=1) # 열로 repeat
print(W_out)

h = np.array([[3,4,5]])   # (1,3)

idx = np.array([1])       # 정답 'say' index
ed = EmbeddingDot(W_out)      # (1,3)(3,1) => (1,1)  sum([[3,4,5]]*[[1,1,1]],axis=1)=>[12]

out = ed.forward(h, idx)
print(out)
print(ed.cache)

[[0 0 0]
 [1 1 1]
 [2 2 2]
 [3 3 3]
 [4 4 4]
 [5 5 5]
 [6 6 6]]
[[1 1 1]]
[12]
(array([[3, 4, 5]]), array([[1, 1, 1]]))


In [3]:
dout = np.array([3])
dh = ed.backward(dout)
print(ed.embed.grads[0]) # 미분값
print("dh = ",dh)

[[ 9 12 15]]
[[ 0  0  0]
 [ 9 12 15]
 [ 0  0  0]
 [ 0  0  0]
 [ 0  0  0]
 [ 0  0  0]
 [ 0  0  0]]
[[ 0  0  0]
 [ 9 12 15]
 [ 0  0  0]
 [ 0  0  0]
 [ 0  0  0]
 [ 0  0  0]
 [ 0  0  0]]
dh =  [[3 3 3]]


In [4]:
# 샘플 2개일때
h = np.array([[3,4,5],
              [6,7,8]])   # (2,3)
idx = np.array([1,2])     # 정답 자리
ed = EmbeddingDot(W_out)
ed.forward(h, idx)

[[1 1 1]
 [2 2 2]]


array([12, 42])

In [5]:
# 미분
dout = np.array([3,4])
dh = ed.backward(dout)
print(dh)

[[ 9 12 15]
 [24 28 32]]
[[ 0  0  0]
 [ 9 12 15]
 [24 28 32]
 [ 0  0  0]
 [ 0  0  0]
 [ 0  0  0]
 [ 0  0  0]]
[[3 3 3]
 [8 8 8]]


### 네거티브 샘플링의 샘플링 기법

In [8]:
import numpy as np

np.random.choice(10)

1

In [9]:
np.random.choice(10)

4

In [12]:
words = ['you','say','goodbye','I','hello','.']
np.random.choice(words)

'I'

In [15]:
# 중복 허용
np.random.choice(words, size=5)

array(['goodbye', 'I', 'I', 'say', 'you'], dtype='<U7')

In [16]:
# 중복 불허 replace=False
np.random.choice(words, size=5, replace=False)

array(['hello', 'you', 'I', 'say', '.'], dtype='<U7')

In [28]:
# 확률 기반 추출
p = [0.5, 0.1, 0.05, 0.2, 0.05, 0.1]
np.random.choice(words, p=p)

'you'

In [29]:
# 빈도수가 낮은 경우 네거티브샘플링이 안될 수 있다..
p = [1000, 1010, 10]
p /= np.sum(p)
print(p)

[0.4950495 0.5       0.0049505]


In [31]:
# 확률이 낮은 경우 보정
p = [1000, 1010, 10]
new_p = np.power(p, 0.75) # 작은 수 값을 올려줌
new_p /= np.sum(new_p)
print(new_p)

[0.4904092  0.49408269 0.0155081 ]


In [32]:
# collections 패키지: 카테고리별 카운팅 가능
import collections

corpus = np.array([1,2,3,4,5,2,3,4,1,1,4,2,2,2])
counts = collections.Counter()
print(len(counts))

for word_id in corpus:
    counts[word_id] += 1
    
print(len(counts))
print(counts) # 빈도수 기준으로 정렬

0
5
Counter({2: 5, 1: 3, 4: 3, 3: 2, 5: 1})


In [33]:
GPU = False

class UnigramSampler:
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None

        counts = collections.Counter()
        for word_id in corpus:
            counts[word_id] += 1

        print(counts)
        
        vocab_size = len(counts)
        self.vocab_size = vocab_size
        
        # 빈도수
        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]
        
        # 보정
        print(self.word_p)
        self.word_p = np.power(self.word_p, power)
        self.word_p /= np.sum(self.word_p)
        print(self.word_p)

    def get_negative_sample(self, target):
        batch_size = target.shape[0]

        if not GPU:
            negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)

            for i in range(batch_size):
                p = self.word_p.copy()
                target_idx = target[i]
                p[target_idx] = 0
                p /= p.sum()
                # 확률에 근거해서 랜덤 추출
                negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
        else:
            # GPU(cupy）로 계산할 때는 속도를 우선한다.
            # 부정적 예에 타깃이 포함될 수 있다.
            negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size),
                                               replace=True, p=self.word_p)

        return negative_sample

In [34]:
# 빈도수 기반으로 샘플 사이즈만큼 추출
corpus = np.array([0, 1, 2, 3, 4, 1, 2, 3, 1, 1, 2])
power = 0.75
sample_size = 2

sampler = UnigramSampler(corpus, power, sample_size)
target = np.array([1, 3, 0])
negative_sample = sampler.get_negative_sample(target) # [0의 확률 = 0.11376918, 1의 확률 = 0.32178782, ...]
print(negative_sample) # 1st row: target의 1번째 (1)이랑 같이 학습되는 샘플, 2nd row: target의 2번째 (3)이랑 같이 학습되는 샘플

Counter({1: 4, 2: 3, 3: 2, 0: 1, 4: 1})
[1. 4. 3. 2. 1.]
[0.11376918 0.32178782 0.25933764 0.19133618 0.11376918]
[[4 2]
 [4 1]
 [3 2]]


In [35]:
a = np.array([1,2,3])
b = np.array([4,5,6])
c = np.c_[a,b] # column concatenation
print(c)

[[1 4]
 [2 5]
 [3 6]]


### 네거티브 샘플링 구현

In [None]:
class NegativeSamplingLoss:
    def __init__(self, W, corpus, power=0.75, sample_size=5):
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]
        self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)]

        self.params, self.grads = [], []
        for layer in self.embed_dot_layers:
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, h, target):
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target)

        # 긍정적 예 순전파
        score = self.embed_dot_layers[0].forward(h, target)
        correct_label = np.ones(batch_size, dtype=np.int32) # Positive sample = 1 
        loss = self.loss_layers[0].forward(score, correct_label) # Sigmoid with loss

        # 부정적 예 순전파
        negative_label = np.zeros(batch_size, dtype=np.int32) # Negative sample = 0
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i]
            score = self.embed_dot_layers[1 + i].forward(h, negative_target)
            loss += self.loss_layers[1 + i].forward(score, negative_label)

        return loss

    def backward(self, dout=1):
        dh = 0
        for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):
            dscore = l0.backward(dout)
            dh += l1.backward(dscore)

        return dh
