In [91]:
import os
import numpy as np
import pandas as pd
from astropy.time import Time
from rubin_scheduler.scheduler.model_observatory import ModelObservatory, KinemModel

In [67]:
# Setting the start of the *survey* (and keeping this the same) is important for the Model Observatory and Scheduler,
# because this sets an overall dither pattern per night, but also helps track things that may 
# otherwise change per night ... for SV surveys, might not be totally necessary, but is good practice
# (you can change the *day* / mjd that you are simulating, of course, but mjd_start should remain the same)

dayobs = '2024-09-09'

survey_start = Time(f'{dayobs}T12:00:00', format='isot', scale='utc')

In [76]:
# Don't have to do this, but can grab almanac information

from rubin_scheduler.site_models import Almanac
from rubin_scheduler.utils import Site

almanac = Almanac(mjd_start = survey_start.mjd)

def show_almanac_info(dayobs):
    night_info = almanac.get_sunset_info(evening_date=dayobs, longitude=Site('LSST').longitude_rad)
    
    dd = []
    for val, col in zip(night_info, night_info.dtype.names):
        if col == 'night':
            continue
        else:
            print(col, val, Time(val, format='mjd', scale='utc').iso)
    
    # And can check on the lunar phase -- this goes from 0 (new) to 100 (full)
    moon_phase = almanac.get_sun_moon_positions(night_info['moonrise'])['moon_phase']
    print(f'moonphase(%) {moon_phase.round(2)}')

show_almanac_info(dayobs)

sunset 60562.936555093154 2024-09-09 22:28:38.360
sun_n12_setting 60562.975207073614 2024-09-09 23:24:17.891
sun_n18_setting 60562.99454106344 2024-09-09 23:52:08.348
sun_n18_rising 60563.39397472935 2024-09-10 09:27:19.417
sun_n12_rising 60563.4132887749 2024-09-10 09:55:08.150
sunrise 60563.45188789442 2024-09-10 10:50:43.114
moonrise 60563.62073250674 2024-09-10 14:53:51.289
moonset 60563.19181374041 2024-09-10 04:36:12.707
moonphase(%) 46.25


In [80]:
def tma_movement(percent=70):
    # See https://confluence.lsstcorp.org/display/LSSTCOM/TMA+Motion+Settings
    tma = {}
    scale = percent / 100.0
    tma['azimuth_maxspeed'] = np.min([10.0 * scale, 7.0])
    tma['azimuth_accel'] = 10.0 * scale
    tma['azimuth_jerk'] = np.max([1.0, 40.0 * scale])
    tma['altitude_maxspeed'] = 5.0 * scale
    tma['altitude_accel'] = 5.0 * scale
    tma['altitude_jerk'] = np.max([1.0, 20.0 * scale])
    tma['settle_time'] = 3.0
    return tma

In [89]:
def rotator_movement(percent=70):
    # These need confirmation from ROO
    # Also it's not clear if they need to be set to the same level as the TMA
    rot = {}
    rot['maxspeed'] = 5.0 * percent/100.0
    rot['accel'] = 1.4 * percent/100.0
    rot['jerk'] = 5.0 * percent/100.0
    return rot

In [128]:
# Some weather telemetry that might be useful
from collections import namedtuple

class CommissioningSeeingData:
    fwhm_500: float = 2.5

    def __call__(self, time : Time):
        """A constant FWHM_500 value

        Parameters
        ----------
        time : `astropy.time.Time`
            It principle the time for which the seeing is returned,
            in practice this argumnet is ignored, and included for
            compatibility.

        Returns
        -------
        fwhm_500 : `float`
            The FWHM at 500nm, in arcseconds.
        """
        return self.fwhm_500

WindConditions = namedtuple("WindConditions", ["speed", "direction"])

class CommissioningWindData:
    """A source of constant wind values.

    Parameters
    ----------
    wind_speed : `float`
        Wind speed (m/s).
    wind_direction : `float`
        Direction from which the wind originates. A direction of 0.0 degrees
        means the wind originates from the north and 90.0 degrees from the
        east (radians).
    """

    def __init__(self, wind_speed: float = 5.0, wind_direction: float = 340):
        # see also https://sitcomtn-126.lsst.io
        # wind direction is typical 330 to 350 degrees
        # wind speed will vary -- telescope should try to point out of wind
        self.wind_speed = wind_speed
        self.wind_direction = wind_direction

    def __call__(self, time: Time):
        """A constant wind conditions

        Parameters
        ----------
        time : `astropy.time.Time`
            It principle the time for which the wind is returned,
            in practice this argument is ignored, and included for
            compatibility.

        Returns
        -------
        wind_conditions : `tuple` (`float`, `float`)
            A named tuple with the wind speed (m/s) and originating
            direction (radians east of north)
        """
        conditions = WindConditions(self.wind_speed, self.wind_direction)
        return conditions

