# Negative Sampling  : 네거티브 샘플링

### Embedding과 EmbeddingDot 계층

In [1]:
import numpy as np

# nn_layers.py에 추가하여 놓는다

# Embedding 계층
class Embedding :
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.idx = None
    
    # 순전파
    def forward(self,idx):
        W, = self.params
        self.idx = idx
        out = W[idx]
        return out
    
    # 역전파
    def backward(self, dout):  # 중복 인덱스가 있어도 올바르게 처리, 속도가 빠름
        dW, = self.grads
        dW[...] = 0
        np.add.at(dW, self.idx, dout)  
        return None       

In [2]:
# EmbeddingDot 계층
# nn_layers.py에 추가하여 놓는다

class EmbeddingDot:
    def __init__(self,W):
        self.embed = Embedding(W)
        self.params = self.embed.params
        self.grads = self.embed.grads
        self.cache = None
        
    def forward(self,h,idx):
        target_W = self.embed.forward(idx)
        out = np.sum(target_W*h,axis=1)   # 1차원 출력
        self.cache = (h, target_W)
        return out
    
    def backward(self, dout):
        h, target_W = self.cache
        dout = dout.reshape(dout.shape[0],1) # 2차원으로 변환
        
        dtarget_W = dout*h  # sum <--> repeat, 브로드캐스트
        self.embed.backward(dtarget_W)
        
        dh = dout*target_W  # 브로드캐스트
        return dh

In [3]:
# EmbeddingDot 클래스 테스트 
W = np.arange(21).reshape(7,3)
print('W:\n',W)

idx = np.array([0,3,1])
print('idx:\n',idx)

h = W[[0,1,2]]
print('h:\n',h)

embed_dot = EmbeddingDot(W)
out = embed_dot.forward(h,idx)
print('out:\n',out)

print('h:\n',embed_dot.cache[0])
print('target_W:\n',embed_dot.cache[1])

W:
 [[ 0  1  2]
 [ 3  4  5]
 [ 6  7  8]
 [ 9 10 11]
 [12 13 14]
 [15 16 17]
 [18 19 20]]
idx:
 [0 3 1]
h:
 [[0 1 2]
 [3 4 5]
 [6 7 8]]
out:
 [  5 122  86]
h:
 [[0 1 2]
 [3 4 5]
 [6 7 8]]
target_W:
 [[ 0  1  2]
 [ 9 10 11]
 [ 3  4  5]]


In [4]:
# EmbeddingDot 계층의 forward 함수내에서 변수의 값 변화 정보
idx = np.array([0,3,1])
embed = Embedding(W)
target_W = embed.forward(idx) # W에서 임베딩 처리
print('target_W:\n',target_W)

h = W[[0,1,2]]
print('h:\n',h)

temp = target_W * h
print('temp:\n',temp)

out = np.sum(temp,axis=1)
print('out:\n',out)

target_W:
 [[ 0  1  2]
 [ 9 10 11]
 [ 3  4  5]]
h:
 [[0 1 2]
 [3 4 5]
 [6 7 8]]
temp:
 [[ 0  1  4]
 [27 40 55]
 [18 28 40]]
out:
 [  5 122  86]


### 네거티브 샘플링 기법
 https://ddiri01.tistory.com/310

In [5]:
# 0에서 9까지의 숫자 중 하나를 무작위로 샘플링
np.random.choice(10)

1

In [6]:
# words에서만 하나만 무작위로 샘플링
words = ['you','say','goodbye','i','hello','.']
np.random.choice(words)

'you'

In [7]:
# 5개만 무작위로 샘플링 (중복 허용)
np.random.choice(words,size=5)

array(['you', '.', 'i', 'hello', '.'], dtype='<U7')

In [8]:
# 5개만 무작위로 샘플링 (중복 금지)
np.random.choice(words,size=5,replace=False)

array(['goodbye', 'you', 'i', '.', 'hello'], dtype='<U7')

In [9]:
# 주어진 확률 분포에 따라 샘플링
p = [0.5,0.1,0.05,0.2,0.05,0.1]  # 합이 1.0
print(sum(p))

np.random.choice(words,p = p)

1.0000000000000002


'say'

