In [19]:
import numpy as np
import pandas as pd

In [20]:
#initialize

class RNN:
    def __init__(self,hidden_size,vocab_size,seq_length,learning_rate):
        #hyper parameters
        self.hidden_size=hidden_size
        self.vocab_size=vocab_size
        self.seq_length=seq_length
        self.learning_rate=learning_rate
        
        #model parameters
        #np.random.uniform(low,high,size): low부터 high까지의 난수를 size 크기 만큼 생성
        #1/vocab_size가 아닌 1./vocab_size인 이유: 1이 아닌 '1.'으로 명시적으로 실수형 연산 지정함
        self.U=np.random.uniform(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hiddem_sie, vocab_size))
        self.V=np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size))
        self.W=np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size))
        
        #bias for hidden layer
        self.b = np.zeros((hidden_size,1)) # bias for hidden layer.
        #bias for output
        self.c = np.zeros((vocab_size,1)) # bias for output.

In [21]:
    def softmax(self, x):
        p = np.exp(x- np.max(x))
        return p / np.sum(p)

In [22]:
#Forward Loss

def forward(self, inputs, hprev):
    xs, hs, os, yacp={}, {}, {}, {}
    hs[-1]=np.copy(hprev)

    for t in range(len(inputs)):
        #xs[t]: 현재 시점 t의 입력 벡터(크기:(vocab_size,1))
        xs[t]=zero_init(self.vocab_size, 1)
        #ont-hot 인코딩
        xs[t][inputs[t]]=1

        #<hidden state>
        #hs[t]: 현재 시점 t의 은닉 상태, np.tanh(): 하이퍼볼릭 탄젠트 함수(비선형 활성화 함수)
        #hs[t-1]: 이전 시점 t-1의 은닉 상태(크기: (hidden_size,1))
        #self.U: 입력 가중치 행렬(크기: (hidden_size, vocab_size))
        #self.W: 은닉 상태 가중치 행렬(크기: (hidden_size, hidden_size))

        #np.dot(self.U, xs[t]): 입력 xs[t]에 대한 선형 변환(입력 가중치 적용) / 현재 입력 xs[t]가 은닉 상태에 어떤 영향을 미치는지 계산
        #np.dot(self.W, hs[t-1]): 이전 은닉 상태 hs[t-1]에 대한 선형 변환(은닉 상태 가중치 적용) / 이전 은닉 상태 hs[t-1]이 현재 은닉 상태에 어떤 영향을 미치는지 계산

        hs[t]=np.tanh(np.dot(self.U, xs[t])+np.dot(self.W, hs[t-1])+self.b)

        #<output> : 은닉 상태 hs[t]가 출력 os[t]로 바뀜
        #os[t]: RNN의 출력층 값
        #self.V: 출력 가중치 행렬(크기: (output_size, hidden_size))
        os[t]=np.dot(self.V, hs[t])+self.c

        #ycap[t]: 최종 확률 분포(예측값), 소프트맥스 함수로 0~1 사이 값으로 변환
        ycap[t]=self.softmax(os[t])

    return xs, hs, os, ycap 

In [23]:
#Compute Loss

def loss(self, ps, targets):
    #loss for sequence
    #cross entropy loss
    """ 
    음의 로그 가능도, Negative Log-Likelihood 계산
    1.모델이 예측한 확률 분포 ps[t]에서 정답 레이블 targets[t]의 확률값을 가져옴
    2.확률값에 log 적용
    3. 음수를 붙여 NLL 계산
    4. 시퀀스 전체에 대해 손실 계산
    """
    return sum(-np.log(ps[t][targets[t],0]) for t in range(self.seq_length))

In [24]:
#Backward Pass

