In [1]:
import chainer
from chainer.utils import type_check

In [2]:
from chainer import cuda

In [3]:
import numpy as np

In [4]:
import chainer
from chainer import function
import chainer.functions as F
import chainer.links as L
from chainer import reporter
from chainer.dataset import convert

In [5]:
from rot import rotation3d

In [6]:
import cv2

In [7]:
import numpy as np

class PreprocessedDataset(chainer.dataset.DatasetMixin):
    def __init__(self, nb_data):
        self.point3d = np.random.rand(nb_data, 3).astype(np.float32)
        self.point3d[:, 2] += 2.0
        self.matA = np.array([[600, 0, 300], [0, 600, 300], [0, 0, 1]], dtype=np.float32)
        self.rvec = np.random.rand(3).astype(np.float32) / 10
        self.tvec = np.random.rand(3).astype(np.float32)

        p1 = np.tensordot(self.matA, self.point3d, ((1,), (1,))).T
        self.p1 = p1[:,:2] / p1[:,2:]
        self.p1 += 5 * np.random.rand(nb_data, 2).astype(np.float32)

        rmat = self.rodrigues(self.rvec)
        rp = np.tensordot(rmat, self.point3d, ((1,), (1,))).T
        rpt = rp + np.broadcast_to(self.tvec, rp.shape)
        p2 = np.tensordot(self.matA, rpt, ((1,), (1,))).T
        self.p2 = p2[:,:2] / p2[:,2:]
        self.p2 += 5 * np.random.rand(nb_data, 2).astype(np.float32)
        
    def rodrigues(self, r):
            def S(n):
                Sn = np.array([[0,-n[2],n[1]],[n[2],0,-n[0]],[-n[1],n[0],0]])
                return Sn

            theta = np.linalg.norm(r)

            if theta > 1e-16:
                n = r / theta
                Sn = S(n)
                R = np.eye(3) + \
                    np.sin(theta) * Sn + \
                    (1 - np.cos(theta)) * np.dot(Sn, Sn)
            else:
                Sr = S(r)
                theta2 = theta**2
                R = np.eye(3) + \
                    (1- theta2/6.) * Sr + \
                    (.5 - theta2/24.) * np.dot(Sr, Sr)

            return R.astype(r.dtype)

    def __len__(self):
        return len(self.p1)

    def get_example(self, i):
        return self.p1[i], self.p2[i]

In [8]:
data = PreprocessedDataset(nb_data = 10)

In [9]:
p0, p1 = convert.concat_examples(data[:], -1)
E, mask = cv2.findEssentialMat(p0, p1, data.matA, method=cv2.RANSAC, prob=0.999, threshold=3.0)
_, rmat, tvec, _ = cv2.recoverPose(E, p0, p1)
rvec, _ = cv2.Rodrigues(rmat)

In [10]:
pl = np.dot(data.matA, np.eye(3, 4, dtype=np.float32))
pr = np.dot(data.matA, np.concatenate((rmat, tvec.reshape(3,1)), axis=1))
point4d = cv2.triangulatePoints(pl, pr, p0.T, p1.T).T
point3d = (point4d[:] / point4d[:,3:])[:,:3]

In [11]:
class Net(chainer.Chain):
    def __init__(self, matA, nb_data, rvec=None, tvec=None, points=None):
        super(Net, self).__init__()
        self.matA = matA
        
        embd = None
        if rvec is not None and tvec is not None:
            embd = np.stack((rvec, tvec), axis=0)
            
        pts = None
        if points is not None:
            pts = points.reshape(-1)
            
        with self.init_scope():
            self.embd = L.EmbedID(2, 3, initialW=embd)
            self.points = L.EmbedID(1, 3 * nb_data, pts)

    def proj(self, x):
        xy, z = F.split_axis(x, (2,), axis=1)
        r = xy / F.broadcast_to(z, xy.shape)
        return r

    def __call__(self):
        xp = self.xp            
        matA = xp.asarray(self.matA)
        
        pts = self.points(xp.array([0], dtype=np.int32))
        pts = F.reshape(pts, (-1, 3))
            
        p0 = F.matmul(pts, matA, transa=False, transb=True)
        p0 = self.proj(p0)
        
        r = self.embd(xp.array([0], dtype=np.int32))
        r = F.reshape(r, (3,))
        
        t = self.embd(xp.array([1], dtype=np.int32))
        t = F.broadcast_to(t, pts.shape)
        
        rxt = rotation3d(pts, r) + t
        p1 = F.matmul(rxt, matA, transa=False, transb=True)
        p1 = self.proj(p1)

        return p0, p1

In [12]:
net = Net(data.matA, nb_data = len(data), rvec=rvec[:,0], tvec=tvec[:,0], points=point3d)

In [13]:
class loss_function(chainer.link.Chain):
    def __init__(self, predictor):
        super(loss_function, self).__init__(predictor=predictor)

    def __call__(self, p0, p1):
        q0, q1 = self.predictor()
        self.loss = F.mean_squared_error(p0, q0) + F.mean_squared_error(p1, q1)
        reporter.report({'loss': self.loss}, self)
        return self.loss

In [14]:
model = loss_function(net)

In [15]:
optimizer = chainer.optimizers.Adam()
optimizer.setup(model)

In [16]:
model(*convert.concat_examples(data[:], -1))

variable(5.782209396362305)

In [17]:
data_iter = chainer.iterators.SerialIterator(data, len(data), shuffle=False)
data_count = len(data)

sum_loss = 0

while data_iter.epoch < 1000:
    batch = data_iter.next()
    x_array, y_array = convert.concat_examples(batch, -1)
    x = chainer.Variable(x_array)
    y = chainer.Variable(y_array)
    optimizer.update(model, x, y)
    sum_loss += float(model.loss.data) * len(y.data)

    if data_iter.is_new_epoch:
        if data_iter.epoch % 100 == 0:
            print('epoch: {}, train mean loss: {}'.format(data_iter.epoch, sum_loss / data_count))
        sum_loss = 0

epoch: 100, train mean loss: 2.7821598052978516
epoch: 200, train mean loss: 2.682826042175293
epoch: 300, train mean loss: 2.603457450866699
epoch: 400, train mean loss: 2.535459518432617
epoch: 500, train mean loss: 2.4778831005096436
epoch: 600, train mean loss: 2.429232597351074
epoch: 700, train mean loss: 2.3879027366638184
epoch: 800, train mean loss: 2.3529210090637207
epoch: 900, train mean loss: 2.3226137161254883
epoch: 1000, train mean loss: 2.295989990234375
