# Import module

In [None]:
from abc import ABC, abstractmethod
from tabulate import tabulate
from tqdm import tqdm, trange
from sklearn.model_selection import KFold, train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix
import numpy as np
import json

# Import datasets

In [None]:
!pip install gdown
import gdown
d = {
    'panda-or-bear-image-classification.zip':'https://drive.google.com/file/d/1DD9rZnpGBFsH6G2bz2CZIfHa5S1Hk1Qn/view?usp=sharing',
    'Train_stock_market.csv': 'https://drive.google.com/file/d/1PA1YQhlDirnBpMuwo9YZ_DKxpAcWVT-P/view?usp=sharing',
    'Test_stock_market.csv': 'https://drive.google.com/file/d/1Ezi3W4xiRo0c3mPKV6ixtjqSLVx6jLTq/view?usp=drive_link',
    'model_spek_cnn.json': 'https://drive.google.com/file/d/1Jbah9KFf5iKE5w0u1B_GAY0ZTCAsXuZu/view?usp=drive_link',
    'model_spek_rnn.json': 'https://drive.google.com/file/d/1jDRZoXj6jvD_-7v32ieBnW-nfh-L6SOz/view?usp=drive_link',
}
for k,v in d.items():
  gdown.download(v, k, quiet=False, fuzzy=True)
!unzip panda-or-bear-image-classification



Downloading...
From: https://drive.google.com/uc?id=1DD9rZnpGBFsH6G2bz2CZIfHa5S1Hk1Qn
To: /content/panda-or-bear-image-classification.zip
100%|██████████| 12.3M/12.3M [00:00<00:00, 74.7MB/s]
Downloading...
From: https://drive.google.com/uc?id=1PA1YQhlDirnBpMuwo9YZ_DKxpAcWVT-P
To: /content/Train_stock_market.csv
100%|██████████| 762k/762k [00:00<00:00, 30.1MB/s]
Downloading...
From: https://drive.google.com/uc?id=1Ezi3W4xiRo0c3mPKV6ixtjqSLVx6jLTq
To: /content/Test_stock_market.csv
100%|██████████| 2.46k/2.46k [00:00<00:00, 7.68MB/s]
Downloading...
From: https://drive.google.com/uc?id=1Jbah9KFf5iKE5w0u1B_GAY0ZTCAsXuZu
To: /content/model_spek_cnn.json
100%|██████████| 159k/159k [00:00<00:00, 23.7MB/s]
Downloading...
From: https://drive.google.com/uc?id=1jDRZoXj6jvD_-7v32ieBnW-nfh-L6SOz
To: /content/model_spek_rnn.json
100%|██████████| 774k/774k [00:00<00:00, 35.5MB/s]


Archive:  panda-or-bear-image-classification.zip
replace PandasBears/Test/Bears/251.jpeg? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
  inflating: PandasBears/Test/Bears/251.jpeg  
  inflating: PandasBears/Test/Bears/252.jpeg  
  inflating: PandasBears/Test/Bears/253.jpeg  
  inflating: PandasBears/Test/Bears/254.jpeg  
  inflating: PandasBears/Test/Bears/255.jpeg  
  inflating: PandasBears/Test/Bears/256.jpeg  
  inflating: PandasBears/Test/Bears/257.jpeg  
  inflating: PandasBears/Test/Bears/258.jpeg  
  inflating: PandasBears/Test/Bears/259.jpeg  
  inflating: PandasBears/Test/Bears/260.jpeg  
  inflating: PandasBears/Test/Bears/261.jpeg  
  inflating: PandasBears/Test/Bears/262.jpeg  
  inflating: PandasBears/Test/Bears/263.jpeg  
  inflating: PandasBears/Test/Bears/264.jpeg  
  inflating: PandasBears/Test/Bears/265.jpeg  
  inflating: PandasBears/Test/Bears/266.jpeg  
  inflating: PandasBears/Test/Bears/267.jpeg  
  inflating: PandasBears/Test/Bears/268.jpeg  
  inflating: PandasBears

# Core program

In [None]:
# Base class for layer-able models
class Layer(ABC):
    name = ''
    def compile(self):
        pass

    @abstractmethod
    def forward(self, x):
        pass

    def __call__(self, x):
        return self.forward(x)

    def _update_batch(self, batch_size):
        if not hasattr(self, 'in_size') or len(self.in_size) != 4:
            return
        self.in_size = (batch_size, *self.in_size[1:])
        self.out_size = self.get_out_size(self.in_size)

    def backward(self, dLdy, lr):
        pass

    def get_out_size(self, in_size):
        return in_size

    @classmethod
    @abstractmethod
    def _deserialize(cls, data):
        pass

    @abstractmethod
    def _serialize(self) -> list[dict]:
        pass

## A. Activations

In [None]:
class Activation(Layer):
    name = ''

    def __init__(self):
        self.x = None

    def forward(self, x):
        self.x = x
        return self.calc(x)

    @abstractmethod
    def calc(self, x):
        pass

    @classmethod
    def _deserialize(cls, data):
        return cls()

    def _serialize(self):
        return [{'type': self.name}]


### 1. ReLU

In [None]:
class ReLU(Activation):
    name = 'relu'

    def calc(self, x):
        return np.maximum(0, x)

    def backward(self, dLdy, lr):
        # dLdy is dLdx from next layer
        # dLdx = dLdy * dydx (i.e. dLdinput)
        # dydx = 1 if x > 0 else 0
        # so dLdx = dLdy if x > 0 else 0
        # so we can just multiply dLdy with a mask of x > 0
        mask = self.x > 0
        return dLdy * mask

### 2. Sigmoid

In [None]:
class Sigmoid(Activation):
    name = 'sigmoid'

    def calc(self, x):
        return 1 / (1 + np.exp(-x))

    def backward(self, dLdy, lr):
        # dLdy is dLdx from next layer
        # dLdx = dLdy * dydx (i.e. dLdinput)
        # dydx = y * (1 - y)
        # so dLdx = dLdy * y * (1 - y)
        # so we can just multiply dLdy with y * (1 - y)
        y = self.calc(self.x)
        return dLdy * y * (1 - y)

### 3. Tanh

In [None]:
class TanH(Activation):
    name = 'tanh'

    def calc(self, x):
        return np.tanh(x)

    def backward(self, dLdy, lr):
        # dLdy is dLdx from next layer
        # dLdx = dLdy * dydx (i.e. dLdinput)
        # dydx = 1 - y^2
        # so dLdx = dLdy * (1 - y^2)
        # so we can just multiply dLdy with (1 - y^2)
        y = self.calc(self.x)
        return dLdy * (1 - y**2)

### Deserialization Helper

In [None]:
def get_activation(activation, allow_none=True):
    activations = [ReLU, Sigmoid, TanH]
    if isinstance(activation, str):
        acts = {
            '': None,
            **{act.name: act for act in activations}
        }
        assert activation in acts.keys(), f"activation must be either {','.join(acts.keys())}"
        activation = acts[activation]
        if activation is not None:
            activation = activation()
    if not allow_none and activation is None:
        raise TypeError("Activation must not be None if allow_none = true")
    if activation is not None and not isinstance(activation, Activation):
        raise TypeError("Activation must be inherited from Activation base class")
    return activation

## B. Layers

