In [None]:
import numpy as np
import mindspore
import mindspore.nn as nn
import mindspore.numpy as mnp
import mindspore.ops as ops
import matplotlib.pyplot as plt

In [None]:
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps

def make_batch():
    input_batch = [np.eye(n_class)[[word_dict[n] for n in sentences[0].split()]]]
    output_batch = [np.eye(n_class)[[word_dict[n] for n in sentences[1].split()]]]
    target_batch = [[word_dict[n] for n in sentences[2].split()]]

    # make tensor
    return mindspore.Tensor(input_batch), mindspore.Tensor(output_batch), mindspore.Tensor(target_batch)

In [None]:
class Attention(nn.Cell):
    def __init__(self):
        super(Attention, self).__init__()
        self.enc_cell = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)
        self.dec_cell = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)

        # Linear for attention
        self.attn = nn.Dense(n_hidden, n_hidden)
        self.out = nn.Dense(n_hidden * 2, n_class)

    def construct(self, enc_inputs, hidden, dec_inputs):
        enc_inputs = enc_inputs.swapaxes(0, 1)  # enc_inputs: [n_step(=n_step, time step), batch_size, n_class]
        dec_inputs = dec_inputs.swapaxes(0, 1)  # dec_inputs: [n_step(=n_step, time step), batch_size, n_class]

        # enc_outputs : [n_step, batch_size, num_directions(=1) * n_hidden], matrix F
        # enc_hidden : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
        enc_outputs, enc_hidden = self.enc_cell(enc_inputs, hidden)

        trained_attn = []
        hidden = enc_hidden
        n_step = len(dec_inputs)
        model = mnp.ones([n_step, 1, n_class])

        for i in range(n_step):  # each time step
            # dec_output : [n_step(=1), batch_size(=1), num_directions(=1) * n_hidden]
            # hidden : [num_layers(=1) * num_directions(=1), batch_size(=1), n_hidden]
            dec_output, hidden = self.dec_cell(dec_inputs[i].expand_dims(0), hidden)
            attn_weights = self.get_att_weight(dec_output, enc_outputs)  # attn_weights : [1, 1, n_step]
            trained_attn.append(attn_weights.squeeze())

            # matrix-matrix product of matrices [1,1,n_step] x [1,n_step,n_hidden] = [1,1,n_hidden]
            context = mnp.matmul(attn_weights, enc_outputs.swapaxes(0, 1))
            dec_output = dec_output.squeeze(0)  # dec_output : [batch_size(=1), num_directions(=1) * n_hidden]
            context = context.squeeze(1)  # [1, num_directions(=1) * n_hidden]
            out = self.out(mnp.concatenate((dec_output, context), 1))
            model[i] = out

        # make model shape [n_step, n_class]
        return model.swapaxes(0, 1).squeeze(0), trained_attn

    def get_att_weight(self, dec_output, enc_outputs):  # get attention weight one 'dec_output' with 'enc_outputs'
        n_step = len(enc_outputs)
        attn_scores = mnp.zeros(n_step)  # attn_scores : [n_step]

        for i in range(n_step):
            attn_scores[i] = self.get_att_score(dec_output, enc_outputs[i])

        # Normalize scores to weights in range 0 to 1
        return ops.Softmax()(attn_scores).view(1, 1, -1)

    def get_att_score(self, dec_output, enc_output):  # enc_outputs [batch_size, num_directions(=1) * n_hidden]
        score = self.attn(enc_output)  # score : [batch_size, n_hidden]
        return mnp.dot(dec_output.view(-1), score.view(-1))  # inner product make scalar value

In [None]:
class WithLossCell(nn.Cell):
    def __init__(self, backbone, loss_fn):
        super(WithLossCell, self).__init__(auto_prefix=False)
        self._backbone = backbone
        self._loss_fn = loss_fn

    def construct(self, *args):
        out, _ = self._backbone(*args[:-1])
        return self._loss_fn(out.view(-1, out.shape[-1]), args[-1].view(-1))

In [None]:
n_step = 5 # number of cells(= number of Step)
n_hidden = 128 # number of hidden units in one cell

sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']

word_list = " ".join(sentences).split()
word_list = list(set(word_list))
word_dict = {w: i for i, w in enumerate(word_list)}
number_dict = {i: w for i, w in enumerate(word_list)}
n_class = len(word_dict)  # vocab list

In [None]:
model = Attention()
criterion = nn.SoftmaxCrossEntropyWithLogits(True, 'mean')
network = WithLossCell(model, criterion)
optimizer = nn.Adam(model.trainable_params(), learning_rate=0.001)
train_network = nn.TrainOneStepCell(network, optimizer)

In [None]:
# hidden : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
hidden = mnp.zeros((1, 1, n_hidden))

input_batch, output_batch, target_batch = make_batch()

In [None]:
# Train
for epoch in range(2000):
    loss = train_network(input_batch, hidden, output_batch, target_batch.squeeze(0))
    if (epoch + 1) % 400 == 0:
        print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss.asnumpy()))

In [None]:
# Test
test_batch = [np.eye(n_class)[[word_dict[n] for n in 'SPPPP']]]
test_batch = mindspore.Tensor(test_batch)
predict, trained_attn = model(input_batch, hidden, test_batch)
predict = predict.argmax(1)
print(sentences[0], '->', [number_dict[int(n.asnumpy())] for n in predict])

In [None]:
# Show Attention
fig = plt.figure(figsize=(5, 5))
ax = fig.add_subplot(1, 1, 1)
ax.matshow([attn.asnumpy() for attn in trained_attn], cmap='viridis')
ax.set_xticklabels([''] + sentences[0].split(), fontdict={'fontsize': 14})
ax.set_yticklabels([''] + sentences[2].split(), fontdict={'fontsize': 14})
plt.show()