In [100]:
import chainer
import chainer.functions as F
import chainer.links as L
import chainerrl
import numpy as np

In [111]:
import chainer.computational_graph as c

In [101]:
class ForwardPredictor(chainer.Chain):
    
    def __init__(self, obs_size, n_actions, n_hidden_channels=128):
        super().__init__()
        with self.init_scope():
            self.l0 = L.Linear(obs_size+n_actions, n_hidden_channels)
            self.l1 = L.Linear(n_hidden_channels, n_hidden_channels)
            self.l2 = L.Linear(n_hidden_channels, n_hidden_channels)
            self.l3 = L.Linear(n_hidden_channels, obs_size)
            self.bn1 = L.BatchNormalization(n_hidden_channels)
            self.bn2 = L.BatchNormalization(n_hidden_channels)
            self.bn3 = L.BatchNormalization(n_hidden_channels)

    def __call__(self, x, test=False):
        """
        Args:
            x (ndarray or chainer.Variable): An observation
            test (bool): a flag indicating whether it is in test mode
        """
        h = F.relu(self.bn1(self.l0(x)))
        h = F.relu(self.bn2(self.l1(h)))
        h = F.relu(self.bn3(self.l2(h)))
        return self.l3(h)

<chainer.optimizers.adam.Adam at 0x7ff6f0b34320>

In [106]:
for j in [1e-1,1e-2,1e-3,1e-4,1e-5,1e-6,1e-7,1e-8,1e-10,1e-20]:
    f_pred = ForwardPredictor(4,2)
    optimizer_f = chainer.optimizers.Adam(eps=j)
    optimizer_f.setup(f_pred)
    for i in range(20):
        f_pred.cleargrads()
        y = f_pred(np.array([[1.,1.,1.,1.,1.,1.],
                             [1.,1.,1.,1.,1.,1.],
                             [1.,1.,1.,1.,1.,1.],
                             [1.,1.,1.,1.,1.,1.],
                             [1.,1.,1.,1.,1.,1.],
                             [1.,1.,1.,1.,1.,1.],
                             [1.,1.,1.,1.,1.,1.]],dtype=np.float32))
        t = np.array([[1.,1.,1.,1.],
                             [1.,1.,1.,1.],
                             [1.,1.,1.,1.],
                             [1.,1.,1.,1.],
                             [1.,1.,1.,1.],
                             [1.,1.,1.,1.],
                             [1.,1.,1.,1.]],dtype=np.float32)
        loss = F.mean_squared_error(y, t)
        loss.backward()
        optimizer_f.update()
    print(j,'loss',loss)

0.1 loss variable(0.9812899)
0.01 loss variable(0.93867964)
0.001 loss variable(0.9113274)
0.0001 loss variable(0.8753067)
1e-05 loss variable(0.8842639)
1e-06 loss variable(0.88033617)
1e-07 loss variable(0.8689108)
1e-08 loss variable(0.8760949)
1e-10 loss variable(0.8590482)
1e-20 loss variable(0.8830109)


In [114]:
class ForwardPredictor(chainer.Chain):
    
    def __init__(self, obs_size, n_actions, n_hidden_channels=128):
        super().__init__()
        with self.init_scope():
            self.l0 = L.Linear(obs_size+n_actions, n_hidden_channels)
            self.l1 = L.Linear(n_hidden_channels, n_hidden_channels)
            self.l2 = L.Linear(n_hidden_channels, n_hidden_channels)
            self.l3 = L.Linear(n_hidden_channels, obs_size)
            self.bn1 = L.BatchNormalization(n_hidden_channels)
            self.bn2 = L.BatchNormalization(n_hidden_channels)
            self.bn3 = L.BatchNormalization(n_hidden_channels)

    def __call__(self, x, test=False):
        """
        Args:
            x (ndarray or chainer.Variable): An observation
            test (bool): a flag indicating whether it is in test mode
        """
        h = F.relu(self.bn1(self.l0(x)))
        h = F.relu(self.bn2(self.l1(h)))
        h = F.relu(self.bn3(self.l2(h)))
        return self.l3(h)

In [115]:
f_pred = ForwardPredictor(4,2)
optimizer_f = chainer.optimizers.Adam(eps=j)
optimizer_f.setup(f_pred)

<chainer.optimizers.adam.Adam at 0x7ff6f0b546a0>

In [124]:
class Unko():
    def __init__(self,opt='',model=lambda x:x):
        self.opt = opt
        self.model = model
    def learn(self):
        for i in range(20):
            y = self.model(np.array([[1.,1.,1.,1.,1.,1.],
                                 [1.,1.,1.,1.,1.,1.],
                                 [1.,1.,1.,1.,1.,1.],
                                 [1.,1.,1.,1.,1.,1.],
                                 [1.,1.,1.,1.,1.,1.],
                                 [1.,1.,1.,1.,1.,1.],
                                 [1.,1.,1.,1.,1.,1.]],dtype=np.float32))
            t = np.array([[1.,1.,1.,1.],
                                 [1.,1.,1.,1.],
                                 [1.,1.,1.,1.],
                                 [1.,1.,1.,1.],
                                 [1.,1.,1.,1.],
                                 [1.,1.,1.,1.],
                                 [1.,1.,1.,1.]],dtype=np.float32)
            self.model.cleargrads()
            loss = F.reduce_sim((y-t)**2)
            loss.backward()
            self.opt.update()
            print(i,loss)

In [125]:
unk = Unko(model=f_pred,opt=optimizer_f)

In [126]:
unk.learn()

0 variable(0.84977883)
1 variable(0.83850485)
2 variable(0.8269025)
3 variable(0.8149721)
4 variable(0.80271894)
5 variable(0.79014534)
6 variable(0.77725446)
7 variable(0.7640539)
8 variable(0.7505493)
9 variable(0.73674446)
10 variable(0.7226519)
11 variable(0.70827466)
12 variable(0.6936298)
13 variable(0.67872274)
14 variable(0.66357)
15 variable(0.64818084)
16 variable(0.63256806)
17 variable(0.61675316)
18 variable(0.6007447)
19 variable(0.58456177)


In [112]:
g = c.build_computational_graph([loss])

In [113]:
with open('graph.dot', 'w') as o:
    o.write(g.dump())