In [11]:
import numpy as np
import math,datetime,sys,torch,time,random,torch,datetime
import multiprocessing as mp
from torch.distributions import uniform, normal
import matplotlib.pyplot as plt
import scipy as sp
from scipy.stats import norm,multivariate_normal
from sklearn.utils import resample

In [2]:
weights = [1]
means = [[0.]]
covs = [[1.]]
theta_gt = np.array([[0]])
get_data = lambda n: torch.from_numpy(gaussian_mixture_model_sample(n, means, covs, weights, test=False))

In [3]:
def LogLikelihood(theta, x):

  distribution = torch.distributions.normal.Normal(theta[0, 0], 1)
  log_prob = distribution.log_prob(x)
  
  return log_prob

In [4]:
def gaussian_mixture_model_sample(n_samples = 10000,
                                  means = [[0.9, -0.8],[-0.7, 0.9]],
                                  covs = [[[2.0, 0.3], [0.3, 0.5]],
                                          [[0.3, 0.5], [0.3, 2.0]]],
                                  weights = [0.3,0.7],
                                  test = False):

  start = time.time()
  dim = len(means[0])
  samples = np.zeros((dim,n_samples))

  for i in range(n_samples):
    r = random.random()
    for j in range(len(weights)):
      if sum(weights[:j+1]) > r:
        samples[:,i] = multivariate_normal.rvs(mean = means[j], cov = covs[j])
        break

  end = time.time()
  print(f'gaussian_mixture_model_sample {end-start}')

  return samples

In [5]:
def gradient_ascent_torch(func, param , data, max_iterations, learningrate):

  start = time.time()
  for t in range(max_iterations):
    loglikelihoods = func(param, data)
    loglikelihood_value = torch.mean(loglikelihoods)
    param.retain_grad()
    loglikelihood_value.backward()
    with torch.no_grad():
      param.add_(learningrate * param.grad)
      param.grad.zero_()
  
  end = time.time()
  print(f'gradient_ascent_torch {end-start}')

  return param, loglikelihood_value

In [6]:
def get_derivatives_torch(func, param, data):

  start = time.time()
  func_forScore = lambda args: func(args, data)
  Scores = torch.autograd.functional.jacobian(func_forScore, param).squeeze()

  func_forHessian = lambda args: torch.mean(func(args, data))
  Hessian = torch.autograd.functional.hessian(func_forHessian, param).squeeze()
  end = time.time()
  print(f'get_derivatives_torch {end-start}')

  return Scores, Hessian

In [7]:
def normal_CI(alpha, Scores, Hessian, theta_n_M):

  start = time.time()
  n = Scores.shape[0]
  z = sp.stats.norm.ppf(1-alpha/2)
  H_n_inv = np.linalg.inv(Hessian.reshape(theta_n_M.shape[1],theta_n_M.shape[1]))
  S_n = 1/n * np.dot(Scores.T, Scores)

  Cov = np.dot(H_n_inv, np.dot(S_n, H_n_inv))
  Cov_diag = np.diag(Cov).reshape(1,-1)
  Cov_diag = np.sqrt(1/n*Cov_diag)

  CI_borders = np.zeros((2, theta_n_M.shape[1]))
  CI_borders[0, :] = theta_n_M - z * Cov_diag ## CI_borders[0, i] = theta_n_M[i] - z*Cov_ii
  CI_borders[1, :] = theta_n_M + z * Cov_diag

  length = CI_borders[1, :] - CI_borders[0, :]
  shape = (CI_borders[1, :] - theta_n_M) / (theta_n_M - CI_borders[0, :])

  end = time.time()
  print(f'normal_CI {end-start}')

  return CI_borders, length, shape

In [8]:
def GetCI(n,m, alpha, type_CI):
    
  start = time.time()  
  data = get_data(int(n)).cuda()
  theta = torch.tensor([[uniform.Uniform(-2, 2).sample()]], requires_grad=True).cuda()

  theta, _ = gradient_ascent_torch(func=LogLikelihood,
                                                param=theta,
                                                data=data,
                                                max_iterations=5000,
                                                learningrate=0.01)

  if type_CI == 'normal':
    theta_hat = theta.cpu().clone().data.detach().numpy()
    Scores, Hessian = get_derivatives_torch(func=LogLikelihood,
                                            param=theta,
                                            data=data)
    ci, length, shape = normal_CI(alpha, Scores.cpu(), Hessian.cpu(), theta_hat)
  
  end = time.time()
  print(f'GetCI {end-start}')

  return ci, length, shape

In [9]:
def CISamplingTest(ground_truth,n_power,m,test_num):

  start = time.time()
  result = 0
  length = 0
  shape = 0
  n = math.pow(10,n_power)

  for j in range(test_num):
    print(f'test num {j}')
    ci, lengthCI , shapeCI = GetCI(n,m, alpha=alpha, type_CI=type_CI)
    length += lengthCI.squeeze()
    shape += shapeCI.squeeze()
    if ground_truth >= ci[0] and ground_truth <= ci[1]:
      result += 1
  
  end = time.time()
  print(f'CISamplingTest {end-start}')

  return {n_power: (result/test_num, length/test_num, shape/test_num)}

In [12]:
alpha = 0.1
type_CI = 'normal'
n_Ci = 100

result = CISamplingTest(theta_gt[0,0],5,1,n_Ci)
print(f'result {result}')

test num 0
gaussian_mixture_model_sample 9.397291421890259
gradient_ascent_torch 4.471856117248535
get_derivatives_torch 44.76304602622986
normal_CI 0.003711700439453125
GetCI 69.17248821258545
test num 1
gaussian_mixture_model_sample 9.314554214477539
gradient_ascent_torch 4.4177000522613525
get_derivatives_torch 45.21180462837219
normal_CI 0.0008797645568847656
GetCI 58.947025299072266
test num 2
gaussian_mixture_model_sample 9.28926420211792
gradient_ascent_torch 4.360684633255005
get_derivatives_torch 45.410457611083984
normal_CI 0.0006086826324462891
GetCI 59.063525676727295
test num 3
gaussian_mixture_model_sample 9.393657684326172
gradient_ascent_torch 4.419691801071167
get_derivatives_torch 45.101698875427246
normal_CI 0.0005435943603515625
GetCI 58.91801738739014
test num 4
gaussian_mixture_model_sample 9.415627241134644
gradient_ascent_torch 4.3945395946502686
get_derivatives_torch 45.18634533882141
normal_CI 0.0005970001220703125
GetCI 58.999656438827515
test num 5
gaussian_