Skip to content

Commit

Permalink
Merge pull request #133 from sblunt/multiprocessing
Browse files Browse the repository at this point in the history
Parallel processing for OFTI
  • Loading branch information
semaphoreP committed Sep 5, 2019
2 parents 95033a5 + b52d2ce commit 82559ee
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 22 deletions.
162 changes: 143 additions & 19 deletions orbitize/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import astropy.constants as consts
import sys
import abc
import math
import time

import emcee
import ptemcee
import multiprocessing as mp


import orbitize.lnlike
import orbitize.priors
Expand Down Expand Up @@ -81,7 +85,8 @@ def _logl(self, params):
return lnlikes_sum


class OFTI(Sampler):

class OFTI(Sampler,):
"""
OFTI Sampler
Expand Down Expand Up @@ -241,51 +246,170 @@ def reject(self, samples):
lnlikes = np.array([lnp[i] for i in saved_orbit_idx])

return saved_orbits, lnlikes


def run_sampler(self, total_orbits, num_samples=10000):

def _sampler_process(self, output, total_orbits, num_cores, num_samples=10000, Value=0,lock=None):
"""
Runs OFTI until we get the number of total accepted orbits we want.
Runs OFTI until it finds the number of total accepted orbits desired.
Meant to be called by run_sampler.
Args:
output (manager.Queue): manager.Queue object to store results
total_orbits (int): total number of accepted orbits desired by user
num_cores(int): the number of cores that _run_sampler_base is being
run in parallel on.
num_samples (int): number of orbits to prepare for OFTI to run
rejection sampling on
Return:
output_orbits (np.array): array of accepted orbits. First dimension
has size ``total_orbits``.
"""
Value (mp.Value(int)): global counter for the orbits generated
lock: mp.lock object to prevent issues caused by access to shared
memory by multiple processes
Returns:
output_orbits (np.array): array of accepted orbits,
size: total_orbits
output_lnlikes (np.array): array of log probabilities,
size: total_orbits
"""

n_orbits_saved = 0
output_orbits = np.empty((total_orbits, len(self.priors)))
output_lnlikes = np.empty(total_orbits)

# add orbits to `output_orbits` until `total_orbits` are saved
while n_orbits_saved < total_orbits:
while n_orbits_saved<total_orbits:
samples = self.prepare_samples(num_samples)
accepted_orbits, lnlikes = self.reject(samples)

if len(accepted_orbits)==0:
pass
else:
n_accepted = len(accepted_orbits)
maxindex2save = np.min([n_accepted, total_orbits - n_orbits_saved])

output_orbits[n_orbits_saved : n_orbits_saved+n_accepted] = accepted_orbits[0:maxindex2save]
output_lnlikes[n_orbits_saved : n_orbits_saved+n_accepted] = lnlikes[0:maxindex2save]
n_orbits_saved += maxindex2save

# print progress statement
print(str(n_orbits_saved)+'/'+str(total_orbits)+' orbits found',end='\r')
# add to the value of the global variable
with lock:
Value.value+=maxindex2save

output.put((np.array(output_orbits),output_lnlikes))
return (np.array(output_orbits),output_lnlikes)



def run_sampler(self, total_orbits, num_samples=10000, num_cores=None):
"""
Runs OFTI in parallel on multiple cores until we get the number of total accepted orbits we want.
Args:
total_orbits (int): total number of accepted orbits desired by user
self.results.add_samples(
np.array(output_orbits),
output_lnlikes, labels=self.system.labels
)
num_samples (int): number of orbits to prepare for OFTI to run
rejection sampling on. Defaults to 10000.
return np.array(output_orbits)
num_cores (int): the number of cores to run OFTI on. Defaults to
number of cores availabe.
Return:
output_orbits (np.array): array of accepted orbits. Size: total_orbits.
Written by: Vighnesh Nagpal(2019)
"""
if num_cores!=1:
if num_cores==None:
num_cores=mp.cpu_count()

results=[]
# orbits_saved is a global counter for the number of orbits generated
orbits_saved=mp.Value('i',0)

manager = mp.Manager()
output = manager.Queue()

# setup the processes
lock = mp.Lock()
nrun_per_core = int(np.ceil(float(total_orbits)/float(num_cores)))