In [10]:
# 네거티브 샘플링에서는 기본 확률 분포에 0.75를 제곱해준다
# 확률분포 값들에 모두 0.75제곱 처리 ==> 원래 확률이 낮은 단어를 버리지 않기 위해서
p = [0.7,0.29,0.01]
print(sum(p))
new_p = np.power(p,0.75)
print(new_p)
new_p /= np.sum(new_p) # 다시 전체 요소의 합으로나누어 각 요소의 합이 1.0이 나오도록 새로운 확률 분포를 만든다
print(new_p)
print(sum(new_p))

1.0
[0.76528558 0.39518322 0.03162278]
[0.64196878 0.33150408 0.02652714]
1.0


In [11]:
print(2**4, np.power(2,4) ) # 2^4

16 16


In [12]:
# 3개의 긍정 unigram 타겟을 주면  부정 맥락 2개씩 3개를 샘플링하여 생성해주는 클래스  
# nn_layers.py에 추가하여 놓는다

import collections
class UnigramSampler:    
    # 생성자 : corpus를 사용하여 단어의 0.75제곱 처리한 확률 분포를 구함
    def __init__(self, corpus, power, sample_size): # power= 0.75, sample_size = 2
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None

        # corpus 내의 단어별 발생횟수를 구함    
        counts = collections.Counter()  
        for word_id in corpus:   # corpus: [0 1 2 3 4 1 5 6], 
            counts[word_id] += 1

        vocab_size = len(counts)
        self.vocab_size = vocab_size  # 7

        self.word_p = np.zeros(vocab_size)  # (7,)
        for i in range(vocab_size):  # 7 
            self.word_p[i] = counts[i]  # [1, 2, 1, 1, 1, 1, 1] ,단어 발생 횟수

        self.word_p = np.power(self.word_p, power) # 0.75제곱
        self.word_p /= np.sum(self.word_p)  # 전체의 합으로 나누어 확률을 구함


    def get_negative_sample(self, target):   # target = np.array([1, 3, 0]), (3,)
        batch_size = target.shape[0]  # 3
        
        negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)  # (3,2)

        for i in range(batch_size):  # 3회
            p = self.word_p.copy()
            target_idx = target[i]  # 1,3,0
            p[target_idx] = 0  # p[1]=0,p[3]=0,p[0]=0 ,부정 단어로 target이 선택되지 않도록 확률 값을 0으로 설정 
            p /= p.sum()
            negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
            
        return negative_sample


In [13]:
# collections.Counter어떤 단어가 주어졌을 때 단어에 포함된 각 알파벳의 글자 수를 dict로 세어주는 클래스
# https://www.daleseo.com/python-collections-counter/
import collections
print(collections.Counter('hello world'))
print(collections.Counter('aaaabbbccdeeeffffggg'))

corpus = np.array([0, 1, 2, 3, 4, 1, 5, 6])
print(collections.Counter(corpus))

# corpus 내의 단어별 발생횟수를 구함
counts = collections.Counter()
print('corpus:',corpus)
corpus = np.array([0, 1, 2, 3, 4, 1, 5, 6])
for word_id in corpus:
    print('**before:',word_id,counts[word_id])
    counts[word_id] += 1
    print('   after:',word_id,counts[word_id])
counts

Counter({'l': 3, 'o': 2, 'h': 1, 'e': 1, ' ': 1, 'w': 1, 'r': 1, 'd': 1})
Counter({'a': 4, 'f': 4, 'b': 3, 'e': 3, 'g': 3, 'c': 2, 'd': 1})
Counter({1: 2, 0: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1})
corpus: [0 1 2 3 4 1 5 6]
**before: 0 0
   after: 0 1
**before: 1 0
   after: 1 1
**before: 2 0
   after: 2 1
**before: 3 0
   after: 3 1
**before: 4 0
   after: 4 1
**before: 1 1
   after: 1 2
**before: 5 0
   after: 5 1
**before: 6 0
   after: 6 1


Counter({0: 1, 1: 2, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1})

In [14]:
corpus= np.array([0, 1, 2, 3, 4, 1, 2, 3]) 
power = 0.75
sample_size = 2 # 부정적 예 2개를 샘플링

sampler = UnigramSampler(corpus,power,sample_size)

target = np.array([2,4,3])
negative_sample = sampler.get_negative_sample(target)

print('긍정(Yes) : ', target)
print('부정(No)  : \n', negative_sample)  # 실행 시마다 다름

