Skip to content

Commit

Permalink
Improve how pycbc_live handles localization-only detectors (gwastro#4635
Browse files Browse the repository at this point in the history
)

* Improve how pycbc_live handles localization-only detectors

LOD = localization-only detectors

* Do not initialize single-detector FAR estimators for LOD
* Only use triggering detectors for PSD var statistic
* Correct the GraceDB annotation to explain which detectors
  contributed to the FAR and list the LOD
* Only list triggering detectors in background dump filename
* Code reformatting

* Fixes

* Fixes

* Use attributes of event manager instead

* Improve GraceDB annotation

* Fix

* Ignore loc-only detectors for rank > 0

* Further clarify which detectors are used only for localization

* Main loop should go through the loc-only detectors only in rank 0
  • Loading branch information
titodalcanton authored and lpathak97 committed Mar 13, 2024
1 parent 3fbc2fe commit 33c6966
Showing 1 changed file with 74 additions and 45 deletions.
119 changes: 74 additions & 45 deletions bin/pycbc_live
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ class LiveEventManager(object):
def __init__(self, args, bank):
self.low_frequency_cutoff = args.low_frequency_cutoff
self.bank = bank
self.skymap_only_ifos = [] if args.skymap_only_ifos is None else list(set(args.skymap_only_ifos))
# all interferometers involved in the analysis, whether for generating
# candidates or doing localization only
self.ifos = set(args.channel_name.keys())
# interferometers used for localization only
self.skymap_only_ifos = set(args.skymap_only_ifos or [])
# subset of the interferometers allowed to produce candidates
self.trigg_ifos = self.ifos - self.skymap_only_ifos

# Figure out what we are supposed to process within the pool of MPI processes
self.comm = mpi.COMM_WORLD
Expand Down Expand Up @@ -117,7 +123,7 @@ class LiveEventManager(object):
if self.run_snr_optimization:
# preestimate the number of CPU cores that we can afford giving
# to followup processes without slowing down the main search
bg_cores = len(tuple(itertools.combinations(ifos, 2)))
bg_cores = len(tuple(itertools.combinations(self.trigg_ifos, 2)))
analysis_cores = 1 + bg_cores
if platform.system() != 'Darwin':
available_cores = len(os.sched_getaffinity(0))
Expand Down Expand Up @@ -502,7 +508,7 @@ class LiveEventManager(object):
logging.info('computing followup data for coinc')
coinc_ifos = coinc_results['foreground/type'].split('-')
followup_ifos = set(ifos) - set(coinc_ifos)
followup_ifos = list(followup_ifos | set(self.skymap_only_ifos))
followup_ifos = list(followup_ifos | self.skymap_only_ifos)

double_ifar = coinc_results['foreground/ifar']
if double_ifar < args.ifar_double_followup_threshold:
Expand Down Expand Up @@ -537,13 +543,20 @@ class LiveEventManager(object):
logging.info('Coincident candidate! Saving as %s', fname)

# Verbally explain some details not obvious from the other info
comment = ('Trigger produced as a {} coincidence. '
'FAR is based on all listed detectors.<br />'
'Two-detector ranking statistic: {}<br />'
'Followup detectors: {}')
comment = comment.format(ppdets(coinc_ifos),
args.ranking_statistic,
ppdets(followup_ifos))
comment = (
'Trigger produced as a {} coincidence.<br />'
'Two-detector ranking statistic: {}<br />'
'Detectors used for FAR calculation: {}.<br />'
'Detectors used for localization: {}.<br />'
'Detectors used only for localization: {}.'
)
comment = comment.format(
ppdets(coinc_ifos),
args.ranking_statistic,
set(ifos) - self.skymap_only_ifos,
ppdets(followup_ifos),
ppdets(self.skymap_only_ifos)
)

ifar = coinc_results['foreground/ifar']
upload_checks = self.enable_gracedb_upload and self.ifar_upload_threshold < ifar
Expand Down Expand Up @@ -604,7 +617,7 @@ class LiveEventManager(object):
logging.info(f'Found {ifo} single with ifar {sifar}')

followup_ifos = [i for i in active if i is not ifo]
followup_ifos = list(set(followup_ifos) | set(self.skymap_only_ifos))
followup_ifos = list(set(followup_ifos) | self.skymap_only_ifos)
# Don't recompute ifar considering other ifos
sld = self.compute_followup_data(
[ifo],
Expand Down Expand Up @@ -636,10 +649,15 @@ class LiveEventManager(object):
logging.info('Single-detector candidate! Saving as %s', fname)

# Verbally explain some details not obvious from the other info
comment = ('Trigger produced as a {0} single. '
'FAR is based on {0} only.<br />'
'Followup detectors: {1}')
comment = comment.format(ifo, ppdets(followup_ifos))
comment = (
'Trigger produced as a {0} single.<br />'
'Detectors used for FAR calculation: {0}.<br />'
'Detectors used for localization: {1}.<br />'
'Detectors used only for localization: {2}.'
)
comment = comment.format(
ifo, ppdets(followup_ifos), ppdets(self.skymap_only_ifos)
)

# Has a coinc event at this time been uploaded recently?
# If so, skip upload - Note that this means that we _always_
Expand Down Expand Up @@ -1017,8 +1035,6 @@ args = parser.parse_args()
scheme.verify_processing_options(args, parser)
fft.verify_fft_options(args, parser)
Coincer.verify_args(args, parser)
ifos = set(args.channel_name.keys())
analyze_singles = LiveSingle.verify_args(args, parser, ifos)

if args.output_background is not None and len(args.output_background) != 2:
parser.error('--output-background takes two parameters: period and path')
Expand All @@ -1045,14 +1061,16 @@ if bank.min_f_lower < args.low_frequency_cutoff:
'minimum f_lower across all templates '
'({} Hz)'.format(args.low_frequency_cutoff, bank.min_f_lower))