processes=[
mp.Process(
target=self._sampler_process,
args=(output,nrun_per_core,num_cores,num_samples,
orbits_saved,lock)
) for x in range(num_cores)
]

# start the processes
for p in processes:
p.start()

# print out the number of orbits generated every second
while orbits_saved.value<total_orbits:
print(str(orbits_saved.value)+'/'+str(total_orbits)+' orbits found',end='\r')
time.sleep(0.1)

print(str(total_orbits)+'/'+str(total_orbits)+' orbits found',end='\r')

# join the processes
for p in processes:
p.join()
# get the results of each process from the queue
for p in processes:
results.append(output.get())

# filling up the output_orbits array
output_orbits = np.zeros((total_orbits, len(self.priors)))
output_lnlikes = np.empty(total_orbits)
pos=0

for p in results:
num_to_fill=np.min([len(p[0]), total_orbits - pos])
output_orbits[pos:pos+num_to_fill]=p[0][0:num_to_fill]
output_lnlikes[pos:pos+num_to_fill]=p[1][0:num_to_fill]
pos+=num_to_fill

self.results.add_samples(
np.array(output_orbits),
output_lnlikes, labels=self.system.labels
)

return output_orbits

else:
# this block is executed if num_cores=1
n_orbits_saved = 0
output_orbits = np.empty((total_orbits, len(self.priors)))
output_lnlikes = np.empty(total_orbits)

# add orbits to `output_orbits` until `total_orbits` are saved
while n_orbits_saved < total_orbits:
samples = self.prepare_samples(num_samples)
accepted_orbits, lnlikes = self.reject(samples)

if len(accepted_orbits)==0:
pass
else:
n_accepted = len(accepted_orbits)
maxindex2save = np.min([n_accepted, total_orbits - n_orbits_saved])

output_orbits[n_orbits_saved : n_orbits_saved+n_accepted] = accepted_orbits[0:maxindex2save]
output_lnlikes[n_orbits_saved : n_orbits_saved+n_accepted] = lnlikes[0:maxindex2save]
n_orbits_saved += maxindex2save

# print progress statement
print(str(n_orbits_saved)+'/'+str(total_orbits)+' orbits found',end='\r')

self.results.add_samples(
np.array(output_orbits),
output_lnlikes, labels=self.system.labels
)

return output_orbits

class MCMC(Sampler):
"""
Expand Down
17 changes: 14 additions & 3 deletions tests/test_OFTI.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import os
import pytest
import matplotlib.pyplot as plt
import time

import orbitize.sampler as sampler
import orbitize.driver
import orbitize.priors as priors


testdir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(testdir, 'GJ504.csv')
input_file_1epoch = os.path.join(testdir, 'GJ504_1epoch.csv')
Expand Down Expand Up @@ -58,9 +60,8 @@ def test_scale_and_rotate():
assert sep_sar == pytest.approx(sar_epoch['quant1'], abs=sar_epoch['quant1_err'])
assert pa_sar == pytest.approx(sar_epoch['quant2'], abs=sar_epoch['quant2_err'])


def test_run_sampler():

# initialize sampler
myDriver = orbitize.driver.Driver(input_file, 'OFTI',
1, 1.22, 56.95,mass_err=0.08, plx_err=0.26)
Expand All @@ -74,7 +75,14 @@ def test_run_sampler():
s.run_sampler(0,num_samples=1)

# test to make sure outputs are reasonable
orbits = s.run_sampler(1000)
start=time.time()
orbits = s.run_sampler(1000,num_cores=4)

end=time.time()
print()
print("Runtime: "+str(end-start) +" s")
print()
print(orbits[0])

print()
idx = s.system.param_idx
Expand All @@ -92,6 +100,9 @@ def test_run_sampler():
assert ecc == pytest.approx(ecc_exp, abs=0.2*ecc_exp)
assert inc == pytest.approx(inc_exp, abs=0.2*inc_exp)

# test with only one core
orbits = s.run_sampler(100,num_cores=1)

# test with only one epoch
myDriver = orbitize.driver.Driver(input_file_1epoch, 'OFTI',
1, 1.22, 56.95,mass_err=0.08, plx_err=0.26)
Expand Down

0 comments on commit 82559ee

Please sign in to comment.