# Abstract

Learn about the data.

# Environment

In [1]:
from pathlib import Path
import sys

In [2]:
sys.path.insert(0, str(Path('..')))

In [3]:
from astropy.io import fits
from bokeh.io import output_notebook, push_notebook, show
from bokeh.models import LinearAxis, Range1d
from bokeh.plotting import figure
from ipywidgets import FloatSlider, IntSlider, interact
import numpy as np
from scipy import constants, ndimage

In [4]:
DATA_PATH = Path('.') / '..' / '..' / 'data'

# Library

In [5]:
class CalcJump:
    """Calculate jumps

    Parameters
    ----------
    data: np.array
        The data.

    jump_locations: list or np.array or None
        The jump locations.
        
    data_window: int or None
        Size of the averaging window
        
    pad: int or None
        Distance from the jump location to place the average
        windows on each side of the jump.
    """
    def __init__(self,
                 data,
                 jump_constraint=0.8,
                 data_window = 13,
                 convolve_window = 5,
                 pad=1
                ):
        self._data = None
        
        self.jump_constraint = jump_constraint
        self.data_window = data_window
        self.convolve_window = convolve_window
        self.pad = pad
        
        self.data = data
            
    def __call__(self):
        return self.jump_means
            
    @property
    def convolve(self):
        if self._convolve is None:
            self._convolve = ndimage.sobel(self.data_mva)
        return self._convolve
    
    @property
    def convolve_mva(self):
        if self._convolve_mva is None:
            self._convolve_mva = moving_average(self.convolve, n=self.convolve_window)
        return self._convolve_mva
    
    @property
    def data(self):
        return self._data
    
    @data.setter
    def data(self, data):
        self._clear_state()
        self._data = data
    
    @property
    def data_mva(self):
        if self._data_mva is None:
            self._data_mva = moving_average(self.data, n=self.data_window)
        return self._data_mva
    
    @property
    def jump_means(self):
        """Determine the means on each side of the jumps
        
        Returns
        -------
        [((left_idx,  left_mean), (right_idx, right_mean))[,...]]
            Two pairs of index and mean, left and right of the jump.
        """
        if self._jump_means is None:
            self._jump_means = []
            half_window = self.data_window // 2
            for jump_loc in self.jump_locations:
                left_idx = jump_loc - half_window - self.pad
                left_mean = np.mean(self.data[jump_loc - self.pad - self.data_window:jump_loc - self.pad])
                right_idx = jump_loc + half_window + self.pad
                right_mean = np.mean(self.data[jump_loc + self.pad:jump_loc + self.pad + self.data_window])
                self._jump_means.append(((left_idx, left_mean), (right_idx, right_mean)))
        return self._jump_means
    
    @property
    def jump_locations(self):
        """Jump locations"""
        if self._jump_locations is None:
            self._jump_locations = central_idxs_constraint(
                self.convolve_mva, self.jump_constraint, 
                adj=self.data_window / 2 + self.convolve_window / 2 + 1
            )
        return self._jump_locations
    
    def _clear_state(self):
        self._convolve = None
        self._convolve_mva = None
        self._data_mva = None
        self._jump_means = None
        self._jump_locations = None

In [6]:
def central_idxs_constraint(a, c=0.8, adj=0):
    """Return central indexes of regions meeting constraint
    
    Parameters
    ----------
    a: nd.array
        Convolution array
    c: float
        Constraint above which counts as a signal
    adj: int
        Adjustment to apply to the indicies, due to averaging
        
    Returns
    -------
    central_idxs: [int[,...]]
        List of central indexes of continous regions
        where the contraint has been met
    """
    central_idxs = []
    meets = np.abs(a) > c
    has_met = False
    for idx, met in enumerate(meets):
        if met:
            adj_idx = idx + adj
            if has_met:
                region.append(adj_idx)
            else:
                region = [adj_idx]
                has_met = True
        else:
            if has_met:
                central_idxs.append(int(np.mean(region)))
                has_met = False
    return central_idxs

In [7]:
def calc_jump_means(data, jump_locations, data_window=3, pad=1):
    """Determine the means on each side of the jumps
    
    Parameters
    ----------
    data: np.array
        The data.
        
    jump_locations: list or np.array
        The jump locations.
        
    data_window: int
        Size of the averaging window
        
    pad: int
        Distance from the jump location to place the average
        windows on each side of the jump.
        
    Returns
    -------
    [((left_idx,  left_mean), (right_idx, right_mean))[,...]]
        Two pairs of index and mean, left and right of the jump.
    """
    jump_means = []
    half_window = data_window // 2
    for jump_loc in jump_locations:
        left_idx = jump_loc - half_window - pad
        left_mean = np.mean(data[jump_loc - pad - data_window:jump_loc - pad])
        right_idx = jump_loc + half_window + pad
        right_mean = np.mean(data[jump_loc + pad:jump_loc + pad + data_window])
        jump_means.append(((left_idx, left_mean), (right_idx, right_mean)))
    return jump_means


