/
interact.py
474 lines (403 loc) · 19.1 KB
/
interact.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
"""Provides tools for interactive visualizations.
Example use
-----------
The functions in this module are used to create Bokeh-based visualization
widgets. For example, the following code will create an interactive
visualization widget showing the pixel data and a lightcurve::
# SN 2018 oh Supernova example
from lightkurve import KeplerTargetPixelFile
tpf = KeplerTargetPixelFile.from_archive(228682548)
tpf.interact()
Note that this will only work inside a Jupyter notebook at this time.
"""
from __future__ import division, print_function
import logging
import warnings
import numpy as np
from astropy.stats import sigma_clip
from .utils import KeplerQualityFlags, LightkurveWarning
import os
log = logging.getLogger(__name__)
# Import the optional Bokeh dependency, or print a friendly error otherwise.
try:
import bokeh # Import bokeh first so we get an ImportError we can catch
from bokeh.io import show, output_notebook
from bokeh.plotting import figure, ColumnDataSource
from bokeh.models import LogColorMapper, Selection, Slider, RangeSlider, \
Span, ColorBar, LogTicker, Range1d
from bokeh.layouts import layout, Spacer
from bokeh.models.tools import HoverTool
from bokeh.models.widgets import Button, Div
from bokeh.models.formatters import PrintfTickFormatter
except ImportError:
# We will print a nice error message in the `show_interact_widget` function
pass
def prepare_lightcurve_datasource(lc):
"""Prepare a bokeh ColumnDataSource object for tool tips.
Parameters
----------
lc : LightCurve object
The light curve to be shown.
Returns
-------
lc_source : bokeh.plotting.ColumnDataSource
"""
# Convert time into human readable strings, breaks with NaN time
# See https://github.com/KeplerGO/lightkurve/issues/116
if (lc.time == lc.time).all():
human_time = lc.astropy_time.isot
else:
human_time = [' '] * len(lc.flux)
# Convert binary quality numbers into human readable strings
qual_strings = []
for bitmask in lc.quality:
flag_str_list = KeplerQualityFlags.decode(bitmask)
if len(flag_str_list) == 0:
qual_strings.append(' ')
if len(flag_str_list) == 1:
qual_strings.append(flag_str_list[0])
if len(flag_str_list) > 1:
qual_strings.append("; ".join(flag_str_list))
lc_source = ColumnDataSource(data=dict(
time=lc.time,
time_iso=human_time,
flux=lc.flux,
cadence=lc.cadenceno,
quality_code=lc.quality,
quality=np.array(qual_strings)))
return lc_source
def prepare_tpf_datasource(tpf, aperture_mask):
"""Prepare a bokeh DataSource object for selection glyphs
Parameters
----------
tpf : TargetPixelFile
TPF to be shown.
aperture_mask : boolean numpy array
The Aperture mask applied at the startup of interact
Returns
-------
tpf_source : bokeh.plotting.ColumnDataSource
Bokeh object to be shown.
"""
npix = tpf.flux[0, :, :].size
pixel_index_array = np.arange(0, npix, 1).reshape(tpf.flux[0].shape)
xx = tpf.column + np.arange(tpf.shape[2])
yy = tpf.row + np.arange(tpf.shape[1])
xa, ya = np.meshgrid(xx, yy)
preselected = Selection()
preselected.indices = pixel_index_array[aperture_mask].reshape(-1).tolist()
tpf_source = ColumnDataSource(data=dict(xx=xa+0.5, yy=ya+0.5),
selected=preselected)
return tpf_source
def get_lightcurve_y_limits(lc_source):
"""Compute sensible defaults for the Y axis limits of the lightcurve plot.
Parameters
----------
lc_source : bokeh.plotting.ColumnDataSource
The lightcurve being shown.
Returns
-------
ymin, ymax : float, float
Flux min and max limits.
"""
flux = sigma_clip(lc_source.data['flux'], sigma=5)
low, high = np.nanpercentile(flux, (1, 99))
margin = 0.10 * (high - low)
return low - margin, high + margin
def make_lightcurve_figure_elements(lc, lc_source):
"""Make the lightcurve figure elements.
Parameters
----------
lc : LightCurve
Lightcurve to be shown.
lc_source : bokeh.plotting.ColumnDataSource
Bokeh object that enables the visualization.
Returns
----------
fig : `bokeh.plotting.figure` instance
step_renderer : GlyphRenderer
vertical_line : Span
"""
if lc.mission == 'K2':
title = "Lightcurve for {} (K2 C{})".format(
lc.label, lc.campaign)
elif lc.mission == 'Kepler':
title = "Lightcurve for {} (Kepler Q{})".format(
lc.label, lc.quarter)
elif lc.mission == 'TESS':
title = "Lightcurve for {} (TESS Sec. {})".format(
lc.label, lc.sector)
else:
title = "Lightcurve for target {}".format(lc.label)
fig = figure(title=title, plot_height=340, plot_width=600,
tools="pan,wheel_zoom,box_zoom,tap,reset",
toolbar_location="below",
border_fill_color="whitesmoke")
fig.title.offset = -10
fig.yaxis.axis_label = 'Flux (e/s)'
fig.xaxis.axis_label = 'Time - 2454833 (days)'
ylims = get_lightcurve_y_limits(lc_source)
fig.y_range = Range1d(start=ylims[0], end=ylims[1])
# Add step lines, circles, and hover-over tooltips
fig.step('time', 'flux', line_width=1, color='gray',
source=lc_source, nonselection_line_color='gray',
nonselection_line_alpha=1.0)
circ = fig.circle('time', 'flux', source=lc_source, fill_alpha=0.3, size=8,
line_color=None, selection_color="firebrick",
nonselection_fill_alpha=0.0,
nonselection_fill_color="grey",
nonselection_line_color=None,
nonselection_line_alpha=0.0,
fill_color=None, hover_fill_color="firebrick",
hover_alpha=0.9, hover_line_color="white")
tooltips = [("Cadence", "@cadence"),
("Time ({})".format(lc.time_format.upper()),
"@time{0,0.000}"),
("Time (ISO)", "@time_iso"),
("Flux", "@flux"),
("Quality Code", "@quality_code"),
("Quality Flag", "@quality")]
fig.add_tools(HoverTool(tooltips=tooltips, renderers=[circ],
mode='mouse', point_policy="snap_to_data"))
# Vertical line to indicate the cadence
vertical_line = Span(location=lc.time[0], dimension='height',
line_color='firebrick', line_width=4, line_alpha=0.5)
fig.add_layout(vertical_line)
return fig, vertical_line
def make_tpf_figure_elements(tpf, tpf_source, pedestal=0):
"""Returns the lightcurve figure elements.
Parameters
----------
tpf : TargetPixelFile
TPF to show.
tpf_source : bokeh.plotting.ColumnDataSource
TPF data source.
Returns
-------
fig, stretch_slider : bokeh.plotting.figure.Figure, RangeSlider
"""
if tpf.mission in ['Kepler', 'K2']:
title = 'Pixel data (CCD {}.{})'.format(tpf.module, tpf.output)
elif tpf.mission == 'TESS':
title = 'Pixel data (Camera {}.{})'.format(tpf.camera, tpf.ccd)
else:
title = "Pixel data"
fig = figure(plot_width=370, plot_height=340,
x_range=(tpf.column, tpf.column+tpf.shape[2]),
y_range=(tpf.row, tpf.row+tpf.shape[1]),
title=title, tools='tap,box_select,wheel_zoom,reset',
toolbar_location="below",
border_fill_color="whitesmoke")
fig.yaxis.axis_label = 'Pixel Row Number'
fig.xaxis.axis_label = 'Pixel Column Number'
vlo, lo, hi, vhi = np.nanpercentile(tpf.flux - pedestal, [0.2, 1, 95, 99.8])
vstep = (np.log10(vhi) - np.log10(vlo)) / 300.0 # assumes counts >> 1.0!
color_mapper = LogColorMapper(palette="Viridis256", low=lo, high=hi)
fig.image([pedestal + tpf.flux[0, :, :]], x=tpf.column, y=tpf.row,
dw=tpf.shape[2], dh=tpf.shape[1], dilate=True,
color_mapper=color_mapper, name="tpfimg")
# The colorbar will update with the screen stretch slider
# The colorbar margin increases as the length of the tick labels grows.
# This colorbar share of the plot window grows, shrinking plot area.
# This effect is known, some workarounds might work to fix the plot area:
# https://github.com/bokeh/bokeh/issues/5186
color_bar = ColorBar(color_mapper=color_mapper,
ticker=LogTicker(desired_num_ticks=8),
label_standoff=-10, border_line_color=None,
location=(0, 0), background_fill_color='whitesmoke',
major_label_text_align='left',
major_label_text_baseline='middle',
title='e/s', margin=0)
fig.add_layout(color_bar, 'right')
color_bar.formatter = PrintfTickFormatter(format="%14u")
fig.rect('xx', 'yy', 1, 1, source=tpf_source, fill_color='gray',
fill_alpha=0.4, line_color='white')
# Configure the stretch slider and its callback function
stretch_slider = RangeSlider(start=np.log10(vlo),
end=np.log10(vhi),
step=vstep,
title='Screen Stretch (log)',
value=(np.log10(lo), np.log10(hi)),
orientation='horizontal',
width=200,
direction='ltr',
show_value=True,
sizing_mode='fixed',
name='tpfstretch')
def stretch_change_callback(attr, old, new):
"""TPF stretch slider callback."""
fig.select('tpfimg')[0].glyph.color_mapper.high = 10**new[1]
fig.select('tpfimg')[0].glyph.color_mapper.low = 10**new[0]
stretch_slider.on_change('value', stretch_change_callback)
return fig, stretch_slider
def make_default_export_name(tpf, suffix='custom-lc'):
"""makes the default name to save a custom intetract mask"""
fn = tpf.hdu.filename()
if fn is None:
outname = "{}_{}_{}.fits".format(tpf.mission, tpf.targetid, suffix)
else:
base = os.path.basename(fn)
outname = base.rsplit('.fits')[0] + '-{}.fits'.format(suffix)
return outname
def show_interact_widget(tpf, notebook_url='localhost:8888',
max_cadences=30000,
aperture_mask='pipeline',
exported_filename=None):
"""Display an interactive Jupyter Notebook widget to inspect the pixel data.
The widget will show both the lightcurve and pixel data. The pixel data
supports pixel selection via Bokeh tap and box select tools in an
interactive javascript user interface.
Note: at this time, this feature only works inside an active Jupyter
Notebook, and tends to be too slow when more than ~30,000 cadences
are contained in the TPF (e.g. short cadence data).
Parameters
----------
tpf : lightkurve.TargetPixelFile
Target Pixel File to interact with
notebook_url: str
Location of the Jupyter notebook page (default: "localhost:8888")
When showing Bokeh applications, the Bokeh server must be
explicitly configured to allow connections originating from
different URLs. This parameter defaults to the standard notebook
host and port. If you are running on a different location, you
will need to supply this value for the application to display
properly. If no protocol is supplied in the URL, e.g. if it is
of the form "localhost:8888", then "http" will be used.
max_cadences : int
Raise a RuntimeError if the number of cadences shown is larger than
this value. This limit helps keep browsers from becoming unresponsive.
"""
try:
import bokeh
if bokeh.__version__[0] == '0':
warnings.warn("interact() requires Bokeh version 1.0 or later", LightkurveWarning)
except ImportError:
log.error("The interact() tool requires the `bokeh` package; "
"you can install bokeh using e.g. `conda install bokeh`.")
return None
aperture_mask = tpf._parse_aperture_mask(aperture_mask)
if exported_filename is None:
exported_filename = make_default_export_name(tpf)
try:
exported_filename = str(exported_filename)
except:
log.error('Invalid input filename type for interact()')
raise
if ('.fits' not in exported_filename.lower()):
exported_filename += '.fits'
lc = tpf.to_lightcurve(aperture_mask=aperture_mask)
npix = tpf.flux[0, :, :].size
pixel_index_array = np.arange(0, npix, 1).reshape(tpf.flux[0].shape)
# Bokeh cannot handle many data points
# https://github.com/bokeh/bokeh/issues/7490
if len(lc.cadenceno) > max_cadences:
msg = 'Interact cannot display more than {} cadences.'
raise RuntimeError(msg.format(max_cadences))
def create_interact_ui(doc):
# The data source includes metadata for hover-over tooltips
lc_source = prepare_lightcurve_datasource(lc)
tpf_source = prepare_tpf_datasource(tpf, aperture_mask)
# Create the lightcurve figure and its vertical marker
fig_lc, vertical_line = make_lightcurve_figure_elements(lc, lc_source)
# Create the TPF figure and its stretch slider
pedestal = np.nanmin(tpf.flux)
fig_tpf, stretch_slider = make_tpf_figure_elements(tpf, tpf_source,
pedestal=pedestal)
# Helper lookup table which maps cadence number onto flux array index.
tpf_index_lookup = {cad: idx for idx, cad in enumerate(tpf.cadenceno)}
# Interactive slider widgets and buttons to select the cadence number
cadence_slider = Slider(start=np.min(tpf.cadenceno),
end=np.max(tpf.cadenceno),
value=np.min(tpf.cadenceno),
step=1,
title="Cadence Number",
width=490)
r_button = Button(label=">", button_type="default", width=30)
l_button = Button(label="<", button_type="default", width=30)
export_button = Button(label="Save Lightcurve",
button_type="success", width=120)
message_on_save = Div(text=' ',width=600, height=15)
# Callbacks
def update_upon_pixel_selection(attr, old, new):
"""Callback to take action when pixels are selected."""
# Check if a selection was "re-clicked", then de-select
if ((sorted(old) == sorted(new)) & (new != [])):
# Trigger recursion
tpf_source.selected.indices = new[1:]
if new != []:
selected_indices = np.array(new)
selected_mask = np.isin(pixel_index_array, selected_indices)
lc_new = tpf.to_lightcurve(aperture_mask=selected_mask)
lc_source.data['flux'] = lc_new.flux
ylims = get_lightcurve_y_limits(lc_source)
fig_lc.y_range.start = ylims[0]
fig_lc.y_range.end = ylims[1]
else:
lc_source.data['flux'] = lc.flux * 0.0
fig_lc.y_range.start = -1
fig_lc.y_range.end = 1
message_on_save.text = " "
export_button.button_type = "success"
def update_upon_cadence_change(attr, old, new):
"""Callback to take action when cadence slider changes"""
if new in tpf.cadenceno:
frameno = tpf_index_lookup[new]
fig_tpf.select('tpfimg')[0].data_source.data['image'] = \
[tpf.flux[frameno, :, :] - pedestal]
vertical_line.update(location=tpf.time[frameno])
else:
fig_tpf.select('tpfimg')[0].data_source.data['image'] = \
[tpf.flux[0, :, :] * np.NaN]
lc_source.selected.indices = []
def go_right_by_one():
"""Step forward in time by a single cadence"""
existing_value = cadence_slider.value
if existing_value < np.max(tpf.cadenceno):
cadence_slider.value = existing_value + 1
def go_left_by_one():
"""Step back in time by a single cadence"""
existing_value = cadence_slider.value
if existing_value > np.min(tpf.cadenceno):
cadence_slider.value = existing_value - 1
def save_lightcurve():
"""Save the lightcurve as a fits file with mask as HDU extension"""
if tpf_source.selected.indices != []:
selected_indices = np.array(tpf_source.selected.indices)
selected_mask = np.isin(pixel_index_array, selected_indices)
lc_new = tpf.to_lightcurve(aperture_mask=selected_mask)
lc_new.to_fits(exported_filename, overwrite=True,
aperture_mask=selected_mask.astype(np.int),
SOURCE='lightkurve interact',
NOTE='custom mask',
MASKNPIX=np.nansum(selected_mask))
if message_on_save.text == " ":
text = '<font color="black"><i>Saved file {} </i></font>'
message_on_save.text = text.format(exported_filename)
export_button.button_type = "success"
else:
text = '<font color="gray"><i>Saved file {} </i></font>'
message_on_save.text = text.format(exported_filename)
else:
text = '<font color="gray"><i>No pixels selected, no mask saved</i></font>'
export_button.button_type = "warning"
message_on_save.text = text
def jump_to_lightcurve_position(attr, old, new):
if new != []:
cadence_slider.value = lc.cadenceno[new[0]]
# Map changes to callbacks
r_button.on_click(go_right_by_one)
l_button.on_click(go_left_by_one)
tpf_source.selected.on_change('indices', update_upon_pixel_selection)
lc_source.selected.on_change('indices', jump_to_lightcurve_position)
export_button.on_click(save_lightcurve)
cadence_slider.on_change('value', update_upon_cadence_change)
# Layout all of the plots
sp1, sp2, sp3, sp4 = (Spacer(width=15), Spacer(width=30),
Spacer(width=80), Spacer(width=60) )
widgets_and_figures = layout([fig_lc, fig_tpf],
[l_button, sp1, r_button, sp2,
cadence_slider, sp3, stretch_slider],
[export_button, sp4, message_on_save])
doc.add_root(widgets_and_figures)
output_notebook(verbose=False, hide_banner=True)
return show(create_interact_ui, notebook_url=notebook_url)