"""
xs: 입력 시퀀스, xs[t]는 t 시점의 입력 벡터
hs: 은닉 상태, hs[t]는 t 시점의 은닉 상태
ps: 출력 확률 분포, ps[t]는 t 시점의 소프트맥스 출력
targets: 정답 레이블 시퀀스
"""
def backward(self, xs, hs, ps, targets):
    #역전파: 역방향으로 기울기를 계산함
    #dU, dW, dV: 가중치의 기울기 변화량을 저장하기 위해 사용, 동일한 크기, 0으로 초기화
    dU, dW, dV=np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self,V)
    #편향의 기울기 변화 저장
    db, dc=np.zeros_like(self.b), np.zeros_like(self.c)
    #다음 타임스텝에서 사용할 은닉 상태의 기울기, 0으로 초기화
    dhnext=np.zeros_like(hs[0])

    #시퀀스를 역순으로 순회(가장 마지막 시점부터 첫 번째 시점으로 진행)
    for t in reversed(range(self.seq_length)):
        #t 시점의 소프트맥스의 출력 저장
        dy=np.copy(ps[t])
        #targets[t]: 정답 클래스의 인덱스, 정답 클래스-1을 함으로써 소프트맥스의 기울기 구함
        dy[targets[t]]-=1
        #가중치 V의 기울기 업데이트
        #dy: 출력층에서의 오차
        dV+=np.dot(dy, hs[t].T)
        dc+=dc

        #출력층에서 전파된 기울기+다음 시점에서 전파된 기울기(이전 루프에서 계산된 값)
        #dh: 현재 시점에서 다음 시점으로 전달할 기울기
        dh=np.dot(self.V.T, dy)+dhnext # backprop into h
        #활성화 함수로 탄젠트 하이퍼볼릭 함수 사용했으므로 tanh의 미분을 적용
        #dhrec: 활성화 함수 통과한 후 현재 은닉 상태의 기울기
        dhrec=(1-hs[t]*hs[t])*dh
        db+=dhrec

        #입력층 가중치 U의 기울기 계산, xs[t].T: 입력 시퀀스 xs[t]의 전치 행렬
        #입력 데이터와 은닉 상태 기울기를 곱하여 가중치 변화량 구함함
        dU+=np.dot(dhrec, xs[t].T)
        dW+=np.dot(self.W.T, dhrec)

    #기울기 클리핑, exploding gradient 방지, -5~5 사이 값으로 제한
    for dparam in [dU, dW, dV, db, dc]:
        np.clip(dparam, -5, 5, out=dparam)
    return dU, dW, dV, db, dc



In [25]:
#Update Weights

def update_model(self, dU, dW, dV, db, dc):
    """
    param: 가중치 행렬 및 편향 벡터 - self.U, self.W, self.V, self.b, self.c
    dparam: 해당 가중치의 기울기 - dU, dW, dV, db, dc
    mem: Adagrad에서 사용하는 누적 기울기 제곱 - self.mU, self.mW, self.mV, self.mb, self.mc
    param에 대해 기울기를 누적하고 Adagrad 업데이트 수행행
    """
    for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c],
                                  [dU, dW, dV, db, dc],
                                  [self.mU, self.mW, self.mV, self.mb, self.mc]):
        #기울기 제곱 누적, 학습률 조정 역할
        mem+=dparam*dparam
        #adagrad update
        param+=-self.learning_rate*dparam/np.sqrt(mem+1e-9)

In [26]:
#Predict Words

"""
data_reader: 문자열을 인덱스로 변환하는 객체, 학습된 데이터의 문자-인덱스 매핑 정보 담김
start: 텍스트 생성을 시작할 초기 문자열
n: 예측할 문자의 개수수
"""
def predict(self, data_reader, start, n):

    #0으로 입력 벡터 초기화
    """
    x: 원-핫 인코딩된 입력 벡터
    self.vocab_size: 모델이 다룰 수 있는 전체 문자 개수
    """
    x=zero_init(self.vocab_size, 1)
    
    #start: 문자열을 문자 단위로 리스트로 변환
    #ixes는 예측된 문자들의 인덱스를 저장할 리스트
    chars=[ch for ch in start]
    ixes=[]
    for i in range(len(chars)):
        ix=data_reader.char_to_ix[chars[i]] #문자를 인덱스로 변환
        x[ix]=1 #해당 위치를 1로 설정(원-핫 인코딩)
        ixes.append(ix) # 변환된 문자 인덱스를 ixes 리스트에 저장장

    #은닉 상태 초기화
    h=np.zeros((self.hidden_size, 1))
    #predict next n chars
    for t in range(n):
        h=np.tanh(np.dot(self.U, x)+np.dot(self.W,h)+self.b) #RNN의 은닉 상태 업데이트 공식
        y=np.dot(self.V, h)+self.c #RNN의 출력 계산 공식식
        p=np.exp(y)/np.sum(np.exp(y)) #확률 분포(소프트맥스 적용), y의 지수 값을 정규화하여 확률 계산산
        
        #확률 p를 기반으로 랜덤하게 문자 인덱스 샘플링
        #p.ravel(): 벡터를 1차원 배열로 변환해 확률 분포로 활용
        ix=np.random.choice(range(self.vocab_size),p=p.ravel())

        #선택한 문자를 다음 입력으로 설정
        x=zero_init(self.vocab_size, 1)
        x[ix]=1

        #ixes.append(ix): 새로 예측한 문자 인덱스를 저장
        ixes.append(ix)
        ixes.append(ix)
    txt=''.join(data_reader.ix_to_char[i] for i in ixes)
    return txt

In [27]:
import numpy as np


