<a href="https://colab.research.google.com/github/lwx-3000/Deep_Neuron_Network/blob/main/Repro_Sampling_Method.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader, random_split
import scipy.stats as stats
import numpy as np

In [None]:
#Section 1, Data generation, define NN model
class SimpleNN(nn.Module):
    def __init__(self, input_size, hidden_layers, output_size, std_deviation):
        super(SimpleNN, self).__init__()
        self.hidden_layers = nn.ModuleList()
        previous_size = input_size
        self.std_dev = abs(std_deviation)
        for layer_size in hidden_layers:
            self.hidden_layers.append(nn.Linear(previous_size, layer_size))
            previous_size = layer_size
        self.output_layer = nn.Linear(previous_size, output_size)

    def forward(self, x, add_noise = True):
        for layer in self.hidden_layers:
            x = torch.relu(layer(x))
        x = self.output_layer(x)
        if add_noise: # add noise to each output. fix dimensionality problem
            noise = torch.randn_like(x) * self.std_dev #save noise?
            x = x + noise
        return x



input_size, hidden_layers, output_size = 3, [3, 3], 1  # L = 2 K = 3 [# make L, K as input parameter]
std_dev = 1.3 #true standard deviation of noise

model = SimpleNN(input_size, hidden_layers, output_size, std_dev)

# Manually set the weights and biases for each neuron
#layer 1
model.hidden_layers[0].weight.data.fill_(0.12)
model.hidden_layers[0].bias.data.fill_(0.2)
#layer 2
model.hidden_layers[1].weight.data.fill_(0.11)
model.hidden_layers[1].bias.data.fill_(0.12)
#output layer
model.output_layer.weight.data.fill_(0.13)
model.output_layer.bias.data.fill_(0.001)

#Generate 'X' data
n_samples = 10000 #temp
X = torch.randn(n_samples, input_size)  #normal distributed

#Pass 'X' through the model to generate 'y_true' as outputs
with torch.no_grad():
    y_true = model(X)

#sanity check point 1
print(X, y_true)
print(f"X dimsension: {X.shape}, y_true.shape: {y_true.shape}")


# split data into Da and Db
# randomlize the order ? Yes
n_b = n_samples // 2
n_a = n_samples - n_b

dataset = TensorDataset(X, y_true)
Da, Db = random_split(dataset, [n_a, n_b])

#y-output dimension?  The dimension problem of y is fixed properly displayed as 1.
train_loader = DataLoader(Db, batch_size=100, shuffle=True)
val_loader = DataLoader(Da, batch_size=100, shuffle=False)

tensor([[-0.2692,  0.9548,  1.7300],
        [ 0.4241, -0.1345, -2.5557],
        [ 0.2105,  0.3175,  0.2257],
        ...,
        [ 0.7230, -1.2820, -0.6629],
        [-0.2402, -0.3636,  0.9041],
        [ 1.8342, -1.0857,  1.0614]]) tensor([[-0.9711],
        [-0.1334],
        [ 0.5546],
        ...,
        [-2.2126],
        [-1.1680],
        [-0.0612]])
X dimsension: torch.Size([10000, 3]), y_true.shape: torch.Size([10000, 1])


In [None]:
class SimpleNN(nn.Module):
    def __init__(self, input_size, hidden_layers, output_size):
        super(SimpleNN, self).__init__()
        self.hidden_layers = nn.ModuleList()
        previous_size = input_size
        for layer_size in hidden_layers:
            self.hidden_layers.append(nn.Linear(previous_size, layer_size))
            previous_size = layer_size
        self.output_layer = nn.Linear(previous_size, output_size)

    def forward(self, x):
        for layer in self.hidden_layers:
            x = torch.relu(layer(x)) #?use a different activation?
        return self.output_layer(x)

