# Module

In [1]:
import argparse
import tqdm
from tqdm import tqdm_notebook as tq
import os, time, math, copy
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from collections import namedtuple
import matplotlib.pyplot as plt

torch.set_printoptions(precision=8, linewidth=50000)
import warnings
warnings.filterwarnings(action='ignore')

# Print Colors

In [2]:
BLACK	= '\033[30m'
RED		= '\033[31m'
GREEN	= '\033[32m'
YELLOW	= '\033[33m'
BLUE	= '\033[34m'
MAGENTA	= '\033[35m'
CYAN	= '\033[36m'
RESET	= '\033[0m'
SEL		= '\033[7m'

In [3]:
class	fxp:
	def	__init__(self, bIn, iBWF):
		self.iFullBW	= len(bIn)
		self.iIntgBW	= self.iFullBW - iBWF
		self.bSign		= bIn[0]
		self.bIntg		= bIn[:self.iIntgBW]
		self.bFrac		= bIn[self.iIntgBW:]
		self.fFull		= 0
		try:
			for idx, bit in enumerate(bIn):
				if	idx == 0:
					self.fFull = self.fFull + int(bit,2) * -pow(2, self.iIntgBW - 1)
				else:
					self.fFull = self.fFull + int(bit,2) * pow(2, self.iIntgBW - 1 - idx)
		except:
			print(bIn)
		self.dispFull	= RED + self.bIntg + BLUE + self.bFrac + RESET
		return

In [4]:
class	flp2fix:
	def	__init__(self, fIn, iBW, iBWF):
		self.fMin		= - 2 ** (iBW - iBWF - 1)
		self.fMax		= (2 ** (iBW-1) - 1) * (2 ** -iBWF)
		self.fResol		= 2 ** -iBWF
		if fIn < self.fMin or fIn > self.fMax:
			print(f'({fIn}): Out of input range ({self.fMax}/{self.fMin}) during flp -> fix converting ')
		self.iBW		= iBW
		self.iBWI		= iBW - iBWF
		self.iBWF		= iBWF

		self.iFLP2INT	= abs(int(fIn * 2 ** iBWF))
		if fIn < 0:
			self.iFLP2INT = 2 ** (iBW-1) - self.iFLP2INT

		if fIn >= 0:
			self.bFull = bin(self.iFLP2INT)[2:].rjust(iBW, '0')
		else:
			self.bFull = '1'+bin(self.iFLP2INT)[2:].rjust(iBW-1, '0')
			if len(self.bFull) > iBW:
				self.bFull = '0' * iBW

		self.cssFxp		= fxp(self.bFull, self.iBWF)
		self.bSign		= self.cssFxp.bSign
		self.bIntg		= self.cssFxp.bIntg
		self.bFrac		= self.cssFxp.bFrac
		self.fFull		= self.cssFxp.fFull
		return

In [5]:
# def	flp2fixTensor(fIn, iBW, iBWF):
# 	fMin = - 2 ** (iBW - iBWF - 1)
# 	fMax = (2 ** (iBW-1) - 1) * (2 ** -iBWF)
# 	fTensor = fIn * (2 ** iBWF)
# 	fTensor = fTensor.round() * (2 ** -iBWF)
# 	if (fTensor.min() < fMin or fMax < fTensor.max()):
# 		print(f'fMin: {fMin}, fMax: {fMax}, fTensor.Min:{fTensor.min()}, fTensor.Max:{fTensor.max()}')
# 	return fTensor

In [6]:
def	flp2fixTensor(fIn, iBW, iBWF):
	fMin = - 2 ** (iBW - iBWF - 1)
	fMax = (2 ** (iBW-1) - 1) * (2 ** -iBWF)
	fList = []
	for aTensor in fIn.view(-1):
		fList.append(flp2fix(aTensor, iBW, iBWF).fFull)
	return torch.tensor(fList).view(fIn.size())

# User Define Variable

In [7]:
data_path = '~/dataset'

# Parser

