# Pain

### general

In [92]:
INPUT_SHAPE = (512, 512, 1)
OUTPUT_SIZE = 16
from tensorflow.python.keras.layers.normalization import BatchNormalization
from tensorflow import keras
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model

def landmark_cnn(input_shape=INPUT_SHAPE, output_size=OUTPUT_SIZE):

    img_input = Input(shape=input_shape)
    

    x = Conv2D(16, (3,3), strides=(1,1), name='Conv1')(img_input)
    x = BatchNormalization()(x)
    x = Activation('relu', name='Relu_conv1')(x)
    
    x = MaxPooling2D(pool_size=(2,2), strides=(2,2), name='Pool1')(x)


    x = Conv2D(32, (3,3), strides=(1,1), name='Conv2')(x)
    x = BatchNormalization()(x)
    x = Activation('relu', name='Relu_conv2')(x)
    
    x = Conv2D(64, (3,3), strides=(1,1), name='Conv3')(x)
    x = BatchNormalization()(x)
    x = Activation('relu', name='Relu_conv3')(x)
    
    x = MaxPooling2D(pool_size=(2,2), strides=(2,2), name='Pool2')(x)


    x = Conv2D(32, (3,3), strides=(1,1), name='Conv4')(x)
    x = BatchNormalization()(x)
    x = Activation('relu', name='Relu_conv4')(x)
    
    x = Conv2D(32, (3,3), strides=(1,1), name='Conv5')(x)
    x = BatchNormalization()(x)
    x = Activation('relu', name='Relu_conv5')(x)
    
    x = MaxPooling2D(pool_size=(2,2), strides=(2,2), name='Pool3')(x)
        

    x = Conv2D(64, (3,3), strides=(1,1), name='Conv6')(x)
    x = BatchNormalization()(x)
    x = Activation('relu', name='Relu_conv6')(x)
    
    x = Dropout(0.2)(x)
    
    x = Flatten(name='Flatten')(x)
    x = Dense(128, activation='relu', name='FC1')(x)
    x = Dense(output_size, activation=None, name='Predictions')(x)
    
    
    model = Model([img_input], x, name='Landmark_model')
    
    return model
model = landmark_cnn()
model.summary()

