In [None]:
import numpy as np
import scipy as scp
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.model_selection import train_test_split

#Keras Imports
from tensorflow import keras
from tensorflow.keras import backend as K

from tensorflow.keras.datasets import cifar10, mnist
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Layer, Activation, Flatten, BatchNormalization
from tensorflow.keras.models import Sequential, Model,load_model
from tensorflow.keras import activations

import os
import sys


#Utility
# from utils import binarize
sys.path.insert(0, '..')
from binarization_utils import *
# from model_architectures import get_model

#Probability
import tensorflow_probability as tfp


from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input

In [None]:
print(tf.__version__)
print(keras.__version__)

In [None]:
training=True
dataset='CIFAR-10'
# Train=True
# Evaluate=False

batch_norm_eps=1e-4
batch_norm_alpha=0.1

# for loading SVHN dataset
def load_svhn(path_to_dataset):
	import scipy.io as sio
	train=sio.loadmat(path_to_dataset+'/train.mat')
	test=sio.loadmat(path_to_dataset+'/test.mat')
	extra=sio.loadmat(path_to_dataset+'/extra.mat')
	X_train=np.transpose(train['X'],[3,0,1,2])
	y_train=train['y']-1

	X_test=np.transpose(test['X'],[3,0,1,2])
	y_test=test['y']-1

	X_extra=np.transpose(extra['X'],[3,0,1,2])
	y_extra=extra['y']-1

	X_train=np.concatenate((X_train,X_extra),axis=0)
	y_train=np.concatenate((y_train,y_extra),axis=0)

	return (X_train,y_train),(X_test,y_test)

In [None]:
if dataset=="MNIST":
	(X_train, y_train), (X_test, y_test) = mnist.load_data()
	# convert class vectors to binary class matrices
	X_train = X_train.reshape(-1,784)
	X_test = X_test.reshape(-1,784)
	use_generator=False
elif dataset=="CIFAR-10":
	use_generator=True
	(X_train, y_train), (X_test, y_test) = cifar10.load_data()
elif dataset=="SVHN":
	use_generator=True
	(X_train, y_train), (X_test, y_test) = load_svhn('svhn_data')
else:
	raise("dataset should be one of the following: [MNIST, CIFAR-10, SVHN].")

    
#Restructure Data
X_train=X_train.astype(np.float32)
X_test=X_test.astype(np.float32)
# Y_train = to_categorical(y_train, 10)
# Y_test = to_categorical(y_test, 10)
X_train /= 255
X_test /= 255
X_train=2*X_train-1
X_test=2*X_test-1


#Train Val Split
X_train,X_val,y_train, y_val = train_test_split(X_train,y_train,test_size = 0.10)
print('X_train shape:', X_train.shape)
print(X_train.shape[0], 'train samples')
print(y_train.shape[0], 'train samples')

print(X_val.shape[0], 'val samples')
print(y_val.shape[0], 'val samples')

print(X_test.shape[0], 'test samples')
print(y_test.shape[0], 'test samples')


In [None]:
class Sign_layer(Layer):
    def __init__(self, levels=1,**kwargs):
        self.levels=levels
        super(Sign_layer, self).__init__(**kwargs)
    def build(self, input_shape):
        ars=np.arange(self.levels)+1.0
        ars=ars[::-1]
        means=ars/np.sum(ars)
        self.means=[K.variable(m) for m in means]
        self._trainable_weights = self.means
        
    def call(self, x, mask=None):
        resid = x
        out_bin=0
        for l in range(self.levels):
            out=binarize(resid)*(K.abs(self.means[l]))
#             print(out)
            out_bin=out_bin+out
            resid=resid-out
        return out_bin
    
        # the following lines were an idea to implement flips using tensor operations
        '''positive_mask = tf.cast(out_bin > 0, tf.float32)
        negative_mask = tf.cast(out_bin < 0, tf.float32)
        
        positive_flips = tf.random.uniform(out_bin.shape) < p[1]
        positives = tf.math.multiply(positive_mask, (tf.cast(tf.random.uniform(out_bin.shape) < p[1], tf.float32) - 1))
        negatives = tf.math.multiply(negative_mask, (tf.cast(tf.random.uniform(out_bin.shape) < p[0], tf.float32) - 1))
        return'''

    def get_output_shape_for(self,input_shape):
        return input_shape
    def compute_output_shape(self,input_shape):
        return input_shape
    def set_means(self,X):
        means=np.zeros((self.levels))
        means[0]=1
        resid=np.clip(X,-1,1)
        approx=0
        for l in range(self.levels):
            m=np.mean(np.absolute(resid))
            out=np.sign(resid)*m
            approx=approx+out
            resid=resid-out
            means[l]=m
            err=np.mean((approx-np.clip(X,-1,1))**2)

        means=means/np.sum(means)
        sess=K.get_session()
        sess.run(self.means.assign(means))
    def get_config(self):

        config = super().get_config().copy()
        config.update({
            'levels': self.levels
        })
        return config
    