In [8]:
def moving_average(a, n=3):
    """Compute the moving average"""
    result = np.cumsum(a)
    result[n:] = result[n:] - result[:-n]
    return result[n-1:] / n

In [9]:
def npfile_to_fits(npfile):
    """Convert a Numpy array file to FITS"""
    npfile = Path(npfile)
    nparray = np.load(npfile)
    hdu = fits.PrimaryHDU(nparray)
    hdul = fits.HDUList([hdu])
    hdul.writeto(npfile.parent / (npfile.stem + '.fits'))

# Main

In [10]:
output_notebook()

## Mask making

In [15]:
image = np.load(DATA_PATH / 'noref.npy')

In [16]:
p_image = figure(tooltips=[('x', '$x'), ('y', '$y'), ('value', '@image')])

In [17]:
p_image.image(image=[image], x=0, y=0, dw=image.shape[0], dh=image.shape[1], palette='Spectral11')

In [18]:
show(p_image)

In [20]:
circle_mask_hdul = fits.open(DATA_PATH / 'circle_mask.fits')

In [21]:
circle_mask = circle_mask_hdul[0].data

In [27]:
circle_mask_bool = circle_mask == 0

In [33]:
p_circle = figure(tooltips=[('x', '$x'), ('y', '$y'), ('value', '@image')])
p_circle.image(image=[circle_mask_bool], x=0, y=0, dw=circle_mask.shape[0], dh=circle_mask.shape[1], palette='Spectral11')
show(p_circle)

In [30]:
rect_mask_hdul = fits.open(DATA_PATH / 'rect_mask.fits')

In [31]:
rect_mask = rect_mask_hdul[0].data

In [34]:
rect_mask_bool = rect_mask != 0

In [35]:
p_rect = figure(tooltips=[('x', '$x'), ('y', '$y'), ('value', '@image')])
p_rect.image(image=[rect_mask_bool], x=0, y=0, dw=rect_mask_bool.shape[0], dh=rect_mask_bool.shape[1], palette='Spectral11')
show(p_rect)

In [36]:
full_mask = circle_mask_bool | rect_mask_bool

In [37]:
p_full_mask = figure(tooltips=[('x', '$x'), ('y', '$y'), ('value', '@image')])
p_full_mask.image(image=[full_mask], x=0, y=0, dw=full_mask.shape[0], dh=full_mask.shape[1], palette='Spectral11')
show(p_full_mask)

In [38]:
image_ma = np.ma.array(image, mask=full_mask)

In [39]:
p_image_ma = figure(tooltips=[('x', '$x'), ('y', '$y'), ('value', '@image')])
p_image_ma.image(image=[image_ma], x=0, y=0, dw=image_ma.shape[0], dh=image_ma.shape[1], palette='Spectral11')
show(p_image_ma)

In [41]:
np.save(DATA_PATH / 'full_mask', full_mask)

## Jump finding

### Read data

In [None]:
data = {}
for fname in DATA_PATH.glob('*.npy'):
    data[fname.stem] = np.load(fname)

In [None]:
jump = data['ubot']
idx_none = 0
idx_jumps = 1
idx_many_missed = 80
idx_steep_but_positive = 86

In [None]:
jump = data['utop']

# idx = 6 F'ed up data
# idx = 11 hmmm, questionable
# idx = 17 twang
# idx = 25 bad data

In [None]:
# jump = data['uref']


In [None]:
# jump = data['umid']

# idx = 4 just steep?

### Explore results

In [None]:
jump_constraint = 0.8
data_window = 13
convolve_window = 5
pad = 1
jump_kernel = [-1., 1.]

In [None]:
jumps = CalcJump(
    jump[0], jump_constraint=jump_constraint, 
    data_window=data_window, convolve_window=convolve_window, pad=pad
)

In [None]:
p_edge = figure(plot_width=950, tooltips=[("x", "$x"), ("value", "@y")])
p_edge.extra_y_ranges = {'convolve': Range1d(start=-2.0, end=2.0)}
p_edge.add_layout(LinearAxis(y_range_name='convolve'), 'right')
render_locs = p_edge.vbar(jumps.jump_locations, width=2, top=np.amax(jump[0]), bottom=np.amin(jump[0]), 
                         line_color='red', fill_color='red')
render_convolve = p_edge.circle(range(len(jumps.convolve_mva)), jumps.convolve_mva, size=2, y_range_name='convolve',
                        fill_color='green', line_color=None)
render_edge = p_edge.circle(range(len(jump[0])), jump[0], size=3, fill_color=None)

