In [18]:
import numpy as np
import math
import matplotlib.pyplot as plt
from scipy.stats import poisson
import numpy as np
import torch
import torch.nn as nn
import torch.distributions as dist
from scipy.stats import ncx2
from time import time

In [19]:
class Utils:
    # Convert string parameter into torch activation function
    @staticmethod
    def get_activation_function(activation_function_str, negative_slope):
        if activation_function_str == "Lrelu":
            activation = torch.nn.LeakyReLU(negative_slope=negative_slope)
        elif activation_function_str == "relu":
            activation = torch.nn.ReLU()
        elif activation_function_str == "sigmoid":
            activation = torch.nn.Sigmoid()
        else:
            raise ValueError("Activation function not recognized : {}".format(activation_function_str))
        return activation
    
    # Convert string parameter into torch final activation function
    @staticmethod
    def get_final_activation_function(final_activation):
        if final_activation == "Sigmoid":
            final_activation = lambda x: torch.sigmoid(x)
        elif final_activation == "Clamping":
            final_activation = lambda x: torch.clamp(x, min=0., max=1.)
        elif final_activation == "Linear":
            final_activation = lambda x: x
        else:
            raise ValueError("final_activation must be Sigmoid, Clamping or Linear")
        return final_activation
    

In [20]:
class Conditional_Chi_Generator(nn.Module):
    def __init__(self,
                 noise_size: int=1,
                 min_d: float=1.,
                 max_d: float=1.,
                 min_lda: float=1.,
                 max_lda: float=1.,
                 hidden_dim: int=100,
                 activation_function: str="Lrelu",
                 final_activation: str="Sigmoid",
                 label_size: int=10,
                 negative_slope: float=0.1,
                 nb_hidden_layers_G: int=1,
                 eps: float=0.,
                 **kwargs):
        super(Conditional_Chi_Generator, self).__init__()
        self.min_d = min_d
        self.max_d = max_d
        self.min_lda = min_lda
        self.max_lda = max_lda
        self.input_dim = noise_size
        self.nb_hidden_layers = nb_hidden_layers_G
        self.hidden_dim = hidden_dim
        self.activation = Utils.get_activation_function(activation_function, negative_slope)
        self.input_noise = nn.Linear(self.input_dim, self.hidden_dim)
        self.label_size = label_size
        self.input_label = nn.Linear(2, self.label_size)
        self.fcs = nn.ModuleList()
        self.eps = eps
        for i in range(self.nb_hidden_layers):
            if i == 0:
                self.fcs.append(nn.Linear(self.hidden_dim + self.label_size, self.hidden_dim))
            else:
                self.fcs.append(nn.Linear(self.hidden_dim, self.hidden_dim))
        self.output = nn.Linear(self.hidden_dim, 1)
        self.final_activation = Utils.get_final_activation_function(final_activation)
    
    def forward(self, x, d, lda):
        # Normalize d with min_d and max_d into [0, 1]
        if self.max_d == self.min_d:
            d = 1
        else:
            d = (d - self.min_d) / (self.max_d - self.min_d)
        # Normalize lambda with min_lda and max_lda into [0, 1]
        if self.max_lda == self.min_lda:
            lda = 1
        else:
            lda = (lda - self.min_lda) / (self.max_lda - self.min_lda)        
        return self.feed_forward(x, d, lda)
    
    def feed_forward(self, x, d, lda):
        x = self.activation(self.input_noise(x))
        params = self.activation(self.input_label(torch.cat((d, lda), 1)))
        x = torch.cat((x, params), 1)
        for i in range(self.nb_hidden_layers):
            x = self.activation(self.fcs[i](x))
        x = self.final_activation(self.output(x))
        return x + self.eps

In [21]:
params = {}

# tested_d are d values on which we will perform tests (KS divergence)
params["tested_d"] = [0.3, 0.5, 1, 10, 50, 100]
params["tested_lda"] = [0, 50, 100, 500, 1000]
# displayed_d are d values on which we will display histograms. displayed_d has to be subset of tested_d to be displayed
params["min_d"] = 0.3
params["max_d"] = 10.
params["min_lda"] = 0.
params["max_lda"] = 100
# alpha: chi.ppf(alpha) is network's max return value.
# Because network return values are bounded in [0,1], 1 has to have a draw equivalent which is chi.ppf(alpha)
params["alpha"] = 0.999

# Networks parameters :
# Number of uniform draws for G
params["noise_size"] = 5
# Hidden layer size for d inputs when parametrized (leaving it at 10 is satisfactory)
params["label_size"] = 50
# "Lrelu", "relu" or "sigmoid" (default Lrelu with negative slope 0.1)
params["activation_function"] = "Lrelu"
params["negative_slope"] = 0.1
# Final activation for Generator: Sigmoid, Linear or Clamping
params["final_activation"] = "Sigmoid"
# hidden layer size
params["hidden_dim"] = 100
# number of hidden layers for both D and G after input layer (default 1) : total number of hidden layers is nb_hidden_layers + 1
params["nb_hidden_layers_G"] = 2
params["nb_hidden_layers_D"] = 2
params["device"] = torch.device('cuda:0')
params["M"] = ncx2.ppf(params["alpha"], params["max_d"], params["max_lda"])
params["m"] = ncx2.ppf(1-params["alpha"], params["min_d"], params["min_lda"])

