In [2]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.ao.nn.quantized import functional as qF
import numpy as np
import cv2 as cv
import pytorch_lightning as pl
import torchvision
import torchvision.transforms.v2 as transforms
from torcheval.metrics import MulticlassAccuracy
from torcheval.metrics.functional import multiclass_accuracy
from torch.ao.quantization import QuantStub, DeQuantStub
from pytorch_lightning.callbacks import ModelCheckpoint, RichProgressBar
import json

from models import DatasetWrapper, MyMobileNet, SeparableConv2d, get_dataloaders

In [3]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
classes = ('plane', 'car', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck')

trainloader, validloader, testloader = get_dataloaders(batch_size=256, num_workers=24)



Files already downloaded and verified
Files already downloaded and verified


# Training MobileNet with ZeLU activation layer

In [27]:
class TanhSeparableConv2d(nn.Module):
    '''Separable convolution'''
    def __init__(self, in_channels, out_channels, stride=1):
        super(TanhSeparableConv2d, self).__init__()
        self.dw_conv = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False),
            nn.BatchNorm2d(in_channels),
            # nn.Tanh()
        )
        self.pw_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(out_channels),
            # nn.Tanh()
        )

    def forward(self, x):
        x = F.tanh(self.dw_conv(x))
        x = F.tanh(self.pw_conv(x))
        return x

class TanhMobileNet(MyMobileNet):
    def __init__(self, steps_per_epoch, num_classes: int=10, alpha: float=1, max_epochs: int=50):
        super(TanhMobileNet, self).__init__(steps_per_epoch, num_classes, alpha, max_epochs)
        # self.relu = nn.Tanh()
        # self.relu = F.tanh

    def make_feature_extractor(self, alpha):
        layer_values = [(int(inp*alpha), int(out*alpha), chan) for inp, out, chan in self.cfg]
        layers = nn.Sequential(*[TanhSeparableConv2d(*tup) for tup in layer_values])
        return layers

    def forward(self, x):
        x = F.tanh(self.bn(self.conv(x)))
        x = self.features(x)
        x = F.avg_pool2d(x, 4)
        x = x.view(x.size()[0], -1)
        x = self.linear(x)
        return x


In [7]:
MAX_EPOCHS = 60

model = TanhMobileNet(steps_per_epoch=len(trainloader), num_classes=10, alpha=0.25)

checkpoint_callback = ModelCheckpoint(
    dirpath='checkpoints/',
    filename='TANH-{epoch:02d}-{val_accuracy:.2f}'
)
progress_callback = RichProgressBar(leave=True)

trainer = pl.Trainer(max_epochs=MAX_EPOCHS, callbacks=[checkpoint_callback, progress_callback])

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [29]:
trainer.fit(model, train_dataloaders=trainloader, val_dataloaders=validloader)

/home/semar/.local/lib/python3.11/site-packages/pytorch_lightning/callbacks/model_checkpoint.py:630: Checkpoint directory checkpoints/ exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

Output()

`Trainer.fit` stopped: `max_epochs=60` reached.


In [19]:
trainer.test(model, testloader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Output()

[{'test_loss': 0.6737096905708313, 'test_accuracy': 0.779699981212616}]

In [32]:
MODEL_WEIGHTS_PATH

'./checkpoints/tanh_60epochs.pth'

In [5]:
MODEL_WEIGHTS_PATH = './checkpoints/tanh_60epochs.pth'

In [20]:
print('==> saving model')
state = {
    'net': model.state_dict(),
    'acc': '0.747299',
    'epoch': 50,
}

torch.save(state, MODEL_WEIGHTS_PATH)

==> saving model


In [21]:
# Sanity test

saved = torch.load(MODEL_WEIGHTS_PATH)
# model.load_state_dict(saved['state_dict'])
model.load_state_dict(saved['net'])
trainer.test(model, testloader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Output()

[{'test_loss': 0.6737096905708313, 'test_accuracy': 0.779699981212616}]

# Gathering statistics

In [37]:
# import pandas as pd
# # df = pd.DataFrame
# df = pd.DataFrame([], columns=['Values']) 
# df.Values.push

AttributeError: 'Series' object has no attribute 'push'

In [46]:
values = []
class Zanh(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        values.extend(x.cpu().detach().numpy())
        # return 0.006769816 + 0.554670504 * x - 0.009411195 * x**2 - 0.014187547 * x**3
        # return -0.004010731 + 0.259730154 * x + 0.002355637 * x.square() - (0.00085806 * x.pow(3))
        return 0.004802802 + (0.307091252 * x) + -0.003282771 * x.square() -(0.001553874 * x.pow(3))
        # return F.tanh(x)

class TanhSeparableConv2d(nn.Module):
    '''Separable convolution'''
    def __init__(self, in_channels, out_channels, stride=1):
        super(TanhSeparableConv2d, self).__init__()
        self.dw_conv = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False),
            nn.BatchNorm2d(in_channels),
            Zanh()
        )
        self.pw_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(out_channels),
            Zanh()
        )

    def forward(self, x):
        # x = F.tanh(self.dw_conv(x))
        # x = F.tanh(self.pw_conv(x))
        x = self.dw_conv(x)
        x = self.pw_conv(x)
        return x

