In [None]:
#| default_exp solver

# solver

> Field solver module - thin layer over other field solver services.

In [None]:
#| hide
from nbdev.showdoc import *
from nbdev import *

In [None]:
#| export
import configparser
import logging
from fastcore.basics import patch
from os.path import expanduser
import os, tempfile, shutil
from io import StringIO, BytesIO
from astroquery.exceptions import TimeoutError as ASTTimeoutError
from astropy.time import Time
from astropy.io import fits
from astropy.coordinates import SkyCoord, Longitude, Latitude
from astropy.wcs import WCS
from ouscope.core import Telescope
import ouscope.util as ousutil

In [None]:
#| export
class Solver:
    '''
    Wrapper of AstrometryNet solver from astropy tuned for the use in osob use.
    '''
    
    _cmd = 'solve-field'
    _args = '-p -l %d -O -L %d -H %d -u app -3 %f -4 %f -5 2 %s'
    _telescopes={
        'galaxy':   (1,2),
        'cluster':  (14,16),
        'coast': (1, 2),
        'pirate': (1, 2),
        '10micron': (1, 2),
        'cdk17': (1, 2),
        'unknown': (1,16),
        'undefined': (1,16),
        "'undefined'": (1,16),
    }

    
    def __init__(self, api_key=None, cache='.cache/wcs', cmd=None, args=None):
        if cmd is None:
            self._cmd = Solver._cmd
        else:
            self._cmd = cmd
        if args is None:
            self._args = Solver._args
        else:
            self._args = args
        self._cmd = ' '.join((self._cmd, self._args))
        self.ast = None
        self.api_key = api_key
        if api_key:
            from astroquery.astrometry_net import AstrometryNet
            self.ast = AstrometryNet()    
            self.ast.api_key = api_key
        self._cache = cache
        self._tout = 15

In [None]:
#| export
@patch
def solve(self: Solver, hdu, crop=(slice(0,-32), slice(0,-32)), force_solve=False, tout=None):
    '''
    Solve plate in fits format using local (if present) or 
    remote (not fully implemented yet) AstrometryNet solver
    '''
    loger = logging.getLogger(__name__)
    if hdu.verify_datasum()!=1:
        hdu.add_datasum()
    fn = f'{int(hdu.header["DATASUM"]):08X}.wcs'
    fp = os.path.join(self._cache,fn[0],fn[1],fn)
    if force_solve or not os.path.isfile(fp) :
        loger.info(f'Solving for {fn[:-4]}')
        print(f'Solving for {fn[:-4]}')
        s = self._solveField_local(hdu, tout=tout)
        if s:
            wcs_header = fits.Header(s.header)
            #wcs_header['NAXIS'] = 2
            #wcs_header['NAXIS1'] = wcs_header['IMAGEW']
            #wcs_header['NAXIS2'] = wcs_header['IMAGEH']
            os.makedirs(os.path.dirname(fp), exist_ok=True)
            with open(fp, 'w') as fh:
                wcs_header.totextfile(fp)
        else:
            wcs_header = None
    else :
        loger.info(f'Getting {fn[:-4]} from cache')
        print(f'Getting {fn[:-4]} from cache')
        with open(fp, 'r') as fh:
            wcs_header = fits.Header.fromtextfile(fh)        
    return wcs_header

In [None]:
#| export
@patch
def _getFrameRaDec(self: Solver, hdu):
    if 'OBJCTRA' in hdu.header:
        ra=hdu.header['OBJCTRA']
        dec=hdu.header['OBJCTDEC']
    elif 'MNTRA' in hdu.header :
        ra=hdu.header['MNTRA']
        dec=hdu.header['MNTDEC']
    elif 'RA-TEL' in hdu.header :
        ra=hdu.header['RA-TEL']
        dec=hdu.header['DEC-TEL']
    else :
        raise KeyError

    try :
        eq=Time(hdu.header['EQUINOX'], format='decimalyear')
    except KeyError :
        eq=Time(2000, format='decimalyear')

    o=SkyCoord(Longitude(ra, unit='hour'),
               Latitude(dec, unit='deg'),
               frame='icrs', obstime=hdu.header['DATE-OBS'],
               equinox=eq)
    return o