Model: "Landmark_model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_12 (InputLayer)        [(None, 512, 512, 1)]     0         
_________________________________________________________________
Conv1 (Conv2D)               (None, 510, 510, 16)      160       
_________________________________________________________________
batch_normalization_5041 (Ba (None, 510, 510, 16)      64        
_________________________________________________________________
Relu_conv1 (Activation)      (None, 510, 510, 16)      0         
_________________________________________________________________
Pool1 (MaxPooling2D)         (None, 255, 255, 16)      0         
_________________________________________________________________
Conv2 (Conv2D)               (None, 253, 253, 32)      4640      
_________________________________________________________________
batch_normalization_5042 (Ba (None, 253, 253, 32)   

### Kaggle-Face-key-detection

In [93]:
model = keras.Sequential()

model.add(Convolution2D(32, (3,3), padding='same', use_bias=False, input_shape=(512, 512, 1)))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())

model.add(Convolution2D(32, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

model.add(Convolution2D(64, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())

model.add(Convolution2D(64, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

model.add(Convolution2D(96, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())

model.add(Convolution2D(96, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

model.add(Convolution2D(128, (3,3),padding='same', use_bias=False))
# model.add(BatchNormalization())
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())

model.add(Convolution2D(128, (3,3),padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

model.add(Convolution2D(256, (3,3),padding='same',use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())

model.add(Convolution2D(256, (3,3),padding='same',use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2)))

model.add(Convolution2D(512, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())

model.add(Convolution2D(512, (3,3), padding='same', use_bias=False))
model.add(LeakyReLU(alpha = 0.1))
model.add(BatchNormalization())


model.add(Flatten())
model.add(Dense(512,activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(16))

print(type(model))

<class 'tensorflow.python.keras.engine.sequential.Sequential'>


### Stacked-hourglass

In [94]:
'''
Architecture adapted from https://github.com/bearpaw/pytorch-pose
'''
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, BatchNormalization, ReLU

class ResidualBlock(tf.keras.Model):

    def __init__(self, planes, strides=1, downsample=None):
        super(ResidualBlock, self).__init__()

        self.bn1 = BatchNormalization(momentum=0.1, epsilon=1e-5)
        self.conv1 = Conv2D(planes, kernel_size=1,)
        self.bn2 = BatchNormalization(momentum=0.1, epsilon=1e-5)
        self.conv2 = Conv2D(planes, kernel_size=3, strides=strides,
                               padding="same",)
        self.bn3 = BatchNormalization(momentum=0.1, epsilon=1e-5)
        self.conv3 = Conv2D(planes * 2, kernel_size=1,)
        self.relu = ReLU()
        self.downsample = downsample

    def call(self, x):
        residual = x

        out = self.bn1(x)
        out = self.relu(out)
        out = self.conv1(out)

        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv2(out)

        out = self.bn3(out)
        out = self.relu(out)
        out = self.conv3(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        return out

In [95]:
import tensorflow as tf

def hg_loss(y_true, y_pred):
    """
    y_true: (batch_size, height, width, n_keypoints)
    y_pred: (batch_size, n_stacks, height, width, n_keypoints)
    """
    mse = tf.keras.losses.MeanSquaredError()
    loss = 0.
    for stack_pred in y_pred:
        loss += mse(y_true, stack_pred)
    
    return loss

In [96]:
'''
Architecture adapted from https://github.com/bearpaw/pytorch-pose
'''
import tensorflow as tf
from tensorflow.keras.layers import UpSampling2D, MaxPool2D

class Hourglass(tf.keras.Model):
    def __init__(self, n_blocks, filters, depth, name='hourglass_1'):
        super(Hourglass, self).__init__()
        self.depth = depth
        self.upsample = UpSampling2D()
        self.hg = self.hourglass(n_blocks, filters, depth, name=name+'_depth_{0}'.format(depth))

    def residual_blocks(self, n_blocks, filters, name):
        layers = []
        for i in range(0, n_blocks):
            layers.append(ResidualBlock(filters))
        return tf.keras.Sequential(*layers, name=name)

    def hourglass(self, n_blocks, filters, depth, name='hourglass_1_1'):
        hg = []
        for i in range(depth):
            res = []
            for j in range(3):
                res.append(self.residual_blocks(n_blocks, filters, name+'_res_{}'.format(j)))
            if i == 0:
                res.append(self.residual_blocks(n_blocks, filters, name+'_res_{}'.format(0)))
            hg.append(res)
        return hg

    def hourglass_call(self, n, x):
        up1 = self.hg[n-1][0](x)
        low1 = MaxPool2D(pool_size=(2,2), strides=(2,2))(x)
        low1 = self.hg[n-1][1](low1)

        if n > 1:
            low2 = self.hourglass_call(n-1, low1)
        else:
            low2 = self.hg[n-1][3](low1)
        low3 = self.hg[n-1][2](low2)
        up2 = self.upsample(low3)
        out = up1 + up2
        return out

    def call(self, x):
        return self.hourglass_call(self.depth, x)

In [99]:
'''
Architecture adapted from https://github.com/bearpaw/pytorch-pose
'''
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, BatchNormalization, ReLU
class StackedHourglass(tf.keras.Model):

    def __init__(self, n_stacks=2, n_blocks=1, n_keypoints=10):
        super(StackedHourglass, self).__init__()

        self.inplanes = 64
        self.expansion = 2
        self.num_feats = 128
        self.n_stacks = n_stacks
        self.conv1 = Conv2D(self.inplanes, kernel_size=3, padding='same')
        self.bn1 = BatchNormalization(momentum=0.1, epsilon=1e-5)
        self.relu = ReLU()
        self.residual1 = self.residual_blocks(128, 1)
        self.residual2 = self.residual_blocks(128, 1)
        self.residual3 = self.residual_blocks(self.num_feats, 1)

        # build hourglass modules
        expanded_feats = self.num_feats*self.expansion
        hg, res, fc, score, fc_, score_ = [], [], [], [], [], []
        for i in range(n_stacks):
            hg.append(Hourglass(n_blocks, self.num_feats, 4, 'hourglass_{}'.format(i)))
            res.append(self.residual_blocks(self.num_feats, n_blocks))
            fc.append(self.linear(expanded_feats))
            score.append(Conv2D(n_keypoints, kernel_size=1))
            if i < n_stacks-1:
                fc_.append(Conv2D(expanded_feats, kernel_size=1))
                score_.append(Conv2D(expanded_feats, kernel_size=1))
        self.hg = hg
        self.res = res
        self.fc = fc
        self.score = score
        self.fc_ = fc_
        self.score_ = score_

    def residual_blocks(self, planes, n_blocks, strides=1):
        downsample = None
        if strides != 1 or self.inplanes != planes * self.expansion:
            downsample = Conv2D(planes * self.expansion, kernel_size=1, strides=strides)
        layers = []
        layers.append(ResidualBlock(planes, strides, downsample))
        self.inplanes = planes * self.expansion
        for i in range(1, n_blocks):
            layers.append(ResidualBlock(planes))

        return tf.keras.Sequential(*layers)

    def linear(self, outplanes):
        bn = BatchNormalization(momentum=0.1, epsilon=1e-5)
        conv = Conv2D(outplanes, kernel_size=1)
        return tf.keras.Sequential([conv, bn, self.relu])

    def call(self, x):
        out = []
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x) 

        x = self.residual1(x)  
        x = self.residual2(x)  
        x = self.residual3(x)  

        for i in range(self.n_stacks):
            y = self.hg[i](x)
            y = self.res[i](y)
            y = self.fc[i](y)
            score = self.score[i](y)
            out.append(score)
            if i < self.n_stacks-1:
                fc_ = self.fc_[i](y)
                score_ = self.score_[i](score)
                x = x + fc_ + score_

        return out
    
    
    def buildgraph(self, raw_shape):

        x = tf.keras.layers.Input(shape=raw_shape)
        return Model(inputs=[x], outputs=self.call(x))

In [108]:
import numpy as np

hourglass = StackedHourglass(2, 1, 16)
hourglass.build(input_shape=(25, 64, 64, 3))
hourglass.buildgraph()

TypeError: buildgraph() missing 1 required positional argument: 'raw_shape'

In [2]:
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.losses import mean_squared_error
import tensorflow.keras.backend as K


def create_hourglass_network(num_classes, num_stacks, num_channels, inres, outres, bottleneck):
    input = Input(shape=(inres[0], inres[1], 3))

    front_features = create_front_module(input, num_channels, bottleneck)

    head_next_stage = front_features

    outputs = []
    for i in range(num_stacks):
        head_next_stage, head_to_loss = hourglass_module(head_next_stage, num_classes, num_channels, bottleneck, i)
        outputs.append(head_to_loss)

    model = Model(inputs=input, outputs=outputs)
    rms = RMSprop(lr=5e-4)
    model.compile(optimizer=rms, loss=mean_squared_error, metrics=["accuracy"])

    return model


def hourglass_module(bottom, num_classes, num_channels, bottleneck, hgid):
    # create left features , f1, f2, f4, and f8
    left_features = create_left_half_blocks(bottom, bottleneck, hgid, num_channels)

    # create right features, connect with left features
    rf1 = create_right_half_blocks(left_features, bottleneck, hgid, num_channels)

    # add 1x1 conv with two heads, head_next_stage is sent to next stage
    # head_parts is used for intermediate supervision
    head_next_stage, head_parts = create_heads(bottom, rf1, num_classes, hgid, num_channels)

    return head_next_stage, head_parts


def bottleneck_block(bottom, num_out_channels, block_name):
    # skip layer
    if K.int_shape(bottom)[-1] == num_out_channels:
        _skip = bottom
    else:
        _skip = Conv2D(num_out_channels, kernel_size=(1, 1), activation='relu', padding='same',
                       name=block_name + 'skip')(bottom)

    # residual: 3 conv blocks,  [num_out_channels/2  -> num_out_channels/2 -> num_out_channels]
    _x = Conv2D(num_out_channels / 2, kernel_size=(1, 1), activation='relu', padding='same',
                name=block_name + '_conv_1x1_x1')(bottom)
    _x = BatchNormalization()(_x)
    _x = Conv2D(num_out_channels / 2, kernel_size=(3, 3), activation='relu', padding='same',
                name=block_name + '_conv_3x3_x2')(_x)
    _x = BatchNormalization()(_x)
    _x = Conv2D(num_out_channels, kernel_size=(1, 1), activation='relu', padding='same',
                name=block_name + '_conv_1x1_x3')(_x)
    _x = BatchNormalization()(_x)
    _x = Add(name=block_name + '_residual')([_skip, _x])

    return _x


def bottleneck_mobile(bottom, num_out_channels, block_name):
    # skip layer
    if K.int_shape(bottom)[-1] == num_out_channels:
        _skip = bottom
    else:
        _skip = SeparableConv2D(num_out_channels, kernel_size=(1, 1), activation='relu', padding='same',
                                name=block_name + 'skip')(bottom)

    # residual: 3 conv blocks,  [num_out_channels/2  -> num_out_channels/2 -> num_out_channels]
    _x = SeparableConv2D(num_out_channels / 2, kernel_size=(1, 1), activation='relu', padding='same',
                         name=block_name + '_conv_1x1_x1')(bottom)
    _x = BatchNormalization()(_x)
    _x = SeparableConv2D(num_out_channels / 2, kernel_size=(3, 3), activation='relu', padding='same',
                         name=block_name + '_conv_3x3_x2')(_x)
    _x = BatchNormalization()(_x)
    _x = SeparableConv2D(num_out_channels, kernel_size=(1, 1), activation='relu', padding='same',
                         name=block_name + '_conv_1x1_x3')(_x)
    _x = BatchNormalization()(_x)
    _x = Add(name=block_name + '_residual')([_skip, _x])

    return _x


def create_front_module(input, num_channels, bottleneck):
    # front module, input to 1/4 resolution
    # 1 7x7 conv + maxpooling
    # 3 residual block

    _x = Conv2D(64, kernel_size=(7, 7), strides=(2, 2), padding='same', activation='relu', name='front_conv_1x1_x1')(
        input)
    _x = BatchNormalization()(_x)

    _x = bottleneck(_x, num_channels // 2, 'front_residual_x1')
    _x = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(_x)

    _x = bottleneck(_x, num_channels // 2, 'front_residual_x2')
    _x = bottleneck(_x, num_channels, 'front_residual_x3')

    return _x


def create_left_half_blocks(bottom, bottleneck, hglayer, num_channels):
    # create left half blocks for hourglass module
    # f1, f2, f4 , f8 : 1, 1/2, 1/4 1/8 resolution

    hgname = 'hg' + str(hglayer)

    f1 = bottleneck(bottom, num_channels, hgname + '_l1')
    _x = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(f1)

    f2 = bottleneck(_x, num_channels, hgname + '_l2')
    _x = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(f2)

    f4 = bottleneck(_x, num_channels, hgname + '_l4')
    _x = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(f4)

    f8 = bottleneck(_x, num_channels, hgname + '_l8')

    return (f1, f2, f4, f8)


def connect_left_to_right(left, right, bottleneck, name, num_channels):
    '''
    :param left: connect left feature to right feature
    :param name: layer name
    :return:
    '''
    # left -> 1 bottlenect
    # right -> upsampling
    # Add   -> left + right

    _xleft = bottleneck(left, num_channels, name + '_connect')
    _xright = UpSampling2D()(right)
    add = Add()([_xleft, _xright])
    out = bottleneck(add, num_channels, name + '_connect_conv')
    return out


def bottom_layer(lf8, bottleneck, hgid, num_channels):
    # blocks in lowest resolution
    # 3 bottlenect blocks + Add

    lf8_connect = bottleneck(lf8, num_channels, str(hgid) + "_lf8")

    _x = bottleneck(lf8, num_channels, str(hgid) + "_lf8_x1")
    _x = bottleneck(_x, num_channels, str(hgid) + "_lf8_x2")
    _x = bottleneck(_x, num_channels, str(hgid) + "_lf8_x3")

    rf8 = Add()([_x, lf8_connect])

    return rf8


def create_right_half_blocks(leftfeatures, bottleneck, hglayer, num_channels):
    lf1, lf2, lf4, lf8 = leftfeatures

    rf8 = bottom_layer(lf8, bottleneck, hglayer, num_channels)

    rf4 = connect_left_to_right(lf4, rf8, bottleneck, 'hg' + str(hglayer) + '_rf4', num_channels)

    rf2 = connect_left_to_right(lf2, rf4, bottleneck, 'hg' + str(hglayer) + '_rf2', num_channels)

    rf1 = connect_left_to_right(lf1, rf2, bottleneck, 'hg' + str(hglayer) + '_rf1', num_channels)

    return rf1


def create_heads(prelayerfeatures, rf1, num_classes, hgid, num_channels):
    # two head, one head to next stage, one head to intermediate features
    head = Conv2D(num_channels, kernel_size=(1, 1), activation='relu', padding='same', name=str(hgid) + '_conv_1x1_x1')(
        rf1)
    head = BatchNormalization()(head)

    # for head as intermediate supervision, use 'linear' as activation.
    head_parts = Conv2D(num_classes, kernel_size=(1, 1), activation='linear', padding='same',
                        name=str(hgid) + '_conv_1x1_parts')(head)

    # use linear activation
    head = Conv2D(num_channels, kernel_size=(1, 1), activation='linear', padding='same',
                  name=str(hgid) + '_conv_1x1_x2')(head)
    head_m = Conv2D(num_channels, kernel_size=(1, 1), activation='linear', padding='same',
                    name=str(hgid) + '_conv_1x1_x3')(head_parts)

    head_next_stage = Add()([head, head_m, prelayerfeatures])
    return head_next_stage, head_parts


def euclidean_loss(x, y):
    return K.sqrt(K.sum(K.square(x - y)))

In [3]:
import sys

sys.path.insert(0, "../data_gen/")
sys.path.insert(0, "../eval/")

import os
from tensorflow.keras.callbacks import CSVLogger, ModelCheckpoint
from tensorflow.keras.models import load_model, model_from_json
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.losses import mean_squared_error
import datetime
import scipy.misc
import numpy as np

class HourglassNet(object):

    def __init__(self, num_classes, num_stacks, num_channels, inres, outres):
        self.num_classes = num_classes
        self.num_stacks = num_stacks
        self.num_channels = num_channels
        self.inres = inres
        self.outres = outres

    def build_model(self, mobile=False, show=False):
        if mobile:
            self.model = create_hourglass_network(self.num_classes, self.num_stacks,
                                                  self.num_channels, self.inres, self.outres, bottleneck_mobile)
        else:
            self.model = create_hourglass_network(self.num_classes, self.num_stacks,
                                                  self.num_channels, self.inres, self.outres, bottleneck_block)
        # show model summary and layer name
        if show:
            self.model.summary()


    def inference_rgb(self, rgbdata, orgshape, mean=None):

        scale = (orgshape[0] * 1.0 / self.inres[0], orgshape[1] * 1.0 / self.inres[1])
        imgdata = scipy.misc.imresize(rgbdata, self.inres)

        if mean is None:
            mean = np.array([0.4404, 0.4440, 0.4327], dtype=np.float)

        imgdata = normalize(imgdata, mean)

        input = imgdata[np.newaxis, :, :, :]

        out = self.model.predict(input)
        return out[-1], scale

    def inference_file(self, imgfile, mean=None):
        imgdata = scipy.misc.imread(imgfile)
        ret = self.inference_rgb(imgdata, imgdata.shape, mean)
        return ret

In [6]:
xnet = HourglassNet(num_classes=10, num_stacks=8, num_channels=128, inres=(512, 512),
                            outres=(1,1))
print(xnet)

<__main__.HourglassNet object at 0x000002BE928EB8C8>


In [15]:
#xnet.build_model(mobile=True, show=True)

In [26]:
model = create_hourglass_network(num_classes=16, num_stacks=4, num_channels=128, inres=(512, 512),
                            outres=(1,1), bottleneck=bottleneck_block)

In [27]:
model.summary()

Model: "functional_17"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_9 (InputLayer)            [(None, 512, 512, 3) 0                                            
__________________________________________________________________________________________________
front_conv_1x1_x1 (Conv2D)      (None, 256, 256, 64) 9472        input_9[0][0]                    
__________________________________________________________________________________________________
batch_normalization_2359 (Batch (None, 256, 256, 64) 256         front_conv_1x1_x1[0][0]          
__________________________________________________________________________________________________
front_residual_x1_conv_1x1_x1 ( (None, 256, 256, 32) 2080        batch_normalization_2359[0][0]   
______________________________________________________________________________________