In [1]:
%%javascript
utils.load_extension('collapsible_headings/main')
utils.load_extension('hide_input/main')
utils.load_extension('autosavetime/main')
utils.load_extension('execute_time/ExecuteTime')
utils.load_extension('table_beautifier/main')
utils.load_extension('code_prettify/code_prettify')
utils.load_extension('scroll_down/main')
utils.load_extension('jupyter-js-widgets/extension')

<IPython.core.display.Javascript object>

In [8]:
#export

import torch.nn as nn
import torch,math,sys
import torch.utils.model_zoo as model_zoo
from functools import partial
from fastai.torch_core import Module
import math

__all__ = ['XResNet_1d', 'xresnet18_1d', 'xresnet34_1d', 'xresnet50_1d', 'xresnet101_1d', 'xresnet152_1d']

# or: ELU+init (a=0.54; gain=1.55)
act_fn = nn.ReLU(inplace=True)

class Flatten(Module):
    def forward(self, x): return x.view(x.size(0), -1)

def init_cnn(m):
    if getattr(m, 'bias', None) is not None: nn.init.constant_(m.bias, 0)
    if isinstance(m, (nn.Conv1d,nn.Linear)): nn.init.kaiming_normal_(m.weight)
    for l in m.children(): init_cnn(l)
        
def same_padding1d(seq_len, ks, stride=1, dilation=1):
    pad = (stride * (seq_len - 1) - seq_len + ks + (ks - 1) * (dilation - 1)) / 2
    padding = (math.ceil(pad), int(pad))
    return padding

def conv(ni, nf, seq_len, ks=3, stride=1, padding='same', bias=False):
    if padding == 'same': padding = same_padding1d(seq_len, ks, stride=stride)
    else: padding = ks//2
    return nn.Conv1d(ni, nf, kernel_size=ks, stride=stride, padding=padding, bias=bias)

def noop(x): return x

def conv_layer(ni, nf, seq_len, ks=3, stride=1, padding='same', zero_bn=False, act=True):
    bn = nn.BatchNorm1d(nf)
    nn.init.constant_(bn.weight, 0. if zero_bn else 1.)
    layers = [conv(ni, nf, seq_len, ks, stride=stride, padding=padding), bn]
    if act: 
        layers.append(act_fn)
    return nn.Sequential(*layers)

class ResBlock(Module):
    def __init__(self, expansion, ni, nh, seq_len, padding='same', stride=1):
        nf,ni = nh*expansion,ni*expansion
        if expansion == 1:
            self.conv_layer1 = conv_layer(ni, nh, seq_len, 3, stride=stride, padding=padding)
            self.conv_layer2 = conv_layer(nh, nf, seq_len, 3, zero_bn=True, act=False, padding=padding)
            self.conv_layer3 = noop
        else:
            self.conv_layer1 = conv_layer(ni, nh, seq_len, 1, padding=padding)
            self.conv_layer2 = conv_layer(nh, nh, seq_len, 3, stride=stride, padding=padding)
            self.conv_layer3 = conv_layer(nh, nf, seq_len, 1, zero_bn=True, act=False, padding=padding)
        # TODO: check whether act=True works better
        self.idconv = noop if ni==nf else conv_layer(ni, nf, seq_len, 1, act=False, padding=padding)
        self.pool = noop if stride==1 else nn.AvgPool1d(2, ceil_mode=True)

    def forward(self, x): 
        print('x:', x.shape)
        out = self.conv_layer1(x)
        print('out1:', out.shape)
        out = self.conv_layer2(x)
        print('out2:', out.shape)
        out = self.conv_layer3(x)
        print('out3:', out.shape)
                   
        convid = self.idconv(self.pool(x))
        print('convid:', convid.shape)
        output = act_fn(out + convid)
        print('output:', output.shape)
        return output