# To read the training data and make a vocabulary and dictiornary to index the chars
class DataReader:
    def __init__(self, path, seq_length):
        #uncomment below , if you dont want to use any file for text reading and comment next 2 lines
        #self.data = "some really long text to test this. maybe not perfect but should get you going."
        self.fp = open(path, "r")
        self.data = self.fp.read()
        #find unique chars
        chars = list(set(self.data))
        #create dictionary mapping for each char
        self.char_to_ix = {ch:i for (i,ch) in enumerate(chars)}
        self.ix_to_char = {i:ch for (i,ch) in enumerate(chars)}
        #total data
        self.data_size = len(self.data)
        #num of unique chars
        self.vocab_size = len(chars)
        self.pointer = 0
        self.seq_length = seq_length

    def next_batch(self):
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        inputs = [self.char_to_ix[ch] for ch in self.data[input_start:input_end]]
        targets = [self.char_to_ix[ch] for ch in self.data[input_start+1:input_end+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_size:
            # reset pointer
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

    def close(self):
        self.fp.close()

In [28]:
seq_length = 25
#read text from the "input.txt" file
data_reader = DataReader("/kaggle/input/input-txt/input.txt", seq_length)
rnn = RNN(hidden_size=100, vocab_size=data_reader.vocab_size,seq_length=seq_length,learning_rate=1e-1)
# rnn.train(data_reader)

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/input-txt/input.txt'

In [None]:
rnn.predict(data_reader, 'Alice', 50)

#전체 코드

In [None]:
def zero_init(rows, cols):
    return np.zeros((rows, cols))

In [None]:
class RNN:
    def __init__(self, hidden_size, vocab_size, seq_length, learning_rate):
        # hyper parameters
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.seq_length = seq_length
        self.learning_rate = learning_rate
        # model parameters
        self.U = np.random.uniform(-np.sqrt(1./vocab_size), np.sqrt(1./vocab_size), (hidden_size, vocab_size))
        self.V = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (vocab_size, hidden_size))
        self.W = np.random.uniform(-np.sqrt(1./hidden_size), np.sqrt(1./hidden_size), (hidden_size, hidden_size))
        self.b = np.zeros((hidden_size, 1)) # bias for hidden layer
        self.c = np.zeros((vocab_size, 1)) # bias for output
        
        # memory vars for adagrad, 
        #ignore if you implement another approach
        self.mU = np.zeros_like(self.U)
        self.mW = np.zeros_like(self.W)
        self.mV = np.zeros_like(self.V)
        self.mb = np.zeros_like(self.b)
        self.mc = np.zeros_like(self.c)
    
    def softmax(self, x):
        p = np.exp(x- np.max(x))
        return p / np.sum(p)
        
    def forward(self, inputs, hprev):
            xs, hs, os, ycap = {}, {}, {}, {}
            hs[-1] = np.copy(hprev)
            for t in range(len(inputs)):
                xs[t] = zero_init(self.vocab_size,1)
                xs[t][inputs[t]] = 1 # one hot encoding , 1-of-k
                hs[t] = np.tanh(np.dot(self.U,xs[t]) + np.dot(self.W,hs[t-1]) + self.b) # hidden state
                os[t] = np.dot(self.V,hs[t]) + self.c # unnormalised log probs for next char
                ycap[t] = self.softmax(os[t]) # probs for next char
            return xs, hs, ycap
        
    def backward(self, xs, hs, ps, targets):
            # backward pass: compute gradients going backwards
            dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
            db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
            dhnext = np.zeros_like(hs[0])
            for t in reversed(range(self.seq_length)):
                dy = np.copy(ps[t])
                #through softmax
                dy[targets[t]] -= 1 # backprop into y
                #calculate dV, dc
                dV += np.dot(dy, hs[t].T)
                dc += dc
                #dh includes gradient from two sides, next cell and current output
                dh = np.dot(self.V.T, dy) + dhnext # backprop into h
                # backprop through tanh non-linearity 
                dhrec = (1 - hs[t] * hs[t]) * dh  #dhrec is the term used in many equations
                db += dhrec
                #calculate dU and dW
                dU += np.dot(dhrec, xs[t].T)
                dW += np.dot(dhrec, hs[t-1].T)
                #pass the gradient from next cell to the next iteration.
                dhnext = np.dot(self.W.T, dhrec)
            # clip to mitigate exploding gradients
            for dparam in [dU, dW, dV, db, dc]:
                np.clip(dparam, -5, 5, out=dparam) 
            return dU, dW, dV, db, dc
    
    def loss(self, ps, targets):
            """loss for a sequence"""
            # calculate cross-entrpy loss
            return sum(-np.log(ps[t][targets[t],0]) for t in range(self.seq_length))
        
    
    def update_model(self, dU, dW, dV, db, dc):
        # parameter update with adagrad
        for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c],
                                  [dU, dW, dV, db, dc],
                                  [self.mU, self.mW, self.mV, self.mb, self.mc]):
            mem += dparam*dparam
            param += -self.learning_rate*dparam/np.sqrt(mem+1e-8) # adagrad update
                
                
    def sample(self, h, seed_ix, n):
            """
            sample a sequence of integers from the model
            h is memory state, seed_ix is seed letter from the first time step
            """
            x = zero_init(self.vocab_size, 1)
            x[seed_ix] = 1
            ixes = []
            for t in range(n):
                h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
                y = np.dot(self.V, h) + self.c
                p = np.exp(y)/np.sum(np.exp(y))
                ix = np.random.choice(range(self.vocab_size), p = p.ravel())
                x = zero_init(self.vocab_size,1)
                x[ix] = 1
                ixes.append(ix)
            return ixes

    def train(self, data_reader):
            iter_num = 0
            threshold = 0.01
            smooth_loss = -np.log(1.0/data_reader.vocab_size)*self.seq_length
            while (smooth_loss > threshold):
                if data_reader.just_started():
                    hprev = np.zeros((self.hidden_size,1))
                inputs, targets = data_reader.next_batch()
                xs, hs, ps = self.forward(inputs, hprev)
                dU, dW, dV, db, dc = self.backward(xs, hs, ps, targets)
                loss = self.loss(ps, targets)
                self.update_model(dU, dW, dV, db, dc)
                smooth_loss = smooth_loss*0.999 + loss*0.001
                hprev = hs[self.seq_length-1]
                if not iter_num%500:
                    sample_ix = self.sample(hprev, inputs[0], 200)
                    print( ''.join(data_reader.ix_to_char[ix] for ix in sample_ix))
                    print( "\n\niter :%d, loss:%f"%(iter_num, smooth_loss))
                iter_num += 1

    def predict(self, data_reader, start, n):

        #initialize input vector
        x = zero_init(self.vocab_size, 1)
        chars = [ch for ch in start]
        ixes = []
        for i in range(len(chars)):
            ix = data_reader.char_to_ix[chars[i]]
            x[ix] = 1
            ixes.append(ix)

        h = np.zeros((self.hidden_size,1))
        # predict next n chars
        for t in range(n):
            h = np.tanh(np.dot(self.U, x) + np.dot(self.W, h) + self.b)
            y = np.dot(self.V, h) + self.c
            p = np.exp(y)/np.sum(np.exp(y))
            ix = np.random.choice(range(self.vocab_size), p = p.ravel())
            x = zero_init(self.vocab_size,1)
            x[ix] = 1
            ixes.append(ix)
        txt = ''.join(data_reader.ix_to_char[i] for i in ixes)
        return txt

