Skip to content

Commit

Permalink
not even data should be scattered it seems like, options now are scat…
Browse files Browse the repository at this point in the history
…tering ONCE in the init or never
  • Loading branch information
MarcusMNoack committed Dec 16, 2022
1 parent e6f933a commit 63bd380
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions fvgp/gp2Scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,15 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):
futures = [] ### a list of futures
actor_futures = []
finished_futures = set()
scatter_future = []
#scatter_future = []
###get workers
compute_workers = set(self.compute_worker_set)
idle_workers = set(compute_workers)
###future_worker_assignments
self.future_worker_assignments = {}
###scatter data
scatter_data = {"x1_data":x1, "x2_data":x2} ##data that can be scattered
scatter_future = client.scatter(scatter_data,workers = compute_workers) ##scatter the data
#scatter_data = {"x1_data":x1, "x2_data":x2} ##data that can be scattered
#scatter_future = client.scatter(scatter_data,workers = compute_workers) ##scatter the data
#print("scatter future: ", scatter_future["hps"].result(), flush = True)
###############
start_time = time.time()
Expand Down Expand Up @@ -257,7 +257,8 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):

#get idle worker and submit work
current_worker = self.get_idle_worker(idle_workers)
data = {"scattered_data": scatter_future,"hps": hyperparameters, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
#data = {"scattered_data": scatter_future,"hps": hyperparameters,"x1_data":x1, "x2_data":x2, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
data = {"hps": hyperparameters,"x1_data":x1, "x2_data":x2, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
#data = {"scattered_data": scatter_future, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0}
futures.append(client.submit(kernel_function, data, workers = current_worker))
self.assign_future_2_worker(futures[-1].key,current_worker)
Expand All @@ -278,10 +279,10 @@ def compute_covariance(self, x1,x2,hyperparameters, variances,client):
actor_futures.append(self.SparsePriorCovariance.add_to_diag(variances)) ##add to diag on actor
actor_futures[-1].result()
#clean up
del futures
del actor_futures
del finished_futures
del scatter_future
#del futures
#del actor_futures
#del finished_futures
#del scatter_future

#########
if self.info:
Expand Down Expand Up @@ -499,8 +500,10 @@ def kernel_function(data):
kernel = data["kernel"]
worker = distributed.get_worker()
if mode == "prior":
x1 = data["scattered_data"]["x1_data"][data["range_i"][0]:data["range_i"][1]]
x2 = data["scattered_data"]["x2_data"][data["range_j"][0]:data["range_j"][1]]
#x1 = data["scattered_data"]["x1_data"][data["range_i"][0]:data["range_i"][1]]
x1 = data["x1_data"][data["range_i"][0]:data["range_i"][1]]
#x2 = data["scattered_data"]["x2_data"][data["range_j"][0]:data["range_j"][1]]
x2 = data["x2_data"][data["range_j"][0]:data["range_j"][1]]
range1 = data["range_i"]
range2 = data["range_j"]
k = kernel(x1,x2,hps)
Expand Down

0 comments on commit 63bd380

Please sign in to comment.