class XResNet_1d(nn.Sequential):
    def __init__(self, expansion, layers, c_in, seq_len, c_out=1000, padding='same'):
        stem = []
        sizes = [c_in,32,32,64]
        for i in range(3):
            stem.append(conv_layer(sizes[i], sizes[i+1], seq_len, padding=padding, stride=2 if i==0 else 1))

        block_szs = [64//expansion,64,128,256,512]
        blocks = [self._make_layer(expansion, block_szs[i], block_szs[i+1], l, 1 if i==0 else 2, padding=padding)
                  for i,l in enumerate(layers)]
        super().__init__(
            *stem,
            nn.MaxPool1d(kernel_size=3, stride=2, padding=1),
            *blocks,
            nn.AdaptiveAvgPool1d(1), Flatten(),
            nn.Linear(block_szs[-1]*expansion, c_out),
        )
        init_cnn(self)

    def _make_layer(self, expansion, ni, nf, blocks, stride, padding='same'):
        return nn.Sequential(
            *[ResBlock(expansion, ni if i==0 else nf, nf, stride if i==0 else 1, padding=padding)
              for i in range(blocks)])

def xresnet(expansion, n_layers, name, pretrained=False, **kwargs):
    model = XResNet_1d(expansion, n_layers, **kwargs)
    if pretrained: model.load_state_dict(model_zoo.load_url(model_urls[name]))
    return model

me = sys.modules[__name__]
for n, e, l in [
    [18, 1, [2, 2, 2, 2]],
    [34, 1, [3, 4, 6, 3]],
    [50, 4, [3, 4, 6, 3]],
    [101, 4, [3, 4, 23, 3]],
    [152, 4, [3, 8, 36, 3]],
]:
    name = f'xresnet{n}_1d'
    setattr(me, name, partial(xresnet, expansion=e, n_layers=l, name=name))

In [None]:
#export 

#export

import torch.nn as nn
import torch,math,sys
import torch.utils.model_zoo as model_zoo
from functools import partial
from fastai.torch_core import Module

__all__ = ['XResNet', 'xresnet18', 'xresnet34', 'xresnet50', 'xresnet101', 'xresnet152']

# or: ELU+init (a=0.54; gain=1.55)
act_fn = nn.ReLU(inplace=True)

class Flatten(Module):
    def forward(self, x): return x.view(x.size(0), -1)

def init_cnn(m):
    if getattr(m, 'bias', None) is not None: nn.init.constant_(m.bias, 0)
    if isinstance(m, (nn.Conv2d,nn.Linear)): nn.init.kaiming_normal_(m.weight)
    for l in m.children(): init_cnn(l)

def conv(ni, nf, ks=3, stride=1, bias=False):
    return nn.Conv2d(ni, nf, kernel_size=ks, stride=stride, padding=ks//2, bias=bias)

def noop(x): return x

def conv_layer(ni, nf, ks=3, stride=1, zero_bn=False, act=True):
    bn = nn.BatchNorm2d(nf)
    nn.init.constant_(bn.weight, 0. if zero_bn else 1.)
    layers = [conv(ni, nf, ks, stride=stride), bn]
    if act: 
        layers.append(act_fn)
    return nn.Sequential(*layers)

class ResBlock(Module):
    def __init__(self, expansion, ni, nh, stride=1):
        nf,ni = nh*expansion,ni*expansion
        layers  = [conv_layer(ni, nh, 3, stride=stride),
                   conv_layer(nh, nf, 3, zero_bn=True, act=False)
        ] if expansion == 1 else [
                   conv_layer(ni, nh, 1),
                   conv_layer(nh, nh, 3, stride=stride),
                   conv_layer(nh, nf, 1, zero_bn=True, act=False)
        ]
        self.convs = nn.Sequential(*layers)
        # TODO: check whether act=True works better
        self.idconv = noop if ni==nf else conv_layer(ni, nf, 1, act=False)
        self.pool = noop if stride==1 else nn.AvgPool2d(2, ceil_mode=True)
        #self.bn = nn.BatchNorm2d(nf)

    def forward(self, x): return act_fn(self.convs(x) + self.idconv(self.pool(x)))
    #def forward(self, x): return self.bn(act_fn(self.convs(x) + self.idconv(self.pool(x))))

def filt_sz(recep): return min(64, 2**math.floor(math.log2(recep*0.75)))

class XResNet(nn.Sequential):
    def __init__(self, expansion, layers, c_in=3, c_out=1000):
        stem = []
        sizes = [c_in,32,32,64]
        for i in range(3):
            stem.append(conv_layer(sizes[i], sizes[i+1], stride=2 if i==0 else 1))
            #nf = filt_sz(c_in*9)
            #stem.append(conv_layer(c_in, nf, stride=2 if i==1 else 1))
            #c_in = nf

        block_szs = [64//expansion,64,128,256,512]
        blocks = [self._make_layer(expansion, block_szs[i], block_szs[i+1], l, 1 if i==0 else 2)
                  for i,l in enumerate(layers)]
        super().__init__(
            *stem,
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            *blocks,
            nn.AdaptiveAvgPool2d(1), 
            Flatten(),
            nn.Linear(block_szs[-1]*expansion, c_out),
        )
        init_cnn(self)

    def _make_layer(self, expansion, ni, nf, blocks, stride):
        return nn.Sequential(
            *[ResBlock(expansion, ni if i==0 else nf, nf, stride if i==0 else 1)
              for i in range(blocks)])

def xresnet(expansion, n_layers, name, pretrained=False, **kwargs):
    model = XResNet(expansion, n_layers, **kwargs)
    if pretrained: model.load_state_dict(model_zoo.load_url(model_urls[name]))
    return model

me = sys.modules[__name__]
for n, e, l in [
    [18, 1, [2, 2, 2, 2]],
    [34, 1, [3, 4, 6, 3]],
    [50, 4, [3, 4, 6, 3]],
    [101, 4, [3, 4, 23, 3]],
    [152, 4, [3, 8, 36, 3]],
]:
    name = f'xresnet{n}'
    setattr(me, name, partial(xresnet, expansion=e, n_layers=l, name=name))

In [9]:
try: from exp.nb_utils import *
except ImportError: from .nb_utils import *
nb_auto_export()

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [10]:
import torch
c_in = 10
seq_len = 128
c_out = 2
xb = torch.rand(32, c_in, seq_len)
xresnet50_1d(c_in=c_in, seq_len=seq_len, c_out=2)(xb)

RuntimeError: expected padding to be a single integer value or a list of 1 values to match the convolution dimensions, but got padding=[65, 64]

In [None]:
xresnet50_1d(c_in=c_in, c_out=c_out)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

__all__ = ['ResNetPlus', 'resnetplus']

def init_cnn(m):
    if getattr(m, 'bias', None) is not None: nn.init.constant_(m.bias, 0)
    if isinstance(m, (nn.Conv1d, nn.Conv2d, nn.Linear)):
        nn.init.kaiming_normal_(m.weight)
    for l in m.children():
        print(l.name)
        init_cnn(l)


class SELayer(nn.Module):
    def __init__(self, channel, reduction=16, se=True):
        super(SELayer, self).__init__()
        self.se = se
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False), nn.Sigmoid())

    def forward(self, x):
        if not self.se:
            return x
        b, c, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1)
        out = x * y.expand_as(x)
        return out


