# Cathode Viewer
Advanced Cathode Camera Image Viewer Application

## Author
Jason Koglin,
Los Alamos National Laboratory

koglin@lanl.gov

## License
This analysis application is part of the catemis package, 
which is distributed as open-source software under a BSD 3-Clause License

© 2022. Triad National Security, LLC. All rights reserved.  
LANL Copyright No. C21111.

This program was produced under U.S. Government contract 89233218CNA000001 for Los Alamos
National Laboratory (LANL), which is operated by Triad National Security, LLC for the U.S.
Department of Energy/National Nuclear Security Administration. All rights in the program are
reserved by Triad National Security, LLC, and the U.S. Department of Energy/National Nuclear
Security Administration. The Government is granted for itself and others acting on its behalf a
nonexclusive, paid-up, irrevocable worldwide license in this material to reproduce, prepare
derivative works, distribute copies to the public, perform publicly and display publicly, and to permit
others to do so.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
   list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright notice,
   this list of conditions and the following disclaimer in the documentation
   and/or other materials provided with the distribution.

3. Neither the name of the copyright holder nor the names of its contributors
   may be used to endorse or promote products derived from this software
   without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

In [None]:
import matplotlib.pyplot as plt
SMALL_SIZE = 20
MEDIUM_SIZE = 25
BIGGER_SIZE = 30

