# Level Populations
Calculate the level populations 

In [1]:
import os
import warnings

import numpy as np
import dask
import dask.array
import distributed
import plasmapy.atomic
import astropy.units as u
import astropy.constants as const
import matplotlib.pyplot as plt
import matplotlib.colors
import seaborn
import scipy.interpolate
import scipy.linalg
import hissw
import h5py
import fortranformat
os.environ['XUVTOP'] = '/Users/willbarnes/ssw/packages/chianti/dbase/'
import ChiantiPy.core as ch
import ChiantiPy.tools as ch_tools

import fiasco

%matplotlib inline

 using cli
 using CLI for selections




In [2]:
foo = fiasco.Ion('fe_11', [1e4,1e8]*u.K)
bar = fiasco.Ion('fe_12', [1e6,1e9]*u.K)

In [4]:
fiasco.IonCollection(foo,bar)

AssertionError: Temperatures for all ions in collection must be the same.

In [5]:
foo+bar

AssertionError: Temperatures for all ions in collection must be the same.

## Modified Ion Object

In [None]:
@u.quantity_input
def proton_ratio(temperature: u.K):
    denominator = None
    for el_name in list_elements():
        el = fiasco.Element(el_name,temperature=temperature)
        for ion in el:
            if denominator is None:
                denominator = ion._ioneq['chianti']['ionization_fraction']*ion.abundance*ion.charge_state
            else:
                denominator += ion._ioneq['chianti']['ionization_fraction']*ion.abundance*ion.charge_state
            
    el_h = fiasco.Element('hydrogen',temperature=temperature)
    numerator = el_h[1].abundance*el_h[1]._ioneq['chianti']['ionization_fraction']
    f_ratio = scipy.interpolate.interp1d(el_h[1]._ioneq['chianti']['temperature'],numerator/denominator,
                                         fill_value='extrapolate')
    
    return f_ratio(temperature.value)

def list_elements():
    with h5py.File(fiasco.defaults['hdf5_dbase_root']) as hf:
        elements = [k.capitalize() for k in hf.keys()]
    return elements