In [8]:
parser = argparse.ArgumentParser(description='PyTorch for MNIST dataset')
parser.add_argument('--device', type=str, default='cpu', help='Device')
parser.add_argument('--shuffle', action='store_true', default=False, help='enables data shuffle')
parser.add_argument('--dataset', type=str, default='mnist', help='training dataset')
parser.add_argument('--data_path', type=str, default=data_path, help='path to MNIST')
parser.add_argument('--batch_size', type=int, default=64, help='batch size')
parser.add_argument('--epochs', type=int, default=10, help='number of epochs to train')
parser.add_argument('--lr', type=float, default=0.001, help='learning rate')
parser.add_argument('--optimizer', type=str, default='adam', help='optimizer')
parser.add_argument('--loss_func', type=str, default='cel', help='optimizer')
parser.add_argument('--quant_opt', type=str, default='asym', help='Type of Quantization')
parser.add_argument('--full_bits', type=int, default=16, help='Number of Quantization Bits')
parser.add_argument('--frac_bits', type=int, default=8, help='Number of Quantization Bits')
parser.add_argument('--pretrained', type=bool, default=True, help='Pretrained Model')
parser.add_argument('--act_quant', type=bool, default=False, help='Activation Quantization')
parser.add_argument('--disp', type=bool, default=False, help='Display Model Information')

args = parser.parse_args(args=[])

# Preparing Data

In [9]:
kwargs = {'num_workers': 1, 'pin_memory': True} if args.device == 'cuda' else {}
data_transform = transforms.Compose([transforms.Resize((32,32)),transforms.ToTensor()])
if args.dataset == 'mnist':
	train_loader = torch.utils.data.DataLoader(
		dataset=datasets.MNIST(
			root=args.data_path,
			train=True,
			download=True,
			transform=data_transform
		),
		batch_size=args.batch_size,
		shuffle=args.shuffle,
		**kwargs
	)

	test_loader = torch.utils.data.DataLoader(
		dataset=datasets.MNIST(
			root=args.data_path,
			train=False,
			download=True,
			transform=data_transform
		),
		batch_size=args.batch_size,
		shuffle=args.shuffle,
		**kwargs
	)

# Build Model

In [10]:
#class MLP(nn.Module):
#	def __init__(self):
#		super(MLP, self).__init__()
#		self.flatten = nn.Flatten()
#		self.fc1 = nn.Linear(28*28, 16)
#		self.relu1 = nn.ReLU()
#		self.fc2 = nn.Linear(16, 16)
#		self.relu2 = nn.ReLU()
#		self.fc3 = nn.Linear(16, 10)
		
#	def forward(self, x):
#		x = self.flatten(x)
#		x = self.fc1(x)
#		x = self.relu1(x)
#		x = self.fc2(x)
#		x = self.relu2(x)
#		logits = self.fc3(x)
#		return logits

In [11]:
class LeNet_5(nn.Module):
    def __init__(self):
        super(LeNet_5,self).__init__()
        self.flatten = nn.Flatten()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5, stride=1)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5, stride=1)
        self.conv3 = nn.Conv2d(16, 120, kernel_size=5, stride=1)  # Flatten layer
        self.fc1 = nn.Linear(120, 84)
        self.fc2 = nn.Linear(84, 10)
        self.pool = nn.AvgPool2d(kernel_size=2, stride=2)
        self.act = nn.Tanh()

    def forward(self, x):
        x = F.tanh(self.conv1(x))
        x = F.avg_pool2d(x, 2, 2)
        x = F.tanh(self.conv2(x))
        x = F.avg_pool2d(x, 2, 2)
        x = F.tanh(self.conv3(x))
        x = self.flatten(x)
#        x = x.view(-1, 120) # Flatten layer
        x = F.tanh(self.fc1(x))
        x = self.fc2(x)
        return x

model = LeNet_5()
print(model)

LeNet_5(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (conv3): Conv2d(16, 120, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=120, out_features=84, bias=True)
  (fc2): Linear(in_features=84, out_features=10, bias=True)
  (pool): AvgPool2d(kernel_size=2, stride=2, padding=0)
  (act): Tanh()
)


In [12]:
def genOptimizer(model, args):
	if args.optimizer == 'sgd':
		optimizer = torch.optim.SGD(model.parameters(), lr=args.lr)
	if args.optimizer == 'adam':
		optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
	return optimizer

def genLossFunc(args):
	if args.loss_func == 'cel':
		loss_func = nn.CrossEntropyLoss()
	return loss_func

In [13]:
def train(train_loader, model, epoch, args):
	model.train()
	loss_func = genLossFunc(args)
	optimizer = genOptimizer(model, args)
	max_batch_index = int(np.floor(len(train_loader.dataset)/args.batch_size))
	running_loss = 0
	for batch_index, (image, label) in enumerate(tq(train_loader, desc='Train', leave=False)):
		image, label = image.to(args.device), label.to(args.device)
		pred = model(image)
		loss = loss_func(pred, label)
		running_loss += loss.item()#*image.size(0)

		optimizer.zero_grad()
		loss.backward()
		optimizer.step()
				
	print(f'Epoch {epoch+1:<3d}: Avg. Loss: {running_loss/len(train_loader.dataset):.4f}', end = '\t')

