    In this demo, want to use tranformer to do the VQA
    As the original Transformer Decoder just need text input, but for the VQA, we still have the picture feature data. So for this, I add image feature input in my decoder, which name DecoderWithImage, which use the DecoderWithImageLayer. This new decoder layer add attention for the image feature, which after first multi-head attention layer in the normal decode layer. Then concate the image attention feature with decode input embedding, and pass those data through GRU layer, Dense layer, then concate with normal seconde multi-head layer, the feed forword network, which is same as normal DecoderLayer.
    Do the training with VQA data, it is much better than last VQA demo.

In [1]:
import numpy as np
import tensorflow as tf
#transformer 
#1. position_embedding
#2.self attention layer
#3.feed forward network

#1. position_embedding
#1.1 count angle degree
def get_position_angle(position, i, dmodel):
    angle = 1/np.power(10000, (2 *(i//2))/np.float32(dmodel))
    return position * angle

def get_position_embedding(pos, dmodel):
    angles = get_position_angle(np.arange(pos)[:,np.newaxis], np.arange(dmodel)[np.newaxis,:], dmodel)
    
    angles[:,0::2] = np.sin(angles[:,0::2])
    angles[:,1::2] = np.cos(angles[:,1::2])
    
    position_embedding = angles[np.newaxis, ...]
    return tf.cast(position_embedding, dtype = tf.float32)
    

In [63]:
#2. attention layer
#2.1 attention layer weight
#when weight computing formula is weight = softmax(Q.K/sqrt(depth)) , and value is weight * V
def scaled_dot_product_attention(q, k, v):
    matmul_qk = tf.linalg.matmul(q, k, transpose_b = True)
    
    b_soft = matmul_qk/tf.math.sqrt(tf.cast(tf.shape(q)[-1], dtype=tf.float32))
    
    weights = tf.keras.backend.softmax(b_soft, axis = -1)
    output = tf.linalg.matmul(weights, v)

    return output, weights
    

In [88]:
#3. feed forward network
def feed_forward_network(dmodel, dff):
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Dense(dff, activation='relu'))
    model.add(tf.keras.layers.Dense(dmodel))
    return model
    #return tf.keras.Sequential([tf.keras.layers.Dense(dff, activation='relu'), tf.keras.layers.Dense(dmodel)])

In [113]:
#test for feed forward network
s_ffn = feed_forward_network(512, 2048)
print(s_ffn(tf.random.uniform((64, 50, 512))).shape)

(64, 50, 512)


In [89]:
#tranformer is consist of Encoder and Decoder
#for Encoder, it is consist of many encoder layers, for Decoder, it is consist of many decoder layers.
#each encoder layer have self attention layer, which is multi-head self attention layer, and feed forward network. And between them, it still has 
#normalization layer
#and for each decoder layer, after self attention layer , it has a attention layer, which will deal with the result from encoder output and weights
#after it, is a feed forward network. It still has normalization layer.
#Following, will create multi-head attention layer, then encoder layer, decoder layer, then Encoder, Decoder, Transformer.

#1. Multi-head Layer

class MultiHeadAttentionLayer(tf.keras.layers.Layer):
    def __init__(self, num_head, dmodel):
        super(MultiHeadAttentionLayer, self).__init__()
        self.num_head = num_head
        self.dmodel = dmodel
        assert dmodel % num_head == 0
        
        self.depth = dmodel // num_head
        
        self.wq = tf.keras.layers.Dense(dmodel)
        self.wk = tf.keras.layers.Dense(dmodel)
        self.wv = tf.keras.layers.Dense(dmodel)
        
        self.dense = tf.keras.layers.Dense(dmodel)
        
    def split_head(self, x, batch_size):
        """split last dimention into (num_head, depth)
            then tranpose the shape into (batch_size, num_head, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_head, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    def call(self, q, k, v):
        batch_size = tf.shape(q)[0]
        
        q = self.wq(q)
        k = self.wk(k)
        v = self.wk(v)
        
        q = self.split_head(q, batch_size)
        k = self.split_head(k, batch_size)
        v = self.split_head(v, batch_size)
        
        attention, weights = scaled_dot_product_attention(q, k, v)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3]) #make the dimention into (batch_size, seq_len, num_head, depth)
        #now concate 
        concate_output = tf.reshape(attention, (batch_size, -1, self.dmodel))
        # again do the linear change
        output = self.dense(concate_output)
        
        return output, weights       

In [129]:
class EncoderLayer(tf.keras.layers.Layer):
    """consist of one multi head attention layer, normalization layer, feed forward network, normalization layer
    """
    def __init__(self, num_head, dmodel, dff, dropout_rate=0.1):
        super(EncoderLayer, self).__init__()
    
        self.multi_head = MultiHeadAttentionLayer(num_head, dmodel)
        self.forward_network = feed_forward_network(dmodel, dff)
        
        self.normalization1 = tf.keras.layers.LayerNormalization(epsilon= 1e-6)
        self.normalization2 = tf.keras.layers.LayerNormalization(epsilon= 1e-6)
        
        #alseo we can add 2 dropout layer
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, is_training):
        attend_out, attention_weights = self.multi_head(x, x, x)
        attend_out = self.dropout1(attend_out, training = is_training)
        out1 = self.normalization1(x+attend_out)
        out_ffn = self.forward_network(out1)
        out_ffn = self.dropout2(out_ffn, training = is_training)
        out2 = self.normalization2(out1 + out_ffn)
        
        return out2
        
        

In [121]:
#test Encode layer
test_enc_layer = EncoderLayer(8, 512, 2048)

test_enc_layer_output = test_enc_layer(
    tf.random.uniform((64, 43, 512)), False)

print(test_enc_layer_output.shape)

encoder out_ffn shape: (64, 43, 512)
encoder out_ffn shape after norm: (64, 43, 512)
(64, 43, 512)


In [122]:
class DecoderLayer(tf.keras.layers.Layer):
    """1. multi head attention layer, dropout, normalization layer
        2. multi head attention layer, dropout, normalization layer, in this layer, used the output of the encoder layer
        3. feed forward network layer, dropout, normalization layer
    """
    def __init__(self, num_head, dmodel, dff, dropout_rate=0.1):
        super(DecoderLayer, self).__init__()
        
        self.multi_head1= MultiHeadAttentionLayer(num_head, dmodel)
        self.multi_head2= MultiHeadAttentionLayer(num_head, dmodel)
        
        self.forward_network = feed_forward_network(dmodel, dff)
        
        self.normalization1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.normalization2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.normalization3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout3 = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, encode_output,  is_training):
        input_attention, input_attention_weights = self.multi_head1(x, x, x)
        input_attention = self.dropout1(input_attention, training = is_training)
        input_out1 = self.normalization1(x+input_attention)
        
        attention_2, attention_weights_2 = self.multi_head2(input_out1, encode_output, encode_output)
        attention_2 = self.dropout2(attention_2, training = is_training)
        out2 = self.normalization2(input_out1 + attention_2)
        
        out_ffn = self.forward_network(out2)
        out_ffn = self.dropout3(out_ffn, training = is_training)
        out3 = self.normalization3(out2 + out_ffn)
        
        return out3, input_attention_weights, attention_weights_2
        

In [125]:
#test for decoder layer
test_dec_layer = DecoderLayer(8, 512, 2048)
test_dec_out,_,_ = test_dec_layer(tf.random.uniform((64, 4, 512)), test_enc_layer_output, False)
print(test_dec_out.shape)

(64, 4, 512)


In [102]:
class Encoder(tf.keras.layers.Layer):
    """1.embedding input 
        2. add position encoding
        3. multi encoder layer
    """
    def __init__(self, num_layers, num_head, dmodel, dff, input_vocab_size, max_position_encoding, dropout_rate=0.1):
        super(Encoder, self).__init__()
        self.num_layers = num_layers
        self.dmodel = dmodel
        
        self.embedding = tf.keras.layers.Embedding(input_vocab_size, dmodel)
        self.pos_enc = get_position_embedding(max_position_encoding, dmodel)
        
        self.enc_layers = [EncoderLayer(num_head, dmodel, dff, dropout_rate) for _ in range(num_layers)]
        
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, training):
        seq_len = tf.shape(x)[1]
        #embedding, pos_encode
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.dmodel, dtype = tf.float32))
        x += self.pos_enc[:,:seq_len,:]
        
        x = self.dropout(x, training= training)
        #now encoder
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training)
        return x

In [128]:
#test for encoder
test_encoder = Encoder(num_layers=2,num_head=8,dmodel=512, 
                         dff=2048, input_vocab_size=8500,
                         max_position_encoding=10000)

test_encoder_output = test_encoder(tf.random.uniform((64, 62)), 
                                       training=False)

print (test_encoder_output.shape)

encoder out_ffn shape: (64, 62, 512)
encoder out_ffn shape after norm: (64, 62, 512)
encoder out_ffn shape: (64, 62, 512)
encoder out_ffn shape after norm: (64, 62, 512)
(64, 62, 512)


In [132]:
class Decoder(tf.keras.layers.Layer):
    """1. embedding + pos_encode
        2.multi decode layers
    """
    def __init__(self, num_layers, num_head, dmodel, dff, output_vocab_size, max_pos_encode, dropout_rate=0.1):
        super(Decoder, self).__init__()
        self.num_layers = num_layers
        self.dmodel = dmodel
        
        self.embedding = tf.keras.layers.Embedding(output_vocab_size, dmodel)
        self.pos_encode = get_position_embedding(max_pos_encode, dmodel)
        
        self.decode_layers = [DecoderLayer(num_head, dmodel, dff, dropout_rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, encode_output, training):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.dmodel, tf.float32))
        x += self.pos_encode[:,:seq_len, :]
        #dropout x
        x = self.dropout(x, training)
        
        for i in range(self.num_layers):
            x, weights1, weights2 = self.decode_layers[i](x, encode_output, training)
            attention_weights['layer{}_weights1'.format(i+1)] = weights1
            attention_weights['layer{}_weights2'.format(i+1)] = weights2
        
        return x, attention_weights


In [134]:
#test for decoder
test_decoder = Decoder(num_layers=2,num_head=8,dmodel=512, 
                         dff=2048, output_vocab_size=8000,
                         max_pos_encode=6000)

test_decoder_output, _ = test_decoder(tf.random.uniform((64, 4)), test_encoder_output,
                                       training=False)

print (test_decoder_output.shape)

(64, 4, 512)


In [94]:
#Transformer
class Transformer(tf.keras.Model):
    """1. encoder
        2. decoder
        3. linear output
    """
    def __init__(self, num_layers, num_head, dmodel, dff, input_vocab_size, output_vocab_size, input_max_pos, output_max_pos, drate=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(num_layers, num_head, dmodel, dff, input_vocab_size, input_max_pos, drate)
        self.decoder = Decoder(num_layers, num_head, dmodel, dff, output_vocab_size, output_max_pos, drate)
        self.linear_layer = tf.keras.layers.Dense(output_vocab_size)
        
    def call(self, enc_x, dec_x, training):
        encoder_output = self.encoder(enc_x, training)
        dec_output, attention_weights = self.decoder(dec_x, encoder_output, training)
        output = self.linear_layer(dec_output)
        
        return output, attention_weights

In [135]:
#test_transformer
test_trans = Transformer(2, 8, 512, 2048, input_vocab_size=8500, output_vocab_size=8000, input_max_pos=10000, output_max_pos=6000)
out, atte = test_trans(tf.random.uniform((64, 62)), tf.random.uniform((64, 4)), False)

print(out.shape)

(64, 4, 8000)


In [230]:
#the decoder only deal with the words, we need decoder to deal with the picture feature and also add attention to picture when it generate words
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.units = units
        self.enc = tf.keras.layers.Dense(units)
        self.hw = tf.keras.layers.Dense(units)
        self.score = tf.keras.layers.Dense(1)
        
    def call(self, feature, hidden):
        #hidden shape is (batchsize, units)
        hidden_with_time_axis = tf.expand_dims(hidden, 1) # (batchsize, 1, units)
        print('hidden_with_time_axis:', hidden_with_time_axis.shape)
        score = tf.math.tanh(self.enc(feature) + self.hw(hidden_with_time_axis))
        print('score:', score.shape)
        attention_weights = tf.keras.backend.softmax(self.score(score), axis = 1)
        print('attention_weights:', attention_weights.shape)
        context_tensor = attention_weights * feature
        print('context_tensor bef:', context_tensor.shape)
        #context_tensor = tf.math.reduce_sum(context_tensor, axis = 1, keepdims=True)
        print('context_tensor aft:', context_tensor.shape)
        return context_tensor, attention_weights

class DecoderWithImageLayer(DecoderLayer):
    def __init__(self, num_head, dmodel, dff, units, drate=0.1):
        super(DecoderWithImageLayer, self).__init__(num_head, dmodel, dff)
        self.units = units
        self.img_dens = tf.keras.layers.Dense(dmodel)
        self.img_attention = BahdanauAttention(units)
        self.gru = tf.keras.layers.GRU(units, return_sequences = True, return_state = True, recurrent_initializer='glorot_uniform')
        self.img_dens2 = tf.keras.layers.Dense(units)
        self.droput_img = tf.keras.layers.Dropout(drate)
        
        
    def call(self, x, enc_output, feature, hidden, training):
        #input embedding and pos_encode
        x_attention, weights = self.multi_head1(x,x,x)
        x_attention = self.dropout1(x_attention, training = training)
        x_out = self.normalization1(x + x_attention)
        #img 
        print('feature shape before:', feature.shape)
        features = self.img_dens(feature)
        features = tf.nn.relu(features)
        print('feature shape:', features.shape)
        #decode input and encode output attention 
        x_enc_attention, x_enc_weights = self.multi_head2(x_out,enc_output,enc_output)
        x_enc_attention = self.dropout2(x_enc_attention, training = training)
        x_enc_out = self.normalization2(x_out + x_enc_attention)
        #image feature attention and decode input
        img_context, img_attention = self.img_attention(features, hidden)
        print('img_context shape before:', img_context.shape)
        print('x shape', x.shape)
        x_img_concate = tf.concat([img_context, x], axis = -1)
        print('x_img_concate_shape:', x_img_concate.shape)
        x_img_out, state = self.gru(x_img_concate)
        print('x_img_gru_shape:', x_img_out.shape)
        x_img_out = self.img_dens2(x_img_out)
        print('x_img_out_shape:', x_img_out.shape)
        x_img_out = self.droput_img(x_img_out, training)
        #x_img_out = self.img_nor(x_img_concate + x_img_out)
        #concate x_enc_out and x_img_out
        concate_all = tf.concat([x_enc_out, x_img_out], axis = -1)
        
        out_ffn = self.forward_network(concate_all)
        out_ffn = self.dropout3(out_ffn, training = training)
        out3 = self.normalization3(x_enc_out + out_ffn)
        
        return out3, weights, x_enc_weights, state, img_attention
        
        

In [138]:
#test for attention
test_at = BahdanauAttention(512)
test_con, test_atten = test_at(tf.random.uniform((1,4096)), tf.zeros((1, 512)))
print(test_con.shape)
print(test_atten.shape)

(1, 4096)
(1, 1, 1)


In [231]:
#test for decode image
test_dec_img = DecoderWithImageLayer(8, 512, 2048, 512)
test_d_out,_,_,_,_ = test_dec_img(tf.random.uniform((64, 4, 512)), test_enc_layer_output, tf.random.uniform((64,4,1024)), tf.zeros((64, 512)), False)
print(test_d_out.shape)

feature shape before: (64, 4, 1024)
feature shape: (64, 4, 512)
hidden_with_time_axis: (64, 1, 512)
score: (64, 4, 512)
attention_weights: (64, 4, 1)
context_tensor bef: (64, 4, 512)
context_tensor aft: (64, 4, 512)
img_context shape before: (64, 4, 512)
x shape (64, 4, 512)
x_img_concate_shape: (64, 4, 1024)
x_img_gru_shape: (64, 4, 512)
x_img_out_shape: (64, 4, 512)
(64, 4, 512)


In [233]:
class DecoderWithImage(tf.keras.layers.Layer):
    """1. embedding + pos_encode
        2.multi decode layers
    """
    def __init__(self, num_layers, num_head, dmodel, dff, output_vocab_size, max_pos_encode, units, dropout_rate=0.1):
        super(DecoderWithImage, self).__init__()
        self.num_layers = num_layers
        self.dmodel = dmodel

        self.embedding = tf.keras.layers.Embedding(output_vocab_size, dmodel)
        self.pos_encode = get_position_embedding(max_pos_encode, dmodel)
        
        self.decode_layers = [DecoderWithImageLayer(num_head, dmodel, dff, units, dropout_rate) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        
    def call(self, x, encode_output, feature, hidden, training):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.dmodel, tf.float32))
        x += self.pos_encode[:,:seq_len, :]
        #dropout x
        x = self.dropout(x, training)
        
        for i in range(self.num_layers):
            print('in decoder : x shape', x.shape, ' layer:', i)
            x, weights1, weights2, hidden, img_weights = self.decode_layers[i](x, encode_output, feature, hidden, training)
            print('in decoder: after layer, x shape:', x.shape, 'decoder layer :', i)
            attention_weights['layer{}_weights1'.format(i+1)] = weights1
            attention_weights['layer{}_weights2'.format(i+1)] = weights2
            attention_weights['layer{}_img_weights'.format(i+1)] = img_weights
        
        return x, attention_weights

In [237]:
#test for decoder image 
test_dec_img = DecoderWithImage(2, 8, 512, 2048, 8000, 6000, 512)
test_dec_img_out,_ = test_dec_img(tf.random.uniform((64,4)), tf.random.uniform((64,1,512)), tf.random.uniform((64,4, 1024)), tf.zeros((64, 512)), False)
print(test_dec_img_out.shape)

in decoder : x shape (64, 4, 512)  layer: 0
feature shape before: (64, 4, 1024)
feature shape: (64, 4, 512)
hidden_with_time_axis: (64, 1, 512)
score: (64, 4, 512)
attention_weights: (64, 4, 1)
context_tensor bef: (64, 4, 512)
context_tensor aft: (64, 4, 512)
img_context shape before: (64, 4, 512)
x shape (64, 4, 512)
x_img_concate_shape: (64, 4, 1024)
x_img_gru_shape: (64, 4, 512)
x_img_out_shape: (64, 4, 512)
in decoder: after layer, x shape: (64, 4, 512) decoder layer : 0
in decoder : x shape (64, 4, 512)  layer: 1
feature shape before: (64, 4, 1024)
feature shape: (64, 4, 512)
hidden_with_time_axis: (64, 1, 512)
score: (64, 4, 512)
attention_weights: (64, 4, 1)
context_tensor bef: (64, 4, 512)
context_tensor aft: (64, 4, 512)
img_context shape before: (64, 4, 512)
x shape (64, 4, 512)
x_img_concate_shape: (64, 4, 1024)
x_img_gru_shape: (64, 4, 512)
x_img_out_shape: (64, 4, 512)
in decoder: after layer, x shape: (64, 4, 512) decoder layer : 1
(64, 4, 512)


In [235]:
#Transformer
class TransformerWithImage(tf.keras.Model):
    """1. encoder
        2. decoder
        3. linear output
    """
    def __init__(self, num_layers, num_head, dmodel, dff, input_vocab_size, output_vocab_size, input_max_pos, output_max_pos, units, drate=0.1):
        super(TransformerWithImage, self).__init__()
        self.units = units
        self.encoder = Encoder(num_layers, num_head, dmodel, dff, input_vocab_size, input_max_pos, drate)
        self.decoder = DecoderWithImage(num_layers, num_head, dmodel, dff, output_vocab_size, output_max_pos, units, drate)
        self.linear_layer = tf.keras.layers.Dense(output_vocab_size)
        
    def call(self, enc_x, dec_x, feature, hidden, training):
        encoder_output = self.encoder(enc_x, training)
        dec_output, attention_weights = self.decoder(dec_x, encoder_output, feature, hidden, training)
        output = self.linear_layer(dec_output)
        
        return output, attention_weights

In [236]:
#test for Transformer image
test_tran_img = TransformerWithImage(2, 8, 512, 2048, 8500, 8000, 10000, 6000, 512)
test_tran_out,_ = test_tran_img(tf.random.uniform((64,24)), tf.random.uniform((64,4)), tf.random.uniform((64,4,1024)), tf.zeros((64,512)), False)
print(test_tran_out.shape)

in decoder : x shape (64, 4, 512)  layer: 0
feature shape before: (64, 4, 1024)
feature shape: (64, 4, 512)
hidden_with_time_axis: (64, 1, 512)
score: (64, 4, 512)
attention_weights: (64, 4, 1)
context_tensor bef: (64, 4, 512)
context_tensor aft: (64, 4, 512)
img_context shape before: (64, 4, 512)
x shape (64, 4, 512)
x_img_concate_shape: (64, 4, 1024)
x_img_gru_shape: (64, 4, 512)
x_img_out_shape: (64, 4, 512)
in decoder: after layer, x shape: (64, 4, 512) decoder layer : 0
in decoder : x shape (64, 4, 512)  layer: 1
feature shape before: (64, 4, 1024)
feature shape: (64, 4, 512)
hidden_with_time_axis: (64, 1, 512)
score: (64, 4, 512)
attention_weights: (64, 4, 1)
context_tensor bef: (64, 4, 512)
context_tensor aft: (64, 4, 512)
img_context shape before: (64, 4, 512)
x shape (64, 4, 512)
x_img_concate_shape: (64, 4, 1024)
x_img_gru_shape: (64, 4, 512)
x_img_out_shape: (64, 4, 512)
in decoder: after layer, x shape: (64, 4, 512) decoder layer : 1
(64, 4, 8000)


In [252]:
#optimizer
#Need customize learning rate schedule, the formula is lrate = rsqrt(dmodel) * min(rsqrt(step), step * warmup_step**-1.5)

class CustomizeLearningRateSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, dmodel, warmup_step = 4000):
        self.dmodel = tf.cast(dmodel, dtype=tf.float32)
        self.warmup_step = warmup_step
        
    def __call__(self, step):
        a = tf.math.rsqrt(self.dmodel)
        b = tf.math.rsqrt(step)
        c = step * (self.warmup_step ** -1.5)
        return a * tf.math.minimum(b, c)

In [14]:
#读取图片的features
#使用标准coco的图片，从https://cs.stanford.edu/people/karpathy/deepimagesent/下可以下载到这些图片的features
#这样可以不用训练，直接拿到图片的feature
import scipy.io as io
#加载训练好的图片的feature到feature_struct中
#生成图片id与feature列标的对应数据 img_map
def generate_coco_image_feature(img_feats_file, img_ids_feats):
    feature_struct = io.loadmat(img_feats_file)
    img_vgg_features = feature_struct['feats']
    img_ids_feats_col = open(img_ids_feats).read().splitlines()
    img_map = {}
    for ids in img_ids_feats_col:
        ids_split = ids.split()
        img_map[ids_split[0]] = int(ids_split[1])
    return img_vgg_features, img_map

#根据图片id,取得相应的特征数据
def get_image_matrix(feature_struct, img_map, image_ids):
    rows = len(image_ids)
    img_matrix = np.zeros((rows, feature_struct.shape[0]))
    for i in range(rows):
        img_matrix[i,:] = feature_struct[:,img_map[image_ids[i]]]
    return img_matrix

In [262]:
import operator
#prepare data, I want to use the vqa train data 
questions = open("./questions_train2014.txt", "r").read().splitlines()
img_ids = open("./images_train2014.txt", "r").read().splitlines()
answers = open("./answers_train2014_modal.txt","r").read().splitlines()
#prepare picture feature
img_feature_struct, img_id_feature_map = generate_coco_image_feature("./vgg_feats.mat", "./coco_vgg_IDMap.txt")

index = (int)(len(questions) * 0.8)
train_questions=questions[0:index]
train_img_ids = img_ids[0:index]
train_answers = answers[0:index]

test_questions = questions[index:]
test_img_ids = img_ids[index:]
test_answers = answers[index:]

#prepare training data
from collections import defaultdict

#1. find out most frequence answers
max_ans = 1000
answers_all = defaultdict(int)
for ans in train_answers:
    answers_all[ans] += 1
answers_all = sorted(answers_all.items(), key=operator.itemgetter(1), reverse= True)[0:max_ans]
top_ans, top_freq = zip(*answers_all)
q_new , img_new, ans_new = [], [], []

#2.get training question, image, answers
for q, img, ans in zip(train_questions, train_img_ids, train_answers):
    if ans in top_ans:
        q_new.append(q)
        img_new.append(img)
        ans_new.append(ans)

train_questions_mlp  = q_new
train_img_ids_mlp = img_new
train_answers_mlp = ans_new

In [16]:
#get input_vocab_size
def tokenize(text):
    txt_tokenizer = tf.keras.preprocessing.text.Tokenizer()
    txt_tokenizer.fit_on_texts(text)
    tensor = txt_tokenizer.texts_to_sequences(text)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')
    
    return tensor, txt_tokenizer
    

In [17]:
qu_tensor, qu_token = tokenize(train_questions_mlp)
an_tensor, an_token = tokenize(train_answers_mlp)

In [214]:
#create tf.data
BUFFER_SIZE = len(qu_tensor)
BATCH_SIZE = 64
steps_per_epoch = len(qu_tensor)//BATCH_SIZE

input_vocab_size = len(qu_token.word_index) + 1
output_vocab_size = len(an_token.word_index) + 1

def img_features(qu, img, an):
    img = str(img, encoding = "utf-8")
    img_matrix = np.array(img_feature_struct[:,img_id_feature_map[img]])
    img_matrix = img_matrix.reshape(an.shape[-1], -1)
    return qu, img_matrix, an

dataset = tf.data.Dataset.from_tensor_slices((qu_tensor, train_img_ids_mlp, an_tensor))
dataset = dataset.map(lambda item1, item2, item3: tf.numpy_function(
          img_features, [item1, item2, item3], [tf.int32, tf.float32, tf.int32]),
          num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [215]:
example_qu_batch, example_img_batch, example_target_batch = next(iter(dataset))

print(example_qu_batch)
print(example_img_batch)
print(example_target_batch)

tf.Tensor(
[[   2    1 7623 ...    0    0    0]
 [   9   10  524 ...    0    0    0]
 [   2    5   14 ...    0    0    0]
 ...
 [   3 1154    1 ...    0    0    0]
 [   2    5    7 ...    0    0    0]
 [   2    5    7 ...    0    0    0]], shape=(64, 22), dtype=int32)
tf.Tensor(
[[[-0.         -0.         -0.         ... -0.         -0.
    0.2905249 ]
  [-0.         -0.         -0.         ... -0.         -0.
   -0.        ]
  [-0.          2.72104    -0.         ... -0.         -0.
   -0.        ]
  [-0.         -0.          0.76282835 ... -0.          0.19101006
    4.366334  ]]

 [[-0.         -0.          0.43651015 ... -0.         -0.
    2.3054607 ]
  [-0.         -0.          0.10234973 ...  7.030856   -0.
   -0.        ]
  [-0.          3.4200547  -0.         ... -0.         -0.
   -0.        ]
  [-0.         -0.         -0.         ... -0.         -0.
    3.4806335 ]]

 [[ 0.8935639  -0.          3.895008   ... -0.          4.7719116
   -0.        ]
  [-0.          0.28672308

In [253]:
d_model = 512
transformer = TransformerWithImage(num_layers = 2, num_head = 8, dmodel = d_model, dff = 2048, \
                                   input_vocab_size= input_vocab_size, output_vocab_size = output_vocab_size,\
                                  input_max_pos = 10000, output_max_pos=6000, units=d_model)

learning_rate = CustomizeLearningRateSchedule(dmodel=d_model)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, 
                                     epsilon=1e-9)

In [20]:
def loss_fun(real, pred):
    loss_ob = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
    loss = loss_ob(real, pred)
    return tf.reduce_mean(loss)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
    name='train_accuracy')

In [21]:
checkpoint_path = "./checkpoints/train"

ckpt = tf.train.Checkpoint(transformer=transformer,
                           optimizer=optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# 如果检查点存在，则恢复最新的检查点。
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print ('Latest checkpoint restored!!')

In [256]:

@tf.function
def train_step(ques, img_tensor, target):
    # initializing the hidden state for each batch
    print('img tensor shape', img_tensor.shape)
    print('qu shape:{}, img_shape:{}, tar_shape:{}'.format(ques.shape, img_tensor.shape, target.shape))
    hidden = tf.zeros((BATCH_SIZE,d_model))
    dec_input = tf.zeros(target.shape)
    print('')
    with tf.GradientTape() as tape:
        # passing the questions, reply start, image features to the transformer
        predictions, _ = transformer(ques,dec_input, img_tensor, hidden, True)
        print('predictions:', predictions)
        loss = loss_fun(target, predictions)

    gradients = tape.gradient(loss, transformer.trainable_variables)    
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
  
    train_loss(loss)
    train_accuracy(target, predictions)

In [258]:
#train
import time
for epoch in range(2): #steps_per_epoch
    start = time.time()
  
    train_loss.reset_states()
    train_accuracy.reset_states()
  
 
    for (batch, (que, img, tar)) in enumerate(dataset):
        train_step(que, img, tar)
    
        if batch % 50 == 0:
            print ('Epoch {} Batch {} Loss {:.4f} Accuracy {:.4f}'.format(
            epoch + 1, batch, train_loss.result(), train_accuracy.result()))
      
    if (epoch + 1) % 5 == 0:
        ckpt_save_path = ckpt_manager.save()
        print ('Saving checkpoint for epoch {} at {}'.format(epoch+1,
                                                         ckpt_save_path))
    
    print ('Epoch {} Loss {:.4f} Accuracy {:.4f}'.format(epoch + 1, 
                                                train_loss.result(), 
                                                train_accuracy.result()))

    print ('Time taken for 1 epoch: {} secs\n'.format(time.time() - start))

Epoch 1 Batch 0 Loss 0.4938 Accuracy 0.8711
Epoch 1 Batch 50 Loss 0.4986 Accuracy 0.8638
Epoch 1 Batch 100 Loss 0.5043 Accuracy 0.8630
Epoch 1 Batch 150 Loss 0.5172 Accuracy 0.8588
Epoch 1 Batch 200 Loss 0.5181 Accuracy 0.8588
Epoch 1 Batch 250 Loss 0.5192 Accuracy 0.8587
Epoch 1 Batch 300 Loss 0.5214 Accuracy 0.8583
Epoch 1 Batch 350 Loss 0.5225 Accuracy 0.8585
Epoch 1 Batch 400 Loss 0.5241 Accuracy 0.8584
Epoch 1 Batch 450 Loss 0.5233 Accuracy 0.8582
Epoch 1 Batch 500 Loss 0.5224 Accuracy 0.8583
Epoch 1 Batch 550 Loss 0.5222 Accuracy 0.8583
Epoch 1 Batch 600 Loss 0.5231 Accuracy 0.8580
Epoch 1 Batch 650 Loss 0.5241 Accuracy 0.8577
Epoch 1 Batch 700 Loss 0.5258 Accuracy 0.8574
Epoch 1 Batch 750 Loss 0.5267 Accuracy 0.8572
Epoch 1 Batch 800 Loss 0.5275 Accuracy 0.8572
Epoch 1 Batch 850 Loss 0.5283 Accuracy 0.8570
Epoch 1 Batch 900 Loss 0.5295 Accuracy 0.8565
Epoch 1 Batch 950 Loss 0.5294 Accuracy 0.8568
Epoch 1 Batch 1000 Loss 0.5296 Accuracy 0.8568
Epoch 1 Batch 1050 Loss 0.5293 Accur

KeyboardInterrupt: 

In [259]:
ckpt_save_path = ckpt_manager.save()

In [303]:
def evaluate(ques, img, out_dim):
    ques_tensor = qu_token.texts_to_sequences(ques)
    ques_tensor = tf.keras.preprocessing.sequence.pad_sequences(ques_tensor, padding='post')
    ques_tensor.transpose((1, 0))
    print('ques_tensor shape', ques_tensor.shape)
    img_matrix = np.array(img_feature_struct[:,img_id_feature_map[img]])
    img_matrix = img_matrix.reshape(out_dim, -1)
    dec_input = tf.zeros((1, out_dim))

    print('img shape:', img_matrix.shape, ' an shape:', dec_input.shape)
    hidden = tf.zeros((1,d_model))
    predictions,_ = transformer(ques_tensor,dec_input, img_matrix, hidden, False)
    
    pred = tf.squeeze(predictions, axis = 0)
    pred_array = tf.random.categorical(pred, 1)
    print(pred_array)
    output =[]
    for index_arr in pred_array:
        index = index_arr.numpy()[0]
        if (index > 0):
            output.append(an_token.index_word[index])
    
    return output, predictions

In [264]:
#get test data
test_questions_new = []
test_img_ids_new = []
test_answers_new = []

for q, img, ans in zip(test_questions, test_img_ids, test_answers):
    if ans in top_ans:
        test_questions_new.append(q)
        test_img_ids_new.append(img)
        test_answers_new.append(ans)
        
print(len(test_questions_new))
print(len(test_img_ids_new))
print(len(test_answers_new))

42952
42952
42952


In [308]:
pred_word, pred = evaluate(test_questions_new[0], test_img_ids_new[0], an_tensor.shape[1])
print('questions for image {} is {}, '.format(test_img_ids_new[0], test_questions_new[0]))
print('pred answers is ', ' '.join(pred_word), ' real answer is ', test_answers_new[0])
print('prediction :', pred)

ques_tensor shape (40, 1)
img shape: (4, 1024)  an shape: (1, 4)
in decoder : x shape (1, 4, 512)  layer: 0
feature shape before: (4, 1024)
feature shape: (4, 512)
hidden_with_time_axis: (1, 1, 512)
score: (1, 4, 512)
attention_weights: (1, 4, 1)
context_tensor bef: (1, 4, 512)
context_tensor aft: (1, 4, 512)
img_context shape before: (1, 4, 512)
x shape (1, 4, 512)
x_img_concate_shape: (1, 4, 1024)
x_img_gru_shape: (1, 4, 512)
x_img_out_shape: (1, 4, 512)
in decoder: after layer, x shape: (1, 4, 512) decoder layer : 0
in decoder : x shape (1, 4, 512)  layer: 1
feature shape before: (4, 1024)
feature shape: (4, 512)
hidden_with_time_axis: (1, 1, 512)
score: (1, 4, 512)
attention_weights: (1, 4, 1)
context_tensor bef: (1, 4, 512)
context_tensor aft: (1, 4, 512)
img_context shape before: (1, 4, 512)
x shape (1, 4, 512)
x_img_concate_shape: (1, 4, 1024)
x_img_gru_shape: (1, 4, 512)
x_img_out_shape: (1, 4, 512)
in decoder: after layer, x shape: (1, 4, 512) decoder layer : 1
tf.Tensor(
[[2]