In [None]:
# Importing stuff
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

import itertools
from random import randint
import numpy
from numpy import linalg as LA

In [None]:
# Utility function for dealing with custom data
def stringtoVector(X):
    return [float(x) for x in X]

# Classification task generation
def rule(x):
    calc = ((int(x[0]) & int(x[1])) | (int(x[2]) & int(x[3])))
    if randint(0,100) <= 6:
        if calc == 0:
            return 1
        return 0
    return calc

In [None]:
# Data generation
distinct = ["".join(seq) for seq in itertools.product("01", repeat=4)]
X_train = []
Y_train = []

X_test = []
Y_test = []

for i in range(0,4096):
    X_train.append(stringtoVector(distinct[randint(0, 15)]))
    Y_train.append(rule(X_train[i]))
    
for i in range(0,4096):
    X_test.append(stringtoVector(distinct[randint(0, 15)]))
    Y_test.append(rule(X_test[i]))

In [None]:
# DNN class
class dataset(Dataset):
    def __init__(self, X, Y, transform=None):
        self.X = X
        self.Y = Y
        self.transform = transform

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        sample, target = X[idx], Y[idx]

        if self.transform:
            sample = self.transform(sample)
            target = self.transform(target)
        return sample, target


X = torch.from_numpy(numpy.array(X))
Y = torch.from_numpy(numpy.array(Y))

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(4 ,4)
        self.fc2 = nn.Linear(4 ,4)
        self.fc3 = nn.Linear(4 ,4)
        self.fc4 = nn.Linear(4 ,4)
        self.fc5 = nn.Linear(4, 2)

    def forward(self, x):
        x = x.float()
        x = F.tanh(self.fc1(x))
        x = F.tanh(self.fc2(x))
        x = F.tanh(self.fc3(x))
        x = F.tanh(self.fc4(x))
        x = F.tanh(self.fc5(x))
        return x
    def penultimateLayer(self, x):
        x = x.float()
        x = F.tanh(self.fc1(x))
        x = F.tanh(self.fc2(x))
        x = F.tanh(self.fc3(x))
        x = F.tanh(self.fc4(x))
        return x

In [None]:
# Actual training stuff
net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.0003)
data = dataset(X,Y)
trainloader = DataLoader(data, 16)

In [None]:
# Training
for epoch in range(10000):
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs
        inputs, labels = data
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        
        if epoch % 100 == 0:
            print('[%d, %5d] loss: %.6f' %(epoch + 1, i + 1, running_loss))
        running_loss = 0.0

In [None]:
# Evaulating NN
train_correct = 0

train_outputs = net(torch.FloatTensor(X_train))
_,predicted_train = torch.max(outputs.data, 1)
train_correct += (predicted_train == torch.LongTensor(Y_train)).sum().item()

print(train_correct)

test_correct = 0

test_outputs = net(torch.FloatTensor(X_test))
_,predicted_test = torch.max(outputs.data, 1)
test_correct += (predicted_test == torch.LongTensor(Y_test)).sum().item()

print(test_correct)

In [None]:
### EPSILON PARITION
Z = net.penultimateLayer(X).detach().numpy()
epsilon = 0.0001

In [None]:
def ePartition(epsilon, Z):
    partitions = []
    visited = [0]*len(Z)
    for i in range(len(Z)):
        part = []
        if visited[i] == 0:
            part.append(i)
            for j in range(i+1,len(Z)):
                if visited[j] == 0:
                    if ((Z[i]-Z[j])**2).sum() < epsilon:
                        part.append(j)
                        visited[j] = 1
            partitions.append(part)
    return partitions

In [None]:
epsPart = ePartition(epsilon, Z)
print len(epsPart)
for part in epsPart:
    print len(part)

In [None]:
# Mutual information calculation I(X;T_E)
XX = X.detach().numpy()
mm = [  [0.0, 0.0, 0.0, 0.0],
        [0.0, 0.0, 0.0, 1.0],
        [0.0, 0.0, 1.0, 0.0],
        [0.0, 0.0, 1.0, 1.0],
        [0.0, 1.0, 0.0, 0.0],
        [0.0, 1.0, 0.0, 1.0],
        [0.0, 1.0, 1.0, 0.0],
        [0.0, 1.0, 1.0, 1.0],
        [1.0, 0.0, 0.0, 0.0],
        [1.0, 0.0, 0.0, 1.0],
        [1.0, 0.0, 1.0, 0.0],
        [1.0, 0.0, 1.0, 1.0],
        [1.0, 1.0, 0.0, 0.0],
        [1.0, 1.0, 0.0, 1.0],
        [1.0, 1.0, 1.0, 0.0],
        [1.0, 1.0, 1.0, 1.0]]

def mapping(index):
    for i in range(16):
        if (XX[index] == mm[i]).all():
            return i

In [None]:
entropies = []
import math
for part in partitions:
    counts = [0]*16
    y_count = [0]*2
    for x in part:
        counts[mapping(x)] += 1
        y_count[fun2(XX[x])] += 1
    entropy = 0
    print counts
    print y_count
    for i in range(16):
        if counts[i] != 0:
            pi = float(counts[i])/len(part)
            entropy += (pi*math.log(pi,2))
    print entropy
    entropies.append(entropy) 