In [14]:
def test(test_loader, model, args):
	model.eval()
	with torch.no_grad():
		loss_func = genLossFunc(args)
		loss, correct = 0, 0
# 		for batch_index, (image, label) in enumerate(tq(test_loader, desc='Test', leave=False)):
		for batch_index, (image, label) in enumerate(test_loader):
			image, label = image.to(args.device), label.to(args.device)
			pred = model(image)
			loss += loss_func(pred, label).item()#*image.size(0)
			correct += (pred.argmax(1) == label).type(torch.int).sum().item()
	loss /= len(test_loader.dataset)
	correct_rate = 100 * correct / len(test_loader.dataset)
	print(f'Accuracy: {correct}/{len(test_loader.dataset)} ({correct_rate:>.1f}%)')

In [15]:
def main(model):
	for epoch in range(args.epochs):
		train(train_loader, model, epoch, args)
		test(test_loader, model, args)
	print("Done!")
	return model

model = main(LeNet_5().to(args.device))

Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 1  : Avg. Loss: 0.0047	Accuracy: 9598/10000 (96.0%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 2  : Avg. Loss: 0.0016	Accuracy: 9723/10000 (97.2%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 3  : Avg. Loss: 0.0011	Accuracy: 9768/10000 (97.7%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 4  : Avg. Loss: 0.0008	Accuracy: 9786/10000 (97.9%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 5  : Avg. Loss: 0.0006	Accuracy: 9767/10000 (97.7%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 6  : Avg. Loss: 0.0005	Accuracy: 9806/10000 (98.1%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 7  : Avg. Loss: 0.0004	Accuracy: 9787/10000 (97.9%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 8  : Avg. Loss: 0.0003	Accuracy: 9813/10000 (98.1%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 9  : Avg. Loss: 0.0003	Accuracy: 9781/10000 (97.8%)


Train:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 10 : Avg. Loss: 0.0002	Accuracy: 9784/10000 (97.8%)
Done!


In [16]:
if args.pretrained:
	model = LeNet_5().to(args.device)
	model.load_state_dict(torch.load('PTQ_LeNet.pth'))
	test(test_loader, model, args)
else:
	model = main(LeNet_5().to(args.device))
	torch.save(model.state_dict(), 'PTQ_LeNet.pth')

Accuracy: 9592/10000 (95.9%)


In [17]:
def model2fix(model, args):
	for name, _ in model.named_parameters():
		exec(f'model.{name}.data = flp2fixTensor(model.{name}.data, {args.full_bits}, {args.frac_bits})')
	return model

In [18]:
# def in2fix(images, full_width=args.full_bits, frac_width=args.frac_bits):
# 	dim_images = images.size()
# 	images = images.view(-1)
# 	for idx_image, image in enumerate(images):
# 		temp_css = flp2fix(image, full_width, frac_width)
# 		images[idx_image] = torch.tensor(temp_css.fFull)
# 		del temp_css
# 	return images.view(dim_images)

In [19]:
def quantFixForward(model, x, args):
    cmodel = copy.deepcopy(model).to(args.device)
    with torch.no_grad():

        act0 = cmodel.pool(cmodel.act(cmodel.conv1(x)))
        act0 = flp2fixTensor(act0, args.full_bits, args.frac_bits)
        
        act1 = cmodel.pool(cmodel.act(cmodel.conv2(act0)))
        act1 = flp2fixTensor(act1, args.full_bits, args.frac_bits)
        
        act2 = cmodel.act(cmodel.conv3(act1))
        act2 = flp2fixTensor(act2, args.full_bits, args.frac_bits)

        act3 = cmodel.flatten(act2)
        act3 = flp2fixTensor(act3, args.full_bits, args.frac_bits)
		
        act4 = cmodel.act(cmodel.fc1(act3))
        act4 = flp2fixTensor(act4, args.full_bits, args.frac_bits)
        
        act5 = cmodel.fc2(act4)
        act5 = flp2fixTensor(act5, args.full_bits, args.frac_bits)
        
    return cmodel, act0, act1, act2, act3, act4, act5

In [156]:
act0.size()

torch.Size([16, 6, 14, 14])

In [157]:
act1.size()

torch.Size([6, 6, 1, 1])

In [158]:
act2.size()

torch.Size([16, 120, 1, 1])

In [159]:
act3.size()

torch.Size([16, 120])

In [160]:
act4.size()

torch.Size([16, 84])

In [162]:
act5.size()

torch.Size([16, 10])

In [None]:
def testQuant(model, test_loader, args):
	qmodel = copy.deepcopy(model).to(args.device)
	qmodel = model2fix(qmodel, args)
	qmodel.eval()

	with torch.no_grad():
		loss_func = genLossFunc(args)
		loss, correct = 0, 0
		for batch_index, (image, label) in enumerate(test_loader):
			image, label = image.to(args.device), label.to(args.device)
			qmodel, act0, act1, act2, act3, act4, act5  = quantFixForward(qmodel, image, args)
			y = act5
			loss += loss_func(y, label).item()#*image.size(0)
			correct += (y.argmax(1) == label).type(torch.int).sum().item()
	correct_rate = 100 * correct / len(test_loader.dataset)
	print(f'Accuracy: {correct}/{len(test_loader.dataset)} ({correct_rate:>.1f}%) Loss: {loss/len(test_loader.dataset):.2f}')
	return qmodel, act0, act1, act2, act3, act4, act5

In [None]:
qmodel, act0, act1, act2, act3, act4, act5 = testQuant(model, test_loader, args)

Accuracy: 9585/10000 (95.8%) Loss: 0.00


In [None]:
def extractParams(model, args):
    for key in model.state_dict().keys():
        layer_name = key.split('.')[0]
        param_type = 'w' if 'weight' in key else 'b'
        for idx, params in enumerate(eval(f'qmodel.{key}.data')):
        #    print(idx)
        #    print(params.dim())
            with open(f'./mif/{layer_name}_{param_type}_{idx}.mif', 'w') as fh:
                if param_type == 'w':
                    if params.dim() == 3 :
                        for idx, dim1 in enumerate(params):
                            for idx, dim2 in enumerate(dim1):
                                for idx, param in enumerate(dim2):
                                    bin_param = flp2fix(param, args.full_bits, args.frac_bits).bFull
                                    fh.write(bin_param + ('\n','')[idx == len(params)-1])
                    elif params.dim() == 1: # fc2 layer
                        for idx, param in enumerate(params):
                            bin_param = flp2fix(param, args.full_bits, args.frac_bits).bFull
                            fh.write(bin_param + ('\n', '')[idx == len(params)-1])
                    
                #    for idx, param in enumerate(params):
                #        bin_param = flp2fix(param, args.full_bits, args.frac_bits).bFull
                #        fh.write(bin_param + ('\n','')[idx == len(params)-1])
                else:
                    bin_param = flp2fix(params, args.full_bits, args.frac_bits).bFull
                    fh.write(bin_param)

In [None]:
def genInputVector(test_loader, args):
	out_path = './vec'
	os.system(f'rm -rf {out_path};mkdir -p {out_path}')
	with open(f'{out_path}/labels.vec', 'w') as fh_labels:
		with open(f'{out_path}/images.vec', 'w') as fh_images:
			for batch_index, (images, labels) in enumerate(test_loader):
				for (image, label) in zip(images, labels):
					bin_label = flp2fix(label, args.full_bits, 0).bFull
					fh_labels.write(bin_label+'\n')
					for pixel in image.view(-1):
						bin_pixel = flp2fix(pixel, args.full_bits, args.frac_bits).bFull
						fh_images.write(bin_pixel+'\n')
                        

In [24]:
#if not args.pretrained:
extractParams(model, args)
genInputVector(test_loader, args)

In [25]:
test_data = datasets.MNIST(	root=args.data_path,
							train=False,
							download=True,
							transform=data_transform)

In [129]:
test_data

Dataset MNIST
    Number of datapoints: 10000
    Root location: /home/jaeeun/dataset
    Split: Test
    StandardTransform
Transform: Compose(
               Resize(size=(32, 32), interpolation=PIL.Image.BILINEAR)
               ToTensor()
           )

In [128]:
test_data[0][0][0].size()

torch.Size([32, 32])

In [93]:
test_data.data.size()

torch.Size([10000, 28, 28])

In [26]:
test_iter = iter(test_loader)

In [27]:
imgs, labs = next(test_iter)

In [28]:
labs[0:32]

tensor([7, 2, 1, 0, 4, 1, 4, 9, 5, 9, 0, 6, 9, 0, 1, 5, 9, 7, 3, 4, 9, 6, 6, 5, 4, 0, 7, 4, 0, 1, 3, 1])

In [84]:
labs.size()

torch.Size([64])

In [29]:
qmodel.fc1.weight[0][0:8]

tensor([-0.06250000, -0.01562500,  0.05859375, -0.11328125,  0.01171875,  0.05859375, -0.01171875, -0.06640625], grad_fn=<SliceBackward0>)

In [30]:
model.conv1.weight

Parameter containing:
tensor([[[[ 0.18045330,  0.19414586,  0.20404883,  0.24982135,  0.33966962],
          [ 0.31025395,  0.31378302,  0.30601561,  0.33577812,  0.41172349],
          [ 0.33776858,  0.35133591,  0.34539074,  0.35325623,  0.37955925],
          [ 0.30402496,  0.32960871,  0.33463514,  0.32065710,  0.29119423],
          [ 0.27029982,  0.30737719,  0.30560264,  0.24157465,  0.13625953]]],


        [[[-0.21190982,  0.07295746, -0.23887797, -0.04612822, -0.34444273],
          [-0.24775240, -0.20183615, -0.15598902, -0.19940718, -0.27916923],
          [ 0.14290810, -0.01709873, -0.01640831, -0.19280452, -0.11677750],
          [ 0.03226719,  0.02781692,  0.05474370,  0.12571493, -0.17665192],
          [ 0.22604902,  0.31695715,  0.31795967, -0.04907313,  0.13301644]]],


        [[[ 0.37560281,  0.34097508,  0.23746356,  0.12254444,  0.01867356],
          [ 0.01831674, -0.03290712,  0.07245599, -0.05952620,  0.01810361],
          [-0.18861026, -0.11805234,  0.045135

In [31]:
qmodel.conv1.weight[0][0]

tensor([[0.17968750, 0.19140625, 0.20312500, 0.24609375, 0.33593750],
        [0.30859375, 0.31250000, 0.30468750, 0.33203125, 0.41015625],
        [0.33593750, 0.34765625, 0.34375000, 0.35156250, 0.37890625],
        [0.30078125, 0.32812500, 0.33203125, 0.32031250, 0.28906250],
        [0.26953125, 0.30468750, 0.30468750, 0.23828125, 0.13281250]], grad_fn=<SelectBackward0>)

In [70]:
test_data[0][1]

7

In [34]:
test_data[0][0].size() # 1024개의 elements(32x32) 1차원으로 핀 것

torch.Size([1, 32, 32])

In [52]:
test_data[0][0][0][1]

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [37]:
model.conv1.bias

Parameter containing:
tensor([-0.80548680,  0.57787883,  0.04906479,  0.55835277, -0.27133316, -0.16827926], requires_grad=True)

In [36]:
qmodel.conv1.weight[0][0][0:5]

tensor([[0.17968750, 0.19140625, 0.20312500, 0.24609375, 0.33593750],
        [0.30859375, 0.31250000, 0.30468750, 0.33203125, 0.41015625],
        [0.33593750, 0.34765625, 0.34375000, 0.35156250, 0.37890625],
        [0.30078125, 0.32812500, 0.33203125, 0.32031250, 0.28906250],
        [0.26953125, 0.30468750, 0.30468750, 0.23828125, 0.13281250]], grad_fn=<SliceBackward0>)

In [49]:
qmodel.conv1.weight[0][0][0]

tensor([0.17968750, 0.19140625, 0.20312500, 0.24609375, 0.33593750], grad_fn=<SelectBackward0>)

In [50]:
qmodel.conv1.weight[0][0][1]

tensor([0.30859375, 0.31250000, 0.30468750, 0.33203125, 0.41015625], grad_fn=<SelectBackward0>)

In [39]:
qmodel.conv1.bias

Parameter containing:
tensor([-0.80468750,  0.57421875,  0.04687500,  0.55468750, -0.26953125, -0.16796875], requires_grad=True)

In [40]:
def evaluation(train_loader):
    total, correct = 0, 0
    for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to(args.device), labels.to(args.device)
        output = model(inputs)
        max_pred, pred = torch.max(output.data, dim=1)
        total += labels.size(0)
        correct += (pred == labels).sum().item()
    return 100 * correct / total

evaluation(train_loader)

95.585

In [41]:
def evaluation(test_loader):
    total, correct = 0, 0
    for data in test_loader:
        inputs, labels = data
        inputs, labels = inputs.to(args.device), labels.to(args.device)
        output = model(inputs)
        max_pred, pred = torch.max(output.data, dim=1)
        total += labels.size(0)
        correct += (pred == labels).sum().item()
    return 100 * correct / total

evaluation(test_loader)

95.92