class AddCoords1d(nn.Module):
    def __init__(self, coordconv=False):
        super().__init__()
        self.coordconv = coordconv
        self.device = torch.device("cuda" if torch.cuda.
                                   is_available() else "cpu")

    def forward(self, input_tensor):
        if not self.coordconv:
            return input_tensor
        batch_size, feat, sl = input_tensor.size()
        if self.coordconv:
            xx_channel = torch.arange(
                sl, device=self.device, dtype=torch.float) / (sl - 1)
            xx_channel = xx_channel * 2 - 1
            xx_channel = xx_channel.repeat(batch_size, 1, 1)
            input_tensor = torch.cat([input_tensor, xx_channel], dim=1)
        return input_tensor


def same_padding1d(input_dim, kernel_size, stride=1, dilation=1):
    # input dim: seq_len
    import math
    pad = (stride * (input_dim - 1) - input_dim + kernel_size +
           (kernel_size - 1) * (dilation - 1)) / 2
    return (math.ceil(pad), int(pad))


def conv1d(in_features, n_feature_maps, kernel_size, stride=1, padding=0, bias=False):
    return nn.Conv1d(
        in_channels=in_features,
        out_channels=n_feature_maps,
        kernel_size=kernel_size,
        padding=padding,
        stride=stride,
        bias=bias)


