<a href="https://colab.research.google.com/github/tohkunhao/DL-Library/blob/main/modular.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#import GPUtil
import numpy as np
import cupy as cp

In [None]:
#def CheckGPU():
#  try:
#    GPUtil.getAvailable()
#    status="available"
#  except:
#    status="not available"
#  
#  return status


In [None]:
class nnModule():
  '''
  class containing defaults of every NN module.
  Use self.trainflg as a conditional if training and testing has different behaviour
  学習時とテスト時の挙動が異なったら、self.trainflgという条件を使用してください
  '''
  def __init__(self):
    self.params = {}
    self.grads = {}
    self.trainflg = True
    self.updateflg = True
    self.xp = None

  def eval(self): #to be present in every NN module
    self.trainflg = False
  
  def train(self): #to be present in every NN module
    self.trainflg = True

In [None]:
class Linear(nnModule):
  '''
  takes in the arguments (in_channels, out_channels, init_type, bias)
  in_channels is the number of input features
  out_channels is the number of perceptrons
  init_type is the type of weight initialisations. Default is He Kaiming's for use with ReLU
    other options include Xavier for tanh.
  bias determines if bias is used. Default is set to true.
  updateflg will determine if weights will be updated by the optimizer. Default is set to true.
  '''
  def __init__(self, in_channels, out_channels, init_type='He',bias=True,updateflg=True):
    super(Linear,self).__init__()
    self.in_channels = in_channels
    self.out_channels = out_channels
    self.init_type = init_type
    self.bias = bias
    self.updateflg = updateflg
  
  def forward(self,x):
    self.xp = cp.get_array_module(x)
    self.x = x #store x for use in backprop

    if self.init_type == 'He':
      sd = self.xp.sqrt(2/self.in_channels)
    elif self.init_type == 'Xavier':
      sd = self.xp.sqrt(1/self.in_channels)

    #initialise weights and biases
    if 'w' not in self.params.keys():
      self.params['w'] = self.xp.random.rand(self.in_channels,self.out_channels)*sd
    if self.bias and 'b' not in self.params.keys():
      self.params['b'] = self.xp.zeros(self.out_channels)#initialize at 0

    if self.bias:
      out = self.xp.dot(x,self.params['w'])+self.params['b']
    else:
      out = self.xp.dot(x,self.params['w'])
    
    return out

  def backward(self,dout):
    if self.bias:
      self.grads['b'] = self.xp.sum(dout,axis=0)
    
    self.grads['w'] = self.xp.dot(self.x.reshape((self.in_channels,-1)),dout.reshape((-1,dout.shape[-1]))) #to prevent errors in batch size 1

    return self.xp.dot(dout,self.xp.transpose(self.params['w']))


In [None]:
class ReLU(nnModule):
  '''
  Activation function ReLU
  f(x)=x x>0, 0 x<0
  '''
  def __init__(self):
    super(ReLU,self).__init__()
    self.mask = None
    self.updateflg = False #no trainable parameters
  
  def forward(self,x):
    self.mask = x>0 #store for backprop
    self.xp = cp.get_array_module(x)
    return self.xp.maximum(0,x)
  
  def backward(self,dout):
    out = self.xp.zeros_like(dout)
    out[self.mask] = dout[self.mask]
    return out

In [None]:
class Dropout(nnModule):
  '''
  Performs dropout on the outputs of the linear layer according to the
  probability of p as defined by the user
  '''
  def __init__(self,p=0.5):
    super(Dropout,self).__init__()
    self.p = p
    self.mask = None
    self.updateflg = False
  
  def forward(self,x):
    if self.trainflg:
      self.xp = cp.get_array_module(x)

      rng = self.xp.random.rand(*x.shape)
      out = self.xp.zeros_like(x)
      self.mask = rng > self.p
      out[self.mask] = x[self.mask]
    else:
      out = x * (1 - self.p) #scale the output at test time

    return out

  def backward(self,dout):
    out = self.xp.zeros_like(dout)
    out[self.mask] = dout[self.mask]
    return out