class TanhMobileNet(MyMobileNet):
    def __init__(self, steps_per_epoch, num_classes: int=10, alpha: float=1, max_epochs: int=50):
        super(TanhMobileNet, self).__init__(steps_per_epoch, num_classes, alpha, max_epochs)
        self.relu = Zanh()
        # self.relu = F.tanh

    def make_feature_extractor(self, alpha):
        layer_values = [(int(inp*alpha), int(out*alpha), chan) for inp, out, chan in self.cfg]
        layers = nn.Sequential(*[TanhSeparableConv2d(*tup) for tup in layer_values])
        return layers

    def forward(self, x):
        # x = F.tanh(self.bn(self.conv(x)))
        x = self.relu(self.bn(self.conv(x)))
        x = self.features(x)
        x = F.avg_pool2d(x, 4)
        x = x.view(x.size()[0], -1)
        x = self.linear(x)
        return x


model = TanhMobileNet(steps_per_epoch=len(trainloader), num_classes=10, alpha=0.25)
saved = torch.load(MODEL_WEIGHTS_PATH)
# model.load_state_dict(saved['state_dict'])
model.load_state_dict(saved['net'])

# model = TanhMobileNet(steps_per_epoch=len(trainloader), num_classes=10, alpha=0.25)

checkpoint_callback = ModelCheckpoint(
    dirpath='checkpoints/',
    filename='TANH-{epoch:02d}-{val_accuracy:.2f}'
)
progress_callback = RichProgressBar(leave=True)

