Skip to content

Commit

Permalink
scattering now just in init and just the data set itself. No reeaded …
Browse files Browse the repository at this point in the history
…scatter in a function anymore. Also added a client.gather(actor_futures)
  • Loading branch information
MarcusMNoack committed Dec 16, 2022
1 parent 63bd380 commit e419428
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions fvgp/gp2Scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ def __init__(
###initiate actor that is a future contain the covariance and methods
self.SparsePriorCovariance = covariance_dask_client.submit(gp2ScaleSparseMatrix,self.point_number, actor=True, workers=self.actor_worker).result()# Create Actor
self.covariance_dask_client = covariance_dask_client
scatter_data = {"x1_data":self.x_data, "x2_data":self.x_data} ##data that can be scattered
self.scatter_future = covariance_dask_client.scatter(scatter_data,workers = self.compute_worker_set) ##scatter the data

self.st = time.time()
self.compute_prior_fvGP_pdf(covariance_dask_client)
if self.info:
Expand Down Expand Up @@ -224,8 +227,6 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):
###future_worker_assignments
self.future_worker_assignments = {}
###scatter data
#scatter_data = {"x1_data":x1, "x2_data":x2} ##data that can be scattered
#scatter_future = client.scatter(scatter_data,workers = compute_workers) ##scatter the data
#print("scatter future: ", scatter_future["hps"].result(), flush = True)
###############
start_time = time.time()
Expand Down Expand Up @@ -257,9 +258,7 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):

#get idle worker and submit work
current_worker = self.get_idle_worker(idle_workers)
#data = {"scattered_data": scatter_future,"hps": hyperparameters,"x1_data":x1, "x2_data":x2, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
data = {"hps": hyperparameters,"x1_data":x1, "x2_data":x2, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
#data = {"scattered_data": scatter_future, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
data = {"scattered_data": self.scatter_future,"hps": hyperparameters, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
futures.append(client.submit(kernel_function, data, workers = current_worker))
self.assign_future_2_worker(futures[-1].key,current_worker)
#if self.info:
Expand All @@ -270,12 +269,11 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):

if self.info:
print("All tasks submitted after ",time.time() - start_time,flush = True)
print("actual number of computed batches: ", count)
print("still have to gather ",len(futures)," results",flush = True)
print("also have to gather ",len(finished_futures)," results",flush = True)
print("number of computed batches: ", count)

actor_futures.append(self.SparsePriorCovariance.get_future_results(finished_futures.union(futures)))
actor_futures[-1].result()
#actor_futures[-1].result()
client.gather(actor_futures)
actor_futures.append(self.SparsePriorCovariance.add_to_diag(variances)) ##add to diag on actor
actor_futures[-1].result()
#clean up
Expand Down Expand Up @@ -500,10 +498,10 @@ def kernel_function(data):
kernel = data["kernel"]
worker = distributed.get_worker()
if mode == "prior":
#x1 = data["scattered_data"]["x1_data"][data["range_i"][0]:data["range_i"][1]]
x1 = data["x1_data"][data["range_i"][0]:data["range_i"][1]]
#x2 = data["scattered_data"]["x2_data"][data["range_j"][0]:data["range_j"][1]]
x2 = data["x2_data"][data["range_j"][0]:data["range_j"][1]]
x1 = data["scattered_data"]["x1_data"][data["range_i"][0]:data["range_i"][1]]
#x1 = data["x1_data"][data["range_i"][0]:data["range_i"][1]]
x2 = data["scattered_data"]["x2_data"][data["range_j"][0]:data["range_j"][1]]
#x2 = data["x2_data"][data["range_j"][0]:data["range_j"][1]]
range1 = data["range_i"]
range2 = data["range_j"]
k = kernel(x1,x2,hps)
Expand Down

0 comments on commit e419428

Please sign in to comment.