In [None]:
# Calculation without loops and without numpy sliding window tricks
def vector_calc(x, out_size, kernel_size, stride):
    bsz, x_c, x_h, x_w = x.shape
    y_h, y_w = out_size
    k_h, k_w = kernel_size
    # setup for advanced indexing
    i0 = np.repeat(np.arange(k_h), k_h) # indexing of rows from kernel
    i1 = np.repeat(np.arange(0, x_h-k_h+1, stride), y_h) # indexing of rows from input
    j0 = np.tile(np.arange(k_w), k_w) # indexing of columns from kernel
    j1 = np.tile(np.arange(0, x_w-k_h+1, stride), y_w) # indexing of columns from input
    # construct full index matrix by adding both indices i.e. shifting by kernel
    i = i0.reshape(-1,1)+i1.reshape(1,-1)
    j = j0.reshape(-1,1)+j1.reshape(1,-1)
    # broadcast over all channels
    i = np.tile(i, (x_c, 1, 1))
    j = np.tile(j, (x_c, 1, 1))
    # broadcast over all images in batch
    i = np.tile(i, (bsz, 1, 1, 1))
    j = np.tile(j, (bsz, 1, 1, 1))
    i_h, i_w = i.shape[2], i.shape[3]
    # same shape for channel indices
    k = np.arange(x_c).reshape(1,-1,1,1)
    k = np.repeat(k, i_w, axis=3)
    k = np.repeat(k, i_h, axis=2)
    k = np.repeat(k, bsz, axis=0)
    # same shape for batch indices
    b = np.arange(bsz).reshape(-1,1,1,1)
    b = np.repeat(b, i_w, axis=3)
    b = np.repeat(b, i_h, axis=2)
    b = np.repeat(b, x_c, axis=1)
    # index from input
    # (batch_size, in_channels, kernel_height * kernel_width, out_height * out_width)
    select_img = x[b, k, i, j]
    return select_img

### 1. Convolution