In [None]:
class BatchNorm(nnModule):
  '''
  Performs batchnorm on NxD tensor from 2015 batchnorm paper by Ioffe & Szegedy.
  NOT TO BE USED FOR CONVOLUTION. See BatchNormConv for convolutions.
  N being mini batch size, D being number of features
  '''
  def __init__(self,eps=1e-05):
    super(BatchNorm,self).__init__()
    self.mean = [] 
    self.var = []
    self.m = None
    self.x_hat = None
    self.mbmean = None
    self.mbvar = None
    self.x = None
    self.eps = eps
  
  def forward(self,x):
    self.xp = cp.get_array_module(x)
    self.x = x #store for backprop

    #initialise the trainable parameters
    if 'gamma' not in self.params.keys():
      self.params['gamma'] = self.xp.ones(x.shape[-1]) #as many as input features
    if 'beta' not in self.params.keys():
      self.params['beta'] = self.xp.zeros(x.shape[-1]) #as many as input features

    if not self.m:
      self.m = x.shape[0] #set minibatch size if self.m is None

    if self.trainflg:
      self.mbmean = self.xp.mean(x, axis = 0)
      self.mbvar = self.xp.mean((x-self.mbmean)**2,axis = 0)
      self.mean.append(self.mbmean) #used to calculate moving average
      self.var.append(self.mbvar) #used to calculate moving average
      self.x_hat = (x - self.mbmean)/self.xp.sqrt(self.mbvar + self.eps)
    else:
      Ex = self.xp.mean(self.mean,axis = 0)#moving average
      Varx = (self.m/(self.m-1)) * self.xp.mean(self.var, axis = 0)#moving average
      self.x_hat = (x - Ex)/self.xp.sqrt(Varx + self.eps)
    
    return self.params['gamma'] * self.x_hat + self.params['beta']
    
  def backward(self,dout):
    #store trainable gradients
    self.grads['beta'] = self.xp.sum(dout, axis = 0) #(D,) dimension
    self.grads['gamma'] = self.xp.sum(dout*self.x_hat, axis = 0) #(D,) dimension

    #Not trainable gradients. No need to store
    dx_hat = self.params['gamma']*dout #(N,D) dimension
    dmbvar = self.xp.sum(dx_hat*(self.x-self.mbmean)*-0.5*self.xp.sqrt(self.mbvar+self.eps)**-3,axis = 0) #(D,) dimension
    dmbmean = self.xp.sum(dx_hat*-1/self.xp.sqrt(self.mbvar+self.eps),axis = 0)
    dmbmean += self.mbvar*self.xp.sum(-2*(self.x-self.mbmean),axis = 0)/self.m #(D,) dimension
    return dx_hat/self.xp.sqrt(self.mbvar+self.eps) + dmbvar*2*(self.x-self.mbmean)/self.m + dmbmean/self.m #(N,D) dimension

In [None]:
class Sequence(nnModule):
  '''
  class for executing nnModules in a sequential manner (left to right).
  non nested lists are accepted.
  モジュールを順番に実行するクラス。
  '''
  def __init__(self,*layers):
    super(Sequence,self).__init__()
    self.modules = []
    for layer in layers:
      if type(layer) == list:
        for sublayer in layer:
          self.modules.append(sublayer)
      else:
        self.modules.append(layer)
    self.x = None

  def forward(self,x):
    self.x = x
    for module in self.modules:
      x = module.forward(x)
    return x
  
  def backward(self,dout):
    for module in self.modules[::-1]:
      dout = module.backward(dout)
    return dout


In [None]:
def im2col(image, kernel_size, stride, padding,mode='reg'):
  '''畳み込みニューラルネットワーク用のim2col関数。
  col_out = im2col(image, kernel_size, stride, padding)
  3 modes available, reg, mono, single
  image format for the 3 modes:
  reg: N,C,H,W
  mono: N,H,W
  single: C,H,W
  kernel_size, stride and padding can be a tuple
  '''

  xp = cp.get_array_module(image)

  if mode == 'reg':
    N,C,H,W = image.shape
  else:
    N,C = 1, 1
    unk,H,W = image.shape
    if mode == 'mono':
      N = unk
    elif mode == 'single':
      C = unk
    image = image.reshape(N,C,H,W) #make into 4d array

  kh, kw = tupleoption(kernel_size)
  ph, pw = tupleoption(padding)
  sh, sw = tupleoption(stride)

  #add padding
  image = xp.pad(image,[(0,0),(0,0),(ph,ph),(pw,pw)],'constant')

  out_h = int((H+2*ph-kh)/sh +1)
  out_w = int((W+2*pw-kw)/sw +1)
  #print(out_h)
  #print(out_w)

  col_holder = []

  for batch_sample in range(N):
    for h_move in range(0,H-kh+1,sh):
      h_start = 0+h_move
      h_end = h_start+kh
      for w_move in range(0,W-kw+1,sw):
        w_start = 0+w_move
        w_end = w_start+kw
        col_holder.append(xp.reshape(image[batch_sample][:,h_start:h_end,w_start:w_end],-1))
  
  return xp.array(col_holder)

In [None]:
def tupleoption(input_var):
  if isinstance(input_var, int):
    return input_var, input_var
  else: 
    return input_var[0], input_var[1]

[0, 2]
