In [None]:
# default_exp camera

# Camera

> summary

This

In [None]:
#hide

# documentation extraction for class methods
from nbdev.showdoc import *

# unit tests using test_eq(...)
from fastcore.test import *

# monkey patching class methods using @patch
from fastcore.foundation import *
from fastcore.foundation import patch

# imitation of Julia's multiple dispatch using @typedispatch
from fastcore.dispatch import typedispatch

# bring forth **kwargs from an inherited class for documentation
from fastcore.meta import delegates

In [None]:
#export

from fastcore.foundation import patch
from fastcore.meta import delegates
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.interpolate import interp1d
from PIL import Image
from scipy.signal import decimate

from typing import Iterable, Union, Callable, List, TypeVar, Generic, Tuple, Optional
import datetime
import json
import pickle

In [None]:
#export

from openhsi.data import *


In [None]:

class OpenHSI():
    """Base Class for the OpenHSI Camera.
    Your cameras should subclass this class.
    """
    def __init__(self,cam_cal_file:str = None, fwhm_nm:float = None, along_track_len:int = 8):
        """Initialise DataCube with customisable processing level"""
        
        self.fwhm_nm = fwhm_nm
        self.along_track_len = along_track_len
        
        # if optional fwhm_nm is provided, then use a binned DataCube
        if fwhm_nm is not None and fwhm_nm > 0.2:
            print("Not implemented error.")
            #self.dc = DataCube(self.resolution,self.row_slice,n_lines=n,axis=1,dtype=np.uint32,shifts=self.smile_shifts,fwhm_nm=4.,wavelength_range=self.wavelength_range)
        
        # else:
        self.buff_sz = (self.resolution[0],along_track_len,self.resolution[1])
        self.dc = CircArrayBuffer(self.buff_sz,axis=1,dtype=np.uint32)
        
    def __enter__(self):
        return self
    
    def __close__(self):
        self.stop_cam()

    def __exit__(self, exc_type, exc_value, traceback):
        self.stop_cam()
        
    def settings_dump(fname:str, settings:dict):
        with open(fname, 'w') as outfile:
            json.dump(settings, outfile,indent=4,)

    def settings_load(fname:str) -> dict:
        with open(fname) as json_file:
            openhsi_settings = json.load(json_file)
        return openhsi_settings
    
    def get_datacube(self) -> None:
        """Take `n` images with option to `show` first image taken."""
        self.start_cam()
        for i in tqdm(range(self.along_track_len)):
            self.dc.push(self.get_img())
        self.stop_cam()
        
    def to_netcdf(self,path:str):
    # NEED REWRITING
        self.nc_data = xr.Dataset(data_vars=dict(datacube=(["x","time","wavelength"],self.dc.data)),
                                coords=dict(
                                    x=(["x"],np.arange(self.dc.size[0])),
                                    wavelength=(["wavelength"],self.dc.wavelengths),
                                    time=(["time"],self.dc.timestamps),
                                ),
                                attrs=self.settings["metadata"])

        """provide metadata to NetCDF coordinates"""
        self.nc_data.x.attrs["long_name"]   = "cross-track"
        self.nc_data.x.attrs["units"]       = "pixels"
        self.nc_data.x.attrs["description"] = "cross-track spatial coordinates"

        self.nc_data.time.attrs["long_name"]   = "along-track"
        self.nc_data.time.attrs["description"] = "along-track spatial coordinates"

        self.nc_data.wavelength.attrs["long_name"]   = "wavelength_nm"
        self.nc_data.wavelength.attrs["units"]       = "nanometers"
        self.nc_data.wavelength.attrs["description"] = "wavelength in nanometers."

        self.nc_data.datacube.attrs["long_name"]   = "hyperspectral datacube"
        self.nc_data.datacube.attrs["units"]       = "digital number"
        self.nc_data.datacube.attrs["description"] = "hyperspectral datacube"

        self.nc_data.to_netcdf(path)
        
    def show_robust(self):
        
        # if optional fwhm_nm is provided, then use a binned DataCube
        if self.fwhm_nm is not None and self.fwhm_nm > 0.2:
            print("Not implemented error.")

            rgb_shape = (*self.nc_data.datacube.shape[:2],3)
            rgb = np.zeros(rgb_shape,dtype = np.float64)
            rgb[:,:,0] = np.array(self.nc_data.datacube.sel(wavelength=640,method="nearest"))
            rgb[:,:,1] = np.array(self.nc_data.datacube.sel(wavelength=550,method="nearest"))
            rgb[:,:,2] = np.array(self.nc_data.datacube.sel(wavelength=470,method="nearest"))

            RGB = xr.Dataset(data_vars=dict(rgb=(["x","y","wavelength"],rgb)),coords=dict(y=(["y"],np.arange(rgb_shape[1])),x=(["x"],np.arange(rgb_shape[0])),wavelength=(["wavelength"],np.arange(rgb_shape[2]))),)
            #print(A.rgb)
            RGB.rgb.plot.imshow(rgb="wavelength",robust=True,figsize=(18,3))
            plt.gca().invert_yaxis()
            plt.xlabel("along-track"); plt.ylabel("cross-track")
        else:
            plt.imshow(np.sum(self.dc.data,axis=2),cmap="gray")
            plt.ylabel('Cross-track pixels')
            plt.xlabel('Along-track pixels')
            plt.show()