In [None]:
class TestIon(fiasco.Ion):
    
    @fiasco.util.needs_dataset('scups')
    def effective_collision_strength(self):
        """
        Calculate the effective collision strength or the Maxwellian-averaged collision
        strength, typically denoted by upsilon.
        
        Note
        ----
        Need a more efficient way of calculating upsilon for all transitions. Current method is slow ions with
        many transitions, e.g. Fe IX and Fe XI
        """
        energy_ratio = np.outer(const.k_B.cgs*self.temperature,1.0/self._scups['delta_energy'].to(u.erg))
        upsilon = np.array(list(map(self.burgess_tully_descale, self._scups['bt_t'], self._scups['bt_upsilon'],
                                    energy_ratio.T, self._scups['bt_c'], self._scups['bt_type'])))
        upsilon = u.Quantity(np.where(upsilon>0., upsilon,0.))
        return upsilon.T
    
    @fiasco.util.needs_dataset('elvlc','scups')
    def electron_collision_rate(self):
        """
        Calculates the collision rate for de-exciting and exciting collisions for electrons
        """
        c = (const.h.cgs**2)/((2. * np.pi * const.m_e.cgs)**(1.5) * np.sqrt(const.k_B.cgs))
        upsilon = self.effective_collision_strength()
        omega_upper = 2.*self._elvlc['J'][self._scups['upper_level'] - 1] + 1.
        omega_lower = 2.*self._elvlc['J'][self._scups['lower_level'] - 1] + 1.
        dex_rate = c*upsilon/(np.sqrt(self.temperature)[:,np.newaxis])/omega_upper
        energy_ratio = np.outer(1./const.k_B.cgs/self.temperature,self._scups['delta_energy'].to(u.erg))
        ex_rate = (omega_upper/omega_lower)*dex_rate*np.exp(-energy_ratio)
        
        return dex_rate, ex_rate
    
    @fiasco.util.needs_dataset('psplups',default=(None,None))
    def proton_collision_rate(self):
        """
        Calculates the collision rate for de-exciting and exciting collisions for protons
        """
        # Create scaled temperature--these are not stored in the file
        bt_t = np.vectorize(np.linspace,excluded=[0,1],otypes='O')(0,1,[ups.shape[0] 
                                                                        for ups in self._psplups['bt_rate']])
        # Get excitation rates directly from scaled data
        energy_ratio = np.outer(const.k_B.cgs*self.temperature,1.0/self._psplups['delta_energy'].to(u.erg))
        ex_rate = np.array(list(map(self.burgess_tully_descale, bt_t, self._psplups['bt_rate'], energy_ratio.T,
                                    self._psplups['bt_c'], self._psplups['bt_type'])))
        ex_rate = u.Quantity(np.where(ex_rate>0.,ex_rate,0.),u.cm**3/u.s).T
        # Calculation de-excitation rates from excitation rate
        omega_upper = 2.*self._elvlc['J'][self._psplups['upper_level'] - 1] + 1.
        omega_lower = 2.*self._elvlc['J'][self._psplups['lower_level'] - 1] + 1.
        dex_rate = (omega_lower/omega_upper)*ex_rate*np.exp(1./energy_ratio)
        
        return dex_rate, ex_rate
    
    @fiasco.util.needs_dataset('wgfa','elvlc','scups')
    @u.quantity_input
    def level_populations(self, density: u.cm**(-3), include_protons=True):
        """
        Calculate populations of all energy levels as a function temperature and density.
        """
        def collect(a,b,c,axis):
            return c[np.where(a==b)].sum(axis=axis)
        collect_v = np.vectorize(collect,excluded=[0,2,3])
        level = self._elvlc['level']
        upper_level = self._scups['upper_level']
        lower_level = self._scups['lower_level']
        coeff_matrix = np.zeros(self.temperature.shape+level.shape+level.shape)/u.s
        
        # Radiative decays
        a_diagonal = collect_v(self._wgfa['upper_level'], level, self._wgfa['A'].value, None)*self._wgfa['A'].unit
        coeff_matrix[:,level - 1,level - 1] += -a_diagonal
        coeff_matrix[:,self._wgfa['lower_level']-1,self._wgfa['upper_level']-1] += self._wgfa['A']
        
        # Proton and electron collision rates
        dex_rate_e,ex_rate_e = self.electron_collision_rate()
        ex_diagonal = np.array([collect(lower_level, l, ex_rate_e.value.T,0) for l in level]).T*ex_rate_e.unit
        dex_diagonal = np.array([collect(upper_level, l, dex_rate_e.value.T,0) for l in level]).T*dex_rate_e.unit
        if include_protons and self._psplups is not None:
            p2e_ratio = proton_ratio(self.temperature)
            dex_rate_p,ex_rate_p = self.proton_collision_rate()
            upper_level_p = self._psplups['upper_level']
            lower_level_p = self._psplups['lower_level']
            ex_diagonal_p = np.array([collect(lower_level_p, l, ex_rate_p.value.T,0) for l in level]).T*ex_rate_p.unit
            dex_diagonal_p = np.array([collect(upper_level_p, l, dex_rate_p.value.T,0) for l in level]).T*dex_rate_p.unit
        
        # Allocate array of nlvl-by-ntemp-by-ndensity to store populations
        populations = np.zeros(self.temperature.shape+density.shape+level.shape)
        b = np.zeros(self.temperature.shape+level.shape)
        b[:,-1] = 1.0
        # Iterate over temperature and density
        for i_d,d in enumerate(density):
            coeff_matrix_copy = coeff_matrix.copy()
            # excitation and de-excitation out of current state
            coeff_matrix_copy[:,level - 1, level - 1] -= d*(dex_diagonal + ex_diagonal)
            # De-excitation from upper states and excitation from lower states
            coeff_matrix_copy[:,lower_level - 1,upper_level - 1] += d*dex_rate_e
            coeff_matrix_copy[:,upper_level - 1,lower_level - 1] += d*ex_rate_e

            # protons
            if include_protons and self._psplups is not None:
                coeff_matrix_copy[:,level-1, level-1] -= d*p2e_ratio[:,np.newaxis]*(dex_diagonal_p + ex_diagonal_p)
                coeff_matrix_copy[:,lower_level_p - 1,upper_level_p - 1] += d*p2e_ratio[:,np.newaxis]*dex_rate_p
                coeff_matrix_copy[:,upper_level_p - 1,lower_level_p - 1] += d*p2e_ratio[:,np.newaxis]*ex_rate_p

            coeff_matrix_copy[:,-1,:] = 1.*coeff_matrix_copy.unit
            pop = np.linalg.solve(coeff_matrix_copy.value,b)
            pop = np.where(pop < 0., 0., pop)
            pop /= pop.sum(axis=1)[:,np.newaxis]
            populations[:,i_d,:] = pop
                
        return u.Quantity(populations)
    
    def emissivity(self, density: u.cm**(-3), include_energy=False, **kwargs):
        """
        Calculate emissivity for all lines as a function of temperature and density
        """
        populations = self.level_populations(density,include_protons=kwargs.get('include_protons',True))
        wavelengths = np.fabs(self._wgfa['wavelength'])
        upper_levels = self._wgfa['upper_level'][wavelengths != 0*u.angstrom]
        a_values = self._wgfa['A'][wavelengths != 0*u.angstrom]
        wavelengths = wavelengths[wavelengths != 0*u.angstrom]
        if include_energy:
            energy = const.h.cgs*const.c.cgs/wavelengths.to(u.cm)
        else:
            energy = 1.*u.photon
        emiss = populations[:,:,upper_levels - 1]*(a_values*energy)
        
        return emiss

