# Example of asynchronous requests (v > 1.1)
- The scope of this example is to show how to request several products together so that internal resource usage is maximized
- We extract the spectrum of the Crab in groups of 'nscw' science windows for each year from 'start_year' to 'stop_year' included
- We use a token provided by the web interface to receive dedicated emails
- We optionally show hoe to fit the spectra with a broken power law using xspec

In [1]:
#A few input parameters
osa_version="OSA10.2"
source_name="Crab"
nscw=10
start_year=2004
end_year=2006
systematic_fraction = 0.01
token=''

In [2]:
#You can provide a valid token as explained in the 'Authentication' example or skip this cell
import getpass
token = getpass.getpass('Insert the token')

Insert the token········


In [3]:
#We hardcode a catalog for the Crab
api_cat={
    "cat_frame": "fk5", 
    "cat_coord_units": "deg", 
    "cat_column_list": [
        [0, 7], 
        ["1A 0535+262", "Crab"], 
        [125.4826889038086, 1358.7255859375], 
        [84.72280883789062, 83.63166809082031], 
        [26.312734603881836, 22.016284942626953], 
        [-32768, -32768], 
        [2, 2], 
        [0, 0], 
        [0.0002800000074785203, 0.0002800000074785203]], 
    "cat_column_names": [
        "meta_ID", 
        "src_names", 
        "significance", 
        "ra", 
        "dec", 
        "NEW_SOURCE", 
        "ISGRI_FLAG", 
        "FLAG", 
        "ERR_RAD"
    ], 
    "cat_column_descr": 
        [
            ["meta_ID", "<i8"], 
            ["src_names", "<U11"], 
            ["significance", "<f8"], 
            ["ra", "<f8"], 
            ["dec", "<f8"], 
            ["NEW_SOURCE", "<i8"], 
            ["ISGRI_FLAG", "<i8"], 
            ["FLAG", "<i8"], 
            ["ERR_RAD", "<f8"]
        ], 
    "cat_lat_name": "dec", 
    "cat_lon_name": "ra"
}



## Let's get some logging

This is to help visualizing the progress.

* WANRING is the default level
* INFO writes some more information
* DEBUG is maily for developers and issue tracking

In [4]:
import logging
#default
#logging.getLogger().setLevel(logging.WARNING)
#slightly more verbose
logging.getLogger().setLevel(logging.INFO)
#all messages
#logging.getLogger().setLevel(logging.DEBUG)

logging.getLogger('oda_api').addHandler(logging.StreamHandler()) 

In [5]:
#Different instances of the platform, the first two are available only internally
import numpy as np
import json
import oda_api.api

import oda_api

from pkg_resources import parse_version

assert parse_version(oda_api.__version__) > parse_version("1.1.0")


def dispatcher(_oda_platform='production'):
    disp = oda_api.api.DispatcherAPI(
        url = {
            'staging' : 'http://dispatcher.staging.internal.odahub.io',
            'production': 'https://www.astro.unige.ch/mmoda/dispatch-data',
        }[_oda_platform]
    )
    disp.get_instrument_description("isgri")
    return disp

disp = dispatcher('production')


--------------
query_name: src_query
 name: src_name,  value: test,  units: str, 
 name: RA,  value: 0.0,  units: deg, 
 name: DEC,  value: 0.0,  units: deg, 
 name: T1,  value: 2001-12-11T00:00:00.000,  units: None, 
 name: T2,  value: 2001-12-11T00:00:00.000,  units: None, 
 name: token,  value: None,  units: str, 

--------------
query_name: isgri_parameters
 name: user_catalog,  value: None,  units: str, 
 name: scw_list,  value: [],  units: names_list, 
 name: selected_catalog,  value: None,  units: str, 
 name: radius,  value: 5.0,  units: deg, 
 name: max_pointings,  value: 50,  units: None, 
 name: osa_version,  value: None,  units: str, 
 name: integral_data_rights,  value: public,  units: str, 
 name: E1_keV,  value: 15.0,  units: keV, 
 name: E2_keV,  value: 40.0,  units: keV, 

--------------
query_name: isgri_image_query
 product_name: isgri_image
 name: detection_threshold,  value: 0.0,  units: sigma, 
 name: image_scale_min,  value: None,  units: None, 
 name: image_sca

- Here, we collect and spectra for each year in a random sample of nscw=10 science windows
- We use the hard-coded catalog.

In [6]:


spectrum_results=[]

disp_by_ys = {}
data_by_ys = {}