In [None]:
# Training, Grid Search with residual and standard deviation packed
class SearchNN(SimpleNN):
    def __init__(self, input_size, hidden_layers, output_size, lr=0.001):
        super(SearchNN, self).__init__(input_size, hidden_layers, output_size)
        self.loss_function = nn.MSELoss() #temperary: MSE
        self.optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        self.y_pred = []

    def train_single_epoch(self, data_loader):
        self.train()
        for x_batch, y_batch in data_loader:
            self.optimizer.zero_grad()
            y_pred = self(x_batch)
            loss = self.loss_function(y_pred, y_batch)
            loss.backward()
            self.optimizer.step()
        return loss.item()

    def validate(self, data_loader):
      self.eval()  # Set the model to evaluation mode
      val_losses = []
      all_residuals = []  # Use a list to collect all residuals

      with torch.no_grad():  # No gradient needed for validation
          for x_val, y_val in data_loader:
              y_val_pred = self(x_val) #this need to be output for data repro
              val_loss = self.loss_function(y_val_pred, y_val)
              val_losses.append(val_loss.item())

              # Calculate residuals and predicted y and append them to the list
              residuals = y_val - y_val_pred
              all_residuals.append(residuals)
              self.y_pred.append(y_val_pred) #need to call .item()? is y_pred a tensor? No

      # Concatenate all residuals tensors to form a single tensor
      all_residuals_tensor = torch.cat(all_residuals, dim=0)

      # Concatenate all predicted_y value
      y_pred_tensor = torch.cat(self.y_pred, dim=0)

      # Calculate the standard deviation of residuals
      # Standard deviation of validate errors.2
      standard_deviation = torch.sqrt(torch.mean(all_residuals_tensor ** 2))

      # The mean validation loss
      mean_val_loss = sum(val_losses) / len(val_losses)

      # Save the residuals and standard deviation as attributes for later use
      self.residuals = all_residuals_tensor
      self.standard_deviation = standard_deviation

      # add parameter save feature

      return mean_val_loss


# Repro with fixed parameter
class Repro():
  def train_and_evaluate_nn(self, grid_search_param, train_loader, val_loader, num_epochs=10, gradient_fix = False): #randomize epochs?
      results = []
      L, K = grid_search_param
      # Model with L layers and K units
      model = SearchNN(input_size, [K] * L, output_size)

      if not gradient_fix: #functionality of validate, keep trained parameters
        for epoch in range(num_epochs):
            train_loss = model.train_single_epoch(train_loader) # change to validate
            #accuracy =
      val_loss = model.validate(val_loader)
      residual = model.residuals
      standard_dev = model.standard_deviation
      size = model.residuals.numel()
      self.y_pred = model.y_pred #y predicted
      self.est_std_dev = torch.cat(self.y_pred,dim=0).std().item()

      # Define the nuclear mapping function given (L,K) on Da:
      # l(L,K) = -n * log(sqrt(2*pi)*std_dev) - sum(residual**2) / 2*std_dev**2
      likelihood = -size * np.log(np.sqrt(2*np.pi)*standard_dev) - np.sum(torch.pow(residual,2).tolist())/ 2*std_dev**2


      # results
      result = {
          'L': L,
          'K': K,
          'val_loss': val_loss,
          'residual': residual,
          'standard_dev': standard_dev,
          'log_likelihood': likelihood

      }
      if not gradient_fix:
        result['train_loss'] = train_loss
      return result

# prepare Repro_data
  def repro_data(self, data, repro_quantity, est_std_dev, seed=39):
    # Set seed for reproducibility
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    self.noise_std = est_std_dev
    self.original_data = data
    self.noise = {}
    self.newdata_cache = {}

    if isinstance(data, torch.utils.data.Subset):
        # Accessing the underlying dataset and indices
        original_dataset = data.dataset
        indices = data.indices
        original_data_tensors = [tensor[indices] for tensor in original_dataset.tensors]
    elif isinstance(data, torch.utils.data.TensorDataset):
        # Directly accessing the tensors
        original_data_tensors = data.tensors
    else:
        raise TypeError("Unsupported dataset type")


    for repro_index in range(repro_quantity):
        # Generating noise for each tensor in the dataset
        noise_tensors = tuple(self.noise_std * torch.randn_like(tensor) for tensor in original_data_tensors)
        self.noise[repro_index] = noise_tensors

        # Adding noise to the original data tensors
        synthetic_data_tensors = tuple(tensor + noise for tensor, noise in zip(original_data_tensors, noise_tensors))
        self.newdata_cache[repro_index] = TensorDataset(*synthetic_data_tensors)

    return self.newdata_cache