In [None]:
def collect(a,b,c,axis):
    return c[np.where(a==b)].sum(axis=axis)
collect_v = np.vectorize(collect,excluded=[0,2,3])

In [None]:
temperature = 10.**(np.arange(5,8,0.01))*u.K
density = np.logspace(8,10,20)*u.cm**(-3)
foo = TestIon('h_1',temperature)

In [None]:
pop = foo.level_populations(density)

## Level Populations

In [None]:
ti = TestIon('fe_5',10.**(np.arange(5,7,0.1))*u.K)

In [None]:
ch_ion = None
ch_ion = ch.ion('fe_5',temperature=ti.temperature.value,eDensity=1e10*np.ones(ti.temperature.shape))

In [None]:
pop = ti.level_populations([1e10]/(u.cm**3),include_protons=True)
emiss = ti.emissivity([1e10]/(u.cm**3),include_energy=True)

In [None]:
#ch_ion.PDensity = np.zeros(ch_ion.Temperature.shape)
ch_ion.populate(popCorrect=0)
ch_ion.emiss()

In [None]:
ssw_script = """
temperature = {{temperature}}
density = {{density}}
emiss = emiss_calc({{atomic_number}},{{ionization_stage}})
"""

In [None]:
num_levels = 15#ti._elvlc['level'].shape[0]
colors = seaborn.color_palette('Set2',n_colors=num_levels)
for i in range(num_levels):
    plt.plot(ti.temperature,pop[:,0,i],color=colors[i])
    plt.plot(ch_ion.Temperature,ch_ion.Population['population'][:,i],'o',color=colors[i],ls='',markevery=2)
plt.xscale('log')
plt.yscale('log')
#plt.ylim([1e-16,1e-9])
#plt.xlim([5e5,1e7])

In [None]:
num_levels = 15#ti._elvlc['level'].shape[0]
colors = seaborn.color_palette('Set2',n_colors=num_levels)
for i in range(num_levels):
    plt.plot(ti.temperature,emiss[:,0,i],color=colors[i])
    plt.plot(ch_ion.Temperature,4.*np.pi*ch_ion.Emiss['emiss'][i,:],'o',color=colors[i],ls='',markevery=2)
plt.xscale('log')
plt.yscale('log')
plt.ylim([1e-12,1e-7])

## Comparing Electron Collision Rates

In [None]:
ti = TestIon('h_1', np.logspace(5.5,9,20)*u.K)

In [None]:
dex_e,ex_e = ti.electron_collision_rate()

In [None]:
ch_ion = ch.ion('h_1',ti.temperature.value)

In [None]:
ch_ion.upsilonDescale()

In [None]:
fig = plt.figure(figsize=(20,15))
ax1 = plt.subplot2grid((3,2),(0,0),rowspan=2)
ax2 = plt.subplot2grid((3,2),(0,1),rowspan=2)
ax3 = plt.subplot2grid((3,2),(2,0),colspan=2)
num_levels = ti._elvlc['level'].max()
colors = seaborn.color_palette('Set2',n_colors=num_levels)
for i in range(num_levels):
    ax1.plot(ti.temperature,dex_e[i,:],color=colors[i])
    ax2.plot(ti.temperature,ex_e[i,:],color=colors[i])
    ax1.plot(ch_ion.Temperature,ch_ion.Upsilon['dexRate'][i,:],'o',ls='',color=colors[i])
    ax2.plot(ch_ion.Temperature,ch_ion.Upsilon['exRate'][i,:],'o',ls='',color=colors[i])
    ax3.plot(ti.temperature,np.fabs(dex_e[i,:].value - ch_ion.Upsilon['dexRate'][i,:])/ch_ion.Upsilon['dexRate'][i,:])
    ax3.plot(ti.temperature,np.fabs(ex_e[i,:].value - ch_ion.Upsilon['exRate'][i,:])/ch_ion.Upsilon['exRate'][i,:],ls='--')