class binary_dense_error_var(Layer):
    def __init__(self,n_in,n_out,error_type,error_dict,**kwargs):
        self.n_in=n_in
        self.n_out=n_out
        self.error_type=error_type
        self.error_dict= error_dict
        assert self.error_type in ["NoError", "Flip","InnerDot"], "error_type of {0} is invalid".format(error_type)
        super(binary_dense_error_var,self).__init__(**kwargs)
        
        if(error_type == "InnerDot"):
            max_pop = self.n_in
            self.uh = self.error_dict["uh"]
            self.ul = self.error_dict["ul"]
            self.sigl = self.error_dict["sigl"]
            self.sigh = self.error_dict["sigh"]

            #Not Used, Mainly used for information
            means = [m * self.ul + (max_pop - m) * self.uh for m in range(0, max_pop+1)]
            stdvs = [np.sqrt(m * (self.sigl**2) + (max_pop - m)*(self.sigh**2)) for m in range(0, max_pop+1)]
            self.distributions = [tfp.distributions.Normal(loc= m, scale = sig) for m,sig in zip(means,stdvs)]
    
        elif(error_type == "Flip"):
            self.p = self.error_dict["p"]
            
    def build(self, input_shape):
        stdv=1/np.sqrt(self.n_in)
        w = np.random.normal(loc=0.0, scale=stdv,size=[self.n_in,self.n_out]).astype(np.float32)
        self.w=K.variable(w)
        self.gamma_w=K.variable(1.0)
        self._trainable_weights=[self.w,self.gamma_w]
        
        
#         if(error_type == "InnerDot"):
#             max_pop = self.n_in+1
#             self.uh = self.error_dict["uh"]
#             self.ul = self.error_dict["ul"]
#             self.sigl = self.error_dict["sigl"]
#             self.sigh = self.error_dict["sigh"]

#             means = [m * ul + (max_pop - 1 - m) * uh for m in range(0, max_pop+1)]
#             stdvs = [np.sqrt(m * (sigl**2) + (max_pop - 1 - m)*(sigh**2)) for m in range(0, max_pop+1)]
#             self.distributions = [tfp.distributions.Normal(loc= m, scale = sig) for m,sig in zip(means,stdvs)]
            
    def call(self, x,training=None):
        
        #Designed with Batch norm in mind, Binarization done after. 
        if(self.error_type == "NoError"):
            self.clamped_w=binarize(self.w)
            self.prod=K.abs(self.gamma_w)*K.dot(x,self.clamped_w)
            self.out = self.prod
            return self.out
        elif(self.error_type == "Flip"):
            self.clamped_w=binarize(self.w)*((2*tf.cast(tf.random.uniform(self.w.shape) > self.p, tf.float32)) - 1)
            self.prod=K.abs(self.gamma_w)*K.dot(x,self.clamped_w)
            self.out = self.prod
            return self.out
        elif(self.error_type == "InnerDot"):
            
            if training:
                
                #Perfrom error model on inference
                #No Error Model first
                self.clamped_w=binarize(self.w)
                
                self.prod=K.abs(self.gamma_w)*K.dot(x,self.clamped_w)
                
                # Make Model
                self.clamped_x=binarize(x)
                gamma_x = tf.math.maximum(K.abs(tf.math.reduce_min(x)),K.abs(tf.math.reduce_max(x)))
                #self.prod = K.dot(x,self.clamped_w)
                self.pop = (K.dot(self.clamped_x,self.clamped_w)+self.n_in)/2

                #Add Error 
                self.dist = tfp.distributions.Normal(loc = self.pop * self.ul + (self.n_in - self.pop) * self.uh,
                                                     scale= tf.math.sqrt(self.pop * (self.sigl**2) + (self.n_in - self.pop)*(self.sigh**2)))
                
                self.samps = self.dist.sample(1)
                self.samps = tf.squeeze(self.samps, axis = 0)
        
                #Use Histogram for binning
                self.step = self.ul - self.uh
                self.edges = np.arange(self.n_in*self.uh-self.step/2, self.n_in*self.ul+self.step, self.step,dtype=np.float32)
                self.bins= tfp.stats.find_bins(self.samps, self.edges, extend_lower_interval = True, extend_upper_interval=True)
                self.out = K.abs(self.gamma_w)*gamma_x*(2*self.bins-self.n_in)
                
                
                #Gradient not using binning
                return  self.prod + K.stop_gradient(self.out -  self.prod)

            else:
                #Perfrom error model on inference
                #No Error Model first
                self.clamped_w=binarize(self.w)
                self.clamped_x=binarize(x)
                gamma_x = tf.math.maximum(K.abs(tf.math.reduce_min(x)),K.abs(tf.math.reduce_max(x)))
                #self.prod = K.dot(x,self.clamped_w)
                self.pop = (K.dot(self.clamped_x,self.clamped_w)+self.n_in)/2

                #Add Error 
                self.dist = tfp.distributions.Normal(loc = self.pop * self.ul + (self.n_in - self.pop) * self.uh,
                                                     scale= tf.math.sqrt(self.pop * (self.sigl**2) + (self.n_in - self.pop)*(self.sigh**2)))
                

                self.samps = self.dist.sample(1)
                self.samps = tf.squeeze(self.samps, axis = 0)
        
                #Use Histogram for binning
                self.step = self.ul - self.uh
                self.edges = np.arange(self.n_in*self.uh-self.step/2, self.n_in*self.ul+self.step, self.step,dtype=np.float32)
                self.bins= tfp.stats.find_bins(self.samps, self.edges, extend_lower_interval = True, extend_upper_interval=True)
                self.out = K.abs(self.gamma_w)*gamma_x*(2*self.bins-self.n_in)
                
            
                return self.out
        
        
    
    def get_output_shape_for(self,input_shape):
        return (self.n_in, self.n_out)
    
    
    def compute_output_shape(self,input_shape):
        return (self.n_in, self.n_out)
    
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'n_in': self.n_in,
            'n_out': self.n_out,
            'error_type': self.error_type,
            'error_dict': self.error_dict
        })
        return config

    