trainer = pl.Trainer(max_epochs=100, callbacks=[checkpoint_callback, progress_callback])
trainer.test(model, validloader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Output()

[{'test_loss': 4.611839294433594, 'test_accuracy': 0.10100000351667404}]

In [34]:
mins = []
maxes = []

for i in range(len(values)):
    min = values[i].min()
    max = values[i].max()
    mins.append(min)
    maxes.append(max)
    

In [35]:
np.array(mins).min(), np.array(mins).max()

(-11.799783, -0.25299513)

In [36]:
np.array(maxes).min(), np.array(maxes).max()

(0.80791754, 11.277118)

In [43]:
torch.Tensor(maxes[0:1]).pow(3)

tensor([2.8427])

In [37]:
from mpmath import mp
import numpy

# Taken from https://github.com/DKenefake/OptimalPoly/blob/main/remez_poly.py

def bisection_search(f, low:float, high:float):
    """
    A root finding method that does not rely on derivatives

    :param f: a function f: X -> R
    :param low: the lower bracket
    :param high: the upper limit bracket
    :return: the location of the root, e.g. f(mid) ~ 0
    """
    # flip high and low if out of order
    if f(high) < f(low):
        low, high = high, low

    # find mid point
    mid = .5 * (low + high)

    while True:

        # bracket up
        if f(mid) < 0:
            low = mid
        # braket down
        else:
            high = mid

        # update mid point
        mid = .5 * (high + low)

        # break if condition met
        if abs(high - low) < 10 ** (-(mp.dps / 2)):
            break

    return mid


def concave_max(f, low:float, high:float):
    """
    Forms a lambda for the approximate derivative and finds the root

    :param f: a function f: X -> R
    :param low: the lower bracket
    :param high: the upper limit bracket
    :return: the location of the root f'(mid) ~ 0
    """
    # create an approximate derivative expression
    scale = high - low

    h = mp.mpf('0.' + ''.join(['0' for i in range(int(mp.dps / 1.5))]) + '1') * scale
    df = lambda x: (f(x + h) - f(x - h)) / (2.0 * h)

    return bisection_search(df, low, high)

def chev_points(n:int, lower:float = -1, upper:float = 1):
    """
    Generates a set of chebychev points spaced in the range [lower, upper]
    :param n: number of points
    :param lower: lower limit
    :param upper: upper limit
    :return: a list of multipressison chebychev points that are in the range [lower, upper]
    """
    #generate chebeshev points on a range [-1, 1]
    index = numpy.arange(1, n+1)
    range_ = abs(upper - lower)
    return [(.5*(mp.cos((2*i-1)/(2*n)*mp.pi)+1))*range_ + lower for i in index]


def remez(func, n_degree:int, lower:float=-1, upper:float=1, max_iter:int = 10):
    """
    :param func: a function (or lambda) f: X -> R
    :param n_degree: the degree of the polynomial to approximate the function f
    :param lower: lower range of the approximation
    :param upper: upper range of the approximation
    :return: the polynomial coefficients, and an approximate maximum error associated with this approximation
    """
    # initialize the node points

    x_points = chev_points(n_degree + 2, lower, upper)

    A = mp.matrix(n_degree + 2)
    coeffs = numpy.zeros(n_degree + 2)

    # place in the E column
    mean_error = float('inf')

    for i in range(n_degree + 2):
        A[i, n_degree + 1] = (-1) ** (i + 1)

    for i in range(max_iter):

        # build the system
        vander = numpy.polynomial.chebyshev.chebvander(x_points, n_degree)

        for i in range(n_degree + 2):
            for j in range(n_degree + 1):
                A[i, j] = vander[i, j]

        b = mp.matrix([func(x) for x in x_points])
        l = mp.lu_solve(A, b)

        coeffs = l[:-1]

        # build the residual expression
        r_i = lambda x: (func(x) - numpy.polynomial.chebyshev.chebval(x, coeffs))

        interval_list = list(zip(x_points, x_points[1:]))
        #         interval_list = [[x_points[i], x_points[i+1]] for i in range(len(x_points)-1)]

        intervals = [upper]
        intervals.extend([bisection_search(r_i, *i) for i in interval_list])
        intervals.append(lower)

        extermum_interval = [[intervals[i], intervals[i + 1]] for i in range(len(intervals) - 1)]

        extremums = [concave_max(r_i, *i) for i in extermum_interval]

        extremums[0] = mp.mpf(upper)
        extremums[-1] = mp.mpf(lower)

        errors = [abs(r_i(i)) for i in extremums]
        mean_error = numpy.mean(errors)

        if numpy.max([abs(error - mean_error) for error in errors]) < 0.000001 * mean_error:
            break

        x_points = extremums

    return [float(i) for i in numpy.polynomial.chebyshev.cheb2poly(coeffs)], float(mean_error)

def c_code_gen(data_type, name, poly_coeffs, comments = None):
    method_string = f'{data_type} {name} ({data_type} x)' + '{\n'
    
    if comments is not None:
        method_string += '\t// ' + str(comments) + ' \n\n'
    
    data_type_converter = '' if data_type == 'double' else 'f'
    
    method_string += '\n'.join([f'\tconst {data_type} a_{i} = {str(val) + data_type_converter};' for i, val in enumerate(poly_coeffs)])
    
    horner = 'return a_0+'
    for i in range(len(poly_coeffs)-2):
        horner += f'x*(a_{i+1} +' 
    horner += f'x*a_{len(poly_coeffs)-1}' + ')'*(len(poly_coeffs)-2) + ';\n}'
    
    return method_string + '\n \t' + horner

In [45]:
function = lambda x: mp.tanh(x)
poly_coeffs, max_error = remez(function, 5, -12, 12)
[np.format_float_positional(c, precision=9) for c in poly_coeffs]

['0.004802802', '0.307091252', '-0.003282771', '-0.001553874']

In [40]:
0.006769816 + 0.554670504 * x - 0.009411195 * x**2 - 0.014187547 * x**3

SyntaxError: 'return' outside function (295807020.py, line 1)