Skip to content

Commit

Permalink
improvements to gp2Scale, you can't scatter the kernel, repeated scat…
Browse files Browse the repository at this point in the history
…ter will not work
  • Loading branch information
MarcusMNoack committed Dec 16, 2022
1 parent cb13a5a commit e6f933a
Showing 1 changed file with 65 additions and 57 deletions.
122 changes: 65 additions & 57 deletions fvgp/gp2Scale.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from dask.distributed import wait
import scipy.sparse as sparse
import scipy.sparse.linalg as solve
import numpy as np
Expand All @@ -18,6 +19,39 @@
#from .sparse_matrix import UpdateSM
import gc


def sparse_stat_kernel(x1,x2, hps):
d = 0
for i in range(len(x1[0])): d += abs(np.subtract.outer(x1[:,i],x2[:,i]))**2
d = np.sqrt(d)
d[d == 0.0] = 1e-16
d[d > hps[1]] = hps[1]
kernel = (np.sqrt(2.0)/(3.0*np.sqrt(np.pi)))*\
((3.0*(d/hps[1])**2*np.log((d/hps[1])/(1+np.sqrt(1.0 - (d/hps[1])**2))))+\
((2.0*(d/hps[1])**2 + 1.0) * np.sqrt(1.0-(d/hps[1])**2)))
return hps[0] * kernel

def sparse_stat_kernel_robust(x1,x2, hps):
d = 0
for i in range(len(x1[0])): d += abs(np.subtract.outer(x1[:,i],x2[:,i]))**2
d = np.sqrt(d)
d[d == 0.0] = 1e-16
d[d > 1./hps[1]**2] = 1./hps[1]**2
kernel = (np.sqrt(2.0)/(3.0*np.sqrt(np.pi)))*\
((3.0*(d*hps[1]**2)**2*np.log((d*hps[1]**2)/(1+np.sqrt(1.0 - (d*hps[1]**2)**2))))+\
((2.0*(d*hps[1]**2)**2 + 1.0) * np.sqrt(1.0-(d*hps[1]**2)**2)))
return (hps[0]**2) * kernel


############################################################
############################################################
############################################################
############################################################
############################################################
############################################################
############################################################