# Load_dir_model is the folder containing the model to load
params["load_dir_model"] = "checkpoints_CHI2"


In [22]:
Chi2_checkpoint = torch.load(params["load_dir_model"], map_location='cpu')
print(Chi2_checkpoint['epoch'])
Chi2_generator = Conditional_Chi_Generator(**params).to(params['device'])
Chi2_generator.load_state_dict(Chi2_checkpoint['generator_state_dict'])


989


AssertionError: Torch not compiled with CUDA enabled

In [None]:
def CHI_generator(G, noise_size, d, _lda, size,  max_d, max_lda, M, m, alpha, device, **kwargs) :
    if d > max_d or lda > max_lda:
        _d = max_d
        _lda = max_lda
    else:
        _d = d
        _lda = lda
    partial_uniforms = torch.rand(size, noise_size, device = device)
    d_input = torch.ones(size, 1, device = device)*_d
    lda_input = torch.ones(size, 1, device = device)*_lda
    simulated_chi_draws = G(partial_uniforms, d_input, lda_input)*(M - m) + m
    if d > max_d or lda > max_lda:
        mean_1 = _d + _lda
        var_1 = 2 * (_d + 2 * _lda)
        mean_2 = d + lda
        var_2 = 2 * (d + 2 * lda)
        simulated_chi_draws = (simulated_chi_draws - mean_1) / np.sqrt(var_1) * np.sqrt(var_2) + mean_2 

    return simulated_chi_draws.reshape(1,-1)[0]
    
def Torch_CHI_generator(d, _lda, size, device, **kwargs) :
    if d <= 1 :
        samples_chi2 = torch.tensor(ncx2.rvs(d, _lda, size = size), device = device)
    else :
        chi2_dist_GPU = dist.Chi2(torch.tensor(d-1, device = device))    
        samples_chi2 = chi2_dist_GPU.sample(torch.tensor([size], device = device))
        Gaussian = (torch.randn(size, device= device)+torch.sqrt(torch.tensor(_lda, device = device)))**2
        samples_chi2 += Gaussian
    return  samples_chi2


Plot histograms on tested_d x tested_lda

In [23]:
size = 1000000

for d in params['tested_d'] :
    for lda in params['tested_lda'] :
        _min = math.floor(ncx2.ppf(1 - 0.999, d, lda))
        _max = math.ceil(ncx2.ppf(0.999, d, lda))
        bins = np.linspace(_min, _max, 200)
        t_ = time()
        true_Chi_CPU = ncx2.rvs(d, lda, size = size)
        print("stats time =", time()-t_)
        t_ = time()
        true_Chi_torch = Torch_CHI_generator(d = d, _lda = lda, size = size, device = torch.device('cuda:0'))
        print("torch time =", time()- t_)
        t_ = time()
        GAN_Chi_torch = CHI_generator(G = Chi2_generator, d = d, _lda = lda, size = size, **params)
        print("GAN time =", time()-t_)
        true_Chi = true_Chi_torch.cpu().numpy()
        GAN_Chi = GAN_Chi_torch.detach().cpu().numpy()
        plt.hist(true_Chi, bins=200, alpha=0.75, color = 'y', density=True, label="torch Chi distribution")
        plt.hist(GAN_Chi, bins=200, alpha=0.75, color = 'red', density=True, label="GAN_GPU distribution")
        plt.hist(true_Chi_CPU, bins=200, alpha=0.75, density=True, color = 'blue', label="stats chi distribution")
        plt.title("d = {}, lambda = {}".format(str(d), str(lda)))
        plt.show()

stats time = 0.2420799732208252


AssertionError: Torch not compiled with CUDA enabled

In [25]:
size = 1000000

for d in params['tested_d'] :
    for lda in params['tested_lda'] :
        print("d, lda =", d, lda)
        t_ = time()
        true_Chi_torch = Torch_CHI_generator(d, lda, size, device = torch.device('cpu'))
        print("torch cpu time =", time()- t_)
        t_ = time()
        true_Chi_torch = Torch_CHI_generator(d, lda, size, device = torch.device('cuda:0'))
        print("torch gpu time =", time()- t_)
        t_ = time()
        GAN_Chi_torch = CHI_generator(Chi2_generator, d, lda, size, **params)
        print("GAN time =", time()-t_, "\n")
    

d, lda = 0.3 0
torch cpu time = 0.17422986030578613


AssertionError: Torch not compiled with CUDA enabled

In [24]:
chi2_dist_GPU = dist.Chi2(torch.tensor(df-1, device = torch.device('cuda:0')))
print(chi2_dist_GPU)

t_ = time()
samples_chi2 = chi2_dist_GPU.sample(torch.tensor([1000000], device = torch.device('cuda:0')))
print(time()-t_)
t_ = time()
samples_chi2 = chi2_dist.sample([1000000])
print(time()-t_)

NameError: name 'df' is not defined