In [131]:
# MJD for start of simulation

mjd_now = night_info['sunset']

# Set up model observatory with modified telescope movement and seeing

filterlist = ['g', 'r', 'i', 'z', 'y']
kinematic_model = KinemModel(mjd0=mjd_now)
kinematic_model.setup_camera(readtime=2.4, **rotator_movement(40.))
kinematic_model.setup_telescope(**tma_movement(10.0))
kinematic_model.mount_filters(filterlist)  # optional, but might be useful with comcam - must be last

seeing_data = CommissioningSeeingData()

wind_data = CommissioningWindData()

observatory = ModelObservatory(mjd=mjd_now, 
                               mjd_start=survey_start.mjd,
                               kinem_model=kinematic_model, # Modified kinematics
                               cloud_data='ideal',          # No clouds
                               seeing_data=seeing_data,     # Modified seeing
                               wind_data=wind_data,         # Add some wind
                               downtimes='ideal',           # No downtime
                               lax_dome=True,               # dome crawl?
                               init_load_length=2,          # size of skybrightness files to load first
                              )

In [139]:
# So you can see the state of the model observatory at any time .. 
# Note that this advances the time for the observatory, which will advance it for the Scheduler
observatory.mjd = night_info['sun_n18_setting']
conditions = observatory.return_conditions()
#conditions

In [157]:
# Set up your scheduler

# Please do make sure to add wind avoidance basis functions, etc. 

fields = (
    ("Rubin_SV_095_-25", 95., -25.), # High stellar densty, low extinction
    ("Rubin_SV_125_-15", 125., -15.), # High stellar densty, low extinction
    ("DESI_SV3_R1", 179.60, 0.000), # DESI, GAMA, HSC DR2, KiDS-N
    ("Rubin_SV_225_-40", 225., -40.), # 225 High stellar densty, low extinction
    ("DEEP_A0", 216, -12.5), # DEEP Solar Systen
    ("Rubin_SV_250_2", 250., 2.), # 250 High stellar densty, low extinction
    ("Rubin_SV_300_-41", 300., -41.), # High stellar densty, low extinction 
    ("Rubin_SV_280_-48", 280., -48.), # High stellar densty, low extinction 
    ("DEEP_B0", 310, -19), # DEEP Solar System
    ("ELAIS_S1", 9.45, -44.0), # ELAIS-S1 LSST DDF
    ("XMM_LSS", 35.708333, -4.75), # LSST DDF
    ("ECDFS", 53.125, -28.1), # ECDFS
    ("COSMOS", 150.1, 2.1819444444444445), # COSMOS
    ("EDFS_A", 58.9, -49.315), # EDFS_a
    ("EDFS_B", 63.6, -47.6), # EDFS_b
)

fields_dict = dict(zip([f[0] for f in fields], [(f[1], f[2]) for f in fields]))

In [159]:
test = [True, False, True]
any(test)

True

In [165]:
## I need to update the *FieldSurvey* in rubin_scheduler, but I think it would be something like this: 

from functools import cached_property
from rubin_scheduler.utils import ra_dec2_hpid
from rubin_scheduler.scheduler.utils import empty_observation
from rubin_scheduler.scheduler.surveys import BaseSurvey