In [None]:
import numpy as np


# To read the training data and make a vocabulary and dictiornary to index the chars
class DataReader:
    def __init__(self, path, seq_length):
        #uncomment below , if you dont want to use any file for text reading and comment next 2 lines
        #self.data = "some really long text to test this. maybe not perfect but should get you going."
        self.fp = open(path, "r")
        self.data = self.fp.read()
        #find unique chars
        chars = list(set(self.data))
        #create dictionary mapping for each char
        self.char_to_ix = {ch:i for (i,ch) in enumerate(chars)}
        self.ix_to_char = {i:ch for (i,ch) in enumerate(chars)}
        #total data
        self.data_size = len(self.data)
        #num of unique chars
        self.vocab_size = len(chars)
        self.pointer = 0
        self.seq_length = seq_length

    def next_batch(self):
        input_start = self.pointer
        input_end = self.pointer + self.seq_length
        inputs = [self.char_to_ix[ch] for ch in self.data[input_start:input_end]]
        targets = [self.char_to_ix[ch] for ch in self.data[input_start+1:input_end+1]]
        self.pointer += self.seq_length
        if self.pointer + self.seq_length + 1 >= self.data_size:
            # reset pointer
            self.pointer = 0
        return inputs, targets

    def just_started(self):
        return self.pointer == 0

    def close(self):
        self.fp.close()

In [None]:
seq_length = 25
#read text from the "input.txt" file
data_reader = DataReader("/kaggle/input/input-txt/input.txt", seq_length)
rnn = RNN(hidden_size=100, vocab_size=data_reader.vocab_size,seq_length=seq_length,learning_rate=1e-1)
# rnn.train(data_reader)

In [None]:
rnn.predict(data_reader, 'Alice', 50)