# do not train
  def monte_carlo(self,data,n_repro,l_low=1,l_up=4,k_low=3,k_up=6,epoch=10,verbose=True):
    self.grid_search_params = [(l,k) for l in range(l_low, l_up) for k in range(k_low,k_up)]
    self.n_epoch = epoch
    self.n_repro = n_repro
    self.repro_results = {}

    for param in self.grid_search_params:
      self.train_and_evaluate_nn(param, train_loader, val_loader, self.n_repro, gradient_fix = False) # obtain trained model fDb, y_pred and est_std_dev
      data_cache = self.repro_data(data,n_repro,self.est_std_dev) #generate repro data
      for repro_index, synthetic_data in data_cache.items():
        repro_train_loader = DataLoader(synthetic_data, batch_size=100, shuffle=True) # do not train, use validate
        # Do not train, use previous trained model
        self.repro_results[(param,repro_index)] = (repro_index, self.train_and_evaluate_nn(param, repro_train_loader, val_loader, self.n_epoch, gradient_fix = True))

    for (l, k), repro_result_tuple in self.repro_results.items():
        repro_index, result = repro_result_tuple
        #train_loss = result['train_loss']
        val_loss = result['val_loss']
        residual = result['residual']
        standard_dev = result['standard_dev']
        likelihood = result['log_likelihood']
        if verbose:
            print(f"Architecture:{l}, Repro Index:{repro_index},  Val Loss: {val_loss}, Std Dev: {standard_dev}, Log Likelihood: {likelihood}")
    return self.repro_results

  def borel_confidence_interval(self, data, alpha = 0.95): # is borel set a confidence interval or a set?
    self.data = np.array(data)
    self.mean = np.mean(data)
    self.n = len(data)
    self.std_dev = np.std(data, ddof=1)
    self.z_score = stats.norm.ppf(1 - (1-alpha)/2) # for over 30, distribution is approx Normal
    self.margin_of_error = self.z_score * (std_dev / np.sqrt(self.n))
    self.confidence_interval = (self.mean - self.margin_of_error, self.mean + self.margin_of_error)
    return self.confidence_interval

# val_loader require initialization


In [None]:
n_repro=5
myrepro = Repro() # contains training
# generate Monte-calo Data Da* and obtain Borel data
repro_result = myrepro.monte_carlo(Db, n_repro=5, l_low=1,l_up=4,k_low=3,k_up=6,verbose=False)
print(myrepro.est_std_dev)

Architecture:(1, 3), Repro Index:0,  Val Loss: 1.9530778336524963, Std Dev: 1.397525668144226, Log Likelihood: -14519.962890625
Architecture:(1, 3), Repro Index:1,  Val Loss: 1.779942398071289, Std Dev: 1.334144949913025, Log Likelihood: -13556.40234375
Architecture:(1, 3), Repro Index:2,  Val Loss: 1.723296914100647, Std Dev: 1.3127440214157104, Log Likelihood: -13236.220703125
Architecture:(1, 3), Repro Index:3,  Val Loss: 1.9893915915489198, Std Dev: 1.4104578495025635, Log Likelihood: -14719.4443359375
Architecture:(1, 3), Repro Index:4,  Val Loss: 1.997310013771057, Std Dev: 1.4132622480392456, Log Likelihood: -14762.8310546875
Architecture:(1, 4), Repro Index:0,  Val Loss: 1.804487748146057, Std Dev: 1.3433122634887695, Log Likelihood: -13694.345703125
Architecture:(1, 4), Repro Index:1,  Val Loss: 1.905134449005127, Std Dev: 1.3802660703659058, Log Likelihood: -14255.267578125
Architecture:(1, 4), Repro Index:2,  Val Loss: 1.7336917471885682, Std Dev: 1.316697359085083, Log Like

In [None]:
# Empirical Distribution of Likelihood
likelihood_distribution = {}
lt = []
for (l, k), repro_result_tuple in repro_result.items():
    # temp = []
    repro_index, result = repro_result_tuple
    lt.append(result['log_likelihood'].numpy())
    if repro_index == n_repro-1:
        likelihood_distribution[l] = tuple(lt)
        lt.clear()

In [None]:
borel_intervals = {}
for archtect, likelihoods in likelihood_distribution.items():
    data = list(likelihoods)
    interval = myrepro.borel_confidence_interval(data)
    borel_intervals[archtect] = interval

# Borel Set (intervals)
print(borel_intervals)

{(1, 3): (-14160.11115899025, -14157.83220038475), (1, 4): (-13620.04084649025, -13617.76188788475), (1, 5): (-13625.95393242775, -13623.67497382225), (2, 3): (-13724.34455742775, -13722.06559882225), (2, 4): (-14300.61408867775, -14298.33513007225), (2, 5): (-13797.22346367775, -13794.94450507225), (3, 3): (-13650.46955742775, -13648.19059882225), (3, 4): (-13503.26740899025, -13500.98845038475), (3, 5): (-13475.44416680275, -13473.16520819725)}