In [None]:

@delegates()
class SimulatedCamera(OpenHSI):
    
    def __init__(self, img_path:str = None, **kwargs):
        """Initialise Simulated Camera"""
        
        self.resolution = (452,1600)
        super().__init__(**kwargs)

        def sim_gen(img_path):
            if img_path is not None:
                img = Image.open(img_path)
                img.resize((self.resolution[0],self.along_track_len))
                
            else:
                for i in range(self.along_track_len):
                    yield np.random.randint(0,255,self.resolution)
        self.sim = sim_gen()
    
    def start_cam(self):
        pass
    
    def stop_cam(self):
        pass
    
    def get_img(self) -> np.ndarray:
        return next(self.sim)
    

with SimulatedCamera(img_path="assets/rocky_beach.png", along_track_len=64) as cam:
    cam.get_datacube()
    cam.show_robust()
    
    


In [None]:

# https://www.ximea.com/support/wiki/apis/Python

class XimeaCamera(OpenHSI):
    
    def __init__(self,**kwargs):
        """Initialise Camera"""
        
        self.resolution = (452,1600)
        
        super().__init__(**kwargs)
        
        self.xicam = xiapi.Camera()
        self.xicam.open_device_by_SN(serial_number) if serial_number else self.xicam.open_device()

        print(f'Connected to device {self.xicam.get_device_sn()}')

        self.xbinwidth  = xbinwidth
        self.xbinoffset = xbinoffset
        self.exposure   = exposure_ms
        self.gain       = 0

        self.xicam.set_width(self.xbinwidth)
        self.xicam.set_offsetX(self.xbinoffset)
        self.xicam.set_exposure_direct(1000*self.exposure)
        self.xicam.set_gain_direct(self.gain)

        self.xicam.set_imgdataformat("XI_RAW16")
        self.xicam.set_output_bit_depth("XI_BPP_12")
        self.xicam.enable_output_bit_packing()
        self.xicam.disable_aeag()
        
        self.xicam.set_binning_vertical(2)
        self.xicam.set_binning_vertical_mode("XI_BIN_MODE_SUM")

        self.rows, self.cols = self.xicam.get_height(), self.xicam.get_width()
        self.img = xiapi.Image()
        
        self.load_cam_settings()
    
    def start_cam(self):
        self.xicam.start_acquisition()
    
    def stop_cam(self):
        self.xicam.stop_acquisition()
        self.xicam.close_device()
    
    def get_img(self):
        self.xicam.get_image(self.img)
        return self.img.get_image_data_numpy()