긍정(Yes) :  [2 4 3]
부정(No)  : 
 [[1 3]
 [3 1]
 [2 1]]


### SigmoidWithLoss

In [15]:
# SigmoidWithLoss 클래스 사용 
# nn_layers.py 에 추가한다

class SigmoidWithLoss:
    def __init__(self):
        self.params, self.grads = [], []
        self.loss = None
        self.y = None  # sigmoid의 출력
        self.t = None  # 정답 데이터

    def cross_entropy_error(self,y, t):   # softmax 와 동일
        if y.ndim == 1:
            t = t.reshape(1, t.size)
            y = y.reshape(1, y.size)

        # 정답 데이터가 원핫 벡터일 경우 정답 레이블 인덱스로 변환
        if t.size == y.size:
            t = t.argmax(axis=1)

        batch_size = y.shape[0]

        return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size
        
    def forward(self, x, t):
        self.t = t
        self.y = 1 / (1 + np.exp(-x))   # sigmoid , 예측값

        self.loss = self.cross_entropy_error(np.c_[1 - self.y, self.y], self.t)

        return self.loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]

        dx = (self.y - self.t) * dout / batch_size
        return dx

In [16]:
# np.c_()  # c_: column으로 합치기
# https://rfriend.tistory.com/tag/np.c_%20%ED%95%A8%EC%88%98

import numpy as np

N = 3
A = np.eye(N)
print('A:\n', A)
B = np.c_[A, A[2]]
print('B:\n', B)

A = [1,2,3]
B = [4,5,6]
C = np.c_[A,B]
print(C)

R = np.r_[A,B]  # r_: row로 합치기
print(R)

A:
 [[1. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
B:
 [[1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 1.]]
[[1 4]
 [2 5]
 [3 6]]
[1 2 3 4 5 6]


### 네거티브 샘플링 구현

In [17]:
# NegativeSamplingLoss 클래스
# nn_layers.py 에 추가한다

class NegativeSamplingLoss:
    def __init__(self,W,corpus,power=0.75,sample_size=5): #  sample_size : 부정 단어 샘플링 수 (2개)
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus,power,sample_size)
        self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)] # 긍정 1개 + 부정 2개
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]   # 긍정 1개 + 부정 2개
        
        self.params, self.grads = [],[]
        for layer in self.embed_dot_layers:
            self.params += layer.params
            self.grads += layer.grads
            
    def forward(self,h,target) : # target은 긍정단어의 index
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target) # 부정 단어 샘플
        
        # 긍정단어 순전파
        score = self.embed_dot_layers[0].forward(h,target)  # target-->index
        correct_label = np.ones(batch_size, dtype=np.int32) # 값이 모두 1: 긍정
        loss = self.loss_layers[0].forward(score,correct_label)        
        
        # 부정단어 순전파
        negative_label = np.zeros(batch_size,dtype-np.int32) # 값이 모두 0 : 부정 
        
        for i in range(self.sample_size):
            negative_target = negative_sample[:,i]
            score = self.embed_dot_layers[i + 1].forward(h,negative_target)
            loss += self.loss_layers[i + 1].forward(score,negative_label) # loss의 누적 합
            
        return loss
     
    def backward(self,dout=1)  : # 입력값을 각 계층의 backward만 호출하여 전달
        dh = 0
        for l0,l1 in zip(self.loss_layers, self.embed_dot_layers):  # 역전파이므로  los_layer가 먼저 호출된다
            dscore = l0.backward(dout)   # SigmoidWithLoss 계층
            dh += l1.backward(dscore)   # EmbeddingDot 계층   
        return dh

In [21]:
W = np.random.randn(7,3)
neg = NegativeSamplingLoss(W,corpus,0.75,2)
neg.loss_layers, neg.embed_dot_layers  # 각각 3개씩 생성 되었다

([<__main__.SigmoidWithLoss at 0x1f176c1a548>,
  <__main__.SigmoidWithLoss at 0x1f176c1a648>,
  <__main__.SigmoidWithLoss at 0x1f176c1a588>],
 [<__main__.EmbeddingDot at 0x1f176bc9448>,
  <__main__.EmbeddingDot at 0x1f176c1a148>,
  <__main__.EmbeddingDot at 0x1f176c1a188>])