class ResBlock(nn.Module):
    def __init__(self,
                 in_features,
                 n_feature_maps,
                 expand=True,
                 se=True,
                 reduction=16,
                 coordconv=False,
                 bias=False,
                 device='cuda'):
        super().__init__()
        self.expand = expand

        self.addcoords1d = AddCoords1d(coordconv=coordconv)
        self.conv1 = conv1d(
            in_features + coordconv, n_feature_maps, kernel_size=7, bias=bias)
        self.bn1 = nn.BatchNorm1d(n_feature_maps)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = conv1d(n_feature_maps, n_feature_maps, kernel_size=5, bias=bias)
        self.bn2 = nn.BatchNorm1d(n_feature_maps)

        self.conv3 = conv1d(n_feature_maps, n_feature_maps, kernel_size=3, bias=bias)
        self.bn3 = nn.BatchNorm1d(n_feature_maps)
        self.se = SELayer(n_feature_maps, reduction, se)

        # expand channels for the sum
        self.shortcut_conv = conv1d(in_features, n_feature_maps, kernel_size=1, bias=bias)
        self.shortcut_bn = nn.BatchNorm1d(n_feature_maps)

    def forward(self, x):
        residual = x
        x = self.addcoords1d(x)
        m = nn.ConstantPad1d(same_padding1d(x.shape[2], 7), 0)
        x = m(x)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        m = nn.ConstantPad1d(same_padding1d(out.shape[2], 5), 0)
        out = m(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        m = nn.ConstantPad1d(same_padding1d(out.shape[2], 3), 0)
        out = m(out)
        out = self.conv3(out)
        out = self.bn3(out)
        out = self.se(out)

        if self.expand:
            shortcut = self.shortcut_conv(residual)
            shortcut = self.shortcut_bn(shortcut)
        else:
            shortcut = self.shortcut_bn(residual)

        out += shortcut
        out = self.relu(out)

        return out


class ResNetPlus(nn.Module):
    def __init__(self,
                 in_features,
                 n_classes,
                 input_dropout=0.0,
                 fc_dropout=0.0,
                 se=False,
                 reduction=16,
                 coordconv=False,
                 y_range=None,
                 bias=False,
                 init=False):
        super().__init__()
        n_feature_maps = 64

        self.arch = 'ResNet'
        self.se = se
        self.reduction = reduction
        self.coordconv = coordconv
        if self.se:
            self.arch = 'SE' + self.arch
        if self.coordconv:
            self.arch = 'cc' + self.arch
        if input_dropout != 0.0:
            self.arch = self.arch + '_ido'
        if fc_dropout != 0.0:
            self.arch = self.arch + '_do'
        self.__name__ = self.arch

        self.in_features = in_features
        self.n_classes = n_classes

        self.block1 = ResBlock(
            in_features,
            n_feature_maps,
            se=se,
            reduction=reduction,
            coordconv=coordconv, 
            bias=bias)
        self.block2 = ResBlock(
            n_feature_maps,
            n_feature_maps * 2,
            se=se,
            reduction=reduction, 
            bias=bias)
        self.block3 = ResBlock(
            n_feature_maps * 2,
            n_feature_maps * 2,
            expand=False,
            se=se,
            reduction=reduction, 
            bias=bias)

        self.gap = nn.AdaptiveAvgPool1d(1)
        self.fc_dropout = nn.Dropout(fc_dropout)
        self.input_dropout = nn.Dropout(input_dropout)
        self.fc1 = nn.Linear(n_feature_maps * 2, n_classes)

        self.y_range = y_range

        # Initialization
        self.init = init
        if self.init: init_cnn(self)

    def forward(self, x):
        print('x', x.shape)
        x = self.input_dropout(x)
        x = self.block1(x)
        print('x', x.shape)
        x = self.block2(x)
        print('x', x.shape)
        x = self.block3(x)
        print('x', x.shape)
        b, c, _ = x.size()
        x = self.gap(x).view(b, c)
        print('x', x.shape)
        x = self.fc_dropout(x)
        print('x', x.shape)
        x = self.fc1(x)
        print('x', x.shape)
        if self.y_range:
            x = F.sigmoid(x)
            x = x * (self.y_range[1] - self.y_range[0])
            x = x + self.y_range[0]
        print('x', x.shape)
        return x

from functools import partial
resnetplus = partial(
        ResNetPlus,
        input_dropout=0.0,
        fc_dropout=0.,
        se=False,
        reduction=16,
        coordconv=False,
        y_range=None,
        init=False)
setattr(resnetplus, '__name__', 'resnetplus')
setattr(resnetplus, 'fn', ResNetPlus)


resnetplus(c_in, c_out)(xb)