# SAOImage ds9 python lab

Jupyter notebook sandbox for developing new ds9 interaction tools using `samp` to replace the obsolete
`psyds9` module.

Started: 2025 Feb 4

Updated: 2025 Feb 6 - refining the modsView with live testing, frozen after initial release of modsView v3.0.2


In [None]:
import os
import sys
import numpy as np

# better way to handle paths

from pathlib import Path

# astropy coordinates for ra/dec and units handling

from astropy.coordinates import SkyCoord, Angle
import astropy.units as u

# better way to handle runtime config files

import yaml

# other bits we need?


### DS9 class

Start of sketching out a "myds9" module DS9 class to encapsulate all the nasty bits of taking to ds9
using samp.  They set/get syntax is very involved even with the convenience functions provided by
the `SAMPIntegratedClient()` functions, and we need to pass info around so a class with internal properties
to hold setup info makes sense.

In [None]:
# astropy has a samp client/hub interaction module

from astropy.samp import SAMPIntegratedClient, SAMPHubError
import time
import subprocess
import shlex

class DS9():
    '''
    DS9 interaction class to replace using pyds9 with SAMP interaction
    
    '''
    
    def __init__(self,ds9ID):
        '''
        Initialize a DS9 class instance

        Parameters
        ----------
        ds9ID : string
            name of the ds9 instance we will create.

        Raises
        ------
        ValueError
            raised if bad inputs are provided.
        RuntimeError
            raised if unrecoverable errors occur..

        Returns
        -------
        None.

        '''
        
        if len(ds9ID) == 0:
            raise ValueError("required argument ds9ID missing")
        
        self.ds9ID = ds9ID
        
        # instantiate a SAMPIntegratedClient instance
        
        self.ds9 = SAMPIntegratedClient(name=f"myDS9_{ds9ID}",description="myDS9 instance")
        self.connected = self.ds9.is_connected
        
        self.clientID = None
        self.haveDS9 = False
        
        self.ds9cmd = f"ds9 -samp -xpa no -fifo none -port none -unix none -title {self.ds9ID}"
        
        try:
            self.ds9.connect()
            self.connected = self.ds9.is_connected
            self.clientID = self.getID()
            if self.clientID:
                self.haveDS9 = True
            else:
                subprocess.Popen(shlex.split(self.ds9cmd))
                time.sleep(5)
                self.clientID = self.getID()
                if not self.clientID:
                    raise RuntimeError(f"Failed to start ds9 for {self.ds9ID}, aborting")
                else:
                    self.connected = self.ds9.is_connected
                    
        except SAMPHubError:
            subprocess.Popen(shlex.split(self.ds9cmd))
            time.sleep(5)
            try:
                self.ds9.connect()
                self.clientID = self.getID()
                if not self.clientID:
                    raise RuntimeError(f"Failed to start ds9 for {self.ds9ID}, aborting")
                else:
                    self.haveDS9 = True
                    self.connected = self.ds9.is_connected
            except Exception as exp:
                raise RuntimeError(f"Cannot connect to a samp hub or ds9: {exp}")
            
        except Exception as exp:
            raise RuntimeError(f"DS9 startup failed with exception: {exp}")


    def getID(self):
        '''
        Get the samp hub client ID corresponding to the ds9 instance we need

        Returns
        -------
        clientID : string
            SAMP client ID of the ds9 instance of interest

        Description
        -----------
        Uses the astropy.samp get_registered_client() and get_methdata()
        methods to find the named ds9 window of interest.  The name
        attached to a ds9 process with the -title command-line argument
        is in the samp.name metadata parameter.  We require an exact
        case match.
        
        '''
        if not self.ds9.is_connected:
            return None
        for clientID in self.ds9.get_registered_clients():
            cliMD = self.ds9.get_metadata(clientID)
            if "samp.name" in cliMD:
                if cliMD["samp.name"] == self.ds9ID:
                    self.clientID = clientID
                    return self.clientID
        return None
    
    
    def set(self,*args):
        '''
        Send a commmand to the ds9 instance

        Parameters
        ----------
        args : string
            SAOImage ds9 set command(s) to send.  Supports multiple commands separated by commas

        Raises
        ------
        ValueError
            if bad information is provided
        RuntimeError
            if there are unrecoverable runtime errors

        Returns
        -------
        None.

        '''
        
        if len(args) == 0:
            return        
        
        if not self.haveDS9:
            raise RuntimeError(f"named ds9 instances {self.ds9ID} not connected")

        for cmdStr in args:
            try:
                sampRet = self.ds9.ecall_and_wait(self.clientID,"ds9.set","10",cmd=cmdStr)
            except Exception as exp:
                raise ValueError(f"set() error: {exp}")
            
            if sampRet['samp.status'] != 'samp.ok':
                raise RuntimeError(f"ds9 set command returned error: {sampRet}")
                
    def get(self,cmdStr):
        '''
        Execute a ds9 get directive to the ds9 instance

        Parameters
        ----------
        cmdStr : string
            SAOImage ds9 get command to send.

        Raises
        ------
        RuntimeError
            if there are unrecoverable errors.

        Returns
        -------
        dictionary
            SAMP dictionary with the return from the ds9.get command.

        See Also
        --------
        getCursKey() for a function that uses this for cursor interaction
        
        '''
        
        if len(cmdStr) == 0:
            return None
        
        if self.haveDS9:
            try:
                sampRet = self.ds9.ecall_and_wait(self.clientID,"ds9.get","0",cmd=cmdStr)
            except Exception as exp:
                raise RuntimeError(f"get() error: {exp}")
            
            if sampRet['samp.status'] == 'samp.ok':
                if "samp.result" in sampRet:
                    if "value" in sampRet["samp.result"]:
                        return sampRet["samp.result"]["value"]
                    else:
                        return sampRet["samp.result"]
                else:
                    return sampRet
            else:
                raise RuntimeError(f"ds9 get command returned error: {sampRet}")
        else:
            raise RuntimeError(f"named ds9 instances {self.ds9ID} not connected")

            
    def getCursKey(self,prompt=None,coords="image"):
        '''
        Put an interactive cursor on the image and wait for a key press,
        returning coordinates and the key pressed.

        Parameters
        ----------
        prompt : string, optional
            Prompt to show on stdout at the start. The default is None.
        coords : string, optional
            coordinates to return, must be one of "image", "fk5", "wcs fk5", or 
            "wcs galactic", The default is "image".

        Raises
        ------
        RuntimeError
            if unrecoverable errors occur in execution.

        Returns
        -------
        key : string
            The key hit (also space if spacebar, etc.)
        cursX : float
            the X coordinate of the cursor when key was hit
        cursY : float
            the Y coordinate of the cursor when key was hit

        Description
        -----------
        A wrapper for "iexam key coordinate image", or substitute
        one of (fk5,wcs fk5,wsc galactic) if alternatives given.
         * "image" returns pixel coordinates (default)
         * "fk5" returns ra/dec in decimal hours/degrees
        Returns the key hit, does not respond to mouse click.
        
        '''
        
        if not prompt:
            print(f"Put cursor on the {self.ds9ID} display image and hit any key")
        else:
            print(f"{prompt}")
        
        try:
            cursData = self.get(f"iexam key coordinate {coords}")
        except Exception as exp:
            raise RuntimeError(f"iexam returned error: {exp}")
        
        cursBits = cursData.split(' ')
        if len(cursBits) == 3:
            return cursBits[0],float(cursBits[1]),float(cursBits[2])
        else:
            return cursData