par_dict = {'instrument':'isgri',
          'product': 'isgri_spectrum',
          'osa_version' : osa_version,
          'product_type': 'Real',
            'max_pointings': nscw,
          'selected_catalog' : json.dumps(api_cat),
           "integral_data_rights": "all-private"}

if token != '':
    par_dict.update({'token': token})

while True:
    spectrum_results=[]

    for year in range(start_year, end_year+1): 
        T1_utc='%4d-01-01 00:00:00.0'%year
        T2_utc='%4d-12-31 23:59:59.0'%year # there are 30 days in June!
        
        print(T1_utc,'-',T2_utc)

        par_dict.update({'T1': T1_utc,
                        'T2': T2_utc})
        
        if year >= 2016:
            osa_version='OSA11.1'
        else:
            osa_version='OSA10.2'

        #Just renaiming for a general dictionary key
        ys = year

        # We start one dipatcher for each job,
        # they will run in parallel until products are ready
        if ys not in disp_by_ys:
            disp_by_ys[ys] = oda_api.api.DispatcherAPI(url=disp.url, wait=False) #Note the flag wait=False

        _disp = disp_by_ys[ys]

        data = data_by_ys.get(ys, None)

        if data is None and not _disp.is_failed:
            
            #We submit or we poll 
            if not _disp.is_submitted:
                data = _disp.get_product(**par_dict)
            else:
                _disp.poll()

            print("Is complete ", _disp.is_complete)
            # We retrieve data
            if not _disp.is_complete:
                continue
            else:
                data = _disp.get_product(**par_dict)
                data_by_ys[ys] = data                

        spectrum_results.append(data)
        
    n_complete = len([ year for year, _disp in disp_by_ys.items() if _disp.is_complete ])
    print(f"complete {n_complete} / {len(disp_by_ys)}")
    if n_complete == len(disp_by_ys):
        print("done!")
        break
    print("not done")


2004-01-01 00:00:00.0 - 2004-12-31 23:59:59.0


- waiting for remote response (since 2021-08-25 09:15:08), please wait for https://www.astro.unige.ch/mmoda/dispatch-data/run_analysis
session: 5Q5DLBNXCSIYET6G job: 8a4bbaebfe404cbb

