# Kirisu 2

#### Simple editor for stopped-flow and other spectra

by *Sergio Boneta Martínez*  
based on the original **kirisu** by *Jose Ramon Peregrina* and *Jorge Estrada*

GPLv3 (C) 2022 @ *Universidad de Zaragoza*

In [None]:
#@title Prepare the environment
#@markdown Execute this cell before running the following cells. Only need to be done once, the rest can be run multiple times.

#@markdown Plots are nice and useful, but araise as source of errors and can be time comsuming for batch working. Uncheck the *plot* option to disable them.
plot = True  # @param {type:"boolean"}


import os
from copy import deepcopy

import numpy as np
import scipy.ndimage
import bokeh.io, bokeh.plotting, bokeh.palettes

from google.colab import files, widgets


class TDSpectrum:
    '''
        Time-Dependent Spectrum

        Parameters
        ----------
        filename : str
            path to spectrum file to read

        Attributes
        ----------
        plot_styles : tuple
            list of supported plot styles
        filename : str
            basename of spectrum file readed (def: 'spectrum.glb')
        type : str
            type of spectrum file readed (def: 'DATA')
        comment : str
            comment inclued in the header of spectrum file readed
        times : ndarray(n)
            array of times
        lambdas : ndarray(m)
            array of wavelengths
        absorb : ndarray(m,n)
            matrix of absorbances

        Properties
        ----------
        n_spec : int
            number of spectra
        n_times : int
            number of times
        n_lambdas : int
            number of wavelengths
        filename_trim : str
            filename with '_t' appended to the basename
        lim_times : tuple
            limits of times (min, max)
        lim_lambdas : tuple
            limits of wavelengths (min, max)
        lim_absorb : tuple
            limits of absorbances (min, max)
    '''

    plot_styles = ('2d-times', '2d-lambdas')

    def __init__(self, filename=None):
        self.times = np.array([])
        self.lambdas = np.array([])
        self.absorb = np.array([])
        self.filename = "spectrum.glb"
        self.type = "DATA"
        self.comment = ""
        if filename is not None:
            self.read(filename)

    def __str__(self) -> str:
        return f"File: {self.filename}\n" + \
               f"Times: {self.n_times} :: {self.lim_times[0]} - {self.lim_times[1]}\n" + \
               f"Lambdas: {self.n_lambdas} :: {self.lim_lambdas[0]} - {self.lim_lambdas[1]}\n" + \
               f"Absorbances: {self.lim_absorb[0]} - {self.lim_absorb[1]}\n"

    @property
    def n_spec(self) -> int:
        return self.n_times

    @property
    def n_times(self) -> int:
        return self.times.shape[0]

    @property
    def n_lambdas(self) -> int:
        return self.lambdas.shape[0]

    @property
    def filename_trim(self) -> str:
        return "{}_t{}".format(*os.path.splitext(self.filename))

    @property
    def lim_times(self) -> tuple:
        return (self.times.min(), self.times.max())

    @property
    def lim_lambdas(self) -> tuple:
        return (self.lambdas.min(), self.lambdas.max())

    @property
    def lim_absorb(self) -> tuple:
        return (self.absorb.min(), self.absorb.max())

    @staticmethod
    def _find_ndx(l, v):
        '''Find index of first occurence of starting lowercase value v in list l'''
        return [i for i, x in enumerate(l) if x.lower().startswith(v)][0]

    def read(self, filename, filestr="", format=None) -> None:
        '''
            Wrapper to read files based on extension/format

            Supported formats: GLB, standard CSV, ProDataCSV

            Parameters
            ----------
            filename : str
                path to file to read
            filestr : str, optional
                file itself as a contigous string
                if given, filename is not readed
            format : {'glb', 'csv'}, optional
                format of file to read
                if not specified, the extension of the filename is used
                if the extension is not recognized, assumed to be 'glb'
        '''
        self.__init__()
        # guess format from extension if not specified
        readers = {
            'glb' : self._read_glb,
            'csv' : self._read_csv
            }
        # check filename/filestr and read file
        self.filename = filename
        if not filestr:
            if not os.path.exists(filename):
                raise FileNotFoundError(f'File not found: {filename}')
            else:
                with open(filename, 'r') as f:
                    filestr = f.read()
        # assign format based on input argument/file extension/default
        extension = os.path.splitext(filename)[1][1:].lower()
        if format is not None:
            if format.lower() in readers:
                format = format.lower()
            else:
                raise ValueError(f'Unknown format to read: {format}')
        elif extension in readers:
            format = extension
        else:
            format = list(readers.keys())[0]
        # read file based on format
        readers[format](filestr)

    def _read_glb(self, filestr) -> None:
        '''Read a spectrum from a GLB file'''
        data = [line.strip() for line in filestr.splitlines() if line.strip()]
        # parse header (delimited by '/')
        slash_ndx = self._find_ndx(data, '/')
        header = data[:slash_ndx]
        data = data[slash_ndx+1:]
        self.type = header[self._find_ndx(header,'type>')].split('>')[1].strip()
        comment_init_ndx = self._find_ndx(header,'%')
        comment_end_ndx = self._find_ndx(reversed(header),'%')
        self.comment = "\n".join(header[comment_init_ndx+1:len(header)-comment_end_ndx-1])
        # get dimensions
        n_spec = int(data[self._find_ndx(data, 'n_spec')].split()[1])
        n_lamb = int(data[self._find_ndx(data, 'n_lam')].split()[1])
        # times
        time_ndx = int(self._find_ndx(data, 'times:')) + 1
        self.times = np.array(data[time_ndx:time_ndx+n_spec], dtype=float)
        # wavelengths
        lamb_ndx = int(self._find_ndx(data, 'lambda:')) + 1
        self.lambdas = np.array(data[lamb_ndx:lamb_ndx+n_lamb], dtype=float)
        # absorbances
        self.absorb = np.zeros((self.n_times, self.n_lambdas), dtype=float)
        abs_ndx = int(self._find_ndx(data, 'data:')) + 1
        data = data[abs_ndx:]
        for i in range(self.n_times):
            self.absorb[i,:] = np.array(data[i*self.n_lambdas:(i+1)*self.n_lambdas], dtype=float)

    def _read_csv(self, filestr) -> None:
        '''Read a spectrum from a CSV file'''
        data = filestr.splitlines()
        if data[0].lower().startswith('prodatacsv'):
            # ProDataCSV
            data = data[self._find_ndx(data, 'data:')+1:]
            data = data[self._find_ndx(data, ','):]
            self.times = np.array([i.strip() for i in data.pop(0).split(',') if i.strip()], dtype=float)
            self.absorb = np.empty((0, self.n_times), dtype=float)
            for row in data:
                if not row.strip():
                    break
                row = row.split(',')
                self.lambdas = np.append(self.lambdas, float(row[0]))
                self.absorb = np.vstack((self.absorb, np.array(row[1:], dtype=float)))
            self.absorb = self.absorb.T
        else:
            # standard CSV
            if not data[0].lower().startswith(','):
                del data[0]
            self.lambdas = np.array([i.strip() for i in data.pop(0).split(',') if i.strip()], dtype=float)
            self.absorb = np.empty((0, self.n_lambdas), dtype=float)
            for row in data:
                row = row.split(',')
                self.times = np.append(self.times, float(row[0]))
                self.absorb = np.vstack((self.absorb, np.array(row[1:], dtype=float)))

    def write(self, filename=None, format=None) -> None:
        '''
            Wrapper to write files based on extension/format

            Supported formats: GLB, standard CSV

            Parameters
            ----------
            filename : str
                path to file to write
                if not specified, is set to the original name
                with '_t' appended to the basename
            format : {'glb', 'csv'}, optional
                format of file to read
                if not specified, the extension of the filename is used
                if the extension is not recognized, assumed to be 'glb'
        '''
        # guess format from extension if not specified
        writers = {
            'glb' : self._write_glb,
            'csv' : self._write_csv
            }
        # assign format based on input argument/file extension/default
        filename = filename or self.filename_trim
        extension = os.path.splitext(filename)[1][1:].lower()
        if format is not None:
            if format.lower() in writers:
                format = format.lower()
            else:
                raise ValueError(f'Unknown format to write: {format}')
        elif extension in writers:
            format = extension
        else:
            format = list(writers.keys())[0]
        # write file based on format
        writers[format](filename)

    def _write_glb(self, filename) -> None:
        '''Write a spectrum to a GLB file'''
        # header
        data = "APL-ASCII-SPECTRAKINETIC\n"
        data += f"TYPE>{self.type}\n%\n{self.comment}\n%\n/\n"
        data += f"N_spec: {self.n_spec:d}\nN_lam: {self.n_lambdas:d}"
        # times
        data += "\nTimes:\n"
        data += '\n'.join([f'{i:.6f}' for i in self.times])
        # wavelengths
        data += "\nLambda:\n"
        data += '\n'.join([f'{i:.3f}' for i in self.lambdas])
        # absorbances
        data += "\nData:\n"
        for i in range(self.n_times):
            data += '\n'.join([f'{j:.6f}' for j in self.absorb[i,:]])
            data += '\n\n'
        # write file
        with open(filename, 'w') as f:
            f.write(data)

    def _write_csv(self, filename) -> None:
        '''Write a spectrum to a standard CSV file'''
        # header
        data = "SPECTRA\n"
        # wavelengths header
        data += ',' + ','.join([f'{i:.3f}' for i in self.lambdas]) + '\n'
        # times and absorbances matrix
        for i in range(self.n_times):
            data += f"{self.times[i]:.6f}," + ','.join([f'{j:.6f}' for j in self.absorb[i,:]]) + '\n'
        # write file
        with open(filename, 'w') as f:
            f.write(data)

    def plot(self, style='2d-times') -> None:
        '''
            Display a plot of the spectra

            Parameters
            ----------
            style : str, optional
                type of plot to draw (def: '2d-times')
                '2d-times' : multiple superposed spectra (times)
                             wavelength (x) vs. absorbance (y)
                '2d-lambdas' : multiple superposed spectra (wavelengths)
                               time (x) vs. absorbance (y)
                '3d' : 3D plot of spectra
                       time (x) vs. wavelength (y) vs. absorbance (z)
        '''
        n_times = self.n_times
        n_lambdas = self.n_lambdas
        color_palette = bokeh.palettes.Viridis256
        if style == '2d-times':
            fig = bokeh.plotting.figure(title=self.filename,
                                        x_axis_label='Wavelength (λ)',
                                        y_axis_label='Absorbance',
                                        width=800,
                                        height=400,
                                        x_range=self.lim_lambdas,
                                        y_range=self.lim_absorb
                                        )
            for i in range(n_times):
                fig.line(self.lambdas, self.absorb[i,:], color=color_palette[i*256//n_times])
        elif style == '2d-lambdas':
            fig = bokeh.plotting.figure(title=self.filename,
                                        x_axis_label='Time',
                                        y_axis_label='Absorbance',
                                        width=800,
                                        height=400,
                                        x_range=self.lim_times,
                                        y_range=self.lim_absorb
                                        )
            for i in range(n_lambdas):
                fig.line(self.times, self.absorb[:,i], color=color_palette[i*256//n_lambdas])
        elif style == '3d':
            raise NotImplementedError()
        else:
            raise ValueError(f'Unknown style to plot: {style}')
        bokeh.plotting.show(fig)

    def zero(self, lamb) -> None:
        '''Modify absorbances to make zero at a specific wavelength'''
        lamb_ndx = np.argmin(abs(self.lambdas-lamb))
        for i in range(self.n_times):
            self.absorb[i, :] -= self.absorb[i, lamb_ndx]

    def trim(self, time=[], lamb=[]) -> None:
        '''
            Trim spectra to a specific time/wavelength range

            Parameters
            ----------
            time : list, optional
                min and max time to keep
            lamb : list, optional
                min and max wavelength to keep
        '''
        # trim times
        if time:
            if len(time) != 2:
                raise ValueError("'time' must be a list of length 2")
            time.sort()
            time_ndx_min = np.argmin(abs(self.times-time[0]))
            time_ndx_max = np.argmin(abs(self.times-time[1]))
            self.times = self.times[time_ndx_min:time_ndx_max+1]
            self.absorb = self.absorb[time_ndx_min:time_ndx_max+1]
        # trim wavelengths
        if lamb:
            if len(lamb) != 2:
                raise ValueError("'lamb' must be a list of length 2")
            lamb.sort()
            lamb_ndx_min = np.argmin(abs(self.lambdas-lamb[0]))
            lamb_ndx_max = np.argmin(abs(self.lambdas-lamb[1]))
            self.lambdas = self.lambdas[lamb_ndx_min:lamb_ndx_max+1]
            self.absorb = self.absorb[:, lamb_ndx_min:lamb_ndx_max+1]

    def smooth(self, method='sma', scale=1, **kwargs) -> None:
        '''
            Smooth spectra wavelenghts for all times

            Parameters
            ----------
            method : str, optional
                type of smoothing to apply (def: 'gaussian')
                'gaussian' : Gaussian filter
                    'sigma' : standard deviation (def: 1)
                'sma' : simple moving average, trims edges
                    'window' : window size to take average (def: 5)
                'median' : median filter
                    'size' : size to apply filter (def: 2)
            scale : float, optional
                multiplier to scalate the smoothing parameter (def: 1)
        '''
        # default kwargs options
        kwargs_def = {'sigma': 1,
                      'window': 5,
                      'size': 2}
        kwargs = {**kwargs_def, **kwargs}
        # apply smoothing
        if method.lower() == 'gaussian':
            for i in range(self.n_times):
                self.absorb[i, :] = scipy.ndimage.gaussian_filter1d(self.absorb[i, :], kwargs['sigma']*scale)
        elif method.lower() == 'sma':
            window = int(kwargs['window']*scale)
            if window >= self.n_lambdas // 2:
                raise ValueError("'window' must be smaller than half the number of wavelengths")
            for i in range(self.n_times):
                self.absorb[i, :] = np.convolve(self.absorb[i, :], np.ones(window)/window, mode='same')
            self.lambdas = self.lambdas[window:-window]
            self.absorb = self.absorb[:, window:-window]
        elif method.lower() == 'median':
            size = int(kwargs['size']*scale)
            size = size if size > 1 else 1
            for i in range(self.n_times):
                self.absorb[i, :] = scipy.ndimage.median_filter(self.absorb[i, :], size)
        else:
            raise ValueError(f'Unknown smoothing method: {method}')


def plot_tabs(spectrum_class):
    if not plot:
        return
    tabs = widgets.TabBar(TDSpectrum.plot_styles)
    for style in TDSpectrum.plot_styles:
        with tabs.output_to(style, select=False):
            spectrum_class.plot(style)

bokeh.io.output_notebook()


In [None]:
#@title Upload spectra
#@markdown Upload one or several files from your local computer containing multiple spectra. Select the format to be read.

format = 'glb'  # @param ["glb", "csv"]

uploaded = files.upload()

spectrum = TDSpectrum()
spectra = []
if uploaded:
    for filename, filestr in uploaded.items():
        try:
            spectrum.read(filename, filestr.decode('UTF-8'), format=format)
            spectra.append(deepcopy(spectrum))
            plot_tabs(spectrum)
            print(spectrum)
        except:
            print(f"ERROR: '{filename}' could not be read/plot.")


In [None]:
#@title Process spectra
#@markdown Modify the spectra according to the input values provided. This cell can be run as many times as needed until a satisfactory plot is reached. If multiple spectra have been uploaded, the same modifications are applied to all of them.

#@markdown Smooth the absorbance with respect the wavelenght for each time.
smooth = 'No'  # @param ["No", "gaussian - Gaussian Filter", "sma - Simple Moving Average", "median - Median Filter"]
smooth_level = 1  #@param {type:"slider", min:0.2, max:3, step:0.1}

#@markdown Modify the spectra to make all times zero at a specific wavelength. Not performed if unchecked.
zero = True  # @param {type:"boolean"}
lambda_zero = 1000  # @param {type:"number"}

#@markdown Trim the spectra to only include up to a minimum and maximum time and wavelenght. Any value out of range will not be modified.
time_min = 0  # @param {type:"number"}
time_max = 1000  # @param {type:"number"}
lambda_min = 0  # @param {type:"number"}
lambda_max = 1000  # @param {type:"number"}


smooth = smooth.split()[0]
spectra_t = [deepcopy(spectrum) for spectrum in spectra]
for spectrum in spectra_t:
    if zero:
        spectrum.zero(lambda_zero)
    if smooth != 'No':
        spectrum.smooth(smooth, smooth_level)
    spectrum.trim([time_min, time_max], [lambda_min, lambda_max])
    plot_tabs(spectrum)
    print(spectrum)


In [None]:
#@title Download processed spectra
#@markdown Write the modified spectra to a new file and download it to your computer. If the name of the file is left blank, the original name with a '_t' will be used. The format will be infered from the extension typed, the extension of the input file or '.glb' by default.

file_name = ""  # @param {type:"string"}

for spectrum in spectra_t:
    file_name_t = file_name or spectrum.filename_trim
    spectrum.write(file_name_t)
    files.download(file_name_t)