#axes[0].set_title('De-excitation')
#axes[1].set_title('Excitation')
ax1.set_xscale('log')
ax2.set_xscale('log')
ax1.set_yscale('log')
ax2.set_yscale('log')
ax3.set_xscale('log')
ax3.set_yscale('log')
ax3.set_ylim([1e-10,1])
#plt.subplots_adjust(wspace=0)

In [None]:
plt.figure(figsize=(15,12))
plt.subplot(221)
plt.pcolor(dex_e.value,norm=matplotlib.colors.SymLogNorm(1e-30,vmin=1e-20,vmax=1e-9))
plt.subplot(223)
plt.pcolor(ex_e.value,norm=matplotlib.colors.SymLogNorm(1e-30,vmin=1e-20,vmax=1e-9))
plt.subplot(222)
plt.pcolor(ch_ion.Upsilon['dexRate'],norm=matplotlib.colors.SymLogNorm(1e-30,vmin=1e-20,vmax=1e-9))
plt.subplot(224)
plt.pcolor(ch_ion.Upsilon['exRate'],norm=matplotlib.colors.SymLogNorm(1e-30,vmin=1e-20,vmax=1e-9))

In [None]:
plt.figure(figsize=(15,6))
plt.subplot(121)
plt.pcolor((ch_ion.Upsilon['dexRate'] - dex_e.value)/ch_ion.Upsilon['dexRate'],
           vmin=0,vmax=1
          )
plt.subplot(122)
plt.pcolor((ch_ion.Upsilon['exRate'] - ex_e.value)/ch_ion.Upsilon['exRate'],
           vmin=0,vmax=1
          )
plt.colorbar()

## Comparing Proton Collision Rates

In [None]:
ti = TestIon('fe_20',np.logspace(5.5,9,20)*u.K)

In [None]:
dex_p,ex_p = ti.proton_collision_rate()

In [None]:
ch_ion = ch.ion('fe_20',ti.temperature.value)

In [None]:
ch_ion.upsilonDescaleSplups(prot=1)

In [None]:
ssw_script = """
fname = '{{ path_to_file }}'
read_elvlc,fname+'.elvlc',l1a,term,conf,ss,ll,jj,ecm,eryd,ecmth,erydth,eref
mult=2.*jj+1.
read_splups,fname+'.psplups',pstr,pref,/prot
temperature = {{temperature}}
i = {{index}}
;ex_rate = make_array(n_elements(pstr),n_elements(temperature))
;dex_rate = make_array(n_elements(pstr),n_elements(temperature))

;for i=0,n_elements(pstr)-1 do begin
descale_all,temperature,pstr,i,ex_rate
;ex_rate[i,*] = tmp
l1=pstr[i].lvl1-1
l2=pstr[i].lvl2-1
de=ABS(pstr[i].de)
dex_rate = ex_rate*mult[l1]/mult[l2]*exp(de*13.61/8.617/10.^(-5)/temperature)
;endfor
"""

In [None]:
ssw_runner = hissw.ScriptMaker(ssw_packages=['chianti'],ssw_paths=['chianti'])

In [None]:
ssw_ex_rate = np.zeros(ex_p.shape)
ssw_dex_rate = np.zeros(dex_p.shape)
for i in range(ti._psplups['lower_level'].shape[0]):
    ssw_prates = ssw_runner.run([(ssw_script,{'temperature':ti.temperature.value.tolist(),
                                              'index':i,
                                              'path_to_file':os.path.join(os.environ['XUVTOP'],'fe/fe_20/fe_20')})],
                                save_vars=('ex_rate','dex_rate'),verbose=False)
    ssw_ex_rate[i,:] = ssw_prates['ex_rate']
    ssw_dex_rate[i,:] = ssw_prates['dex_rate']

In [None]:
fig,axes = plt.subplots(1,2,figsize=(20,10),sharex=True,sharey=True)
num_levels = 9#ti._psplups['lower_level'].shape[0]
colors = seaborn.color_palette('Set2',n_colors=num_levels)
for i in range(num_levels):
    axes[0].plot(ti.temperature,ex_p[i,:],
                 color=colors[i],
                 label='fiasco')
    axes[0].plot(ch_ion.Temperature,ch_ion.PUpsilon['exRate'][i,:],'o',
                 color=colors[i],
                 ls='',markevery=2,
                 label='ChiantiPy')
    axes[0].plot(ti.temperature,ssw_ex_rate[i,:],'s',
                 color=colors[i],
                 ls='',markevery=2,
                 label='IDL')
    axes[1].plot(ti.temperature,dex_p[i,:],color=colors[i])
    axes[1].plot(ch_ion.Temperature,ch_ion.PUpsilon['dexRate'][i,:],'o',ls='',markevery=2,color=colors[i])
    axes[1].plot(ti.temperature,ssw_dex_rate[i,:],'s',ls='',markevery=2,color=colors[i])
