Skip to content

Commit

Permalink
some gp2Scale changes
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcusMNoack committed Sep 22, 2023
1 parent 13912cf commit 5d754b8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 30 deletions.
2 changes: 1 addition & 1 deletion fvgp/gp.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ def _compute_GPpriorV(self, x_data, y_data, hyperparameters, calc_inv = False):
#get K
if self.gp2Scale:
st = time.time()
self.gp2Scale_obj.compute_covariance(x_data,x_data,hyperparameters, self.gp2Scale_dask_client)
self.gp2Scale_obj.compute_covariance(hyperparameters, self.gp2Scale_dask_client)
if self.info: print("Computing the covariance matrix done after ",time.time()-st," seconds.", flush = True)
K = self.gp2Scale_obj.SparsePriorCovariance.get_result().result()
if self.info: print("Transferring the covariance matrix to host done after ",time.time()-st," seconds. sparsity = ", float(K.count_nonzero())/float(len(x_data)**2) , flush = True)
Expand Down
51 changes: 22 additions & 29 deletions fvgp/gp2Scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,13 @@ class gp2Scale():
"""
This class allows the user to scale GPs up to millions of datapoints. There is full high-performance-computing
support through DASK.
Parameters
----------
input_space_dim : int
Dimensionality of the input space.
x_data : np.ndarray
The point positions. Shape (V x D), where D is the `input_space_dim`.
batch_size : int
The covariance is divided into batches of the defined size for distributed computing.
variances : np.ndarray, optional
An numpy array defining the uncertainties in the data `y_data`. Shape (V x 1) or (V). Note: if no
variances are provided they will be set to `abs(np.mean(y_data) / 100.0`.
limit_workers : int, optional
If given as integer only the workers up to the limit will be used.
batch_size : int, optional
The covariance is divided into batches of the defined size for distributed computing. Default = 10000
LUtimeout : int, optional (future release)
Controls the timeout for the LU decomposition.
gp_kernel_function : Callable, optional
Expand Down Expand Up @@ -59,7 +52,6 @@ def __init__(
The constructor for the gp class.
type help(GP) for more information about attributes, methods and their parameters
"""
#self.input_space_dim = input_space_dim
self.batch_size = batch_size
self.point_number = len(x_data)
self.num_batches = self.point_number // self.batch_size
Expand All @@ -75,8 +67,8 @@ def __init__(
self.SparsePriorCovariance = covariance_dask_client.submit(gp2ScaleSparseMatrix,self.point_number, actor=True, workers=self.actor_worker).result()
#self.covariance_dask_client = covariance_dask_client

scatter_data = {"x_data":self.x_data} ##data that can be scattered
self.scatter_future = covariance_dask_client.scatter(scatter_data,workers = self.compute_worker_set) ##scatter the data
scatter_data = {"x_data":self.x_data.copy()} ##data that can be scattered
self.scatter_future = covariance_dask_client.scatter(scatter_data,workers = self.compute_worker_set) ##scatter the data to compute workers, not the actor

##################################################################################
##################################################################################
Expand All @@ -91,24 +83,25 @@ def _total_number_of_batches(self):
Db = float(self.num_batches)
return 0.5 * Db * (Db + 1.)

def compute_covariance(self, x1,x2,hyperparameters,client):
def compute_covariance(self, hyperparameters,client):
"""computes the covariance matrix from the kernel on HPC in sparse format"""
###initialize futures
futures = [] ### a list of futures
actor_futures = []
finished_futures = set()
futures = [] ### a list of futures
actor_futures = [] ###futures of the actor
finished_futures = set() ### a set of finished futures
###get workers
compute_workers = set(self.compute_worker_set)
idle_workers = set(compute_workers)
#idle_workers = set(compute_workers)
idle_workers = compute_workers.copy()
###future_worker_assignments
self.future_worker_assignments = {}
###scatter data
start_time = time.time()
count = 0
num_batches_i = len(x1) // self.batch_size
num_batches_j = len(x2) // self.batch_size
last_batch_size_i = len(x1) % self.batch_size
last_batch_size_j = len(x2) % self.batch_size
num_batches_i = len(self.x_data) // self.batch_size
num_batches_j = len(self.x_data) // self.batch_size
last_batch_size_i = len(self.x_data) % self.batch_size
last_batch_size_j = len(self.x_data) % self.batch_size
if last_batch_size_i != 0: num_batches_i += 1
if last_batch_size_j != 0: num_batches_j += 1

Expand All @@ -127,7 +120,8 @@ def compute_covariance(self, x1,x2,hyperparameters,client):

####collect finished workers but only if actor is not busy, otherwise do it later
if len(finished_futures) >= 2000:
actor_futures.append(self.SparsePriorCovariance.get_future_results(set(finished_futures), info = self.info))
#actor_futures.append(self.SparsePriorCovariance.get_future_results(set(finished_futures), info = self.info))
actor_futures.append(self.SparsePriorCovariance.get_future_results(finished_futures.copy(), info = self.info))
finished_futures = set()

#get idle worker and submit work
Expand All @@ -147,8 +141,7 @@ def compute_covariance(self, x1,x2,hyperparameters,client):

actor_futures.append(self.SparsePriorCovariance.get_future_results(finished_futures.union(futures), info = self.info))
client.gather(actor_futures)
#actor_futures.append(self.SparsePriorCovariance.add_to_diag(variances)) ##add to diag on actor
actor_futures[-1].result()
#actor_futures[-1].result()

#########
if self.info:
Expand Down Expand Up @@ -290,7 +283,7 @@ def total_number_of_batches(self):
Db = float(self.num_batches)
return 0.5 * Db * (Db + 1.)

def compute_covariance(self, x1,x2,hyperparameters, variances,client):
def compute_covariance(self, hyperparameters, variances,client):
"""computes the covariance matrix from the kernel on HPC in sparse format"""
###initialize futures
futures = [] ### a list of futures
Expand All @@ -305,9 +298,9 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):
start_time = time.time()
count = 0
num_batches_i = len(x1) // self.batch_size
num_batches_j = len(x2) // self.batch_size
last_batch_size_i = len(x1) % self.batch_size
last_batch_size_j = len(x2) % self.batch_size
num_batches_j = len(self.x_data) // self.batch_size
last_batch_size_i = len(self.x_data) % self.batch_size
last_batch_size_j = len(self.x_data) % self.batch_size
if last_batch_size_i != 0: num_batches_i += 1
if last_batch_size_j != 0: num_batches_j += 1

Expand Down

0 comments on commit 5d754b8

Please sign in to comment.