In [None]:
class conv_batchnorm_sign(Layer):
    def __init__(self,nfilters,ch_in,k,padding,error_type,error_dict,strides=(1,1),batch_norm_params={"eps" : 1e-4, "alpha" : 0.1},reg_params={"type" : None, "b" : 32, "lambda" : 0.0001, "alpha" : 1},**kwargs):
        self.nfilters=nfilters
        self.ch_in=ch_in
        self.k=k
        self.padding=padding
        self.strides=strides
        self.error_type=error_type
        self.error_dict= error_dict
        self.batch_norm_eps = batch_norm_params["eps"]
        self.batch_norm_alpha = batch_norm_params["alpha"]
        
        # regularization hyperparameters
        self.loss_type = reg_params["type"]
        # desired bit error tolerance
        self.b = reg_params["b"]
        self.reg_lambda = reg_params["lambda"]
        self.alpha = reg_params["alpha"]
        
        assert self.error_type in ["NoError", "Flip","InnerDot"], "error_type of {0} is invalid".format(error_type)
        

        super(conv_batchnorm_sign,self).__init__(**kwargs)
        
        if(error_type == "InnerDot"):
            self.uh = self.error_dict["uh"]
            self.ul = self.error_dict["ul"]
            self.sigl = self.error_dict["sigl"]
            self.sigh = self.error_dict["sigh"]

#             #Not Used, Mainly used for information
#             means = [m * self.ul + (max_pop - m) * self.uh for m in range(0, max_pop+1)]
#             stdvs = [np.sqrt(m * (self.sigl**2) + (max_pop - m)*(self.sigh**2)) for m in range(0, max_pop+1)]
#             self.distributions = [tfp.distributions.Normal(loc= m, scale = sig) for m,sig in zip(means,stdvs)]
    
        elif(error_type == "Flip"):
            self.p = self.error_dict["p"]

    def build(self, input_shape):
        stdv=1/np.sqrt(self.k*self.k*self.ch_in)
        w = np.random.normal(loc=0.0, scale=stdv,size=[self.k,self.k,self.ch_in,self.nfilters]).astype(np.float32)
        if tensorflow.keras.backend.backend()=="mxnet":
            w=w.transpose(3,2,0,1)
        self.w=K.variable(w)
        self.gamma_w=K.variable(1.0)
#         self._trainable_weights=[self.w,self.gamma_w]
        
        self.max_pop = self.k*self.k*self.ch_in

        # used for regularization
        self.uh = self.error_dict["uh"]
        self.ul = self.error_dict["ul"]
        self.sigl = self.error_dict["sigl"]
        self.sigh = self.error_dict["sigh"]
        
        # batchnorm and sign activation
        self.batchnorm_layer = BatchNormalization(axis=-1, momentum=self.batch_norm_alpha, epsilon=self.batch_norm_eps)
        self.sign_layer = Sign_layer(levels=1)
        
#         self.batchnorm_layer.build(input_shape=(None, input_shape[1]-(self.k-1), input_shape[2]-(self.k-1), self.nfilters))
#         self.sign_layer.build(input_shape)

        self._trainable_weights=[self.w, self.gamma_w] + self.batchnorm_layer._trainable_weights + self.sign_layer._trainable_weights

    def call(self, x,mask=None, training=None):
        
        if(self.error_type == "NoError"):
            constraint_gamma=K.abs(self.gamma_w)#K.clip(self.gamma_w,0.01,10)
            self.clamped_w=constraint_gamma*binarize(self.w)
            self.out=K.conv2d(x, kernel=self.clamped_w, padding=self.padding,strides=self.strides )
            self.output_dim=self.out.get_shape().as_list()