... query status [35mprepared[0m => [35msubmitted[0m
... assigned job id: [33m8a4bbaebfe404cbb[0m
 | the job is working remotely, please wait status=submitted job_id=8a4bbaeb in 0 messages since 36 seconds (36/36); in 0 SCW so far; nodes (0): 0 computed 0 restored
... [32m[0m[0m
non-waiting dispatcher: terminating

[33mquery not complete, please poll again later[0m


Is complete  False
2005-01-01 00:00:00.0 - 2005-12-31 23:59:59.0


- waiting for remote response (since 2021-08-25 09:15:46), please wait for https://www.astro.unige.ch/mmoda/dispatch-data/run_analysis
session: CH88SQJWEWWX834P job: 0977b57d0308e6a2

... query status [35mprepared[0m => [35msubmitted[0m
... assigned job id: [33m0977b57d0308e6a2[0m
 | the job is working remotely, please wait status=submitted job_id=0977b57d in 0 messages since 21 seconds (22/22); in 0 SCW so far; nodes (0): 0 computed 0 restored
... [32m[0m[0m
non-waiting dispatcher: terminating

[33mquery not complete, please poll again later[0m


Is complete  False
2006-01-01 00:00:00.0 - 2006-12-31 23:59:59.0


- waiting for remote response (since 2021-08-25 09:16:10), please wait for https://www.astro.unige.ch/mmoda/dispatch-data/run_analysis
session: FT61XR6JXXBJ3VI0 job: e994ebc4e2dca0e1

... query status [35mprepared[0m => [35msubmitted[0m
... assigned job id: [33me994ebc4e2dca0e1[0m
 | the job is working remotely, please wait status=submitted job_id=e994ebc4 in 0 messages since 15 seconds (15/15); in 0 SCW so far; nodes (0): 0 computed 0 restored
... [32m[0m[0m
non-waiting dispatcher: terminating

[33mquery not complete, please poll again later[0m
- waiting for remote response (since 2021-08-25 09:16:25), please wait for https://www.astro.unige.ch/mmoda/dispatch-data/run_analysis


Is complete  False
complete 0 / 3
not done
2004-01-01 00:00:00.0 - 2004-12-31 23:59:59.0


session: 5Q5DLBNXCSIYET6G job: 8a4bbaebfe404cbb
 / the job is working remotely, please wait status=submitted job_id=8a4bbaeb in 0 messages since 93 seconds (26/36); in 0 SCW so far; nodes (0): 0 computed 0 restored
... [32m[0m[0m
- waiting for remote response (since 2021-08-25 09:16:41), please wait for https://www.astro.unige.ch/mmoda/dispatch-data/run_analysis


Is complete  False
2005-01-01 00:00:00.0 - 2005-12-31 23:59:59.0


session: CH88SQJWEWWX834P job: 0977b57d0308e6a2
 / the job is working remotely, please wait status=submitted job_id=0977b57d in 0 messages since 65 seconds (16/22); in 0 SCW so far; nodes (0): 0 computed 0 restored
... [32m[0m[0m
- waiting for remote response (since 2021-08-25 09:16:52), please wait for https://www.astro.unige.ch/mmoda/dispatch-data/run_analysis


Is complete  False
2006-01-01 00:00:00.0 - 2006-12-31 23:59:59.0


session: FT61XR6JXXBJ3VI0 job: e994ebc4e2dca0e1
 / the job is working remotely, please wait status=submitted job_id=e994ebc4 in 0 messages since 55 seconds (14/15); in 0 SCW so far; nodes (0): 0 computed 0 restored
... [32m[0m[0m
- waiting for remote response (since 2021-08-25 09:17:05), please wait for https://www.astro.unige.ch/mmoda/dispatch-data/run_analysis


Is complete  False
complete 0 / 3
not done
2004-01-01 00:00:00.0 - 2004-12-31 23:59:59.0


problem in API call, 19 tries left:

unable to complete API call
in <function DispatcherAPI.poll at 0x7f6df07eaee0> called with:
... [ DispatcherAPI: https://www.astro.unige.ch/mmoda/dispatch-data ]
... 
possible causes:
- connection error
- error on the remote server
 exception message: 

HTTPSConnectionPool(host='www.astro.unige.ch', port=443): Read timed out. (read timeout=120)
Traceback (most recent call last):
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib/python3.8/http/client.py", line 1344, in getresponse
    response.begin()
  File "/usr/lib/python3.8/http/client.py", line 307, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.8/http/

RemoteException: RemoteException (line 76): 
unable to complete API call
in <function DispatcherAPI.poll at 0x7f6df07eaee0> called with:
... [ DispatcherAPI: https://www.astro.unige.ch/mmoda/dispatch-data ]
... 
possible causes:
- connection error
- error on the remote server
 exception message: 

HTTPSConnectionPool(host='www.astro.unige.ch', port=443): Read timed out. (read timeout=120)
Traceback (most recent call last):
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/connectionpool.py", line 445, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/connectionpool.py", line 440, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib/python3.8/http/client.py", line 1344, in getresponse
    response.begin()
  File "/usr/lib/python3.8/http/client.py", line 307, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.8/http/client.py", line 268, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
  File "/usr/lib/python3.8/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/lib/python3.8/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
socket.timeout: The read operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/requests/adapters.py", line 439, in send
    resp = conn.urlopen(
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/connectionpool.py", line 755, in urlopen
    retries = retries.increment(
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/util/retry.py", line 532, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/packages/six.py", line 770, in reraise
    raise value
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/connectionpool.py", line 447, in _make_request
    self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/urllib3/connectionpool.py", line 336, in _raise_timeout
    raise ReadTimeoutError(
urllib3.exceptions.ReadTimeoutError: HTTPSConnectionPool(host='www.astro.unige.ch', port=443): Read timed out. (read timeout=120)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/oda_api/api.py", line 120, in func_wrapper
    return func(*args, **kwargs)
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/oda_api/api.py", line 537, in poll
    self.response_json = self.request_to_json()
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/oda_api/api.py", line 346, in request_to_json
    response = requests.post(
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/requests/api.py", line 117, in post
    return request('post', url, data=data, json=json, **kwargs)
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/requests/api.py", line 61, in request
    return session.request(method=method, url=url, **kwargs)
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/home/ferrigno/.venv/myVE/lib/python3.8/site-packages/requests/adapters.py", line 529, in send
    raise ReadTimeout(e, request=request)
requests.exceptions.ReadTimeout: HTTPSConnectionPool(host='www.astro.unige.ch', port=443): Read timed out. (read timeout=120)


- This part saves the spectra in fits files and updates some keywords

In [None]:
# This part saves the spectra in fits files and updates some keywords
for yr, data in data_by_ys.items():
    for ID,s in enumerate(data._p_list):
        if (s.meta_data['src_name']==source_name):
            if(s.meta_data['product']=='isgri_spectrum'):
                ID_spec=ID
            if(s.meta_data['product']=='isgri_arf'):
                ID_arf=ID
            if(s.meta_data['product']=='isgri_rmf'):
                ID_rmf=ID

    print(ID_spec, ID_arf, ID_rmf)

    spec=data._p_list[ID_spec].data_unit[1].data
    arf=data._p_list[ID_arf].data_unit[1].data
    rmf=data._p_list[ID_rmf].data_unit[2].data
    expos=data._p_list[0].data_unit[1].header['EXPOSURE']
    name=source_name+'_'+str(year)
    specname=name+'_spectrum.fits'
    arfname=name+'_arf.fits.gz'
    rmfname=name+'_rmf.fits.gz'
    data._p_list[ID_spec].write_fits_file(specname)
    data._p_list[ID_arf].write_fits_file(arfname)
    data._p_list[ID_rmf].write_fits_file(rmfname)
    hdul = fits.open(specname, mode='update')
    hdul[1].header.set('EXPOSURE', expos)
    hdul[1].header['RESPFILE']=rmfname
    hdul[1].header['ANCRFILE']=arfname
    hdul[1].data['SYS_ERR']=systematic_fraction

    hdul.close()

## If xspec is available, we make a fit of each spectrum

In [None]:
try:

    import xspec
    import shutil
    from IPython.display import Image
    from IPython.display import display 

    xspec.Fit.statMethod = "chi"

    #init dictionaries
    fit_by_lt={}

    model='cflux*bknpow'

    xspec.AllModels.systematic=0.0
    low_energies=[20]
    freeze_pow_ebreak=1

    for year in range(start_year,end_year+1):

        for c_emin in low_energies: #np.linspace(17,40,5):    
            xspec.AllData.clear()

            m1=xspec.Model(model)

            specname=source_name+'_'+str(year)+'_spectrum.fits'

            xspec.AllData(specname)

            s = xspec.AllData(1)

            isgri = xspec.AllModels(1)

            print(m1.nParameters)

            xspec.AllData.ignore('bad')
            xspec.AllData.ignore('500.0-**')

            ig="**-%.2f,500.-**"%c_emin
            print("ISGRI ignore: "+ ig)
            s.ignore(ig)

            #Key for output
            lt_key='%d_%.10lg'%(year, c_emin)

            isgri.cflux.lg10Flux=-8            

            isgri.cflux.Emin=20.
            isgri.cflux.Emax=80.

            isgri.bknpower.norm = "1,-1"
            isgri.bknpower.PhoIndx1 = "2.0,.01,1.,1.,3.,3."
            isgri.bknpower.PhoIndx2 = "2.2,.01,1.,1.,3.,3."
            isgri.bknpower.BreakE = "100,-1,20,20,300,300"

            xspec.Fit.perform()
            isgri.bknpower.BreakE.frozen = freeze_pow_ebreak  > 0

            xspec.Fit.perform()

            max_chi=np.ceil(xspec.Fit.statistic / xspec.Fit.dof)

            xspec.Fit.error("1.0 max %.1f 1-%d"%(max_chi,m1.nParameters))


            fit_by_lt[lt_key]=dict(
                    emin=c_emin,
                    year=year,
                    chi2_red=xspec.Fit.statistic/xspec.Fit.dof,                                
                    chi2=xspec.Fit.statistic,
                    ndof=xspec.Fit.dof,                                    
                )

            for i in range(1,m1.nParameters+1): 
                if (not isgri(i).frozen) and (not bool(isgri(i).link)):
                    #use the name plus position because there could be parameters with same name from multiple 
                    #model components (e.g., several gaussians)
                    print(isgri(i).name, "%.2f"%(isgri(i).values[0]), isgri(i).frozen,bool(isgri(i).link) )
                    fit_by_lt[lt_key][isgri(i).name+"_%02d"%(i)]=[ isgri(i).values[0], isgri(i).error[0], isgri(i).error[1] ]



            xspec.Plot.device="/png"
            #xspec.Plot.addCommand("setplot en")
            xspec.Plot.xAxis="keV"
            xspec.Plot("ldata del")
            xspec.Plot.device="/png"

            fn="fit_%s.png"%lt_key
            fit_by_lt[lt_key]['plot_fname'] = fn

            shutil.move("pgplot.png_2", fn)

            _=display(Image(filename=fn,format="png"))

except ImportError:
    print("no problem!")