## Startup process


Instantiate a `SAMPIntegratedClient()` class and try to connect.

If a samp hub is running `connect()` will succeed and set the `is_connected` attribute True.  If we
are connected, then try to find the named ds9 instance `ds9ID` (e.g., `modsView`).  If we don't find
it, start one and then verify we see it in the samp client table.

If no samp hub is running it raises a `SAMPHubError` exception. The corrective is to launch a named ds9
window with the `ds9ID` with the `-samp` flag so that it also starts a samp hub.  This is a nice feature of ds9
that it takes care of launcing a hub for us if needed, or attaches to the running samp hub.

This makes the startup logic complicated as seen below.


In [None]:
ds9ID = "modsView"
ds9 = DS9(ds9ID)

if ds9.connected:
    if ds9.haveDS9:
        print(f"Connected named ds9 instance {ds9.ds9ID} as samp client {ds9.clientID}")
    else:
        print(f"Connected to a samp hub but no named ds9 instance {ds9.ds9ID} found")
else:
    print(f"could not connect or start a samp hub or ds9 instance")
    

### send ds9 set commands

We got this far we have a ds9 instance to talk to through a samp hub, now configure the ds9 window to our
liking.
 * dimensions: 800x800
 * clear all frames
 * remove redundant pixel output
 * remove colorbar


In [None]:
ds9.set("width 800","width 800","frame clear all","view image no","view colorbar no","raise")
#ds9.set("width 800")
#ds9.set("height 800")
#ds9.set("frame clear all")
#ds9.set("view image no")        # redundant pixel readout
#ds9.set("view colorbar no")     # don't need color bar
#ds9.set("raise")

# display an image

#dataDir = "/lbt/data/repository"
dataDir = str(Path.home() / "DEMONEXT/Data")
fitsFile = f"{dataDir}/n20190621.GuideCal.i.0006.fits"

ds9.set(f"file fits {fitsFile}")
ds9.set("zoom to fit")
ds9.set("scale mode zscale")

### send ds9 get commands

These test the ds9.get directives. The `get()` method returns just the value of the return (what the user
wants 99% of the time), whereas if there is no "value" item, it returns the raw "samp.return" dictionary.

In [None]:
foo = ds9.get("file")
print(foo)

foo = ds9.get("fits")
print(foo)

# this get() will fail

try:
    foo = ds9.get("finbob")
except Exception as exp:
    print(f"\nERROR: {exp}")
    
# this set() will fail

try:
    ds9.set("finbob")
except Exception as exp:
    print(f"\nERROR: {exp}")


### iexam test

DS9 remote cursor command is `iexam`.  Instead of doing
 > ret = ds9.get("iexam key coordinate image"

and then messing with the dictionary returned by the samp `ds9.get`, we pre-process the raw SAMP "samp.return"
dictionary info into the data we want from the cursor+key interaction to make it more readily usable.

In [None]:
cursKey, cursX, cursY = ds9.getCursKey()

print(f"Got key={cursKey}, X,Y={cursX:.2f},{cursY:.2f}")
    