#             self.clamped_w=binarize(self.w)
#             self.prod=K.abs(self.gamma_w)*K.conv2d(x, kernel=self.clamped_w, padding=self.padding,strides=self.strides )
#             self.clamped_x=binarize(x)
#             gamma_x = tf.math.maximum(K.abs(tf.math.reduce_min(x)),K.abs(tf.math.reduce_max(x)))
#             self.pop = (K.conv2d(x, kernel=self.clamped_w, padding=self.padding,strides=self.strides)+self.max_pop)/2
#             self.step = self.ul - self.uh
#             self.edges = np.arange(self.max_pop*self.uh-self.step/2, self.max_pop*self.ul+self.step, self.step,dtype=np.float32)
#             self.bins= tfp.stats.find_bins(self.pop * self.ul + (self.max_pop - self.pop) * self.uh, self.edges, extend_lower_interval = True, extend_upper_interval=True)
#             self.out = K.abs(self.gamma_w)*gamma_x*(2*self.bins-self.max_pop)
#             self.out = self.prod + K.stop_gradient(self.out -  self.prod)
        elif(self.error_type == "Flip"):
            constraint_gamma=K.abs(self.gamma_w)#K.clip(self.gamma,0.01,10)
            self.clamped_w=constraint_gamma*binarize(self.w)*((2*tf.cast(tf.random.uniform(self.w.shape) > self.p, tf.float32)) - 1)
            self.out=K.conv2d(x, kernel=self.clamped_w, padding=self.padding,strides=self.strides )
            self.output_dim=self.out.get_shape().as_list() 

        elif(self.error_type == "InnerDot"):
            


            #Perfrom error model on inference
            #No Error Model first
            self.clamped_w=binarize(self.w)

            #self.prod=K.abs(self.gamma_w)*K.conv2d(x, kernel=self.clamped_w, padding=self.padding,strides=self.strides )
            self.prod=K.abs(self.gamma_w)*K.conv2d(x, kernel=self.clamped_w, padding=self.padding,strides=self.strides )

            # Make Model
            self.clamped_x=binarize(x)
            gamma_x = tf.math.maximum(K.abs(tf.math.reduce_min(x)),K.abs(tf.math.reduce_max(x)))
            #self.prod = K.dot(x,self.clamped_w)
            #self.pop = (K.conv2d(x, kernel=self.clamped_w, padding=self.padding,strides=self.strides)+self.max_pop)/2
            self.pop = (K.conv2d(x, kernel=self.clamped_w, padding=self.padding,strides=self.strides)+self.max_pop)/2

            #Add Error 
            self.dist = tfp.distributions.Normal(loc = self.pop * self.ul + (self.max_pop - self.pop) * self.uh,
                                                 scale= tf.math.sqrt(self.pop * (self.sigl**2) + (self.max_pop - self.pop)*(self.sigh**2)))

            self.samps = self.dist.sample(1)
            self.samps = tf.squeeze(self.samps, axis = 0)                

            #Use Histogram for binning
            self.step = self.ul - self.uh
            self.edges = np.arange(self.max_pop*self.uh-self.step/2, self.max_pop*self.ul+self.step, self.step,dtype=np.float32)
            self.bins= tfp.stats.find_bins(self.samps, self.edges, extend_lower_interval = True, extend_upper_interval=True)
            self.out = K.abs(self.gamma_w)*gamma_x*(2*self.bins-self.max_pop)
            
            self.out = self.prod + K.stop_gradient(self.out -  self.prod)

        # regulariztion
        if training:
            

            # regularization
            # threshold regularization
            params = self.batchnorm_layer.weights
            print(params)
            gamma = params[0]
            beta = params[1]
            mean = params[2]
            var = params[3]
            T = mean - tf.math.divide(tf.math.sqrt(var),gamma)*beta
            
            if self.loss_type == "uniform":
                self.add_loss(self.reg_lambda*tf.reduce_mean(tf.nn.relu(self.b - tf.math.abs(self.out - T))))
            elif self.loss_type == "exp":
                self.add_loss(self.reg_lambda*tf.reduce_mean(-1*tf.math.exp(self.alpha*(2*T - self.max_pop))*tf.math.abs(self.out - T)))
            elif self.loss_type == "inverse":
                self.add_loss(self.reg_lambda*tf.reduce_mean(tf.math.reciprocal(2*T - self.max_pop)*(self.b - tf.math.abs(self.out - T))))
                
                
        self.out = self.batchnorm_layer(self.out)
        self.out = self.sign_layer(self.out)
        return self.out
        
    def  get_output_shape_for(self,input_shape):
        return (input_shape[0], self.output_dim[1],self.output_dim[2],self.output_dim[3])
    def compute_output_shape(self,input_shape):
        return (input_shape[0], self.output_dim[1],self.output_dim[2],self.output_dim[3])
    
class dense_batchnorm_sign(Layer):
    def __init__(self,n_in,n_out,error_type,error_dict,batch_norm_params={"eps" : 1e-4, "alpha" : 0.1},reg_params={"type" : None, "b" : 32, "lambda" : 0.0001, "alpha" : 1},**kwargs):
        self.n_in=n_in
        self.n_out=n_out
        self.error_type=error_type
        self.error_dict= error_dict
        self.batch_norm_eps = batch_norm_params["eps"]
        self.batch_norm_alpha = batch_norm_params["alpha"]
        
        # regularization hyperparameters
        self.loss_type = reg_params["type"]
        # desired bit error tolerance
        self.b = reg_params["b"]
        self.reg_lambda = reg_params["lambda"]
        self.alpha = reg_params["alpha"]
        
        assert self.error_type in ["NoError", "Flip","InnerDot"], "error_type of {0} is invalid".format(error_type)
        super(dense_batchnorm_sign,self).__init__(**kwargs)
        
        self.max_pop = self.n_in
        if(error_type == "InnerDot"):
            max_pop = self.n_in
            self.uh = self.error_dict["uh"]
            self.ul = self.error_dict["ul"]
            self.sigl = self.error_dict["sigl"]
            self.sigh = self.error_dict["sigh"]

