In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
from collections import OrderedDict
import math

# torch.manual_seed(0)
# random.seed(0)
# torch.use_deterministic_algorithms(True)

In [2]:
#Recursive function to generate Hadamard matrices

H = {}
H[1] = torch.tensor([[1.0]])
def Had(n):
    if(n in H):
        return H[n]
    else:
        Hnby2 = Had(n//2)
        Hn = torch.ones(n, n)
        for i in range(n//2):
            for j in range(n//2):
                Hn[i][j] = Hnby2[i][j]
                Hn[i+n//2][j] = Hnby2[i][j]
                Hn[i][j+n//2] = Hnby2[i][j]
                Hn[i+n//2][j+n//2] = -Hnby2[i][j]
        H[n] = Hn
        return Hn

In [43]:
#Generating boolean functions and from them, their walsh spectra to feed as inputs to the neural network

#k = number of variables in the boolean functions, n = number of inputs to train the model on
def generate_input_data(k, n, scheme='regular'):
    two_pow_k, data = pow(2, k), []
    if(scheme=='regular'):
        if(n<two_pow_k):
            print("Requested number of inputs less than 2^k. Please request a higher number")
            return data
        while(True):
            if(two_pow_k<=16):
                perm = torch.randperm(pow(2, two_pow_k))
                data = [[1.0]*(two_pow_k-len(bin(num)[2:]))+[1.0 if int(i)==0 else -1.0 for i in bin(num)[2:]] for num in perm[:n]]
            else:
                data = [[1.0 if random.random()>0.5 else -1.0 for i in range(two_pow_k)] for i in range(n)]
            rank = np.linalg.matrix_rank(data)
            if(rank<two_pow_k):
                print("Rank (", rank, ") not large enough, generating data again")
            else:
                print("Data generated.")
                break
    elif(scheme=='one-hot'):
        data = [[1.0]*i + [-1.0] + [1.0]*(two_pow_k-i-1) for i in range(two_pow_k)]
        print("Data generated")
    return torch.tensor(data)

k = 3
two_pow_k = pow(2, k)
if(k<=4):
    n = pow(2, two_pow_k)
else: 
    n = k*pow(2, pow(2, 4))
boolean_functions = generate_input_data(k, n)
walsh_spectra = torch.matmul(boolean_functions, Had(two_pow_k))

Data generated.


In [44]:
#Calculating the correlation immunity of functions given their walsh spectra

def correlation_immunity(walsh_spectra):
    ci = []
    n, two_pow_k = walsh_spectra.size()[0], walsh_spectra.size()[1]
    k = int(math.log2(two_pow_k))
    no_ones = []
    for i in range(two_pow_k):
        no_ones.append(sum([int(dig) for dig in bin(i)[2:]]))
    for spectrum in walsh_spectra:
        m_ci = [1]*(k+1)
        for i in range(two_pow_k):
            if(int(spectrum[i])!=0):
                m_ci[no_ones[i]] = 0
        m = 1
        while(m<k+1 and m_ci[m]==1):
            m+=1
        m -= 1
#         # Let ci item = [x0, x1, ..., xm]. x0 is 0 if it is balanced, xi is 1 if it is i-correlation-immune #for unprocessed boolean functions
#         ci.append(([1.0] if spectrum[0]==two_pow_k//2 else [0.0])+(m)*[1.0]+(k-m)*[0.0])
        # Let ci item = [x0, x1, ..., xm]. x0 is 1 if it is balanced, xi is 1 if it is i-correlation-immune
        ci.append(([1.0] if int(spectrum[0])==0 else [0.0])+(m)*[1.0]+(k-m)*[0.0])

    return ci

ci = torch.tensor(correlation_immunity(walsh_spectra))
print("Correlation Immunities of functions calculated")

Correlation Immunities of functions calculated


In [45]:
print(boolean_functions[:4])
print(walsh_spectra[:4])
print(ci[:4])

tensor([[ 1.,  1., -1., -1., -1., -1.,  1., -1.],
        [ 1., -1., -1., -1., -1.,  1., -1., -1.],
        [ 1.,  1., -1.,  1., -1.,  1., -1.,  1.],
        [ 1., -1.,  1., -1.,  1.,  1.,  1., -1.]])
tensor([[-2.,  2.,  2., -2.,  2., -2.,  6.,  2.],
        [-4.,  0.,  4.,  0.,  0.,  4.,  0.,  4.],
        [ 2., -6.,  2.,  2.,  2.,  2.,  2.,  2.],
        [ 2.,  6.,  2., -2., -2.,  2., -2.,  2.]])
tensor([[0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.]])


In [46]:
ci_count = [0]*k
res_count = [0]*k

for el in ci: #[:8*32]:
    m = 1
    while(m<k+1 and el[m]==1):
        ci_count[m-1]+=1
        if(el[0]==1):
            res_count[m-1]+=1
        m+=1

print(ci.size())
print(ci_count)
print(res_count)

torch.Size([256, 4])
[18, 4, 2]
[8, 2, 0]


In [47]:
print(ci.size())
print(ci[10*5120:(10+1)*5120, 1])

327680/5120

torch.Size([256, 4])
tensor([])


64.0

In [48]:
def print_model_weights(model_in):
    weight_dict = OrderedDict(model_in.named_parameters())
    print(weight_dict['lin.weight'])
    print(weight_dict['lin.bias'])

    print("Weights: ", torch.round(weight_dict['lin.weight']))
    print("Bias: ", torch.round(weight_dict['lin.bias']))

In [49]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        two_pow_k = pow(2,k)
#         self.lin = nn.Linear(two_pow_k, k+1) #m1
        self.lin = nn.Linear(two_pow_k, 1) #m2
#         self.lin.weight.data = torch.tensor([[1.0, 0, 0, 0, 0, 0, 0, 0],
#                                         [0, 1, 1, 0, 1, 0, 0, 0],
#                                         [0, 1, 1, 1, 1, 1, 1, 0],
#                                         [0, 1, 1, 1, 1, 1, 1, 1]])
#         self.lin.bias.data = torch.tensor([0.0, -2, -5, -6])
    
    
    def forward(self, x):
        x = self.lin(x)
        immunity = nn.Hardtanh(0,1)(x)
#         immunity = nn.Tanh()(2*x)
        return immunity

In [50]:
class TrainModel():

    def __init__(self, model, device, learningRate, inputDim, epochs, batchSize):

        self.device = device
        self.net = model.to(self.device)
        self.optimizer = optim.Adam(self.net.parameters(), lr=learningRate)
        self.inputDim = inputDim
        self.epochs = epochs
        self.batchSize = batchSize

    def train(self,):

        pr_loss = -1234
        flag = 1
        for epoch in range(self.epochs):

            self.optimizer.zero_grad()
#             input = torch.tensor([[1.0 if spec[0]==two_pow_k//2 else 0.0] + [1.0 if val==0 else 0.0 for val in spec[1:]] for spec in walsh_spectra[:pow(2, two_pow_k)//2]]).to(self.device) #unprocessed boolean function
            input = torch.tensor([[1.0 if int(val)==0 else 0.0 for val in spec] for spec in walsh_spectra[:pow(2, two_pow_k)//2]]).to(self.device)
            if(flag==1):
                flag = 0
                print(input.size())
                print("input: ", input)
            output = self.net(input)
#             loss = F.mse_loss(ci, output) #m1
#             loss = F.mse_loss(ci[:pow(2, two_pow_k)//8], output) #m1 for fraction of inputs as training data
#             n = torch.Tensor.size(ci)[0]
            loss = F.mse_loss(ci[:pow(2, two_pow_k)//2, 1].view(pow(2, two_pow_k)//2, 1), output) #m2 for fraction of input as training data
#             loss = F.mse_loss(ci[:, 1].view(n, 1), output) #m2
            loss.backward()
            self.optimizer.step()    
            if(epoch%200==199):
                print("ep ", epoch, ", loss = ", loss)
                if(abs(loss.item()-pr_loss)<0.00005):
                    break
                pr_loss = loss.item()

    def train_batch(self,):

        pr_loss = -1234
        batchSize = self.batchSize
        for epoch in range(self.epochs):
            
            for step in range(((pow(2, two_pow_k) if self.inputDim<=16 else k*pow(2, 16))//2)//batchSize):
#             for step in range((pow(2, (two_pow_k if self.inputDim<=4 else pow(2, 4)))//2)//batchSize):

                self.optimizer.zero_grad()
                input = torch.tensor([[1.0 if int(val)==0 else 0.0 for val in spec] for spec in walsh_spectra[step*batchSize:(step+1)*batchSize]]).to(self.device)
                true_labels = ci[step*batchSize:(step+1)*batchSize, 1].view(batchSize, 1)
                #up-sampling
                ups_rate = 250
                pos_samples = sum(ci[step*batchSize:(step+1)*batchSize, 1]).item()
                true_labels = torch.cat((ci[step*batchSize:(step+1)*batchSize, 1].view(batchSize, 1), torch.ones(int(pos_samples)*ups_rate, 1)), 0)
#                 print(pos_samples, true_labels.size(), true_labels[-5:])
                up_sampled_input = torch.zeros(batchSize+int(pos_samples)*ups_rate, self.inputDim)
                up_sampled_input[:batchSize] = input
                app_ips = 0
                for i in range(batchSize):
                    if(int(ci[step*batchSize+i, 1].item())==1):
                        for j in range(ups_rate):
                            up_sampled_input[batchSize+app_ips+j] = input[i]
                        app_ips += ups_rate
                input = up_sampled_input
                #upsampling-end
                output = self.net(input)
                loss = F.mse_loss(true_labels, output) #m2 for fraction of input as training data batch
                loss.backward()
                self.optimizer.step()
                
            if(epoch%5==4 or epoch==0):
                print("ep ", epoch, ", loss = ", loss)            
                if(abs(loss.item()-pr_loss)<0.00005):
                    break
                pr_loss = loss.item()

                
    def test(self,n):
        
        test_samples = torch.tensor([[1.0 if int(val)==0 else 0.0 for val in spec] for spec in walsh_spectra[-n:]]).to(self.device)
        preds = torch.round(self.net(test_samples))
#         loss = F.mse_loss(ci[-n:], preds) #m1
        loss = F.mse_loss(ci[-n:, 1].view(n, 1), preds) #m2
        print("loss = ", loss)
        for i in range(n):
            if(not torch.equal(ci[-n+i][1], preds[i])):
                print(ci[-n+i][1])
                print(preds[i])


In [53]:
learningRate = 0.001
epochs = 1200
two_pow_k = pow(2, k)

# model =  NeuralNetwork()
print_model_weights(model)
device = ("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using {device} device")
trainer = TrainModel(model, device, learningRate, two_pow_k, epochs, two_pow_k)
trainer.train()
# trainer.train_batch()
print_model_weights(model)
# trainer.test(5)


Parameter containing:
tensor([[-0.0220,  0.9406,  0.9419, -0.0244,  0.9345, -0.0180, -0.0179,  0.0616]],
       requires_grad=True)
Parameter containing:
tensor([-1.8295], requires_grad=True)
Weights:  tensor([[-0., 1., 1., -0., 1., -0., -0., 0.]], grad_fn=<RoundBackward0>)
Bias:  tensor([-2.], grad_fn=<RoundBackward0>)
Using cpu device
torch.Size([128, 8])
input:  tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 1., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 1., 1.,  ..., 0., 0., 1.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])
ep  199 , loss =  tensor(5.9660e-06, grad_fn=<MseLossBackward0>)
ep  399 , loss =  tensor(4.3340e-08, grad_fn=<MseLossBackward0>)
Parameter containing:
tensor([[-4.3895e-04,  9.9873e-01,  9.9876e-01, -4.8695e-04,  9.9860e-01,
         -3.5704e-04, -3.5279e-04,  1.2926e-03]], requires_grad=True)
Parameter containing:
tensor([-1.9964], requires_grad=True)
Weights:  tensor([[-0.

In [458]:
trainer.test(16)

loss =  tensor(0., grad_fn=<MseLossBackward0>)
tensor(1.)
tensor([1.], grad_fn=<SelectBackward0>)
tensor(1.)
tensor([1.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(1.)
tensor([1.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(1.)
tensor([1.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)
tensor(0.)
tensor([0.], grad_fn=<SelectBackward0>)


In [261]:
k = 5
two_pow_k = pow(2, k)

count1, count2 = 0, 0
for i in range(2500):
    bf = [[1.0 if random.random()>0.5 else -1.0 for i in range(two_pow_k)] for i in range(400)]
    walsh_spectra = torch.matmul(torch.tensor(bf), Had(two_pow_k))
    ci = correlation_immunity(walsh_spectra)
    count1 += sum(torch.tensor(ci)[:, 1])
    count2 += sum(torch.tensor(ci)[:, 2])
    
print(count1.item(), count2.item())
    

744.0 1.0


In [16]:
# m = nn.Hardtanh(0, 1)
# input = torch.tensor([-0.1, 0, 0.5, 1, 2, 3])
# output = m(input)
# print(output)

# no_ones = []
# for i in range(two_pow_k):
#     no_ones.append(sum([int(dig) for dig in bin(i)[2:]]))
# print(no_ones)
# trainer.test(80)
n = 256
test_samples = torch.tensor([[1.0 if spec[0]==two_pow_k//2 else 0.0] + [1.0 if val==0 else 0.0 for val in spec[1:]] for spec in walsh_spectra[-n:]]).to(device)
preds = torch.round(model(test_samples))
# loss = F.mse_loss(ci[-n:], preds) #m1
loss = F.mse_loss(ci[-n:, 1].view(n, 1), preds)
print("loss = ", loss)
for i in range(n):
    if(not torch.equal(ci[-n+i][1], preds[i][0])):
        print(ci[-n+i][1])
        print(preds[i])

loss =  tensor(0., grad_fn=<MseLossBackward0>)