class FieldSurvey(BaseSurvey):
    """A survey class for running deep drilling fields.

    Parameters
    ----------
    basis_functions : list
        List of basis_function objects
    detailers : list of rubin_scheduler.scheduler.detailers objects
        The detailers to apply to the list of observations.
    RA : float
        The RA of the field (degrees)
    dec : float
        The dec of the field to observe (degrees)
    sequence : list of observation objects or str (rgizy)
        The sequence of observations to take. Can be a string of
        list of obs objects.
    nvis : list of ints
        The number of visits in each filter. Should be same length
        as sequence.
    exptime : float
        The exposure time for visits in grizy.
    u_exptime : float
        The exposure time for visits in u band
    nexp : float
        The number of exposures per visit (except u band, which is always 1)
    ignore_obs : `list` [`str`] or None
        Ignore observations with this string in the `scheduler_note`.
        Will ignore observations which match subsets of the string, as well as
        the entire string. Ignoring 'mysurvey23' will also ignore 'mysurvey2'.
    accept_obs : `list` [`str`] or None
        If match_obs is set, then ONLY observations which match these
        strings in the `scheduler_note` will be counted for the survey.
        A complete match must occur; substrings will not match. (for obs_array too??)
    survey_name : `str` or None.
        The name to give this survey, for debugging and visualization purposes.
        Also propagated to the 'target_name' in the observation.
        The default None will construct a name based on the RA/Dec of the field.
    scheduler_note : `str` or None
        The value to include in the scheduler note. 
        The scheduler note is for internal, scheduler, use for the purposes of
        identifying observations to ignore or include for a survey or feature.
    readtime : float 
        Readout time for computing approximate time of observing
        the sequence. (seconds)
    filter_change_time : float
        Filter change time, on average. Used for computing approximate 
        time for the observing sequence. (seconds)
    nside : float or None
        Nside for computing survey basis functions and maps.
        The default of None will use rubin_scheduler.utils.set_default_nside().
    flush_pad : float
        How long to hold observations in the queue after they
        were expected to be completed (minutes).
    """
    def __init__(
        self,
        basis_functions,
        RA,
        dec,
        sequence="rgizy",
        nvis=[20, 10, 20, 26, 20],
        exptime=30.0,
        u_exptime=38.0,
        nexp=2,
        ignore_obs=None,
        accept_obs=None,
        survey_name=None,
        scheduler_note=None,
        readtime=2.4,
        filter_change_time=120.0,
        nside=None,
        flush_pad=30.0,
        seed=42,
        detailers=None,
    ):
        super().__init__(
            nside=nside,
            basis_functions=basis_functions,
            detailers=detailers,
            ignore_obs=ignore_obs,
        )
        self.accept_obs = accept_obs
        
        random.seed(a=seed)
        # Set all basis function equal. 
        self.basis_weights = np.ones(len(basis_functions)) / len(basis_functions)

        self.ra = np.radians(RA)
        self.ra_hours = RA / 360.0 * 24.0
        self.dec = np.radians(dec)
        self.ra_deg, self.dec_deg = RA, dec
        
        self.survey_name = survey_name
        if self.survey_name is None:
            self.survey_name = f"Field {self.ra_deg :.2f} {self.dec_deg :.2f}"
        self.scheduler_note = scheduler_note
        if self.scheduler_note is None:
            self.scheduler_note = self.survey_name
         
        self.reward_value = reward_value
        self.flush_pad = flush_pad / 60.0 / 24.0  # To days
        self.filter_sequence = []
        if isinstance(sequence, str):
            self.observations = []
            for num, filtername in zip(nvis, sequence):
                for j in range(num):
                    obs = empty_observation()
                    obs["filter"] = filtername
                    if filtername == "u":
                        obs["exptime"] = u_exptime
                    else:
                        obs["exptime"] = exptime
                    obs["RA"] = self.ra
                    obs["dec"] = self.dec
                    if filtername == "u":
                        obs["nexp"] = 1
                    else:
                        obs["nexp"] = nexp
                    obs["target"] = self.survey_name
                    obs["note"] = self.scheduler_note
                    self.observations.append(obs)
        else:
            self.observations = sequence

        # Let's just make this an array for ease of use
        self.observations = np.concatenate(self.observations)
        order = np.argsort(self.observations["filter"])
        self.observations = self.observations[order]

        n_filter_change = np.size(np.unique(self.observations["filter"]))

        # Make an estimate of how long a seqeunce will take.
        # Assumes no major rotational or spatial
        # dithering slowing things down.
        self.approx_time = (
            np.sum(self.observations["exptime"] + readtime * self.observations["nexp"]) / 3600.0 / 24.0
            + filter_change_time * n_filter_change / 3600.0 / 24.0
        )  # to days

        if self.reward_value is None:
            self.extra_features["Ntot"] = features.NObsSurvey()
            self.extra_features["N_survey"] = features.NObsSurvey(note=self.scheduler_note)

    @cached_property
    def roi_hpid(self):
        hpid = ra_dec2_hpid(self.nside, np.degrees(self.ra), np.degrees(self.dec))
        return hpid

    def check_continue(self, observation, conditions):
        # feasibility basis functions?
        """
        This method enables external calls to check if a given
        observations that belongs to this survey is
        feasible or not. This is called once a sequence has
        started to make sure it can continue.

        XXX--TODO:  Need to decide if we want to develop check_continue,
        or instead hold the sequence in the survey, and be able to check
        it that way. 
        """
        result = True
        return result

    def add_observation(self, observation, **kwargs):
        """Add observation one at a time."""
        # Check each posible ignore string
        checks = [io not in str(observation["note"]) for io in self.ignore_obs]
        passed_ignore = all(checks)
        passed_accept = True
        if passed_ignore and self.accept_obs is not None:
            # Check if this observation matches any accept string.
            checks = [io == str(observation["note"]) for io in self.accept_obs]
            passed_accept = any(checks)
        # I think here I have to assume observation is an
        # array and not a dict.
        if passed_ignore and passed_accept:
            for feature in self.extra_features:
                self.extra_features[feature].add_observation(observation, **kwargs)
            for bf in self.extra_basis_functions:
                self.extra_basis_functions[bf].add_observation(observation, **kwargs)
            for bf in self.basis_functions:
                bf.add_observation(observation, **kwargs)
            for detailer in self.detailers:
                detailer.add_observation(observation, **kwargs)
            self.reward_checked = False

    def add_observations_array(self, observations_array_in, observations_hpid_in):
        """Add an array of observations rather than one at a time

        Parameters
        ----------
        observations_array_in : np.array
            An array of completed observations
            (with columns like
            rubin_scheduler.scheduler.utils.empty_observation).
        observations_hpid_in : np.array
            Same as observations_array_in, but larger and with an
            additional column for HEALpix id. Each observation is
            listed mulitple times, once for every HEALpix it overlaps.
        """
        # Just to be sure things are sorted
        observations_array_in.sort(order="mjd")
        observations_hpid_in.sort(order="mjd")

        # Copy so we don't prune things for other survey objects
        observations_array = observations_array_in.copy()
        observations_hpid = observations_hpid_in.copy()

        for ig in self.ignore_obs:
            not_ignore = np.where(np.char.find(observations_array["note"], ig) == -1)[0]
            observations_array = observations_array[not_ignore]

            not_ignore = np.where(np.char.find(observations_hpid["note"], ig) == -1)[0]
            observations_hpid = observations_hpid[not_ignore]

        for acc in self.accept_obs:
            accept = np.where(np.char.find(observations_array["note"], acc) == 1)[0]
            observations_array = observations_array[accept]

            accept = np.where(np.char.find(observations_hpid["note"], acc) == 1)[0]
            observations_hpid = observations_hpid[accept]
            
        for feature in self.extra_features:
            self.extra_features[feature].add_observations_array(observations_array, observations_hpid)
        for bf in self.extra_basis_functions:
            self.extra_basis_functions[bf].add_observations_array(observations_array, observations_hpid)
        for bf in self.basis_functions:
            bf.add_observations_array(observations_array, observations_hpid)
        for detailer in self.detailers:
            detailer.add_observations_array(observations_array, observations_hpid)
        self.reward_checked = False

    def calc_reward_function(self, conditions):
        self.reward_checked = True
        indx = ra_dec2_hpid(self.nside, self.ra_deg, self.dec_deg)
        if self._check_feasibility(conditions):
            self.reward = 0
            for bf, weight in zip(self.basis_functions, self.basis_weights):
                basis_value = bf(conditions, indx=indx)
                self.reward += basis_value * weight

            if not np.isscalar(self.reward):
                self.reward = np.sum(self.reward[indx])

                if np.any(np.isinf(self.reward)):
                    self.reward = np.inf
        else:
            # If not feasible, negative infinity reward
            self.reward = -np.inf

        return self.reward

    def generate_observations_rough(self, conditions):
        result = []
        if self._check_feasibility(conditions):
            result = copy.deepcopy(self.observations)

            # Set the flush_by
            result["flush_by_mjd"] = conditions.mjd + self.approx_time + self.flush_pad

            # remove filters that are not mounted
            mask = np.isin(result["filter"], conditions.mounted_filters)
            result = result[mask]
            # Put current loaded filter first
            ind1 = np.where(result["filter"] == conditions.current_filter)[0]
            ind2 = np.where(result["filter"] != conditions.current_filter)[0]
            result = result[ind1.tolist() + (ind2.tolist())]

            # convert to list of array. 
            final_result = [
                row.reshape(
                    1,
                )
                for row in result
            ]
            result = final_result

        return result

    def __repr__(self):
        return (
            f"<{self.__class__.__name__} survey_name='{self.survey_name}'"
            f", RA={self.ra}, dec={self.dec} at {hex(id(self))}>"
        )

In [166]:
## field survey to be tested .. this should make it unnecessary to specify particular notes in features for basis functions.