#             #Not Used, Mainly used for information
#             means = [m * self.ul + (max_pop - m) * self.uh for m in range(0, max_pop+1)]
#             stdvs = [np.sqrt(m * (self.sigl**2) + (max_pop - m)*(self.sigh**2)) for m in range(0, max_pop+1)]
#             self.distributions = [tfp.distributions.Normal(loc= m, scale = sig) for m,sig in zip(means,stdvs)]
    
        elif(error_type == "Flip"):
            self.p = self.error_dict["p"]
            
    def build(self, input_shape):
        stdv=1/np.sqrt(self.n_in)
        w = np.random.normal(loc=0.0, scale=stdv,size=[self.n_in,self.n_out]).astype(np.float32)
        self.w=K.variable(w)
        self.gamma_w=K.variable(1.0)
        
        # batchnorm and sign activation
        self.batchnorm_layer = BatchNormalization(axis=-1, momentum=self.batch_norm_alpha, epsilon=self.batch_norm_eps)
        self.sign_layer = Sign_layer(levels=1)
        
        self._trainable_weights=[self.w,self.gamma_w] + self.batchnorm_layer._trainable_weights + self.sign_layer._trainable_weights
        
        # used for regularization
        self.uh = self.error_dict["uh"]
        self.ul = self.error_dict["ul"]
        self.sigl = self.error_dict["sigl"]
        self.sigh = self.error_dict["sigh"]
        
#         if(error_type == "InnerDot"):
#             max_pop = self.n_in+1
#             self.uh = self.error_dict["uh"]
#             self.ul = self.error_dict["ul"]
#             self.sigl = self.error_dict["sigl"]
#             self.sigh = self.error_dict["sigh"]

#             means = [m * ul + (max_pop - 1 - m) * uh for m in range(0, max_pop+1)]
#             stdvs = [np.sqrt(m * (sigl**2) + (max_pop - 1 - m)*(sigh**2)) for m in range(0, max_pop+1)]
#             self.distributions = [tfp.distributions.Normal(loc= m, scale = sig) for m,sig in zip(means,stdvs)]
            
    def call(self, x,training=None):
        
        #Designed with Batch norm in mind, Binarization done after. 
        if(self.error_type == "NoError"):
            self.clamped_w=binarize(self.w)
            self.prod=K.abs(self.gamma_w)*K.dot(x,self.clamped_w)
            self.out = self.prod

#             self.clamped_w=binarize(self.w)
#             self.prod=K.abs(self.gamma_w)*K.dot(x,self.clamped_w)
#             self.clamped_x=binarize(x)
#             gamma_x = tf.math.maximum(K.abs(tf.math.reduce_min(x)),K.abs(tf.math.reduce_max(x)))
#             self.pop = (K.dot(self.clamped_x,self.clamped_w)+self.n_in)/2
#             self.out = K.abs(self.gamma_w)*gamma_x*(2*(self.pop * self.ul + (self.n_in - self.pop) * self.uh)-self.n_in)         
#             self.out = self.prod + K.stop_gradient(self.out -  self.prod)
#             self.clamped_w=binarize(self.w)

#             self.prod=K.abs(self.gamma_w)*K.dot(x,self.clamped_w)
#             self.clamped_x=binarize(x)
#             gamma_x = tf.math.maximum(K.abs(tf.math.reduce_min(x)),K.abs(tf.math.reduce_max(x)))
#             self.pop = (K.dot(self.clamped_x,self.clamped_w)+self.n_in)/2

#             #Use Histogram for binning
#             self.step = self.ul - self.uh
#             self.edges = np.arange(self.n_in*self.uh-self.step/2, self.n_in*self.ul+self.step, self.step,dtype=np.float32)
#             self.bins= tfp.stats.find_bins(self.pop * self.ul + (self.n_in - self.pop) * self.uh, self.edges, extend_lower_interval = True, extend_upper_interval=True)
#             self.out = K.abs(self.gamma_w)*gamma_x*(2*self.bins-self.n_in)
                
#             self.out = self.prod + K.stop_gradient(self.out -  self.prod)
        elif(self.error_type == "Flip"):
            self.clamped_w=binarize(self.w)*((2*tf.cast(tf.random.uniform(self.w.shape) > self.p, tf.float32)) - 1)
            self.prod=K.abs(self.gamma_w)*K.dot(x,self.clamped_w)
            self.out = self.prod
        elif(self.error_type == "InnerDot"):
                
            #Perfrom error model on inference
            #No Error Model first
            self.clamped_w=binarize(self.w)

            self.prod=K.abs(self.gamma_w)*K.dot(x,self.clamped_w)

            # Make Model
            self.clamped_x=binarize(x)
            gamma_x = tf.math.maximum(K.abs(tf.math.reduce_min(x)),K.abs(tf.math.reduce_max(x)))
            #self.prod = K.dot(x,self.clamped_w)
            self.pop = (K.dot(self.clamped_x,self.clamped_w)+self.n_in)/2

            #Add Error 
            self.dist = tfp.distributions.Normal(loc = self.pop * self.ul + (self.n_in - self.pop) * self.uh,
                                                 scale= tf.math.sqrt(self.pop * (self.sigl**2) + (self.n_in - self.pop)*(self.sigh**2)))

            self.samps = self.dist.sample(1)
            self.samps = tf.squeeze(self.samps, axis = 0)

            #Use Histogram for binning
            self.step = self.ul - self.uh
            self.edges = np.arange(self.n_in*self.uh-self.step/2, self.n_in*self.ul+self.step, self.step,dtype=np.float32)
            self.bins= tfp.stats.find_bins(self.samps, self.edges, extend_lower_interval = True, extend_upper_interval=True)
            self.out = K.abs(self.gamma_w)*gamma_x*(2*self.bins-self.n_in)
                
            self.out = self.prod + K.stop_gradient(self.out -  self.prod)

        # regulariztion
        if training:
            

            # regularization
            # threshold regularization
            params = self.batchnorm_layer.weights
            print(params)
            gamma = params[0]
            beta = params[1]
            mean = params[2]
            var = params[3]
            T = mean - tf.math.divide(tf.math.sqrt(var),gamma)*beta

            if self.loss_type == "uniform":
                self.add_loss(self.reg_lambda*tf.reduce_mean(tf.nn.relu(self.b - tf.math.abs(self.out - T))))
            elif self.loss_type == "exp":
                self.add_loss(self.reg_lambda*tf.reduce_mean(-1*tf.math.exp(self.alpha*(2*T - self.max_pop))*tf.math.abs(self.out - T)))
            elif self.loss_type == "inverse":
                self.add_loss(self.reg_lambda*tf.reduce_mean(tf.math.reciprocal(2*T - self.max_pop)*(self.b - tf.math.abs(self.out - T))))
          
            
        self.out = self.batchnorm_layer(self.out)
        self.out = self.sign_layer(self.out)
        return self.out
        
        
    
    def get_output_shape_for(self,input_shape):
        return (self.n_in, self.n_out)
    
    
    def compute_output_shape(self,input_shape):
        return (self.n_in, self.n_out)