plt.xscale('log')
plt.yscale('log')
plt.ylim([1e-12,1e-9])
#plt.legend()
plt.subplots_adjust(wspace=0)

## Proton-electron Ratio

In [None]:
ch_ion.p2eRatio()
p2eratio = proton_ratio(ti.temperature)
plt.plot(ch_ion.Temperature,ch_ion.ProtonDensityRatio,'o',markevery=2)
plt.plot(ti.temperature,p2eratio)
plt.xscale('log')

## Sandbox

In [None]:
ch_tools.io.splupsRead('fe_11',filetype='psplups')

In [None]:
foo = ch.ion('fe_18',temperature=np.logspace(5.5,9,20))

In [None]:
foo.upsilonDescaleSplups(prot=1)

In [None]:
plt.plot(foo.PUpsilon['temperature'],foo.PUpsilon['dexRate'][0,:])
plt.xscale('log')
plt.yscale('log')

In [None]:
foo = fiasco.IonBase('fe_9')

In [None]:
l2 = foo.scups['upper_level']
l1 = foo.scups['lower_level']

In [None]:
energy = np.where(foo.elvlc['E_obs'] <= 0., foo.elvlc['E_th'], foo.elvlc['E_obs'])*const.h.cgs.value*const.c.cgs.value
delta_energy = energy[l2-1] - energy[l1-1]

In [None]:
plt.hist((delta_energy - foo.scups['delta_energy'].to(u.erg).value),bins=100,histtype='step',log=True);

In [None]:
plt.hist(delta_energy,bins='fd',histtype='step',log=True);
plt.xscale('log')

In [None]:
foo.scups['delta_energy'].to(u.erg)

In [None]:
foo = fiasco.IonBase('fe_18')

## PSplups Reader Sandbox

In [None]:
%%bash
head -n 100 /Users/willbarnes/ssw/packages/chianti/dbase/fe/fe_20/fe_20.psplups

In [None]:
class TestPsplupsParser(fiasco.io.GenericParser):
    filetype = 'psplups'
    dtypes = [int,int,int,float,float,float,'object']
    units = [None,None,None,u.dimensionless_unscaled,u.Ry,u.dimensionless_unscaled,
             u.dimensionless_unscaled]
    headings = ['lower_level', 'upper_level', 'bt_type', 'gf', 'delta_energy', 'bt_c', 'bt_upsilon']
    descriptions = ['lower level index', 'upper level index', 'Burgess-Tully scaling type',
                    'oscillator strength', 'delta energy', 'Burgess-Tully scaling parameter',
                    'Burgess-Tully scaled effective collision strength']
    
    def preprocessor(self,table,line,index):
        tmp = line.strip().split()
        # 5-point fit for type 6, 9-point fit for type 2
        n_spline = 5 if int(tmp[2]) == 6 else 9 
        fformat = fortranformat.FortranRecordReader('(3I3,{}E10.3)'.format(3+n_spline))
        line = fformat.read(line)
        row = line[:6] + [np.array(line[6:])]
        table.append(row)

In [None]:
fiasco.io.Parser('fe_20.psplups',custom_parser=TestPsplupsParser).parse()

## Decorator Sandbox
Consider using decorators for returning some default value when the provided info is not available

In [None]:
def has_dataset(*names, default=None):
    def decorator(func):
        def func_wrapper(*args,**kwargs):
            if any([args[0].__getattribute__('_{}'.format(n)) is None for n in names]):
                warnings.warn('At least one dataset out of {} is missing. Cannot calculate {}'
                              .format(','.join(names),func__name__))
                return default
            else:
                return func(*args,**kwargs)
        return func_wrapper
    return decorator

In [None]:
class DecoratorCheck(fiasco.Ion):
    
    @datacheck('elvlc','scups')
    def foo(self):
        return(self._elvlc['E_obs']*const.h.cgs*const.c.cgs)[0] + self._scups['delta_energy'][0]
    
    @datacheck('wgfa','psplups')
    def bar(self,val):
        if val < 1:
            return self._wgfa['A']
        else:
            return self._psplups['delta_energy']

In [None]:
f = DecoratorCheck('fe_9',np.logspace(4,5,100)*u.K)

In [None]:
f._wgfa

In [None]:
f.bar()

In [None]:
f.bar(1.1)

In [None]:
None+2