In [None]:
class Convolution(Layer):
    name = 'conv2d'
    DEFAULT_SIZE = (1,3,16,16)

    def __init__(
        self,
        in_size=DEFAULT_SIZE,
        padding=0,
        n_kernels=3,
        kernel_size=(3,3),
        activation: Activation|str|None='relu',
        stride=1
    ):
        self.in_size = in_size # (batch_size, n_channels, height, width)
        self.padding = padding # padding size on each side
        self.n_kernels = n_kernels # number of kernels *per input channel*
        self.kernel_size = kernel_size # (height, width)
        self.stride = stride # stride size to shift kernel
        self.activation = get_activation(activation)

    def get_out_size(self, in_size):
        return (
            in_size[0],
            self.n_kernels,
            ((in_size[2] - self.kernel_size[0] + 2 * self.padding) // self.stride) + 1,
            ((in_size[3] - self.kernel_size[1] + 2 * self.padding) // self.stride) + 1,
        )

    def compile(self):
        # generate output size
        # (batch_size, n_kernels, out_height, out_width) so out_channels = n_kernels
        self.out_size = self.get_out_size(self.in_size)
        # initialize kernels with random values and normalize them
        # (in_channels, n_kernels, kernel_height, kernel_width)
        self.kernels = np.random.randn(
            self.in_size[1],
            self.n_kernels,
            *self.kernel_size
        )
        self.kernels /= self.kernel_size[0] * self.kernel_size[1]
        # (n_kernels, out_height, out_width)
        self.bias = np.zeros((self.n_kernels, *self.out_size[-2:]))

    def vectorize_regions(self, x):
        _, _, y_h, y_w = self.out_size
        return vector_calc(x, (y_h, y_w), self.kernel_size, self.stride)

    def vectorize_backregions(self, x):
        bs, c, x_h, x_w = x.shape
        _, _, in_h, in_w = self.in_size
        # to accomodate stride, we need to pad x with zeros after, before, and between each element
        s = self.stride
        shape = (bs, c, (x_h+1)*(s-1) + x_h, (x_w+1)*(s-1) + x_w)
        padded = np.zeros(shape)
        padded[:, :, s-1::s, s-1::s] = x
        # calculate padding size
        y_pad_h = in_h - padded.shape[2]
        y_pad_h = y_pad_h + ((self.kernel_size[0] // 2) * 2)
        y_pad_h = y_pad_h // 2
        y_pad_w = in_w - padded.shape[3]
        y_pad_w = y_pad_w + ((self.kernel_size[1] // 2) * 2)
        y_pad_w = y_pad_w // 2
        # pad dLdy
        padded = np.pad(padded, ((0,0), (0,0), (y_pad_h, y_pad_h), (y_pad_w, y_pad_w)), 'constant')
        # vectorize receptive fields
        regions = vector_calc(padded, (in_h, in_w), self.kernel_size, 1)
        return regions

    def forward(self, x):
        self._update_batch(x.shape[0])
        # pad x with zeros
        x = np.pad(x, ((0,0), (0,0), (self.padding, self.padding), (self.padding, self.padding)), 'constant')
        # cache x for backpropagation
        self.x = x
        # vectorize receptive fields
        regions = self.vectorize_regions(x) # (batch_size, in_channels, kernel_height * kernel_width, out_height * out_width)
        # reshape kernels to (in_channels, n_kernels, kernel_height * kernel_width)
        kernels = self.kernels.reshape(self.in_size[1], self.n_kernels, -1) # (in_channels, n_kernels, kernel_height * kernel_width)
        # now we can just do matrix multiplication
        out = kernels @ regions # (batch_size, in_channels, n_kernels, out_height * out_width)
        # add together among channels
        out = out.sum(axis=1) # (batch_size, n_kernels, out_height * out_width)
        # add bias
        out += self.bias.reshape(self.n_kernels, -1) # (batch_size, n_kernels, out_height * out_width) (bias is broadcasted)
        # reshape to output shape
        out = out.reshape(self.out_size) # (batch_size, n_kernels, out_height, out_width)
        # apply activation
        if self.activation is not None:
            out = self.activation.forward(out)
        return out

    def backward(self, dLdy, lr):
        if self.activation is not None:
            dLdy = self.activation.backward(dLdy, lr)
        # dLdy shape: (batch_size, n_kernels, out_height, out_width)
        # first we calculate dLdk and dLdb
        # we do that with the convolution dLdk = x * dLdy and dLdb = 1 * dLdy where * is convolution
        # this is a property of the convolution operation when backpropagating
        # need to dilate dLdy according to stride with zeros
        dilate_h = dLdy.shape[2] * self.stride - (self.stride - 1) if self.stride > 1 else dLdy.shape[2]
        dilate_w = dLdy.shape[3] * self.stride - (self.stride - 1) if self.stride > 1 else dLdy.shape[3]
        dLdy_dilate = np.zeros((dLdy.shape[0], dLdy.shape[1], dilate_h, dilate_w))
        dLdy_dilate[:, :, ::self.stride, ::self.stride] = dLdy
        # prepare dLdy for convolution
        conv_dLdy = dLdy_dilate.reshape(dLdy_dilate.shape[0], 1, dLdy_dilate.shape[1], -1) # (batch_size, 1, n_kernels, out_height * out_width)
        # we can use the same trick as in forward pass to vectorize receptive fields
        x_regions = vector_calc(self.x, self.kernel_size, (dLdy_dilate.shape[2], dLdy_dilate.shape[3]), 1)
        # calculate dLdk and dLdb
        dLdk = conv_dLdy @ x_regions # (batch_size, in_channels, kernel_height * kernel_width, n_kernels)
        dLdk = dLdk.transpose(0,1,3,2) # (batch_size, in_channels, n_kernels, kernel_height * kernel_width)
        dLdk = dLdk.sum(axis=0) # (in_channels, n_kernels, kernel_height * kernel_width) (sum over batch and receptive fields)
        dLdb = dLdy.sum(axis=0) # (n_kernels, out_height, out_width) (sum over batch and receptive field)
        # update weights
        self.kernels -= lr * dLdk.reshape(self.kernels.shape) # (in_channels, n_kernels, kernel_height, kernel_width)
        self.bias -= lr * dLdb # (n_kernels, out_height, out_width)
        # now we calculate dLdx
        # this one is a bit funky, we need to calculate dLdx = dLdy * k
        # and we need to pad dLdy with zeros until the output will be the same size as x
        # why? because we need to align the contributions of each pixel in x to the output
        # so we need to calculate the padding size which must result in whatever x.shape + 1 is
        # vectorize receptive fields
        regions = self.vectorize_backregions(dLdy)
        # reshape kernels to (in_channels, n_kernels, kernel_height * kernel_width)
        kernels = self.kernels.reshape(self.in_size[1], self.n_kernels, -1)
        # transpose kernels to (n_kernels, in_channels, kernel_height * kernel_width)
        kernels = kernels.transpose(1,0,2)
        # now we can just do matrix multiplication
        dLdx = kernels @ regions # (batch_size, n_kernels, in_channels, in_height * in_width)
        # add together among kernels
        dLdx = dLdx.sum(axis=1)
        # reshape to input shape
        dLdx = dLdx.reshape(self.in_size)
        # return dLdx
        return dLdx

    @classmethod
    def _deserialize(cls, data):
        p = data['params']
        # we can auto infer in_size, n_kernels, and kernel_size from kernel shape
        # (no_channel, kernel_size[0], kernel_size[1], n_kernels)
        k = np.array(p['kernel'])
        no_channel, ks0, ks1, n_kernels = k.shape
        kernel_size = (ks0, ks1)
        if 'img_size' in p:
            def_size = tuple(p['img_size'])
        else:
            def_size = (cls.DEFAULT_SIZE[-2], cls.DEFAULT_SIZE[-1])
        c = cls(
            in_size=(None, no_channel, *def_size),
            kernel_size=kernel_size,
            n_kernels=n_kernels,
            **data['meta']
        )
        # set kernels
        c.compile()
        k = k.transpose(0, 3, 1, 2) # (no_channel, n_kernels, kernel_size[0], kernel_size[1])
        assert k.shape == c.kernels.shape, f"kernel shape mismatch: {k.shape} != {c.kernels.shape}"
        c.kernels = k
        # set bias
        b = np.array(p['bias'])
        if len(b.shape) == 1:  # parameter sharing, duplicate it
            b = b.repeat(c.out_size[-2] * c.out_size[-1]).reshape(c.bias.shape)
        assert b.shape == c.bias.shape, f"bias shape mismatch: {b.shape} != {c.bias.shape}"
        c.bias = b

        return c


    def _serialize(self):
        return [{
            'type': 'conv2d',
            'params': {
                # restore to (no_channel, kernel_size[0], kernel_size[1], n_kernels)
                # for compatibility
                'kernel': self.kernels.transpose(0, 2, 3, 1).tolist(),
                'bias': self.bias.tolist(),
                'img_size': self.in_size[-2:],
            },
            'meta': {
                'padding': self.padding,
                'stride': self.stride,
                'activation': None if self.activation is None else self.activation.name,
            }
        }]

In [None]:
# test conv backprop
x = np.random.randn(1,1,32,32)
print('x', x.shape)
print(x)
conv = Convolution(in_size=x.shape, padding=0, n_kernels=1, kernel_size=(4,4), stride=4)
conv.compile()
out = conv.forward(x)
print('out', out.shape)
print(out)
dLdy = np.random.randint(1, 9, out.shape)
print('dLdy', dLdy.shape)
print(dLdy)
dLdx = conv.backward(dLdy, 0.1)
print('dLdx', dLdx.shape)
print(dLdx)

x (1, 1, 32, 32)
[[[[ 0.95172323 -1.48659433 -0.01935405 ... -0.38011853  1.85622064
     1.03372016]
   [ 0.16750962 -0.53118492  0.14296131 ...  1.75401119 -0.69414117
     0.37768782]
   [-0.490602    0.11356264  0.00663914 ...  0.28692275  1.42143583
     1.94405463]
   ...
   [-0.03343196  0.56678553  0.67181207 ... -1.09701458 -0.829446
    -0.85642891]
   [-1.02649227 -0.56310046  0.2073629  ...  0.36762804  0.65842512
     1.56222553]
   [ 0.07713261 -0.3688733  -1.39069608 ...  2.29601218 -1.170719
     0.54954297]]]]
out (1, 1, 8, 8)
[[[[0.19997201 0.24508563 0.14150687 0.41986734 0.         0.06733596
    0.         0.04619712]
   [0.29393066 0.20844106 0.09891817 0.         0.12955816 0.0998903
    0.         0.        ]
   [0.12011249 0.         0.30744752 0.         0.         0.
    0.16278822 0.        ]
   [0.         0.         0.         0.         0.31833272 0.28292788
    0.         0.        ]
   [0.45731917 0.13722932 0.34579298 0.10483424 0.         0.
    0.100

### 2. Pooling

In [None]:
class Pooling(Layer):
    name = '_pooling2d'
    def __init__(self, pool_size=(2,2), stride=2, mode='max'):
        assert mode in ['max', 'avg'], "mode must be either 'max' or 'avg'"
        self.pool_size = pool_size
        self.stride = stride
        self.mode = mode
        self.x = None
        self.mask = None
        self.max_ids = None

    def vectorize_pools(self, x):
        _, _, x_h, x_w = x.shape
        y_h = (x_h - self.pool_size[0]) // self.stride + 1
        y_w = (x_w - self.pool_size[1]) // self.stride + 1
        return vector_calc(x, (y_h, y_w), self.pool_size, self.stride)

    def vectorize_backpools(self, x):
        bs, c, x_h, x_w = x.shape
        _, _, in_h, in_w = self.x.shape
        # to accomodate stride, we need to pad x with zeros after, before, and between each element
        s = self.stride
        shape = (bs, c, (x_h+1)*(s-1) + x_h, (x_w+1)*(s-1) + x_w)
        padded = np.zeros(shape)
        padded[:, :, s-1::s, s-1::s] = x
        # calculate padding size
        y_pad_h = in_h - padded.shape[2]
        y_pad_h = y_pad_h + ((self.pool_size[0] // 2) * 2)
        y_pad_h = y_pad_h // 2
        y_pad_w = in_w - padded.shape[3]
        y_pad_w = y_pad_w + ((self.pool_size[1] // 2) * 2)
        y_pad_w = y_pad_w // 2
        # pad dLdy
        padded = np.pad(padded, ((0,0), (0,0), (y_pad_h, y_pad_h), (y_pad_w, y_pad_w)), 'constant')
        # vectorize receptive fields
        regions = vector_calc(padded, (in_h, in_w), self.pool_size, 1)
        return regions

    def get_out_size(self, in_size):
        # adjust input size to be divisible by pool size
        in_size = list(in_size)
        in_size[2] = in_size[2] - (in_size[2] % self.pool_size[0])
        in_size[3] = in_size[3] - (in_size[3] % self.pool_size[1])
        return (
            in_size[0],
            in_size[1],
            (in_size[2] - self.pool_size[0]) // self.stride + 1,
            (in_size[3] - self.pool_size[1]) // self.stride + 1,
        )

    def forward(self, x):
        # crop last rows and columns to make it divisible by pool size
        self.is_odd = x.shape[2] % self.pool_size[0] != 0 or x.shape[3] % self.pool_size[1] != 0
        x = x[:, :, :x.shape[2] - (x.shape[2] % self.pool_size[0]), :x.shape[3] - (x.shape[3] % self.pool_size[1])]
        self.x = x
        self._update_batch(x.shape[0])
        out_size = self.get_out_size(x.shape)
        # vectorize pools so that the regions are columns
        pools = self.vectorize_pools(x)
        # get max or average
        if self.mode == 'max':
            self.max_ids = np.argmax(pools, axis=2)
            out = np.max(pools, axis=2)
        elif self.mode == 'avg':
            out = np.mean(pools, axis=2)
        # reshape to output shape
        out = out.reshape(out_size)
        return out

    def backward(self, dLdy, lr):
        # dLdy shape: (batch_size, n_channels, out_height, out_width)
        # no weights to update, just need to calculate dLdx
        # now for mean pooling, we just need to make a kernel of 1/pool_size and convolve it with dLdy
        if self.mode == 'avg':
            # same as convolution, we need to pad dLdy with zeros until the output will be the same size as x
            # vectorize pools
            pools = self.vectorize_backpools(dLdy) # (batch_size, n_channels, pool_height * pool_width, out_height * out_width)
            k = np.ones(self.pool_size) / (self.pool_size[0] * self.pool_size[1])
            # reshape kernel to (1, 1, kernel_height * kernel_width)
            k = k.reshape(1, 1, k.shape[0] * k.shape[1])
            # convolve
            dLdx = k @ pools
            dLdx = dLdx.reshape(self.x.shape)
        # for max pooling, we need to find the max value in each pool and set it to 1, the rest to 0
        # and then we multiply elementwise with dLdy
        elif self.mode == 'max':
            # initialize the gradient tensor with zeros to the same shape as the input x
            dLdx = np.zeros_like(self.x)
            # determine the positions in the original input x where each dLdy value will be placed
            batch_idx, channel_idx, out_i, out_j = np.indices(dLdy.shape)
            # reshape max_ids back to 2 dimensions
            self.max_ids = self.max_ids.reshape(self.max_ids.shape[0], self.max_ids.shape[1], dLdy.shape[2], dLdy.shape[3])
            # get row and column indices of the max values from self.max_ids
            max_pos_i, max_pos_j = divmod(self.max_ids, self.pool_size[1])
            # calculate the positions in the original input x
            input_i = out_i * self.stride + max_pos_i[batch_idx, channel_idx, out_i, out_j]
            input_j = out_j * self.stride + max_pos_j[batch_idx, channel_idx, out_i, out_j]
            # use advanced indexing to update dLdx
            dLdx[batch_idx, channel_idx, input_i, input_j] = dLdy[batch_idx, channel_idx, out_i, out_j]
        # add zeros back to the end if the input was odd
        if self.is_odd:
            dLdx = np.pad(dLdx, ((0,0), (0,0), (0,1), (0,1)), 'constant')
        # return dLdx
        return dLdx

    @classmethod
    def _deserialize(cls, data):
        mode = data['type'].split('_')[0]
        return cls(
            mode=mode,
            **data['meta'],
        )

    def _serialize(self):
        return [{
            'type': f'{self.mode}{self.name}',
            'meta': {
                'pool_size': self.pool_size,
                'stride': self.stride,
            }
        }]


In [None]:
# test pooling backprop
x = np.random.randn(1,1,12,12)
print(x.shape)
print(x)
pool = Pooling(pool_size=(2,2), stride=2, mode='avg')
out = pool.forward(x)
print(out.shape)
print(out)
dLdy = np.random.randint(1, 9, out.shape)
print(dLdy.shape)
print(dLdy)
dLdx = pool.backward(dLdy, 0.1)
print(dLdx.shape)
print(dLdx)

(1, 1, 12, 12)
[[[[-0.37613749  1.7419758   0.51781062  0.49499937 -0.48927446
    -0.16975034  0.77617644 -2.06831327  0.37353925  0.85677623
    -0.17441137 -0.69373026]
   [ 0.13131136  0.59365921  0.66604824 -0.69068809 -1.21705825
     0.83892934 -0.97689771 -0.77746656  2.75014632  1.26737113
     0.0460793   0.93298   ]
   [-0.04665596 -1.41358926 -1.92750485 -0.01181503 -0.54554272
    -1.13387094  0.75944302 -0.60654733 -0.57594301 -0.34294087
     0.27676555 -0.2841054 ]
   [ 1.15016268  0.37874626  0.21857591 -0.10558039 -0.39612805
    -0.76272858 -0.55492714 -1.75383943  0.06215821  0.09305066
    -0.21747143 -0.61696121]
   [ 0.80142675 -1.14099451  0.12197229  0.7667177   1.32087495
     0.55070667  0.77427229  0.50330044 -0.39579064 -1.0141134
    -0.58847999  0.43566313]
   [ 0.54821643 -1.74123915 -1.08298406  0.49194784  0.77706953
     0.33717693  1.28960878  1.72386019  0.51835427 -0.75969429
    -0.26438119  0.42247968]
   [-0.68997485 -1.04850389  0.71048726  0.9

### 3. Flatten

In [None]:
class Flatten(Layer):
    name = 'flatten'
    def __init__(self):
        self.x = None
        self.in_size = 0

    def get_out_size(self, in_size):
        s = 1
        for i in range(1, len(in_size)):
            s *= in_size[i]
        return (None, s)

    def forward(self, x):
        self.x = x
        return x.reshape(x.shape[0], -1)

    def backward(self, dLdy, lr):
        # TODO: test this
        return dLdy.reshape(self.x.shape)

    @classmethod
    def _deserialize(cls, _):
        return cls()

    def _serialize(self):
        return [{
            'type': 'flatten',
        }]

### 4. Linear

In [None]:
class Linear(Layer):
    name = 'linear'
    def __init__(self, in_size=10, out_size=10):
        self.in_size = (None, in_size)
        self.out_size = (None, out_size)
        self.x = None

    def get_out_size(self, in_size):
        return self.out_size

    def compile(self):
        # initialize weights with random values and normalize them
        self.weights = np.random.randn(
            self.in_size[-1],
            self.out_size[-1],
        ) # (in_features, out_features)
        self.weights /= self.in_size[-1]
        self.bias = np.zeros(self.out_size[-1]) # (out_features)

    def forward(self, x):
        self.x = x
        assert x.shape[-1] == self.weights.shape[0], f"input shape mismatch: {x.shape[-1]} != {self.weights.shape[0]}"
        out = x @ self.weights # (batch_size, out_features)
        out += self.bias # (batch_size, out_features)
        return out

    def backward(self, dLdy, lr):
        dLdw = self.x.T @ dLdy # (in_features, out_features)
        dLdb = dLdy.sum(axis=0) # (out_features)
        # update weights and bias
        self.weights -= lr * dLdw
        self.bias -= lr * dLdb
        # calculate dLdx
        dLdx = dLdy @ self.weights.T # (batch_size, in_features)
        return dLdx

    def _load_weights(self, k, b):
        self.compile()
        # set weight
        assert k.shape == self.weights.shape, f"kernel shape mismatch: {k.shape} != {self.weights.shape}"
        self.weights = k
        # set bias
        assert b.shape == self.bias.shape, f"bias shape mismatch: {b.shape} != {self.bias.shape}"
        self.bias = b


    @classmethod
    def _deserialize(cls, data):
        p = data['params']
        k = np.array(p['kernel'])
        in_size, out_size = k.shape
        c = cls(
            in_size=in_size,
            out_size=out_size,
            **data['meta'],
        )
        # set weight
        c._load_weights(k, np.array(p['bias']))
        return c

    def _serialize(self):
        return [{
            'type': 'linear',
            'params': {
                'kernel': self.weights.tolist(),
                'bias': self.bias.tolist(),
            }
        }]

### 5. Dense

In [None]:
class Dense(Linear):
    name = 'dense'
    def __init__(self, inputs=0, units=10, activation:Activation|str='sigmoid'):
        super().__init__(inputs, units)
        self.activation = get_activation(activation, allow_none=False)

    def forward(self, x):
        out = super().forward(x)
        out = self.activation.forward(out)
        return out

    def backward(self, dLdy, lr):
        dLdx = self.activation.backward(dLdy, lr)
        dLdx = super().backward(dLdx, lr)
        return dLdx

    @classmethod
    def _deserialize(cls, data):
        p = data['params']
        k = np.array(p['kernel'])
        in_size, out_size = k.shape
        c = cls(
            inputs=in_size,
            units=out_size,
            **data['meta']
        )
        # set weight
        c._load_weights(k, np.array(p['bias']))
        return c

    def _serialize(self):
        return [{
            'type': 'dense',
            'params': {
                'kernel': self.weights.tolist(),
                'bias': self.bias.tolist(),
            },
            'meta': {
                'activation': self.activation.name,
            }
        }]

### 6. LSTM

In [None]:
class LSTM(Layer):
    name = 'lstm'

    def __init__(self, in_size=5, units=10, return_sequences=False):
        self.in_size = (None, None, in_size)
        self.out_size = (None, None if return_sequences else 1, units)
        self.x = None
        self.sigmoid = Sigmoid()
        self.tanh = TanH()
        self.return_sequences = return_sequences

    def get_out_size(self, in_size):
        return self.out_size

    def compile(self):
        self.out_size = (None, None if self.return_sequences else 1, self.out_size[-1])
        # forget gate
        self.Wf = np.random.randn(self.in_size[2], self.out_size[2])
        self.Uf = np.random.randn(self.out_size[2], self.out_size[2])
        self.bf = np.zeros(self.out_size[2])
        # input gate
        self.Wi = np.random.randn(self.in_size[2], self.out_size[2])
        self.Ui = np.random.randn(self.out_size[2], self.out_size[2])
        self.bi = np.zeros(self.out_size[2])
        # cell state
        self.Wc = np.random.randn(self.in_size[2], self.out_size[2])
        self.Uc = np.random.randn(self.out_size[2], self.out_size[2])
        self.bc = np.zeros(self.out_size[2])
        # output gate
        self.Wo = np.random.randn(self.in_size[2], self.out_size[2])
        self.Uo = np.random.randn(self.out_size[2], self.out_size[2])
        self.bo = np.zeros(self.out_size[2])

    def _forward_step(self, x, t):
        x = x[:, t]
        h_prev = self.h[:, t-1] if t > 0 else np.zeros((self.in_size[0], self.out_size[2]))
        c_prev = self.c[:, t-1] if t > 0 else np.zeros((self.in_size[0], self.out_size[2]))
        # forget gate
        self.f = self.sigmoid(x @ self.Wf + h_prev @ self.Uf + self.bf)
        # input gate
        self.i = self.sigmoid(x @ self.Wi + h_prev @ self.Ui + self.bi)
        # cell state
        c_hat = self.tanh(x @ self.Wc + h_prev @ self.Uc + self.bc)
        self.c[:, t] = self.f * c_prev + self.i * c_hat
        # output gate
        self.o = self.sigmoid(x @ self.Wo + h_prev @ self.Uo + self.bo)
        # output
        h = self.o * self.tanh(self.c[:, t])
        return h

    def forward(self, x):
        self.in_size = (x.shape[0], x.shape[1], x.shape[2])
        self.x = x
        # forward propagate through time
        self.h = np.zeros((self.in_size[0], self.in_size[1], self.out_size[2]))
        self.c = np.zeros((self.in_size[0], self.in_size[1], self.out_size[2]))
        for t in range(self.in_size[1]):
            self.h[:, t] = self._forward_step(self.x, t)
        return self.h if self.return_sequences else self.h[:, -1:]

    @classmethod
    def _deserialize(cls, data):
        p = data['params']
        k = np.array(p['W_i']).shape
        c = cls(
            in_size=k[0],
            units=k[1],
            **data['meta'],
        )
        # set all params
        c.compile()
        for w in ['W', 'U', 'b']:
            for g in ['i', 'f', 'c', 'o']:
                wg = w+g
                k = np.array(p[f'{w}_{g}'])
                cur_k = getattr(c, wg)
                assert k.shape == cur_k.shape, f"{wg} shape mismatch: {k.shape} != {cur_k.shape}"
                setattr(c, wg, k)
        return c

    def _serialize(self):
        return [{
            'type': 'lstm',
            'params': {
                f'{w}_{g}': getattr(self, w+g).tolist()
                for g in ['i', 'f', 'c', 'o']
                for w in ['W', 'U', 'b']
            },
            'meta': {
                'return_sequences': self.return_sequences,
            }
        }]

In [None]:
# test LSTM
x = np.random.randn(1, 10, 5)
print(x.shape)
print(x)
lstm = LSTM(in_size=x.shape[2])
lstm.compile()
out = lstm.forward(x)
print(out.shape)
print(out)

(1, 10, 5)
[[[ 0.28654986 -0.27679391 -0.74665186 -0.00331392  0.36234761]
  [ 0.89689543 -0.0383736   2.60044925 -0.38847812  0.00801824]
  [ 0.13825808 -1.23614514  1.07531923  0.14212616 -0.65449765]
  [ 0.09285611 -0.15220465  0.75606619  0.7148543   0.88647074]
  [-0.50929909 -0.46183475 -0.65519141 -1.58960559 -0.41647173]
  [ 0.31144274 -0.95566199  0.71868884 -0.34756257  0.07684889]
  [-0.15792326 -1.91252661 -0.33115826  0.16559161  0.60208357]
  [-0.10701206 -0.72534962  0.60437855  0.95402968  0.01387122]
  [ 1.23201603 -0.18782568 -1.45886105 -0.69232699  0.83736018]
  [ 0.4750557   1.75159701 -0.01327656 -0.7495701   0.15185625]]]
(1, 1, 10)
[[[-0.64211747 -0.00295774 -0.00935804 -0.22378255 -0.41902588
    0.1151812  -0.23497289  0.00831314  0.0035991   0.18202333]]]


## C. Modelling

### 1. Sequential modelling

In [None]:
from collections import defaultdict


class Sequential():
    def __init__(self, models=None):
        self.metrics = None
        self.layers: list[Layer] = [] if models is None else models

    def add(self, model: Layer):
        self.layers.append(model)

    def _calc_in(self):
        last_out = None
        for m in self.layers:
            if hasattr(m, 'in_size') and last_out is not None:
                # Auto feed input size from before if available
                m.in_size = last_out
            last_out = m.get_out_size(
                m.in_size if last_out is None
                else last_out
            )
        return last_out

    def compile(self, metrics=None):
        if metrics:
            self.metrics = metrics
        last_out = None
        for m in self.layers:
            if hasattr(m, 'in_size') and last_out is not None:
                # Auto feed input size from before if available
                m.in_size = last_out
            m.compile()
            last_out = m.get_out_size(
                m.in_size if last_out is None
                else last_out
            )

    def summary(self):
        sizes = []
        if len(self.layers) == 0:
            print("No layers added")
            return
        for i in range(len(self.layers)):
            m = self.layers[i]
            if not hasattr(m, 'in_size'):
                in_size = sizes[i-1][2]
            else:
                in_size = m.in_size
            sz = [in_size, m.get_out_size(in_size)]
            for i in range(2):
                if len(sz[i]) == 4:
                    sz[i] = (None, *sz[i][1:])
            sizes.append((m.__class__.__name__, *sz))

        # describe model like keras
        print(
            tabulate(
                sizes,
                headers=['Layer Type', 'Input Shape', 'Output Shape'],
                tablefmt='grid',
            )
        )

    def forward(self, x):
        out = x
        for m in self.layers:
            out = m.forward(out)
        return out

    def backward(self, x, y, lr=0.1):
        yo = self.forward(x)  # 1. forward it first
        dLdy = np.subtract(yo, y.reshape(yo.shape)) # 2. calculate loss
        # 3. backward them from last layer till first layer
        for i in range(len(self.layers)-1, -1, -1):
            m = self.layers[i]
            dLdy = m.backward(dLdy, lr)

    def fit(self, x, y, epochs=2, batch_size=10, lr=0.1):
        x0, y0 = self._batch(x, y=y, batch_size=batch_size)
        for e in trange(epochs, desc='Fit Epoch',position=1):
            for i in trange(len(x0), desc='Fit Batch',position=0):
                self.backward(x0[i], y0[i], lr)
            if self.metrics:
                print(f": {self.evaluate(x, y)}")
            print()

    def _batch(self, x, batch_size, y=None):
        n_batch = x.shape[0] // batch_size
        bef_len = n_batch * batch_size
        x0, y0 = [], []
        if bef_len > 0:
            x0 = np.split(x[:bef_len], n_batch)
            if y is not None:
                y0 = np.split(y[:bef_len], n_batch)
        if x.shape[0] % batch_size != 0:
            x0.append(x[bef_len:])
            if y is not None:
                y0.append(y[bef_len:])
        return x0, y0

    def evaluate(self, x, y):
        out = self.predict(x)
        return self.metrics((out, y))

    def predict(self, x):
        # batch predict max to 50 to prevent memory error
        batch_size = 50
        x0, _ = self._batch(x, batch_size)
        out = []
        for i in trange(len(x0), desc='Predict Batch'):
            out.append(self.forward(x0[i]))
        return np.concatenate(out)

    @classmethod
    def load(cls, filename):
        layers = [Linear, Convolution, Pooling, Flatten, Dense, LSTM, ReLU, Sigmoid, TanH]
        with open(filename, 'r') as f:
            data = json.load(f)
        m = []
        last_out = None
        for d in data:
            for l in layers:
                if l.name in d['type']:
                    mdl = l._deserialize(defaultdict(dict, d))
                    if hasattr(mdl, 'in_size') and last_out is not None:
                        # Auto feed input size from before if available
                        mdl.in_size = last_out
                    last_out = mdl.get_out_size(
                        mdl.in_size if last_out is None
                        else last_out
                    )
                    m.append(mdl)
                    break
        return cls(m)

    def save(self, filename):
        data = []
        for m in self.layers:
            data.extend(m._serialize())
        with open(filename, 'w') as f:
            json.dump(data, f, indent=2)

In [None]:
def create_sequences(data, seq_length):
    sequences = []
    targets = []
    data_len = len(data)
    for i in range(data_len - seq_length):
        seq_end = i + seq_length
        seq_x = data[i:seq_end]
        seq_y = data[seq_end]
        sequences.append(seq_x)
        targets.append(seq_y)
    return np.array(sequences), np.array(targets)


### 2. Pre-made model class (CNN)

In [None]:
class CNN(Sequential):
    def __init__(self, in_size=(1,3,16,16), padding=0, n_kernels=8, kernel_size=(3,3), stride=1, units=8, pool_size=(2,2), pool_stride=2, pool_mode='avg'):
        super().__init__([
            Convolution(in_size=in_size, padding=padding, n_kernels=n_kernels, kernel_size=kernel_size, stride=stride),
            Pooling(pool_size=pool_size, stride=pool_stride, mode=pool_mode),
            Convolution(padding=padding, n_kernels=n_kernels, kernel_size=kernel_size, stride=stride),
            Pooling(pool_size=pool_size, stride=pool_stride, mode=pool_mode),
            Flatten(),
            Dense(units=units, activation='relu'),
            Dense(units=1, activation='sigmoid'),
        ])

# Testing CNN
Using panda or bear

## Preparation

In [None]:
def compute_metrics(p):
    preds, label = p
    return {
        'accuracy': accuracy_score(label == 1, preds > 0.5),
        'f1': f1_score(label == 1, preds > 0.5),
    }

In [55]:
data_gen_args = dict()
datagen_train = ImageDataGenerator(validation_split=0.2, **data_gen_args)
data_root_path = 'PandasBears'
data_paths = [f'{data_root_path}/Train', f'{data_root_path}/Test']
def prepare_data(data_path, datagen):
    d = datagen.flow_from_directory(data_path, batch_size=999999999, class_mode='sparse')
    return (d[0][0].transpose(0,3,1,2), d[0][1])

In [56]:
X_train, y_train = prepare_data(data_paths[0], datagen_train)
X_val, y_val = prepare_data(data_paths[1], datagen_train)
X_train = X_train / 255
X_val = X_val / 255

Found 500 images belonging to 2 classes.
Found 100 images belonging to 2 classes.


## Training Experiment 1

### Train

In [57]:
# train
cnn = CNN(in_size=(20,3,256,256), padding=1, n_kernels=8, kernel_size=(3,3), stride=1, units=16)
cnn.compile(metrics=compute_metrics)
cnn.summary()

+--------------+---------------------+---------------------+
| Layer Type   | Input Shape         | Output Shape        |
| Convolution  | (None, 3, 256, 256) | (None, 8, 256, 256) |
+--------------+---------------------+---------------------+
| Pooling      | (None, 8, 256, 256) | (None, 8, 128, 128) |
+--------------+---------------------+---------------------+
| Convolution  | (None, 8, 128, 128) | (None, 8, 128, 128) |
+--------------+---------------------+---------------------+
| Pooling      | (None, 8, 128, 128) | (None, 8, 64, 64)   |
+--------------+---------------------+---------------------+
| Flatten      | (None, 8, 64, 64)   | (None, 32768)       |
+--------------+---------------------+---------------------+
| Dense        | (None, 32768)       | (None, 16)          |
+--------------+---------------------+---------------------+
| Dense        | (None, 16)          | (None, 1)           |
+--------------+---------------------+---------------------+


In [58]:
cnn.fit(X_train, y_train, batch_size=10, epochs=2, lr=0.001)


Fit Batch: 100%|██████████| 50/50 [04:39<00:00,  5.58s/it]
Predict Batch: 100%|██████████| 10/10 [01:06<00:00,  6.64s/it]

Fit Epoch:  50%|█████     | 1/2 [05:45<05:45, 345.63s/it][A

: {'accuracy': 0.974, 'f1': 0.9735234215885946}



Fit Batch: 100%|██████████| 50/50 [04:58<00:00,  5.98s/it]
Predict Batch: 100%|██████████| 10/10 [01:19<00:00,  7.91s/it]

Fit Epoch: 100%|██████████| 2/2 [12:03<00:00, 361.79s/it]

: {'accuracy': 0.98, 'f1': 0.9797570850202428}






In [59]:
cnn.save('test_cnn.json')

### Predict

In [60]:
compute_metrics((cnn.predict(X_val), y_val))

Predict Batch: 100%|██████████| 2/2 [00:16<00:00,  8.04s/it]


{'accuracy': 0.96, 'f1': 0.9591836734693877}

### Load and Predict Saved Model

In [61]:
m = Sequential.load('test_cnn.json')
m.summary()

+--------------+---------------------+---------------------+
| Layer Type   | Input Shape         | Output Shape        |
| Convolution  | (None, 3, 256, 256) | (None, 8, 256, 256) |
+--------------+---------------------+---------------------+
| Pooling      | (None, 8, 256, 256) | (None, 8, 128, 128) |
+--------------+---------------------+---------------------+
| Convolution  | (None, 8, 128, 128) | (None, 8, 128, 128) |
+--------------+---------------------+---------------------+
| Pooling      | (None, 8, 128, 128) | (None, 8, 64, 64)   |
+--------------+---------------------+---------------------+
| Flatten      | (None, 8, 64, 64)   | (None, 32768)       |
+--------------+---------------------+---------------------+
| Dense        | (None, 32768)       | (None, 16)          |
+--------------+---------------------+---------------------+
| Dense        | (None, 16)          | (None, 1)           |
+--------------+---------------------+---------------------+


In [62]:
compute_metrics((m.predict(X_val), y_val))

Predict Batch: 100%|██████████| 2/2 [00:17<00:00,  8.62s/it]


{'accuracy': 0.96, 'f1': 0.9591836734693877}

## Training Experiment 2

### Train

In [None]:
# train a different version
cnn2 = Sequential([
    Convolution(in_size=(20,3,256,256), padding=2, n_kernels=8, kernel_size=(5,5), stride=1),
    Pooling(mode='max'),
    Convolution(padding=1, n_kernels=6, kernel_size=(3,3), stride=1),
    Pooling(mode='avg'),
    Flatten(),
    Dense(units=16, activation='relu'),
    Dense(units=1, activation='sigmoid'),
])
cnn2.compile(metrics=compute_metrics)
cnn2.summary()

+--------------+---------------------+---------------------+
| Layer Type   | Input Shape         | Output Shape        |
| Convolution  | (None, 3, 256, 256) | (None, 8, 256, 256) |
+--------------+---------------------+---------------------+
| Pooling      | (None, 8, 256, 256) | (None, 8, 128, 128) |
+--------------+---------------------+---------------------+
| Convolution  | (None, 8, 128, 128) | (None, 6, 128, 128) |
+--------------+---------------------+---------------------+
| Pooling      | (None, 6, 128, 128) | (None, 6, 64, 64)   |
+--------------+---------------------+---------------------+
| Flatten      | (None, 6, 64, 64)   | (None, 24576)       |
+--------------+---------------------+---------------------+
| Dense        | (None, 24576)       | (None, 16)          |
+--------------+---------------------+---------------------+
| Dense        | (None, 16)          | (None, 1)           |
+--------------+---------------------+---------------------+


In [None]:
cnn2.fit(X_train, y_train, batch_size=10, epochs=2, lr=0.001)


Fit Batch: 100%|██████████| 50/50 [08:49<00:00, 10.59s/it]
Predict Batch: 100%|██████████| 10/10 [02:14<00:00, 13.49s/it]

Fit Epoch:  50%|█████     | 1/2 [11:04<11:04, 664.43s/it][A

: {'accuracy': 0.958, 'f1': 0.9565217391304348}



Fit Batch: 100%|██████████| 50/50 [08:42<00:00, 10.46s/it]
Predict Batch: 100%|██████████| 10/10 [02:13<00:00, 13.36s/it]

Fit Epoch: 100%|██████████| 2/2 [22:00<00:00, 660.50s/it]

: {'accuracy': 0.862, 'f1': 0.8770053475935828}






In [None]:
compute_metrics((cnn2.predict(X_train), y_train))

Predict Batch: 100%|██████████| 10/10 [02:12<00:00, 13.22s/it]


{'accuracy': 0.862, 'f1': 0.8770053475935828}

## Loading Model Spesifikasi

In [None]:
m = Sequential.load('model_spek_cnn.json')
m.summary()

+--------------+-------------------+-------------------+
| Layer Type   | Input Shape       | Output Shape      |
| Convolution  | (None, 3, 16, 16) | (None, 8, 14, 14) |
+--------------+-------------------+-------------------+
| Pooling      | (None, 8, 14, 14) | (None, 8, 7, 7)   |
+--------------+-------------------+-------------------+
| Flatten      | (None, 8, 7, 7)   | (None, 392)       |
+--------------+-------------------+-------------------+
| Dense        | (None, 392)       | (None, 8)         |
+--------------+-------------------+-------------------+
| Dense        | (None, 8)         | (None, 1)         |
+--------------+-------------------+-------------------+


In [None]:
p = np.random.randn(
    5, 3, 16, 16
)

In [None]:
m.predict(p)

Predict Batch: 100%|██████████| 1/1 [00:00<00:00, 87.38it/s]


array([[0.54118097],
       [0.5517079 ],
       [0.5282708 ],
       [0.49202831],
       [0.52122151]])

# RNN Testing

## RNN Forward Pass Testing + Save/Load

In [None]:
rnn = Sequential([
    LSTM(in_size=5, units=10),
    Dense(inputs=10, units=1, activation='sigmoid')
])
rnn.compile()
rnn.summary()

+--------------+-----------------+----------------+
| Layer Type   | Input Shape     | Output Shape   |
| LSTM         | (None, None, 5) | (None, 1, 10)  |
+--------------+-----------------+----------------+
| Dense        | (None, 1, 10)   | (None, 1)      |
+--------------+-----------------+----------------+


In [None]:
# prepare data
x = np.random.randn(1, 10, 5)
print(x.shape)
print(x)

(1, 10, 5)
[[[ 1.67815934  0.95695283  0.27787654 -0.39265758  0.66191067]
  [-1.25531444 -0.42498035  0.21000236 -1.13629448  2.3582527 ]
  [-0.52725563 -0.61988274  0.12180147 -1.21837795 -1.36166998]
  [ 0.49656554 -1.23067168 -0.80301389 -0.9898318  -0.58856035]
  [-0.53006427  0.76842053 -0.37384234  0.125754    1.18966862]
  [-0.23673754  1.36260388 -0.34547994  0.41988483 -2.4991309 ]
  [ 0.50318535  1.75929159  0.12941912  1.91642448  0.0920533 ]
  [ 0.91613984 -1.7687264  -0.02425373  0.43317379  0.28299487]
  [-0.11017796  0.14351769  0.35156993  1.47157511  0.48471243]
  [ 0.48728174 -0.59976945 -0.57598336  0.48365966  0.47992505]]]


In [None]:
# test forward
out = rnn.forward(x)
print(out.shape)
print(out)

(1, 1, 1)
[[[0.46909752]]]


In [None]:
# save model
rnn.save('test_rnn.json')

In [None]:
# test forward saved model
r = Sequential.load('test_rnn.json')
out = r.forward(x)
print(out.shape)
print(out)

(1, 1, 1)
[[[0.46909752]]]


## RNN BMRA Data

In [None]:
# test on BMRA data
import pandas as pd
bmra = pd.read_csv('Test_stock_market.csv')
# use only high, low, volum, open and close as features
bmra = bmra[['High', 'Low', 'Volume', 'Open', 'Close']]
bmra = bmra.to_numpy()

def create_ds(seq_len=10):
  x_test, y_test = create_sequences(bmra, seq_len)
  y_test = y_test.reshape(y_test.shape[0], 1, y_test.shape[1])
  return x_test, y_test

In [None]:
rnn = Sequential([
    LSTM(5, 32),
    Dense(32, 5, 'relu'),
    Linear(5, 5),
])
rnn.compile()
rnn.summary()

+--------------+-----------------+----------------+
| Layer Type   | Input Shape     | Output Shape   |
| LSTM         | (None, None, 5) | (None, 1, 32)  |
+--------------+-----------------+----------------+
| Dense        | (None, 1, 32)   | (None, 5)      |
+--------------+-----------------+----------------+
| Linear       | (None, 5)       | (None, 5)      |
+--------------+-----------------+----------------+


In [None]:
# test forward and MSE loss
for seq_len in [5, 10, 15]:
  print('==== Testing forward for sequence length', seq_len, '====')
  x_test, y_test = create_ds(seq_len)
  y_pred = rnn.predict(x_test)
  loss = np.mean(np.square(y_pred - y_test))
  print("Input 1:", x_test[0])
  print("Output 1:", y_pred[0])
  print("Target 1:", y_test[0])
  print("Overall MSE Loss:", loss)


==== Testing forward for sequence length 5 ====


  return 1 / (1 + np.exp(-x))
Predict Batch: 100%|██████████| 1/1 [00:00<00:00, 45.13it/s]


Input 1: [[3.49e+00 3.38e+00 3.84e+04 3.48e+00 3.43e+00]
 [3.58e+00 3.43e+00 1.32e+04 3.57e+00 3.56e+00]
 [3.51e+00 3.40e+00 2.54e+04 3.51e+00 3.45e+00]
 [3.49e+00 3.32e+00 2.80e+04 3.43e+00 3.36e+00]
 [3.46e+00 3.33e+00 4.16e+04 3.33e+00 3.40e+00]]
Output 1: [[ 0.00717982  0.00927167 -0.00255867 -0.00687925 -0.00027295]]
Target 1: [[3.54e+00 3.43e+00 8.40e+03 3.46e+00 3.54e+00]]
Overall MSE Loss: 187001092.91319346
==== Testing forward for sequence length 10 ====


Predict Batch: 100%|██████████| 1/1 [00:00<00:00, 45.77it/s]


Input 1: [[3.49e+00 3.38e+00 3.84e+04 3.48e+00 3.43e+00]
 [3.58e+00 3.43e+00 1.32e+04 3.57e+00 3.56e+00]
 [3.51e+00 3.40e+00 2.54e+04 3.51e+00 3.45e+00]
 [3.49e+00 3.32e+00 2.80e+04 3.43e+00 3.36e+00]
 [3.46e+00 3.33e+00 4.16e+04 3.33e+00 3.40e+00]
 [3.54e+00 3.43e+00 8.40e+03 3.46e+00 3.54e+00]
 [3.54e+00 3.04e+00 7.98e+04 3.54e+00 3.05e+00]
 [3.27e+00 2.95e+00 4.57e+04 3.12e+00 2.95e+00]
 [3.20e+00 2.95e+00 4.28e+04 3.10e+00 3.00e+00]
 [3.08e+00 2.93e+00 3.30e+04 2.98e+00 2.96e+00]]
Output 1: [[ 0.00718038  0.00927147 -0.00255806 -0.00687917 -0.00027303]]
Target 1: [[3.04e+00 2.99e+00 2.49e+04 3.00e+00 2.99e+00]]
Overall MSE Loss: 140291410.46944797
==== Testing forward for sequence length 15 ====


Predict Batch: 100%|██████████| 1/1 [00:00<00:00, 24.97it/s]

Input 1: [[3.49e+00 3.38e+00 3.84e+04 3.48e+00 3.43e+00]
 [3.58e+00 3.43e+00 1.32e+04 3.57e+00 3.56e+00]
 [3.51e+00 3.40e+00 2.54e+04 3.51e+00 3.45e+00]
 [3.49e+00 3.32e+00 2.80e+04 3.43e+00 3.36e+00]
 [3.46e+00 3.33e+00 4.16e+04 3.33e+00 3.40e+00]
 [3.54e+00 3.43e+00 8.40e+03 3.46e+00 3.54e+00]
 [3.54e+00 3.04e+00 7.98e+04 3.54e+00 3.05e+00]
 [3.27e+00 2.95e+00 4.57e+04 3.12e+00 2.95e+00]
 [3.20e+00 2.95e+00 4.28e+04 3.10e+00 3.00e+00]
 [3.08e+00 2.93e+00 3.30e+04 2.98e+00 2.96e+00]
 [3.04e+00 2.99e+00 2.49e+04 3.00e+00 2.99e+00]
 [3.03e+00 2.97e+00 2.90e+03 3.01e+00 3.00e+00]
 [3.00e+00 2.90e+00 1.49e+04 3.00e+00 2.90e+00]
 [2.93e+00 2.84e+00 2.39e+04 2.93e+00 2.87e+00]
 [2.85e+00 2.80e+00 2.90e+04 2.82e+00 2.80e+00]]
Output 1: [[ 0.00718038  0.00927147 -0.00255806 -0.00687917 -0.00027303]]
Target 1: [[2.92e+00 2.70e+00 4.30e+04 2.70e+00 2.92e+00]]
Overall MSE Loss: 150663448.80840382





## Loading RNN

In [None]:
s = Sequential.load('model_spek_rnn.json')
s.summary()

+--------------+-----------------+----------------+
| Layer Type   | Input Shape     | Output Shape   |
| LSTM         | (None, None, 5) | (None, 1, 64)  |
+--------------+-----------------+----------------+
| Dense        | (None, 1, 64)   | (None, 5)      |
+--------------+-----------------+----------------+


# Save and Load Model

In [None]:
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
d = load_breast_cancer(return_X_y=True)
X_train, X_test, Y_train, Y_test = train_test_split(d[0], d[1], test_size=0.2, random_state=42)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train.shape, X_test.shape, Y_train.shape, Y_test.shape

((455, 30), (114, 30), (455,), (114,))

In [None]:
m = Sequential([
    Dense(X_train.shape[1], 9, 'relu'),
    Dense(0, 1, 'sigmoid'),
])
m.compile(metrics=compute_metrics)
m.summary()

+--------------+---------------+----------------+
| Layer Type   | Input Shape   | Output Shape   |
| Dense        | (None, 30)    | (None, 9)      |
+--------------+---------------+----------------+
| Dense        | (None, 9)     | (None, 1)      |
+--------------+---------------+----------------+


In [None]:
m.fit(X_train, Y_train, epochs=5, lr=0.1, batch_size=100)


Fit Batch: 100%|██████████| 5/5 [00:00<00:00, 987.87it/s]
Predict Batch: 100%|██████████| 10/10 [00:00<00:00, 12554.04it/s]


: {'accuracy': 0.9714285714285714, 'f1': 0.977469670710572}



Fit Batch: 100%|██████████| 5/5 [00:00<00:00, 3402.81it/s]
Predict Batch: 100%|██████████| 10/10 [00:00<00:00, 14098.50it/s]


: {'accuracy': 0.9802197802197802, 'f1': 0.9843478260869565}



Fit Batch: 100%|██████████| 5/5 [00:00<00:00, 2856.38it/s]
Predict Batch: 100%|██████████| 10/10 [00:00<00:00, 11841.63it/s]


: {'accuracy': 0.9802197802197802, 'f1': 0.9843478260869565}



Fit Batch: 100%|██████████| 5/5 [00:00<00:00, 3356.52it/s]
Predict Batch: 100%|██████████| 10/10 [00:00<00:00, 12854.13it/s]


: {'accuracy': 0.9824175824175824, 'f1': 0.986111111111111}



Fit Batch: 100%|██████████| 5/5 [00:00<00:00, 3468.09it/s]
Predict Batch: 100%|██████████| 10/10 [00:00<00:00, 14433.26it/s]
Fit Epoch: 100%|██████████| 5/5 [00:00<00:00, 55.43it/s]

: {'accuracy': 0.9802197802197802, 'f1': 0.9843478260869565}






In [None]:
compute_metrics((m.predict(X_test), Y_test))

Predict Batch: 100%|██████████| 3/3 [00:00<00:00, 7153.45it/s]


{'accuracy': 0.9912280701754386, 'f1': 0.993006993006993}

In [None]:
m.save("test1.json")

In [None]:
n = Sequential.load("test1.json")
n.summary()

+--------------+---------------+----------------+
| Layer Type   | Input Shape   | Output Shape   |
| Dense        | (None, 30)    | (None, 9)      |
+--------------+---------------+----------------+
| Dense        | (None, 9)     | (None, 1)      |
+--------------+---------------+----------------+


In [None]:
compute_metrics((n.predict(X_test), Y_test))

Predict Batch: 100%|██████████| 3/3 [00:00<00:00, 5506.74it/s]


{'accuracy': 0.9912280701754386, 'f1': 0.993006993006993}