In [None]:
def make_CIFAR10_SVHN_model(error_type, p = 0.1, ul = 1, sigl = 0.01, uh=0.01, sigh = 0.01, reg_params = {"type" : "exp", "b" : 64, "lambda" : 0.0001, "alpha" : 1}):
    if(error_type == 0):
        #No Error
        error_type = "NoError"
        #error_dict = {}
        # for regularization
        error_dict = {"ul" : ul , "sigl" : sigl, "uh" : uh, "sigh" : sigh}
    elif(error_type == 1):
        #Flip Error
        error_type = "Flip"
        error_dict = {"p": p }
    elif(error_type == 2):
        #InnerDot error 
        error_type = "InnerDot"
        error_dict = {"ul" : ul , "sigl" : sigl, "uh" : uh, "sigh" : sigh}        
    
    #Model Def
    model=Sequential()
    model.add(conv_batchnorm_sign(nfilters=64,ch_in=3,k=3,padding='valid',input_shape=[32,32,3],error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(conv_batchnorm_sign(nfilters=64,ch_in=64,k=3,padding='valid',error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(MaxPooling2D(pool_size=(2, 2),strides=(2,2)))
    
    model.add(conv_batchnorm_sign(nfilters=128,ch_in=64,k=3,padding='valid',error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(conv_batchnorm_sign(nfilters=128,ch_in=128,k=3,padding='valid',error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(MaxPooling2D(pool_size=(2, 2),strides=(2,2)))

    model.add(conv_batchnorm_sign(nfilters=256,ch_in=128,k=3,padding='valid',error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(conv_batchnorm_sign(nfilters=256,ch_in=256,k=3,padding='valid',error_type = error_type, error_dict = error_dict, reg_params = reg_params))

    model.add(my_flat())

    model.add(dense_batchnorm_sign(n_in=int(model.output.get_shape()[1]),n_out=512,error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(dense_batchnorm_sign(n_in=int(model.output.get_shape()[1]),n_out=512,error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(binary_dense_error_var(n_in=int(model.output.get_shape()[1]),n_out=10,error_type = error_type, error_dict = error_dict))
    model.add(BatchNormalization(axis=-1, momentum=batch_norm_alpha, epsilon=batch_norm_eps))
    model.add(Activation('softmax'))
    
    # the following is a workaround so that the model weights can be saved
    # https://github.com/tensorflow/tensorflow/issues/46871
    j = 0
    for w in model.weights:
        w._handle_name = 'model_' + str(j) + w.name
        j = j + 1
        
    
    return model

def make_MNIST_model(error_type, p = 0.1, ul = 1, sigl = 0.01, uh=0.01, sigh = 0.01, reg_params = {"type" : "exp", "b" : 64, "lambda" : 0.0001, "alpha" : 1}):
    if(error_type == 0):
        #No Error
        error_type = "NoError"
        #error_dict = {}
        # for regularization
        error_dict = {"ul" : ul , "sigl" : sigl, "uh" : uh, "sigh" : sigh}
    elif(error_type == 1):
        #Flip Error
        error_type = "Flip"
        error_dict = {"p": p }
    elif(error_type == 2):
        #InnerDot error 
        error_type = "InnerDot"
        error_dict = {"ul" : ul , "sigl" : sigl, "uh" : uh, "sigh" : sigh}   
    model=Sequential()
    model.add(dense_batchnorm_sign(n_in=784,n_out=256,error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(dense_batchnorm_sign(n_in=int(model.output.get_shape()[1]),n_out=256,error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(dense_batchnorm_sign(n_in=int(model.output.get_shape()[1]),n_out=256,error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(dense_batchnorm_sign(n_in=int(model.output.get_shape()[1]),n_out=256,error_type = error_type, error_dict = error_dict, reg_params = reg_params))
    model.add(binary_dense_error(n_in=int(model.output.get_shape()[1]),n_out=10,error_type = error_type, error_dict = error_dict))
    model.add(BatchNormalization(axis=-1, momentum=batch_norm_alpha, epsilon=batch_norm_eps))
    model.add(Activation('softmax'))

In [None]:
#Model Construction
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input

batch_norm_eps=1e-4
batch_norm_alpha=0.1
batch_norm_params = {"eps" : batch_norm_eps, "alpha" : batch_norm_alpha}



# Training Script

In [None]:
#Model Parameters
# 0 : No error, params ignored
# 1 : Flip error, only p used
# 2 : InnerProd error
error_type = 0 # training with no error

#Flip Probability
p=0.1

#Regularization params
reg_params = {"type" : "uniform", "b" : 64, "lambda" : 0.0001, "alpha" : 1}

#InnerProd params
ul = 1 #l-low means bits match
uh = 0.1
sigl = 0.01
sigh = 0.2

#Make Model
model = make_CIFAR10_SVHN_model(error_type, p = p, ul = ul, sigl = sigl, uh=uh, sigh = sigh, reg_params = reg_params)

model.summary()

In [None]:
#Training Script
training=True
    
#Training parameters
batch_size=50
epochs=50
error_type=0

#Makes Model Name
if(error_type == 0):
    #No Error
    model_name = "error_{0}".format(error_type)
elif(error_type == 1):
    model_name = "error_{0}_p_{1}".format(error_type,p)
elif(error_type == 2):
    #InnerDot error 
    model_name = "error_{0}_ul_{1}_uh_{2}_sigl_{3}_sigh_{4}".format(error_type,ul,uh,sigl,sigh)

model_name = 'uniform'
#model_name = 'uniform'
if not(os.path.exists('models')):
    os.mkdir('models')
if not(os.path.exists('models/'+model_name)):
    os.mkdir('models/'+model_name)
    
    
# #gather all binary dense and binary convolution layers:
# binary_layers=[]
# for l in model.layers:
#     if isinstance(l,binary_dense) or isinstance(l,binary_conv):
#         binary_layers.append(l)

#Train
lr=0.01
opt = keras.optimizers.Adam(learning_rate=lr,decay=1e-6)#SGD(lr=lr,momentum=0.9,decay=1e-5)
model.compile(loss='sparse_categorical_crossentropy',optimizer=opt,metrics=['accuracy'])  


weights_path='models/'+model_name+'/'+model_name+'.h5'
cback=keras.callbacks.ModelCheckpoint(weights_path, monitor='val_accuracy', save_best_only=True,save_weights_only=True)


#Training
if keras.__version__[0]=='2':
    history=model.fit(X_train, y_train,batch_size=batch_size,validation_data=(X_val, y_val), verbose=2,epochs=epochs,callbacks=[cback])
if keras.__version__[0]=='1':
    history=model.fit(X_train, y_train,batch_size=batch_size,validation_data=(X_val, y_val), verbose=2,nb_epoch=epochs,callbacks=[cback])

#Save history    
dic={'hard':history.history}
foo=open('models/'+model_name+'.pkl','wb')
pickle.dump(dic,foo)
foo.close()

# Testing Script

In [None]:
training=False

#Test Based on Error Type
#Model Parameters
# 0 : No error, params ignored
# 1 : Flip error, only p used
# 2 : InnerProd error
train_error_type = 0 #can be 0 or 1 or 2
test_error_type = 2 # can be 0 1 or 2
train_flip_probability = 0.1

#Training InnerProd params
train_ul = 1 #l-low means bits match
train_uh = 0.1
train_sigl = 0.01
train_sigh = 0.2


#Flip Probabiliy
p=0.1

# Test Parameters  
#InnerProd params
ul = 1
uh = 0.1
sigl = 0.01
sigh = 0.2

#Makes Model Name
if(train_error_type == 0):
    #No Error
    model_name = "error_{0}".format(train_error_type)
elif(train_error_type == 1):
    model_name = "error_{0}_p_{1}".format(train_error_type,train_flip_probability)
elif(train_error_type == 2):
    #InnerDot error 
    model_name = "error_{0}_ul_{1}_uh_{2}_sigl_{3}_sigh_{4}".format(train_error_type,train_ul,train_uh,train_sigl,train_sigh)    
    
model_name = 'uniform'
weights_path = 'models/'+model_name+'/'+model_name+'.h5' #To Specify  

#weights_path = "" #To Specify

if(test_error_type == 0):
    #No Error
    model=make_CIFAR10_SVHN_model(test_error_type, p = p, ul = ul, sigl = sigl, uh=uh, sigh = sigh )
    model.compile()

    #Load Weights
    model.load_weights(weights_path)
    
    score=model.evaluate(X_test,y_test,verbose=0)
    print("No Error, test loss was %0.4f, test accuracy was %0.4f"%(score[0],score[1]))

elif(test_error_type == 1):
    #Flip Error
    probs = np.logspace(-4, -1, 20)
    
    acc_1 = []
    for p in probs:
        
        model=make_MNIST_model(test_error_type, p = p, ul = ul, sigl = sigl, uh=uh, sigh = sigh )
        model.compile()
        
        #Load Weights
        model.load_weights(weights_path)
        
        
        
        score=model.evaluate(X_test,y_test,verbose=0)
        #print("With p= %0.4f, test loss was %0.4f, test accuracy was %0.4f" %(p,score[0],score[1]))
        print('%0.4f' %(score[1]))
        acc_1.append(score[1])
    
elif(test_error_type == 2):
    #InnerDot Error
    error_type = "InnerDot"
    
    ratio_l_h = np.linspace(1,80,20)
    acc_2 = []
    for ratio in ratio_l_h:
        print(ratio)
    for ratio in ratio_l_h:
        
        model = make_CIFAR10_SVHN_model(test_error_type, p = p, ul = ul, sigl = sigl, uh=uh, sigh = ratio*sigl )
        
        #model.compile()
        lr=0.01
        opt = keras.optimizers.Adam(learning_rate=lr,decay=1e-6)#SGD(lr=lr,momentum=0.9,decay=1e-5)
        model.compile(loss='sparse_categorical_crossentropy',optimizer=opt,metrics=['accuracy']) 
        
        #Load Weights
        model.load_weights(weights_path)
        
        score=model.evaluate(X_test,y_test,verbose=0)
        #print("With ratio %0.4f, test loss was %0.4f, test accuracy was %0.4f"%(ratio,score[0],score[1]))
        print('%0.4f' %(score[1]))
        #print(score)
        acc_2.append(score[1])


In [None]:
ratios = np.linspace(1,80,20)
c10_accuracies = [0.6668, 0.6753, 0.6895, 0.6986, 0.7043, 0.7023, 0.6918, 0.6748, 0.6527, 0.6182, 0.5813, 0.5423, 0.4911, 0.4535, 0.4120, 0.3623, 0.3227, 0.2917, 0.2578, 0.2321]
threshold_reg = [0.6984, 0.7008, 0.6980, 0.6859, 0.6761, 0.6576, 0.6455, 0.6143, 0.5939, 0.5615, 0.5322, 0.5007, 0.4622, 0.4322, 0.3969, 0.3761, 0.3416, 0.3148, 0.2881, 0.2646]
pop_reg = [0.6888, 0.6918, 0.6923, 0.6779, 0.6643, 0.6382, 0.6024, 0.5622, 0.5220, 0.4694, 0.4268, 0.3877, 0.3391, 0.3088, 0.2835, 0.2535, 0.2404, 0.2175, 0.2030, 0.1937];
#inverse_threshold_weighted_reg = [0.6908, 0.6936, 0.6894, 0.6781, 0.6650, 0.6414, 0.6152, 0.5820, 0.5482, 0.5102, 0.4762, 0.4335, 0.3929, 0.3605, 0.3390, 0.3023, 0.2813, 0.2569, 0.2387, 0.2247]
inverse_threshold_weighted_reg = [0.6929, 0.7010, 0.7083, 0.7108, 0.7123, 0.7003, 0.6850, 0.6579, 0.6188, 0.5749, 0.5279, 0.4767, 0.4219, 0.3758, 0.3336, 0.2918, 0.2586, 0.2273, 0.2069, 0.1871]
#exp_threshold_weighted_reg = [0.6868, 0.6864, 0.6814, 0.6657, 0.6530, 0.6230, 0.5959, 0.5540, 0.5046, 0.4592, 0.4086, 0.3646, 0.3231, 0.2927, 0.2644, 0.2364, 0.2177, 0.2048, 0.1938, 0.1832]
exp_threshold_weighted_reg = [0.6314, 0.6374, 0.6464, 0.6595, 0.6632, 0.6525, 0.6454, 0.6185, 0.5917, 0.5533, 0.5074, 0.4629, 0.4145, 0.3754, 0.3350, 0.3092, 0.2749, 0.2511, 0.2296, 0.2098]
plt.plot(ratios,c10_accuracies,label='2 train 2 test')
plt.plot(ratios,threshold_reg, label='threshold_reg with threshold = maxpop/2')
plt.plot(ratios,pop_reg, label='maxpop - pop regularization')
plt.plot(ratios,inverse_threshold_weighted_reg, label='1/T weight regularization')
plt.plot(ratios,exp_threshold_weighted_reg, label='exp(-T) weight regularization')
plt.legend()

In [None]:
accuracies1 = [0.6668, 0.6753, 0.6895, 0.6986, 0.7043, 0.7023, 0.6918, 0.6748, 0.6527, 0.6182, 0.5813, 0.5423, 0.4911, 0.4535, 0.4120, 0.3623, 0.3227, 0.2917, 0.2578, 0.2321]
accuracies2 = [0.6426, 0.6535, 0.6640, 0.6676, 0.6748, 0.6687, 0.6548, 0.6212, 0.5848, 0.5379, 0.4815, 0.4292, 0.3776, 0.3249, 0.2797, 0.2431, 0.2110, 0.1841, 0.1727, 0.1625]
plt.plot(ratios,accuracies1)
plt.plot(ratios,accuracies2)

In [None]:
uniform=[0.4361, 0.4351, 0.4291, 0.4088, 0.3812, 0.3570, 0.3269, 0.2957, 0.2669, 0.2379, 0.2059, 0.1830, 0.1581, 0.1379, 0.1215, 0.1123, 0.1060, 0.1033, 0.1032, 0.1020]




plt.plot(ratios,uniform,label='uniform with \lambda = 10^-4, b = 32')