# Graph the means
left, right = zip(*jumps.jump_means)
idxs, means = zip(*left, *right)
render_means = p_edge.dash(idxs, means, size=20, line_color='orange')

In [None]:
def update_p_edge(idx, cutoff=1.5, data_window=data_window, convolve_window=convolve_window):
    # Calculate
    jumps = CalcJump(
        jump[idx], jump_constraint=cutoff, data_window=data_window, convolve_window=convolve_window, pad=pad
    )
    
    # Plot data and convolution
    render_edge.data_source.data['y'] = jump[idx]
    render_convolve.data_source.data['x'] = range(len(jumps.convolve_mva)) 
    render_convolve.data_source.data['y'] = jumps.convolve_mva
    
    # Plot out jump locations
    render_locs.data_source.data['x'] = jumps.jump_locations
    render_locs.glyph.top = np.amax(jump[idx])
    render_locs.glyph.bottom = np.amin(jump[idx])
    
    # Plot out jump means
    left, right = zip(*jumps.jump_means)
    idxs, means = zip(*left, *right)
    render_means.data_source.data['x'] = idxs
    render_means.data_source.data['y'] = means
    
    push_notebook()

In [None]:
show(p_edge, notebook_handle=True)

In [None]:
interact(update_p_edge, idx=IntSlider(min=0, max=data['ubot'].shape[0] - 1, value=0), 
         cutoff=FloatSlider(min=0., max=5., value=jump_constraint), 
         data_window=IntSlider(min=1, max=100, value=data_window), 
         convolve_window=IntSlider(min=1, max=100, value=convolve_window))

In [None]:
mean_diffs = np.array([
    abs(left[1] - right[1])
    for left, right in jumps.jump_means
    ])

In [None]:
p_mean_diffs = figure()
p_mean_diffs.circle(range(len(mean_diffs)), mean_diffs)
show(p_mean_diffs)

In [None]:
mean_diffs_mod_2pi = np.mod(mean_diffs, constants.pi * 2)

In [None]:
p_mean_diffs_2pi = figure()
p_mean_diffs_2pi.circle(range(len(mean_diffs_mod_2pi)), mean_diffs_mod_2pi)
show(p_mean_diffs_2pi)

# Test & Utilities

## Convert npy to fits

In [None]:
npfile_to_fits(DATA_PATH / 'noref.npy')

## Plot a line

In [None]:
p_nav = figure(plot_width=950, tooltips=[("x", "$x"), ("value", "@y")])

In [None]:
render = p_nav.circle(range(40000), data['ubot'][0], size=4)

In [None]:
def update_p_nav(idx):
    render.data_source.data['y'] = data['ubot'][idx]
    push_notebook()

In [None]:
show(p_nav, notebook_handle=True)

In [None]:
interact(update_p_nav, idx=(0, data['ubot'].shape[0] - 1))

## Moving Average

In [None]:
mva_none = moving_average(jump[idx_none], n=10)

In [None]:
p_mva_none = figure(plot_width=950)
p_mva_none.circle(range(len(mva_none)), mva_none, size=2)
show(p_mva_none)

In [None]:
mva_jumps = moving_average(jump[idx_jumps], n=10)

In [None]:
p_mva_jumps = figure(plot_width=950)
p_mva_jumps.circle(range(len(mva_jumps)), mva_jumps, size=2)
show(p_mva_jumps)

## Jump detection with basic convolution

In [None]:
kernel = [-1., 1.]
# kernel = [1., -4., 1.]
# kernel = [4., -20., 4.]
# kernel = [-1., -2., 16, -2., -1.]

In [None]:
c_none = np.convolve(jump[idx_none], kernel, mode='valid')

In [None]:
p_convolve = figure(plot_width=950)

In [None]:
p_convolve.circle(range(len(c_none)), c_none, size=2)

In [None]:
show(p_convolve)

In [None]:
c_jumps = np.convolve(mva_jumps, kernel, mode='valid')

In [None]:
p_jumps = figure(plot_width=950)

In [None]:
p_jumps.circle(range(len(c_jumps)), c_jumps, size=2)

In [None]:
show(p_jumps)

In [None]:
central_idxs = central_idxs_constraint(c_jumps, 0.7, adj=5+1)

In [None]:
central_idxs

## Try the sobel

In [None]:
s = ndimage.sobel(mva_jumps)

In [None]:
p_sobel = figure()

In [None]:
p_sobel.circle(range(len(s)), s, size=2)

In [None]:
show(p_sobel)

In [None]:
s_avg = moving_average(s, n=3)

In [None]:
p_sobel_avg = figure()
p_sobel_avg.circle(range(len(s_avg)), s_avg, size=2)
show(p_sobel_avg)