logging.info('Analyzing data from detectors %s', ppdets(ifos))

evnt = LiveEventManager(args, bank)
logging.info('Detectors that only aid in the sky localization %s', ppdets(evnt.skymap_only_ifos))

logging.info('Analyzing data from detectors %s', ppdets(evnt.ifos))
logging.info('Using %s for localization only', ppdets(evnt.skymap_only_ifos))

analyze_singles = LiveSingle.verify_args(args, parser, evnt.trigg_ifos)

# include MPI rank and functional description into proctitle
task_name = 'root' if evnt.rank == 0 else 'filtering'
setproctitle('PyCBC Live rank {:d} [{}]'.format(evnt.rank, task_name))
setproctitle(f'PyCBC Live rank {evnt.rank:d} [{task_name}]')

sg_chisq = SingleDetSGChisq.from_cli(args, bank, args.chisq_bins)

Expand Down Expand Up @@ -1111,23 +1129,27 @@ with ctx:
args.round_start_time
logging.info('Starting from: %s', args.start_time)

# initialize the data readers for all detectors
# Initialize the data readers for all detectors. For rank 0, we need data
# from all detectors, including the localization-only ones. For higher
# ranks, we only need the detectors that can generate candidates.
if args.max_length is not None:
maxlen = args.max_length
maxlen = int(maxlen)
data_reader = {ifo: StrainBuffer.from_cli(ifo, args, maxlen)
for ifo in ifos}
data_reader = {
ifo: StrainBuffer.from_cli(ifo, args, maxlen)
for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos)
}
evnt.data_readers = data_reader

# create single-detector background "estimators"
if analyze_singles and evnt.rank == 0:
sngl_estimator = {ifo: LiveSingle.from_cli(args, ifo)
for ifo in ifos}
for ifo in evnt.trigg_ifos}

# Create double coincident background estimator for every combo
# Create double coincident background estimator
# for every pair of triggering interferometers
if args.enable_background_estimation and evnt.rank == 0:
trigg_ifos = [ifo for ifo in ifos if ifo not in evnt.skymap_only_ifos]
ifo_combos = itertools.combinations(trigg_ifos, 2)
ifo_combos = itertools.combinations(evnt.trigg_ifos, 2)
estimators = []
for combo in ifo_combos:
logging.info('Will calculate %s background', ppdets(combo, "-"))
Expand Down Expand Up @@ -1167,12 +1189,14 @@ with ctx:
# main analysis loop
data_end = lambda: data_reader[tuple(data_reader.keys())[0]].end_time
last_bg_dump_time = int(data_end())
psd_count = {ifo:0 for ifo in ifos}
psd_count = {ifo:0 for ifo in evnt.ifos}

# Create dicts to track whether the psd has been recalculated and to hold
# psd variation filters
psd_recalculated = {ifo: True for ifo in ifos}
psd_var_filts = {ifo: None for ifo in ifos}
psd_recalculated = {
ifo: True for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos)
}
psd_var_filts = {ifo: None for ifo in evnt.trigg_ifos}

while data_end() < args.end_time:
t1 = pycbc.gps_now()
Expand All @@ -1181,7 +1205,7 @@ with ctx:
results = {}
evnt.live_detectors = set()

for ifo in ifos:
for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos):
results[ifo] = False
status = data_reader[ifo].advance(
valid_pad,
Expand Down Expand Up @@ -1259,23 +1283,27 @@ with ctx:

# Calculate and add the psd variation for the results
if args.psd_variation:

for ifo in results:
logging.info(f"Calculating PSD Variation Statistic for {ifo}")
logging.info("Calculating PSD Variation Statistic for %s", ifo)

# A new filter is needed if the PSD has been recalculated
if psd_recalculated[ifo] is True:
psd_var_filts[ifo] = variation.live_create_filter(data_reader[ifo].psd,
args.psd_segment_length,
int(args.sample_rate))
psd_var_filts[ifo] = variation.live_create_filter(
data_reader[ifo].psd,
args.psd_segment_length,
int(args.sample_rate)
)
psd_recalculated[ifo] = False

psd_var_ts = variation.live_calc_psd_variation(data_reader[ifo].strain,
psd_var_filts[ifo],
args.increment)
psd_var_ts = variation.live_calc_psd_variation(
data_reader[ifo].strain,
psd_var_filts[ifo],
args.increment
)

psd_var_vals = variation.live_find_var_value(results[ifo],
psd_var_ts)
psd_var_vals = variation.live_find_var_value(
results[ifo], psd_var_ts
)

results[ifo]['psd_var_val'] = psd_var_vals

Expand All @@ -1295,7 +1323,7 @@ with ctx:
gates = {ifo: data_reader[ifo].gate_params for ifo in data_reader}

# map the results file to an hdf file
prefix = '{}-{}-{}-{}'.format(''.join(sorted(ifos)),
prefix = '{}-{}-{}-{}'.format(''.join(sorted(evnt.ifos)),
args.file_prefix,
data_end() - args.analysis_chunk,
valid_pad)
Expand All @@ -1310,8 +1338,9 @@ with ctx:
data_end() - last_bg_dump_time > float(args.output_background[0]):
last_bg_dump_time = int(data_end())
bg_dists = coinc_pool.broadcast(output_background, None)
bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(''.join(sorted(ifos)),
last_bg_dump_time)
bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(
''.join(sorted(evnt.trigg_ifos)), last_bg_dump_time
)
bg_fn = os.path.join(args.output_background[1], bg_fn)
with h5py.File(bg_fn, 'w') as bgf:
for bg_ifos, bg_data, bg_time in bg_dists:
Expand Down

0 comments on commit 33c6966

Please sign in to comment.