Skip to content

Commit

Permalink
work on new MCMC method
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcusMNoack committed Nov 11, 2023
1 parent 92beba5 commit 35a268a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 69 deletions.
1 change: 1 addition & 0 deletions fvgp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
from .gp import GP
from .fvgp import fvGP
from .gpMCMC import gpMCMC

__all__ = ['GP', 'fvGP']
__version__ = _version.get_versions()['version']
Expand Down
176 changes: 107 additions & 69 deletions fvgp/gpMCMC.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# K .............................................. adapt every K iterations
# #

#TODO:
# TODO:
# *the proposal distribution has to receive and return parameters without knowing what they are


Expand Down Expand Up @@ -66,40 +66,30 @@ def __init__(self,
dim,
prior_function,
proposal_distributions,
args = None
args=None
): # pragma: no cover
self.log_likelihood_function = log_likelihood_function
self.dim = dim
self.prior_function = prior_function
self.proposal_distributions=proposal_distributions
self.args=args


self.proposal_distributions = proposal_distributions
self.args = args

def run_mcmc(self,n_updates=10000,
def run_mcmc(self, n_updates=10000,
x0=None,
info=False,
adapt_cov=True,
break_condition = None): # pragma: no cover
break_condition=None): # pragma: no cover
start_time = time.time()
n_updates = max(n_updates, 2)

if x0 is None: x0 = np.ones((self.dim))
if isinstance(x0) != np.ndarray: raise Exception("x0 is not a numpy array")
if not isinstance(x0, np.ndarray): raise Exception("x0 is not a numpy array")
if np.ndim(x0) != 1: raise Exception("x0 is not a vector in MCMC")
if break_condition is None: break_condition = lambda a: False

eps = .001
k = 3 # the iteration offset
trace = {"f(x)": [], "x":[], "time stamp":[]}
# If the supplied proposal covariance matrix is either not given or invalid,
# just use the identity.

# Initialize sigma_m to the rule of thumb
sigma_m = 2.4 ** 2 / self.dim
r_hat = 0
self.trace = {"f(x)": [], "x": [], "time stamp": []}
# Set up and initialize trace objects
jump_trace = np.zeros(n_updates)
trace["x"].append(x0)

self.trace["x"].append(x0)

