In [4]:
# Testing Sequence to Sequence model on a toy dataset.

In [1]:
import tensorflow as tf
import numpy as np

# Sequence to Sequence model has two parts an encoder part which encodes the variable lenght input, say a source language, to a fixed size representation called the context, and a decoder part which decodes this representation to a variable lenght representation, say to a target language.

In [8]:
# Encoder Part

class seq2seq_encoder:
    
    def __init__(self,n_hidden,n_layers,batch_size,time_steps,input_size):
        context_size= input_size
        self.w= tf.Variable(tf.truncated_normal(shape= [input_size,
                                                        n_hidden],
                                               dtype= tf.float32,
                                               stddev= 0.01))
        self.b= tf.Variable(tf.constant(0.01, shape= [n_hidden]))
        self.cell= tf.contrib.rnn.GRUCell(n_hidden)
        self.input_size= input_size
        self.context_size= input_size
        self.w_cntxt= tf.Variable(tf.truncated_normal(shape= [n_hidden,
                                                              input_size],
                                               dtype= tf.float32,
                                               stddev= 0.01))
        self.b_cntxt= tf.Variable(tf.constant(0.01, shape= [input_size]))
        self.n_hidden= n_hidden
        self.batch_size= batch_size
        self.time_steps= time_steps
        
    # shape of encoder_input must be [batch_size,time_steps,input_dim] 
    # encode method returns context and state of each batch.
    def encode(self,encoder_input):
        X_in= tf.matmul(tf.reshape(encoder_input,(-1,self.input_size)),
                       self.w)+self.b
        X_in= tf.reshape(X_in, (-1,self.time_steps,self.n_hidden))
        init_state= self.cell.zero_state(self.batch_size,
                                         dtype= tf.float32)
        output,state= tf.nn.dynamic_rnn(self.cell,X_in,
                                        initial_state= init_state,
                                       scope= 'encoder')
        output= tf.unstack(tf.transpose(output, (1,0,2)))
        context= tf.matmul(output[-1],self.w_cntxt)+self.b_cntxt
        
        return (context,state)

In [9]:
# Decoder Part

class seq2seq_decoder:
    
    def __init__(self,n_hidden,n_layers,batch_size,time_steps,input_size):
        self.w= tf.Variable(tf.truncated_normal(shape= [input_size,n_hidden],
                                               dtype= tf.float32,
                                               stddev= 0.01))
        self.b= tf.Variable(tf.constant(0.01, shape= [n_hidden]))
        self.cell= tf.contrib.rnn.GRUCell(n_hidden)
        self.input_size= input_size
        self.w_out= tf.Variable(tf.truncated_normal(shape= [n_hidden,input_size],
                                               dtype= tf.float32,
                                               stddev= 0.01))
        self.b_out= tf.Variable(tf.constant(0.01, shape= [input_size]))
        self.time_steps= time_steps
        self.n_layers= n_layers
        self.n_hidden= n_hidden
    
    # decode methods returns the decoded output for each batch at 
    # each time steps.
    def decode(self,labels,context,state):
        c= tf.reshape(context, (-1,1,self.input_size))
        decoder_input= tf.concat((c,labels), axis= 1)
        decoder_input= decoder_input[:,:-1]
        X_in= tf.matmul(tf.reshape(decoder_input,
                                   (-1,self.input_size)), self.w)+self.b
        X_in= tf.reshape(X_in, (-1,time_steps,n_hidden))
        output,state= tf.nn.dynamic_rnn(self.cell,X_in,
                                        initial_state= state,
                                       scope= 'decoder')
        output= tf.matmul(tf.reshape(output, 
                                     (-1,self.n_hidden)),
                          self.w_out)+self.b_out
        output= tf.reshape(output, (-1,self.time_steps,self.input_size))
        
        return output

In [15]:
# Hyper-Parameters

time_steps= 20
batch_size= 128
input_size= 5
output_size= input_size
n_hidden= 64
num_layers= 1

# I will generate a dataset for the model. Here I will encode this dataset using encoder part and then recover the original data using decoder part of the model.

In [13]:
x= np.random.rand(batch_size*100,time_steps,input_size)

# Modelling the architecture

In [14]:
encoder_input= tf.placeholder(shape= [None,time_steps,input_size], dtype= tf.float32)
decoder_input= tf.placeholder(shape= [None,time_steps,input_size], dtype= tf.float32)
labels= tf.placeholder(shape= [None,time_steps,output_size], dtype= tf.float32)

In [16]:
encoder= seq2seq_encoder(n_hidden,num_layers,
                         batch_size,time_steps,input_size)

In [17]:
context,state= encoder.encode(encoder_input)

In [18]:
decoder= seq2seq_decoder(n_hidden,num_layers,
                         batch_size,time_steps,input_size)
y_= decoder.decode(labels,context,state)

In [19]:
loss= tf.losses.mean_squared_error(labels,y_)
optimize= tf.train.AdamOptimizer(0.001).minimize(loss)

In [20]:
init= tf.global_variables_initializer()
sess= tf.Session()
sess.run(init)

# Training....

In [None]:
for i in range(600):
    total_loss= 0
    for j in range(90):
        enc_batch= x[batch_size*j:batch_size*(j+1)]
        labels_batch= enc_batch
        
        cost,_= sess.run([loss,optimize], feed_dict= {
            encoder_input:enc_batch, labels: labels_batch})
        total_loss+= cost
        print (i,j,cost)
    print (i,total_loss/90)

# Testing...

In [25]:
total_loss= 0
for j in range(90,100):
    enc_batch= x[batch_size*j:batch_size*(j+1)]
    lables_batch= enc_batch
        
    cntxt= sess.run(context, feed_dict= {encoder_input: enc_batch})
    cntxt= cntxt.reshape(-1,1,input_size)
        
    dec_batch= np.concatenate((cntxt,enc_batch), axis= 1)
    dec_batch= dec_batch[:,:-1]
        
    cost,_= sess.run([loss,optimize],
                        feed_dict= {encoder_input: enc_batch,
                                        decoder_input: dec_batch,
                                        labels: lables_batch})
    total_loss+= cost
        
print ('total loss for the test part is %.3f'%(total_loss/10))

total loss for the test part is 0.021