class gp2Scale():
"""
gp2Scale class: Provides tools for a Large-Scale single-task GP.
Expand Down Expand Up @@ -61,11 +95,10 @@ def __init__(
values,
init_hyperparameters,
batch_size,
target_worker_count,
variances = None,
workerFrac2Start = 0.5,
limit_workers = None,
LUtimeout = 100,
gp_kernel_function = None,
gp_kernel_function = sparse_stat_kernel,
gp_mean_function = None,
covariance_dask_client = None,
info = False
Expand All @@ -84,11 +117,9 @@ def __init__(
self.y_data = values
self.batch_size = batch_size
self.num_batches = self.point_number // self.batch_size

self.limit_workers = limit_workers
self.info = info
self.LUtimeout = LUtimeout
self.target_worker_count = target_worker_count
self.workerFrac2Start = workerFrac2Start
##########################################
#######prepare variances##################
##########################################
Expand All @@ -100,27 +131,21 @@ def __init__(
elif np.ndim(variances) == 2:
self.variances = variances[:,0]
elif np.ndim(variances) == 1:
self.variances = np.array(variances)#, requires_grad = True)
self.variances = np.array(variances)
else:
raise Exception("Variances are not given in an allowed format. Give variances as 1d numpy array")
if len(self.variances[self.variances < 0]) > 0: raise Exception("Negative measurement variances communicated to fvgp.")
##########################################
#######define kernel and mean function####
##########################################
if callable(gp_kernel_function): self.kernel = gp_kernel_function
#else: raise Exception("A kernel callable has to be provided!")
#self.d_kernel_dx = self.d_gp_kernel_dx
if gp_kernel_function == "robust": self.kernel = sparse_stat_kernel_robust
elif callable(gp_kernel_function): self.kernel = gp_kernel_function
else: raise Exception("A kernel callable has to be provided!")

self.gp_mean_function = gp_mean_function
if callable(gp_mean_function): self.mean_function = gp_mean_function
else: self.mean_function = self.default_mean_function

#if callable(gp_kernel_function_grad): self.dk_dh = gp_kernel_function_grad
#else:
# if self.ram_economy is True: self.dk_dh = self.gp_kernel_derivative
# else: self.dk_dh = self.gp_kernel_gradient

#if callable(gp_mean_function_grad): self.dm_dh = gp_mean_function_grad
##########################################
#######prepare hyper parameters###########
##########################################
Expand Down Expand Up @@ -192,14 +217,16 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):
futures = [] ### a list of futures
actor_futures = []
finished_futures = set()
scatter_future = []
###get workers
compute_workers = set(self.compute_worker_set)
idle_workers = set(compute_workers)
###future_worker_assignments
self.future_worker_assignments = {}
###scatter data
scatter_data = {"x1_data":x1, "x2_data":x2,"hps": hyperparameters, "kernel" : self.kernel} ##data that can be scattered
scatter_data = {"x1_data":x1, "x2_data":x2} ##data that can be scattered
scatter_future = client.scatter(scatter_data,workers = compute_workers) ##scatter the data
#print("scatter future: ", scatter_future["hps"].result(), flush = True)
###############
start_time = time.time()
count = 0
Expand All @@ -221,7 +248,7 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):
##make workers available that are not actively computing
while not idle_workers:
idle_workers, futures, finished_futures = self.free_workers(futures, finished_futures)
time.sleep(0.01)
time.sleep(0.1)

####collect finished workers but only if actor is not busy, otherwise do it later
if len(finished_futures) >= 1000:
Expand All @@ -230,13 +257,14 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):

#get idle worker and submit work
current_worker = self.get_idle_worker(idle_workers)
data = {"scattered_data": scatter_future, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
data = {"scattered_data": scatter_future,"hps": hyperparameters, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
#data = {"scattered_data": scatter_future, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
futures.append(client.submit(kernel_function, data, workers = current_worker))
self.assign_future_2_worker(futures[-1].key,current_worker)
if self.info:
print(" submitted batch. i:", beg_i,end_i," j:",beg_j,end_j, "to worker ",current_worker, " Future: ", futures[-1].key)
print(" current time stamp: ", time.time() - start_time," percent finished: ",float(count)/self.total_number_of_batches())
print("")
#if self.info:
# print(" submitted batch. i:", beg_i,end_i," j:",beg_j,end_j, "to worker ",current_worker, " Future: ", futures[-1].key)
# print(" current time stamp: ", time.time() - start_time," percent finished: ",float(count)/self.total_number_of_batches())
# print("")
count += 1

if self.info:
Expand All @@ -246,13 +274,14 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):
print("also have to gather ",len(finished_futures)," results",flush = True)

actor_futures.append(self.SparsePriorCovariance.get_future_results(finished_futures.union(futures)))
actor_futures[-1].result()
actor_futures.append(self.SparsePriorCovariance.add_to_diag(variances)) ##add to diag on actor
#clean up
actor_futures[-1].result()
self.covariance_dask_client.cancel(actor_futures)
self.covariance_dask_client.cancel(futures)
#clean up
del futures
del actor_futures
del finished_futures
del scatter_future

#########
if self.info:
Expand All @@ -264,7 +293,8 @@ def free_workers(self, futures, finished_futures):
free_workers = set()
remaining_futures = []
for future in futures:
if future.status == "finished":
if future.status == "cancelled": print("WARNING: cancelled futures encountered!")
if future.status == "finished":
finished_futures.add(future)
free_workers.add(self.future_worker_assignments[future.key])
del self.future_worker_assignments[future.key]
Expand All @@ -276,7 +306,6 @@ def assign_future_2_worker(self, future_key, worker_address):

def get_idle_worker(self,idle_workers):
return idle_workers.pop()

##################################################################################
##################################################################################
##################################################################################
Expand All @@ -287,13 +316,10 @@ def get_idle_worker(self,idle_workers):
##################################################################################
##################################################################################
def _init_dask_client(self,dask_client):

if dask_client is None: dask_client = distributed.Client()
try: dask_client.wait_for_workers(n_workers=int(self.target_worker_count * self.workerFrac2Start), timeout = 600)
except: time.sleep(100)

worker_info = list(dask_client.scheduler_info()["workers"].keys())
if not worker_info: raise Exception("No workers available")
if self.limit_workers: worker_info = worker_info[0:self.limit_workers]
actor_worker = worker_info[0]
compute_worker_set = set(worker_info[1:])
print("We have ", len(compute_worker_set)," compute workers ready to go.")
Expand Down Expand Up @@ -432,27 +458,7 @@ def get_distance_matrix_robust(self,x1,x2,hps):
d = np.zeros((len(x1),len(x2)))
for i in range(x1.shape[1]):
d += ((x1[:,i].reshape(-1, 1) - x2[:,i])*hps[i+1])**2
return np.sqrt(d + 1e-16)

def default_kernel(self,x1,x2,hyperparameters,obj):
################################################################
###standard anisotropic kernel in an input space with l2########
################################################################
"""
x1: 2d numpy array of points
x2: 2d numpy array of points
obj: object containing kernel definition
Return:
-------
Kernel Matrix
"""
distance_matrix = self.get_distance_matrix_robust(x1,x2,hyperparameters)
#return hyperparameters[0]**2 * obj.matern_kernel_diff1(distance_matrix,1)
return hyperparameters[0]**2 * torch.exp(-distance_matrix)



return np.sqrt(d)
##################################################################################
##################################################################################
##################################################################################
Expand Down Expand Up @@ -488,19 +494,21 @@ def posterior_mean(self, x_iset):

def kernel_function(data):
st = time.time()
hps= data["scattered_data"]["hps"]
hps= data["hps"]
mode = data["mode"]
kernel = data["scattered_data"]["kernel"]
kernel = data["kernel"]
worker = distributed.get_worker()
if mode == "prior":
x1 = data["scattered_data"]["x1_data"][data["range_i"][0]:data["range_i"][1]]
x2 = data["scattered_data"]["x2_data"][data["range_j"][0]:data["range_j"][1]]
range1 = data["range_i"]
range2 = data["range_j"]
k = kernel(x1,x2,hps, None)
k = kernel(x1,x2,hps)
else:
x1 = data["x1_data"]
x2 = data["x2_data"]
k = kernel(x1,x2,hps, None)
k = kernel(x1,x2,hps)
k_sparse = sparse.coo_matrix(k)
return k_sparse, (data["range_i"][0],data["range_j"][0]), time.time() - st, worker.address


0 comments on commit e6f933a

Please sign in to comment.