# Initialize Metropolis
x = x0.copy()
Expand All @@ -109,45 +99,45 @@ def run_mcmc(self,n_updates=10000,
# Begin main loop
for i in np.arange(1, n_updates):
for obj in self.proposal_distributions:
x, prior, likelihood, jump_trace[i] = self._jump(x,obj,prior,likelihood)
prop_Sigma, sigma_m = obj._adapt(i, trace["x"], k, jump_trace, sigma_m, prop_Sigma, adapt_cov)
x, prior, likelihood, jt = self._jump(x, obj, prior, likelihood)
obj.jump_trace.append(jt)
obj.adapt(i, self)

# Update the trace objects
trace["x"].append(x)
trace["f(x)"].append(likelihood
trace["time stamp"].append(time.time() - start_time)
self.trace = trace
self.trace["x"].append(x)
self.trace["f(x)"].append(likelihood)
self.trace["time stamp"].append(time.time() - start_time)

if info and (i % 100) == 0:
print("Finished " + str(i) + " out of " + str(n_updates), " iterations. f(x)=",likelihood)
print("Finished " + str(i) + " out of " + str(n_updates), " iterations. f(x)=", likelihood)
if break_condition(self): break
# End main loop

# Collect trace objects to return
arg_max = np.argmax(trace["f(x)"])
x = np.asarray(trace["x"].T)

arg_max = np.argmax(self.trace["f(x)"])
x = np.asarray(self.trace["x"].T)

return {"max f(x)": f[arg_max],
return {"max f(x)": self.trace["f(x)"][arg_max],
"max x": x[arg_max],
"trace": trace,
"trace": self.trace,
"stripped distribution": x[int(len(x) - (len(x) / 10)):],
"full distribution": x,
"distribution mean": np.mean(x[int(len(x) - (len(x) / 10)):], axis=0),
"distribution var": np.var(x[int(len(x) - (len(x) / 10)):], axis=0)}

###############################################################
def _jump(self,x,obj,prior,likelihood): # pragma: no cover
x_star = np.zeros((self.dim))
def _jump(self, x, obj, prior, likelihood): # pragma: no cover
x_star = x.copy()
if callable(obj.prop_dist):
x_star[obj.indices] = obj.prop_dist(x[obj.indices], self)
print("obj: ", obj.indices)
x_star[obj.indices] = obj.prop_dist(x[obj.indices], obj)
else:
raise Exception("A proposal distribution is not callable.")

prior_evaluation_x_star = self.prior_function(x_star, self.args)
jump_trace = 0.
if prior_evaluation_x_star != -np.inf:
likelihood_star = self.log_likelihood_function(hyperparameters=x_star)
likelihood_star = self.log_likelihood_function(x_star, self.args)
if np.isnan(likelihood_star): likelihood_star = -np.inf
metr_ratio = np.exp(prior_evaluation_x_star + likelihood_star -
prior - likelihood)
Expand All @@ -157,22 +147,33 @@ def _jump(self,x,obj,prior,likelihood): # pragma: no cover
prior = prior_evaluation_x_star
likelihood = likelihood_star
jump_trace = 1.
print("accepted")
else:
print("NOT accepted")
else:
print("prior probability 0")

print("x* :", x_star)
print("x :", x)
input()
return x, prior, likelihood, jump_trace

###############################################################


###############################################################
class ProposalDistribution(): # pragma: no cover
class ProposalDistribution: # pragma: no cover
def __init__(self, prop_dist,
indices,
should_be_adapted = False,
should_be_adapted=False,
adapt_callable=None,
r_opt=.234,
c_0=10,
c_1=.8,
K=10,
args = None): # pragma: no cover
init_prop_Sigma=None,
adapt_cov=False,
args=None): # pragma: no cover
"""
Function to define a set of proposal distributions.
Expand All @@ -183,61 +184,98 @@ def __init__(self, prop_dist,
indices : iterable of int
Which indices should be drawn from this proposal distribution.
should_be_adapted : bool
Whether ot not to update this proposal distribution
Whether ot not to update this proposal distribution.
If True either a callable for the update should be provided
or the proposal is assumed to be normal and the default adaption procedure will be performed.
adapt_callable : Callable, option
A callable to adapt the distribution.
A callable to adapt the distribution. The default is an adaption procedure for normal distributions.
The callable should not return anything but update the `args`.
args : Any, optional
Arguments that will be communicated to the user-provided `adapt_callable`.
Leave blank if the default adaption procedure is used.
"""

self.prop_dist = prop_dist
self.indices = indices
self.should_be_adapted = should_be_adapted
self.args = args
if callable(adapt_callable):
self.adapt = adapt_callable
else:
self.adapt = self._adapt

self.r_opt = r_opt
self.c_0 = c_0
self.c_1 = c_1
self.K = K
#########################################################
def _adapt(self,i,trace, k, jump_trace, sigma_m, prop_Sigma, adapt_cov): # pragma: no cover
if (i % self.K) == 0:
gamma2 = 1. / ((i / self.K) + k) ** self.c_1
gamma1 = self.c_0 * gamma2
r_hat = np.mean(jump_trace[(i - self.K + 1): i])
sigma_m = np.exp(np.log(sigma_m) + gamma1 * (r_hat - self.r_opt))
if adapt_cov: prop_Sigma = prop_Sigma + gamma2 * (np.cov(trace[:, (i - self.K + 1): i]) - prop_Sigma)
return prop_Sigma, sigma_m
self.adapt_cov = adapt_cov
dim = len(indices)
self.jump_trace = []
if not callable(adapt_callable) and args:
raise Exception("The args should only be provided for a user defined `adapt_callable`")
if not callable(adapt_callable) and init_prop_Sigma is None and should_be_adapted:
raise Exception("You are using the default adaption mechanism for normal distributions.\n \
Please provide an initial_prop_Sigma")

if init_prop_Sigma is None and not callable(adapt_callable):
init_prop_Sigma = np.identity(dim)
if callable(adapt_callable):
self.adapt = adapt_callable
else:
self.adapt = self._adapt

if args is None:
self.args = {"prop_Sigma": init_prop_Sigma, "sigma_m": 2.4 ** 2 / dim}
else:
self.args = args

#########################################################
def _adapt(
self,
end,
obj
): # pragma: no cover
k = 3
c_0 = self.c_0
c_1 = self.c_1
K = self.K
r_opt = self.r_opt
prop_Sigma = self.args["prop_Sigma"]
sigma_m = self.args["sigma_m"]
jump_trace = self.jump_trace
trace = obj.trace["x"]
if (end % K) == 0:
start = (end - K + 1)
gamma2 = 1. / ((end / K) + k) ** c_1
gamma1 = c_0 * gamma2
r_hat = np.mean(jump_trace[start: end])
sigma_m = np.exp(np.log(sigma_m) + gamma1 * (r_hat - r_opt))
if self.adapt_cov: prop_Sigma = prop_Sigma + gamma2 * (np.cov(trace[:, start: end]) - prop_Sigma)
self.args["prop_Sigma"] = prop_Sigma
self.args["sigma_m"] = sigma_m


###############################################################
###############################################################
###############################################################
###############################################################
def out_of_bounds(x,bounds): # pragma: no cover
def out_of_bounds(x, bounds): # pragma: no cover
for i in range(len(x)):
if x[i] < bounds[i,0] or x[i] > bounds[i,1]:
if x[i] < bounds[i, 0] or x[i] > bounds[i, 1]:
return True
return False


def project_onto_bounds(x,bounds): # pragma: no cover
def project_onto_bounds(x, bounds): # pragma: no cover
for i in range(len(x)):
if x[i] < bounds[i,0]: x[i] = bounds[i,0]
if x[i] > bounds[i,1]: x[i] = bounds[i,1]
if x[i] < bounds[i, 0]: x[i] = bounds[i, 0]
if x[i] > bounds[i, 1]: x[i] = bounds[i, 1]
return x

def in_bounds(v,bounds): # pragma: no cover

def in_bounds(v, bounds): # pragma: no cover
for i in range(len(v)):
if v[i] < bounds[i,0] or v[i] > bounds[i,1]: return False
if v[i] < bounds[i, 0] or v[i] > bounds[i, 1]: return False
return True


def prior_func(theta,bounds): # pragma: no cover
if in_bounds(theta, bounds): return 0.
else: return -np.inf


def prior_func(theta, bounds): # pragma: no cover
if in_bounds(theta, bounds):
return 0.
else:
return -np.inf

0 comments on commit 35a268a

Please sign in to comment.