In [None]:
#| export
@patch
def _solveField_local(self: Solver, hdu, tout=None, cleanup=True):
    '''
    Run local solver for hdu.
    '''
    loger = logging.getLogger(__name__)
    o=self._getFrameRaDec(hdu)
    ra=o.ra.deg
    dec=o.dec.deg

    try :
        tel = hdu.header['TELESCOP'].lower()
    except KeyError:
        tel = 'unknown'
        hdu.header.remove('TELESCOP')
        hdu.header['TELESCOP'] = tel

    if hdu.header['TELESCOP']=="'undefined'":
        tel = 'unknown'
        hdu.header.remove('TELESCOP')
        hdu.header['TELESCOP'] = tel        
        
    if 'brt' in tel:
        tel=tel.split()[1]
    else :
        tel=tel.split()[0]

    loapp, hiapp=Solver._telescopes[tel]
    td=tempfile.mkdtemp(prefix='field-solver')
    try :
        fn=tempfile.mkstemp(dir=td, suffix='.fits')
        loger.debug(td, fn)
        #print(fn[1], hdu.header['TELESCOP'])
        hdu.writeto(fn[1])
        cmd = self._cmd % (self._tout if tout is None else tout,
                           loapp, hiapp, ra, dec, fn[1])
        loger.debug(cmd)
        print(cmd)
        solver=os.popen(cmd)
        for ln in solver:
            loger.debug(ln.strip())
        shdu=fits.open(BytesIO(open(fn[1][:-5]+'.new','rb').read()))
        return shdu[0]
    except IOError :
        return None
    finally :
        if cleanup :
            shutil.rmtree(td)


In [None]:
#| login
config = configparser.ConfigParser()
config.read(expanduser('~/.config/telescope.ini'))
solver = Solver(config['astrometry.net']['apikey'])

scope=Telescope(config='~/.config/telescope.ini')

In [None]:
#| login
reqlst=scope.get_user_requests(sort='completion')
for rq in sorted(reqlst, key=lambda r: int(r['requesttime']), reverse=True):
    if Telescope.REQUESTSTATUS_TEXTS[int(rq['status'])]=='Complete':
        break
ousutil.print_dict(rq)
print()
ousutil.print_dict(scope.get_request(int(rq['id'])))
last_complete = int(scope.get_request(int(rq['id']))['jid'])

id: 759658
seen: 1
usercomments: Mira
objecttype: RADEC
objectid: 19:19:02.60 +41:06:34.24
objectname: EQ Lyr
requesttime: 1701984976
status: 8
row: 26

rid: 759658
jid: 412679
type: RADEC
oid: 19:19:02.60 +41:06:34.24
name: EQ Lyr
exp: 180000 ms
filter: BVR
dark: Instant
tele_type: Galaxy
tele: COAST
requested: ['7', 'December', '2023', '21:36:16', 'UTC']
completion: ['9', 'December', '2023', '19:36:32', 'UTC']
status: Complete


In [None]:
#| login
ffn = scope.get_obs(scope.get_job(last_complete)).name
print(last_complete, '-->', ffn)
hdu = fits.open(ffn, cube=True, verbose=True)[0]

s_hdu = solver.solve(hdu)

assert s_hdu

w = WCS(s_hdu, naxis=2)
w.printwcs()

412679 --> .cache/jobs/4/1/412679.fits
Getting A1E36A92 from cache
WCS Keywords

Number of WCS axes: 2
CTYPE : 'RA---TAN-SIP' 'DEC--TAN-SIP' 
CRVAL : 290.135756976 41.0252130636 
CRPIX : 153.755741119 558.506164551 
CD1_1 CD1_2  : -0.000466761461958 -3.93688498768e-06 
CD2_1 CD2_2  : -3.88592965771e-06 0.000466462440999 
NAXIS : 1536  1536  3


a floating-point value was expected. [astropy.wcs.wcs]
the RADECSYS keyword is deprecated, use RADESYSa. [astropy.wcs.wcs]


In [None]:
#| login
#| slow
hd = solver.ast.solve_from_image(ffn, force_image_upload=True)
WCS(hd, naxis=2).printwcs()

Solving..........WCS Keywords

Number of WCS axes: 2
CTYPE : 'RA---TAN-SIP' 'DEC--TAN-SIP' 
CRVAL : 289.79342685 41.2102280398 
CRPIX : 702.608161926 961.371337891 
CD1_1 CD1_2  : -0.000466545685512 -5.13583974664e-06 
CD2_1 CD2_2  : -5.88360704449e-06 0.000465741808602 
NAXIS : 0  0


