<a id='toc_nb'></a> 
  
[Model Definition](#model_def)  
[Hybrid Loss](#hybrid)  
[Training](#train)  
[Testing](#test)  
[Visualization](#vis)  



In [None]:
import torch, math, json, jsonpickle
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

tracking = np.zeros(shape=5)

<a id='save_load'></a> 

###### [Back to TOC](#toc_nb)  [Next ](#data_load) 

### Model Saving and loading 

In [None]:
# Needed for converting state_dict to JSON format
def saveModel(model):
    json_str = jsonpickle.encode(model.state_dict())
    # Save best model for later use
    out_file = open(".json", "w")
    json.dump(json_str, out_file, indent = 6)
    out_file.close()

# Load saved model
def loadModel():
    in_file = open(".json", "r")
    input = json.load(in_file)
    thawed = jsonpickle.decode(input)
    in_file.close()
    return thawed

In [10]:
#a = [0 for i in range(5)]
import numpy as np
a = np.zeros(shape=5)
print(a)
a[4] = 1
print(a)
for i in range(1,5):
    a[i-1] = a[i]
a[4] = 0
print(a)
for i in range(1,5):
    a[i-1] = a[i]
a[4] = 0
print(a)

[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 1.]
[0. 0. 0. 1. 0.]
[0. 0. 1. 0. 0.]


In [11]:
print(f"SD: {a.std()}, Var: {a.var()}")

SD: 0.4000000000000001, Var: 0.16000000000000006


<a id='data_load'></a> 

###### [Back to TOC](#toc_nb) [Previous ](#save_load) [Next ](#model_def) 
Data Loading

In [None]:
def load_data(path):
    ds = pd.read_csv(path)
    size = ds.shape[0]%64
    features = torch.tensor(ds.iloc[:, 1:].values, 
                            dtype=torch.float32)
    #features = torch.tensor(ds.iloc[:, 1:].values, 
    #                        dtype=torch.float32).reshape(ds.shape[0],28,28)
    labels = torch.tensor(ds.iloc[:, 0].values, dtype=torch.float32)
    print(type(features), features.shape,labels.shape)

    return features, labels
       
#training_data = load_data("~/path/to/datasets")
#test_data = load_data("~/path/to/datasets")
train_feat, train_lbl = load_data("~/path/to/datasets")
train_feat = train_feat.to('cuda')
train_lbl = train_lbl.to('cuda')
test_feat, test_lbl = load_data("~/path/to/datasets")
test_feat = test_feat.to('cuda')
test_lbl = test_lbl.to('cuda')

batch_size = 32

print(test_lbl.device)
print(f"Cell 1 Done")

<class 'torch.Tensor'> torch.Size([1000, 784]) torch.Size([1000])
<class 'torch.Tensor'> torch.Size([100, 784]) torch.Size([100])
cuda:0
Cell 1 Done


<a id='model_def'></a>   

###### [Back to TOC](#toc_nb)  [Previous ](#data_load) [Next ](#evol)  
Model Definition

In [None]:
# Where is the dropout? A separate forward func for dis and gen, 
# later, one func for both last layer problem and hybrid loss 
class MyNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, layers, activations):
        super(MyNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size) #input layer
        self.fc2 = nn.Linear(hidden_size, output_size) #hidden layer
        self.Dis = nn.Linear(hidden_size, output_size)
        self.Gen = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
        # This allows making a variable size newtwork

        self.layers = []
        for layer in layers:
            if type(layer) == nn.Linear:
                self.layer = layer.to('cuda')
                self.layers.append(self.layer)
        self.activations = activations 
        self.layeractivationpairs = zip(self.layers,self.activations)
    
    def forward(self, x):
        sm = nn.Softmax(dim=1)
        #print(x.shape)
        x = self.fc1(x) #input layer
        #print(x.shape)
        x = self.relu(x)
        #print(x.shape)
        x = self.fc2(x)
        #print(x.shape)
        #print("forward is being called")
        dX = self.Dis(x)
        gX = self.Gen(x)
        #x = sm(x)
        return dX, gX
        #return x 
    
    def altforward(self,x):
        for layer,activation in self.layers,self.activations:
            print(layer,activation)
            print(layer.state_dict,activation.state_dict)
            x = layer(x)
            x = activation(x)
        return x
    
    def copy(self, model):
        self.load_state_dict(model.state_dict())
    
    def toJSON(self):
        return json.dumps(self, default=lambda o: o.__dict__, 
            sort_keys=True, indent=4)

# Create an instance of the neural network
input_size = 784
hidden_size = 20
output_size = 10
layers=[nn.Linear(784, 20),nn.Linear(20, 10)]
activations=[nn.ReLU(),nn.ReLU()]

dg_model=[MyNN(input_size, hidden_size, output_size,layers, activations).to('cuda') for i in range(CENSUS)]


In [None]:
def loc(tensor):
    vx,idxx = torch.max(tensor,1,keepdim=True)
    vy,idxy = torch.max(tensor,0,keepdim=True)
    #print(f"\nvx: {vx}, vy: {vy}\n")
    x=int(torch.argmax(idxx))
    y=int(torch.argmax(idxy))
   # print(f"idxx shape: {idxx.shape}, idxy shape: {idxy.shape}")
    #x=int(idxx[x,0])
    #y=int(idxy[0,y])
    ix=int(idxx[x,0])
    iy=int(idxy[0,y])
    #print(f"idxx: {idxx}, idxy: {idxy}, \nx: {x}, y: {y}, \n{tensor}\n")
    #print(f"value: {tensor[iy,ix]} at y: {iy}, x: {ix}")
    #print(float(tensor[x][y]) )
    return x,y



<a id='hybrid'></a>   

###### [Back to TOC](#toc_nb)  [Previous ](#evol) [Next ](#tt_def) 
### Hybrid Loss  
This is where we give the details about the hybrid loss function. Benefits and limitations. Implementation details. Theory behind it.  

#### Regularization  
Due to the differences in the nature of the tasks, the weights may need regularization to not fall off the edge of reasonablenesses  
#### Generative  
To model the underlying data distribution. That is to say, the probablity of features, given the label. 
#### Discriminative  
To model the likelihood of a sample coming from a distribution, given the features.  

In [None]:

# Alternative approach
class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.5, beta=0.5):
        super(CombinedLoss, self).__init__()
        self.kl_loss = nn.KLDivLoss()
        self.ce_loss = nn.CrossEntropyLoss()
        self.alpha = alpha  # Weight for KL
        self.beta = beta    # Weight for Cross-Entropy

    def forward(self, outputs, targets, aux_targets):
        #Gloss = self.KLDivLoss()
        # Compute KL Loss
        kl = self.kl_loss(outputs, targets)
        # Compute Cross-Entropy Loss
        ce = self.ce_loss(outputs, aux_targets)
        # Combine with weights
        total_loss = self.alpha * kl + self.beta * ce
        return total_loss


In [None]:
# Yet another alternative
class GMMLogisticLoss(nn.Module):
    def __init__(self, n_components=2, n_features=784):
        super(GMMLogisticLoss, self).__init__()
        self.n_components = n_components
        self.n_features = n_features
        
        # GMM parameters
        self.means = nn.Parameter(torch.randn(n_components, n_features))
        self.covs = nn.Parameter(torch.eye(n_features).repeat(n_components, 1, 1))
        self.weights = nn.Parameter(torch.ones(n_components) / n_components)
        
        # Logistic loss
        self.log_loss = nn.CrossEntropyLoss()
        
    def gmm_log_likelihood(self, x):
        # Calculate GMM log likelihood for each component
        log_probs = []
        for k in range(self.n_components):
            diff = x - self.means[k]
            log_prob = -0.5 * (
                torch.log(torch.det(self.covs[k])) +
                torch.sum(torch.matmul(diff, torch.inverse(self.covs[k])) * diff, dim=1) +
                self.n_features * np.log(2 * np.pi)
            )
            log_probs.append(log_prob + torch.log(self.weights[k]))
        
        return torch.logsumexp(torch.stack(log_probs), dim=0)

    def forward(self, pred_dis, pred_gen, targets, features):
        # Discriminative (Logistic) Loss
        lr_loss = self.log_loss(pred_dis, targets)
        
        # Generative (GMM) Loss
        gmm_loss = -torch.mean(self.gmm_log_likelihood(features))
        
        return lr_loss, gmm_loss

class HybridLoss(nn.Module):
    def __init__(self, alpha=0.5, n_components=2, n_features=784):
        super(HybridLoss, self).__init__()
        self.alpha = alpha  # Weight between GMM and LR losses
        self.gmm_lr_loss = GMMLogisticLoss(n_components=n_components, n_features=n_features)
        
    def forward(self, pred_dis, pred_gen, targets, features):
        lr_loss, gmm_loss = self.gmm_lr_loss(pred_dis, pred_gen, targets, features)
        total_loss = self.alpha * lr_loss + (1 - self.alpha) * gmm_loss
        return total_loss

In [None]:


def Train(model,ins, outs, iter):
    bestloss=1000.0
    best=0
   
    criterion = HybridLoss(alpha=0.5, n_components=2, n_features=784)
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

    avgloss = 0
    for i in range(ins.shape[0]):
        optimizer.zero_grad()
        #predicted = model(ins[i].reshape(1,784))
        dis_out, gen_out = model(ins[i].reshape(1, 784))
        #loss = max(outs)*Dloss(predicted, outs[i]) + (1 - max(outs))*Gloss(predicted, outs[i])
        #loss = criterion(predicted, outs[i])
        loss = criterion(dis_out, gen_out, outs[i], ins[i].reshape(1, 784))
        loss.backward()
        optimizer.step()
        #print(avgloss)
        avgloss+=loss.item()
    if avgloss < bestloss:
        bestloss = avgloss
        best = m
        
# Test the trained model
#test_input = torch.randn(1, input_size).to('cuda')
#with torch.no_grad():
#    test_output = model(test_input)
#print("Test output:", test_output)

def model_test(model,m_ins, m_outs):
    print(m_ins.device)
    score = 0

    for i in range(m_ins.shape[0]):
        p = model(m_ins[i].reshape(1,784))
        gt = int(m_outs[i])
        y = torch.zeros(1,10).to('cuda')
        y[0,gt] = 1
        tmp = y - p
        #print(tmp, gt, p.shape, y.shape)
        if torch.argmax(p)==torch.argmax(y):
            score+=1
    score = 100*score/m_ins.shape[0]
    print(f"Best result: {score}%")
    return score


<a id='train'></a>   

###### [Back to TOC](#toc_nb)  [Previous ](#tt_def) [Next ](#test) 
Training  
Maybe handle switching between losses?

In [None]:
top = model_test(dg_model,test_feat,test_lbl)

In [None]:
_ = model_test(dg_model,test_feat,test_lbl)

EPOCHS = 8
for i in range(EPOCHS):
    Train(dg_model,train_feat,train_lbl)
    if i%20==0:
        print(f"Training {100*i/(EPOCHS):.1f}% completed")
    
top = model_test(dg_model,test_feat,test_lbl)


In [None]:
saveModel(dg_model)

<a id='vis'></a>   

###### [Back to TOC](#toc_nb)  [Previous ](#test) [Next ](#vis) 
Visualization

In [None]:
w1 = tmp_mdl.fc1.weight.cpu().detach()
w2 = tmp_mdl.fc2.weight.cpu().detach()
b1 = tmp_mdl.fc1.bias.cpu().detach()
b2 = tmp_mdl.fc2.bias.cpu().detach()
a = tmp_mdl.fc1.weight.cpu().detach()
b = tmp_mdl.fc2.weight.cpu().detach()

figure, WBplots = plt.subplots(2, 2)
  
# For Sine Function
WBplots[0, 0].imshow(w1, cmap='hot', interpolation='nearest')
WBplots[0, 0].set_title("1st layer weight")
  
# For Cosine Function
WBplots[0, 1].imshow(w2, cmap='hot', interpolation='nearest')
WBplots[0, 1].set_title("2nd layer weight")
  
# For Tangent Function
WBplots[1, 0].imshow(b1, cmap='hot', interpolation='nearest')
WBplots[1, 0].set_title("1st layer bias")
  
# For Tanh Function
WBplots[1, 1].imshow(b2, cmap='hot', interpolation='nearest')
WBplots[1, 1].set_title("2nd layer bias")

plt.show()

In [None]:
# Visualize the weights
a = tmp_mdl.fc1.weight.cpu().detach()
b = tmp_mdl.fc2.weight.cpu().detach()
#a = model_list[2].fc1.weight.cpu().detach()
plt.imshow(a, cmap='hot', interpolation='nearest')
plt.show()

In [None]:
c = (torch.rand(4,4) - 0.5)/5
print(c.max(),c.min())
#print(a.max(),a.min())

In [None]:
import json

print(json.dumps(dg_model.toJSON()))
print(dg_model, dg_model.fc1.parameters)

In [None]:
dg_model.__repr__()

In [None]:
dg_model.state_dict()

In [None]:
print(dg_model(ins[1]),"\n",outs[1])

In [None]:
test_input = torch.Tensor.uniform_(0.,1.)
tmax=test_input[0][torch.argmax(test_input[0])]
print(test_input, test_input.shape,tmax)
test_input = test_input/tmax
print(test_input)
test_input = test_input.to('cuda')