plt.rc('font', size=SMALL_SIZE)          # controls default text sizes
plt.rc('axes', titlesize=SMALL_SIZE)     # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE)    # fontsize of the x and y labels
plt.rc('xtick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize=SMALL_SIZE)    # legend fontsize
plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title

fontsize = {
    'title': 20,
    'xlabel': 18,
    'yticks': 12,
    'ylabel': 18,
    'xticks': 12,
    'zlabel': 18,
    'cticks': 12,
}

from matplotlib.figure import Figure
def null_figure():
    return Figure(figsize=(8,6))

import numpy as np
import pandas as pd
import xarray as xr

import holoviews as hv
import hvplot.pandas
import hvplot.xarray

import panel as pn
import panel.widgets as pnw
import param

pn.extension()

In [None]:
import platform
from pathlib import Path
import catemis

try:
    version = catemis.__version__

except:
    version = 'dev'
    
package = 'catemis'
package_git_url = 'https://git.lanl.gov/koglin/'+package

# needs update for documentation URL
docs_url = package_git_url

app_site = '{:} v{:}'.format(package, version)

package_path = Path(catemis.__file__).parent
lanl_logo_path = package_path/'template'/'LANL Logo White.png'
jdiv_logo_path = package_path/'template'/'j-div-logo-blue1.png'
if lanl_logo_path.is_file():
    lanl_logo_pn = pn.pane.PNG(str(lanl_logo_path), width=300)
else:
    lanl_logo_pn = None

if jdiv_logo_path.is_file():
    jdiv_logo_pn = pn.Row(
        pn.Spacer(width=50),
        pn.pane.PNG(str(jdiv_logo_path), width=200),
    )

else:
    jdiv_logo_pn = None

app_meta_pn = pn.Column(
    pn.pane.Markdown('###[App Documentation]({:})'.format(docs_url), 
                     align='center', width=320),
    pn.pane.Markdown('###Created by koglin@lanl.gov'),
    pn.pane.Markdown('###[{:}]({:}) v{:}'.format(package, package_git_url, version), 
                     align='center', width=320),
    align='center',
    width=320,
)

In [None]:
def get_local_datetime(datetime_utc,
                       zone='us/mountain'):
    """
    Get the local datetime from datetime UTC
    (originally from darhtio.utils)

    Parameters
    ----------
    datetime : np.datetime64
        UTC datetime.
    zone : str, optional
        Time Zone. The default is 'us/mountain'.

    Returns
    -------
    np.datetime64
        Local datetime
    """
    import pandas as pd
    import pytz

    tz = pytz.timezone(zone)
    tstamp = pd.Timestamp(datetime_utc)
    datetime = (pd.Timedelta(tz.utcoffset(tstamp), units='ns')
                +datetime_utc).to_datetime64()

    return datetime

In [None]:
status_pane = pn.pane.Markdown(object='', width=800)

In [None]:
path = str(Path(catemis.__file__).parent/'data')
data_folder = '20200701'
null_folder = '20190314'
file_base = 'CAT_CAM.'

In [None]:
class ImageViewer(param.Parameterized):
    """
    Interactive Cathode Image Viewer 
    """    
    action = param.Action(lambda x: x.param.trigger('action'), label='Update Plot')
    update_shots = param.Action(lambda x: x.param.trigger('update_shots'), label='Update Shots')
    height = param.Integer(default=700, step=100, label='Plot Height') 
    width = param.Integer(default=1000, step=100, label='Plot Size')
    psize = param.Number(default=0.00375, step=0.00025, label='Pixel Size [mm]')
    cmin = param.Number(default=400, label='Temp/ADU min', step=5)
    cmax = param.Number(default=1200, label='Temp/ADU max', step=5)
    auto_lim = param.Boolean(default=True, label='Auto Limits')
    auto_thresh = param.Boolean(default=True, label='Auto Threshold')
    x0 = param.Number(default=550, step=1, label='Xo (pixel)')
    y0 = param.Number(default=630, step=1, label='Yo (pixel)')
    rx = param.Number(default=-11, step=0.5, label='Azimuth [deg]')
    ry = param.Number(default=53, step=0.5, label='Elevation [deg]')
    orientation = param.Number(default=0, step=1, label='Roll [deg]')
    dcat = param.Number(default=490, step=1, label='Diameter [pixels]')
    rcat = param.Number(default=6.5/2*25.4, label='Radius [mm]')
    focal = param.Number(default=100, step=1, label='Camera Focus [mm]')
    Zcam = param.Number(default=(30.7+10)*25.4, label='Cathode Distance [mm]')
    img_data = param.Selector(default='Temperature', objects=['ADU', 'Temperature'], label='Plot Units')
    calfactor = param.Number(2.85e-15, step=1e-16, label='Temperature Calibration')
    catcam_integration = param.Number(0.080, step=0.005, label='Integration Time [sec]')

    path = param.String(default=str(path), label='Path')
    file_base = param.String(default=file_base, label='File Base')
    file_ext = param.String(default='.tif', label='Extension')
    use_shot_nums = param.Boolean(default=True, label='Use Shot Numbers')
    data_folder = param.String(default=data_folder, label='Data Folder')
    shots = param.Selector(label='Shots')
    files = param.Selector(label='Files')
    null_subtract = param.Boolean(default=True, label='Subtract Null Shot')
    null_folder = param.String(default=null_folder, label='Null Folder')
    null_shots = param.Selector(label='Null Shots')
    null_files = param.Selector(label='Null Files')

    def get_params(self):
        params = {}
        for attr in ['x0','y0',
                     'rx', 
                     'ry', 
                     'orientation', 
                     'dcat', 'focal', 'rcat', 'Zcam', 
                     'psize']:
            params[attr] = getattr(self,attr)

        return params
    
    def pn_select(self):
        if self.use_shot_nums:
            return pn.Row(
                self.param.shots, 
                self.param.null_shots,
                width=320,
            )

        else:
            return pn.Row(
                self.param.files, 
                self.param.null_files, 
                width=320,
            )
    
    @param.depends('update_shots', watch=True)
    def set_shots_from_path(self):
        """
        Identify and set shots from path if use_shot_nums set, otherwise set files from path.
        """
        import traceback
        # Data Files
        data_path = Path(self.path)/self.data_folder
        files_path = list(data_path.glob('{:}*{:}'.format(self.file_base, self.file_ext)))
        files_list = [file.parts[-1] for file in files_path]
        self.param.files.objects = files_list
        
        shot_list = []
        sstr = 'Setting shot list' 
        print(sstr)
        try:
            for file in files_list:
                shot = file.split('.')[1]
                if shot.isnumeric():
                    shot_list.append(int(shot))

        except:
            traceback.print_exc()
            print('... ERROR getting shot numbers from files')
            
        shot_list = sorted(shot_list)
        print('shots: ', shot_list)
        if not shot_list or shot_list != self.param.shots.objects:
            self.param.shots.default = None
            self.param.files.default = None
            
        self.param.shots.objects = shot_list
            
        null_data_path = Path(self.path)/self.null_folder
        null_files_path = list(null_data_path.glob('{:}*{:}'.format(self.file_base, self.file_ext)))
        null_files_list = [file.parts[-1] for file in null_files_path]
        self.param.null_files.objects = null_files_list

        # Null Files
        null_shot_list = []
        sstr = 'Setting null shot list' 
        print(sstr)
        try:
            for file in null_files_list:
                shot = file.split('.')[1]
                if shot.isnumeric():
                    null_shot_list.append(int(shot))

        except:
            traceback.print_exc()
            print('... ERROR getting null shot numbers from files')
            
        null_shot_list = sorted(null_shot_list)
        print('null shots: ', null_shot_list)
        if not null_shot_list or null_shot_list != self.param.null_shots.objects:
            self.param.null_shots.default = None
            self.param.null_files.default = None
            
        self.param.null_shots.objects = null_shot_list

        if self.use_shot_nums:
            if shot_list:
                self.shots = shot_list[0]
                self.null_shots = null_shot_list[0]
                self.param.trigger('action')
                
            else:
                status_pane.object = 'Click Update Shots or set new Path and/or Data Folder'

        else:
            if files_list:
                self.files = files_list[0]
                self.null_files = null_files_list[0]
                self.param.trigger('action')
                
            else:
                status_pane.object = 'Click Update Shots or set new Path and/or Data Folder'
                
    def load_catcam(self):
        """
        Load catcam image passing parameters to cathode.load_catcam method
        """
        import os
        import imageio
        import xarray as xr
        from pathlib import Path
        import traceback
        
        from catemis import cathode
        
        shot = self.shots
        data_path = Path(self.path)/self.data_folder
        file_base = self.file_base
        file_ext = self.file_ext
        file_sel = self.files
        if not file_sel and self.param.files.objects:
            file_sel = self.param.files.objects[0]
        
        null_shot = self.null_shots
        null_path = Path(self.path)/self.null_folder
        null_file_sel = self.null_files
        if not null_file_sel and self.param.null_files.objects:
            null_file_sel = self.param.null_files.objects[0]

        name = 'img'

        if self.use_shot_nums:
            # Select by shot number
            if not shot:
                print('ERROR: invalid shot')
                return

            fname = '{:}{:}{:}'.format(file_base, shot, file_ext)
            fname_null = '{:}{:}{:}'.format(file_base, null_shot, file_ext)

        else:
            if not file_sel:
                print('ERROR: invalid file')
                return

            fname = file_sel
            fname_null = null_file_sel
    
        params = self.get_params()
        try:
            da = cathode.load_catcam(
                    fname=fname, 
                    daystr=self.data_folder, 
                    path=data_path,
                    **params,
            )

        except:
            traceback.print_exc()
            sstr = 'ERROR loading {:} from {:}'.format(fname, data_path)
            status_pane.object = sstr

        try:
            if self.null_subtract:
                da['CatNull'] = cathode.load_catcam(
                        fname=fname_null, 
                        daystr=self.null_folder, 
                        path=null_path,
                        **params,
                )
            else:
                da['CatNull'] = xr.zeros_like(da)
                
            da['CatNull'].attrs['long_name'] = 'Cathode Pedestal'
            da['CatNull'].attrs['units'] = 'ADU'
            da['CatNull'].attrs['shot_null'] = null_shot
            da['CatNull'].attrs['daystr_null'] = self.null_folder

        except:
            traceback.print_exc()
            sstr = 'ERROR loading null shot {:} from {:}'.format(fname_null, null_path)
            status_pane.object = sstr

        xdim = da.dims[1]
        ydim = da.dims[0]
        da.coords['CatCut'] = ((
                               (((da/da.mean(dim=[xdim,ydim])-1)*100)
                                .rolling({ydim:3}).mean()
                                .rolling({xdim:3}).mean()
                               ) >0 )
                               .rolling({ydim:3}).mean()
                               .rolling({xdim:3}).mean()
                               ) == 1
        da.coords['CatCut'].attrs['long_name'] = 'Cathode Image Cut'
        
        da.coords['CAT_R'] = (da[xdim]**2+da[ydim]**2).pipe(np.sqrt)
        da.coords['CAT_R'].attrs['long_name'] = 'R'
        da.coords['CAT_R'].attrs['units'] = da[xdim].attrs.get('units')

        self.da = da
        return self.da
    
    @param.depends('action')
    def plot_image(self):
        """
        Plot image based on shot/file selection
        """
        import traceback
        shot = self.shots

        if self.use_shot_nums:
            # Select by shot number
            if not shot:
                status_pane.object = 'Click Update Shots or set new Path and/or Data Folder'
                return

            title = 'Shot {:}'.format(shot)
            status_pane.object = '... Loading Shot {:}'.format(shot)
            
        else:
            if not self.files:
                status_pane.object = 'Click Update Shots or set new Path and/or Data Folder'
                return 

            title = self.files
            status_pane.object = '... Loading File {:}'.format(title)

        clabel = 'ADU'
        da = self.load_catcam().copy()
        
        if da is None:
            return null_figure()

        if 'CatNull' in da.coords:
            try:
                da -= da.CatNull.values
                
            except:
                traceback.print_exc()
                status_pane.object = 'Failed Subtracting Null'
        
        if self.img_data == 'Temperature':
            from catemis import cathode
            ds = cathode.calculate_catcam_temperature(
                    da.rename('CatCam').to_dataset().expand_dims('shot'),
                    calfactor=self.calfactor,
                    catcam_integration=self.catcam_integration,
                    cathode_radius_cm=self.rcat/10.,
                ).squeeze()
            da = ds.CatTemp
            clabel = 'T [C]'
            title += ' Temperature: <T> = {:5.1f} C'.format(float(ds.CatTemp_mean))

        else:
            title += 'Intensity'
                
        if self.auto_thresh:
            da = da.where(da.CatCut)
        
        xdim = da.dims[1]
        ydim = da.dims[0]
        
        pn_plot = None
        try:
            long_name = da.attrs.get('long_name', '')
            units = da.attrs.get('units', 'ADU')

            height = self.height
            width = self.width

            if self.auto_lim:
                try:
                    dfp = pd.DataFrame(da.values.flatten()).describe(percentiles=[0.01,0.99])
                    cmin = int(dfp.T['1%'])
                    cmax = int(dfp.T['99%'])
                    if False:
                        cmin -= 5
                        cmax += 5

                    self.cmin = cmin
                    self.cmax = cmax

                except:
                    print('Failed clim')
                    print(dfp)

            clim = (self.cmin, self.cmax)

            status_pane.object += '... Updating CatCam plot'

            plt_kwargs = {
                'fontsize': fontsize,
                'frame_width': int(width*0.8),
                'clim': clim,
                'clabel': clabel,
                'title': str(title),
                'aspect': 1,
            }
        
            pn_plot = da.hvplot.image(x=xdim, y=ydim, **plt_kwargs).opts(toolbar="above")
            status_pane.object = 'Cathode Image for {:}'.format(title)

        except:
            traceback.print_exc()
            status_pane.object = 'Failed Making Image Plot'
            return null_figure()

        try:
            cat_rbins = np.arange(0, self.rcat*1.1,2.)
            dag = da.groupby_bins('CAT_R', cat_rbins)
            dagm = dag.mean()
            dags = dag.std()
            rmid = np.array([(a.left+a.right)/2 for a in dagm.CAT_R_bins.to_pandas()])
            dagm.coords['rCAT'] = (('CAT_R_bins'), rmid)
            da = dagm.swap_dims({'CAT_R_bins': 'rCAT'})

            plt_kwargs = {
                'fontsize': fontsize,
                'height': int(height),
                'width': int(width),
                'ylim': clim,
                'xlabel': 'Cathode Radius [mm]',
                'ylabel': clabel,
                'title': str(title),
            }

            pn_plot = (
                pn_plot
                + da.hvplot('rCAT', **plt_kwargs)
            ).cols(1)
                        
        except:
            traceback.print_exc()
            status_pane.object = 'Failed Making Projection Plot'

        return pn_plot

    def cam_params(self):
        pn_params = pn.Column(
            pn.Row(
                self.param.update_shots,
                pn.Column(
                    self.param.use_shot_nums,
                    self.param.null_subtract,
                ),
                width=320,
            ),
            pn.Row(
                self.param.file_base,
                self.param.file_ext,
                width=320,
            ),
            self.param.path,
            pn.Row(
                self.param.data_folder,
                self.param.null_folder,
                width=320,
            ),
            pn.Row(
                self.pn_select, 
                width=320,
            ),
            pn.layout.Divider(),
            pn.Row(
                self.param.action,
                width=320,
            ),
            pn.Row(
                self.param.x0,
                self.param.y0,
                width=320,
            ),
            pn.Row(
                self.param.rx,
                self.param.ry,
                width=320,
            ),
            pn.Row(
                self.param.orientation,
                self.param.rcat,
                width=320,
            ),
            pn.Row(
                self.param.focal,
                self.param.Zcam,
                width=320,
            ),
            pn.Row(
                self.param.dcat,
                self.param.psize,
                width=320,
            ),
            pn.Row(
                self.param.calfactor,
                self.param.catcam_integration,
                width=320,
            ),
            pn.Row(
                self.param.img_data,
                pn.Column(
                    self.param.auto_lim, 
                    self.param.auto_thresh,
                ),
                width=320,
            ),
            pn.Row(
                self.param.cmin,
                self.param.cmax,
                width=320,
            ),
            pn.Row(
                self.param.width,
                self.param.height,
                width=320,
            ),
        )

        return pn_params
    
    def app(self):
        pn_app = pn.template.MaterialTemplate(
            title='Cathode Viewer', 
            logo=str(lanl_logo_path),
            header_background='#000F7E',
            theme=pn.template.DefaultTheme,
        )

        pn_app.sidebar.append(jdiv_logo_pn)
        pn_app.sidebar.append(self.cam_params)
        pn_app.sidebar.append(pn.layout.Divider())
        pn_app.sidebar.append(app_meta_pn)

        pn_app.main.append(
            pn.Column(
                pn.Card(
                    pn.Column(
                        status_pane,
                        self.plot_image,
                    ),
                    title='Cathode Plots',
                ),
            )
        )

        return pn_app    
    
viewer = ImageViewer()

In [None]:
# To launch straight from Jupyter Notebook uncomment and execute this line
#viewer.app().show()

In [None]:
# For use with command line as in documentation
# panel serve --show cathode_viewer.ipynb --port=5051
viewer.app().servable()