# OR list machine-readable comments

In [1]:
import sys
import pprint
from ruamel.yaml import YAML
from parse_cm.or_list import read_or_list_full


yaml = YAML(typ="safe")  # default, if not specfied, is 'rt' (round-trip)

In [2]:
import ruamel.yaml
import textwrap


def dedent_text(text):
    text = text.lstrip("\n")
    text = textwrap.dedent(text)
    return text


In [3]:
obsreqs, comments = read_or_list_full("example-xml.or")

In [4]:
len(comments)

9

In [5]:
import xml.etree.ElementTree as ET

tags_all = {}
for obsid, obsreq in obsreqs.items():
    try:
        xml_str = "<root>" + obsreq["comment"] + "</root>"
        root = ET.fromstring(xml_str)
    except Exception as e:
        print(obsid, e)
        continue

    tag_names = [elem.tag for elem in root]
    for tag_name in tag_names:
        if tag_name not in tags_all:
            value = root.find(tag_name).text
            tags_all[tag_name] = value

for tag_name in sorted(tags_all):
    value = tags_all[tag_name]
    print("#" * 8 + f" {tag_name} " + "#" * 8)
    print(value.strip()[:100])
    print()


######## absolute_mon_window ########
23871, 2022:169:06:28:06.976, 2024:009:06:28:06.976

######## acis_fp_limit ########
-109.0

######## coordination_window ########
NO COORDINATION WINDOW FOUND!!

######## cycle_number ########
23

######## freeform_constraint ########
Remarks:
Three observations are requested: conjunction with Be star in front
(phi=0.25), conjunction

######## obs_group ########
26488, 26489

######## obs_group_duration ########
2.7

######## phase_window ########
#Start,                 End
2023:185:15:55:59.750, 2023:190:17:33:48.730

######## pointing ########
#roll_start, roll_stop, y_target_offset, z_target_offset, RA, Dec, z_sim_offset
0.0, 20.0, 0.008333,

######## relative_mon_window ########
27012, 000:00:00:00.000, 014:00:00:00.000

######## roll ########
# min, max
102, 222
282,  42

######## sequence_number ########
201454

######## si_modes ########
TE_00D50, NULL, NULL, NULL, NULL, NULL

######## slosh ########
180

######## split_duration ########
1

In [6]:
print(obsreqs[27012]["comment"])

<cycle_number>24</cycle_number>
<sequence_number>590678</sequence_number>
<si_modes>TE_00CD8, TE_00CE6, NULL, NULL, NULL, NULL</si_modes>
<acis_fp_limit>-109.0</acis_fp_limit>
<freeform_constraint>
Window Constraint requirements exist for observation.
WINDOW=(2023:182:00:00:00,2023:212:00:00:00)
</freeform_constraint>
<split_duration>14</split_duration>


In [7]:
import xml.etree.ElementTree as ET

xml_str = """
<cycle_number>24</cycle_number>
<sequence_number>590678</sequence_number>
<si_modes>TE_00CD8, TE_00CE6, NULL, NULL, NULL, NULL</si_modes>
<acis_fp_limit>-109.0</acis_fp_limit>
<freeform_constraint>
Window Constraint requirements exist for observation.
  WINDOW=(2023:182:00:00:00,2023:212:00:00:00)
</freeform_constraint>
<split_duration>14</split_duration>
"""

root = ET.fromstring("<root>" + xml_str + "</root>")

tag_names = [elem.tag for elem in root]

print(tag_names)


['cycle_number', 'sequence_number', 'si_modes', 'acis_fp_limit', 'freeform_constraint', 'split_duration']


In [8]:
import ast

In [9]:
def convert_value(value: str):
    if not isinstance(value, str):
        raise TypeError("input must be str")

    try:
        out = ast.literal_eval(value)
    except Exception:
        out = value

    if not isinstance(out, (int, float, str)):
        out = value

    return out

In [10]:
from astropy.table import Table
import io


def table_constructor(loader, node):
    # print(f"table constructor {node.value=}")
    out = Table.read(node.value, format="ascii")
    return out


def table_representer(dumper, data):
    out = io.StringIO()
    data.write(out, format="ascii")
    return dumper.represent_scalar("!table", out.getvalue(), style="|")


txt = """\
test: !table |
  col1 col2
  1 2
  3 4
"""
yaml = ruamel.yaml.YAML()
yaml.Constructor.add_constructor("!table", table_constructor)
yaml.Representer.add_representer(Table, table_representer)

dat = yaml.load(txt)
print(dat)
out = io.StringIO()
yaml.dump(dat, out)
print(out.getvalue())


ordereddict([('test', <Table length=2>
 col1  col2
int64 int64
----- -----
    1     2
    3     4)])
test: !table |
  col1 col2
  1 2
  3 4



In [11]:
from io import StringIO


class ORListRepresenter(ruamel.yaml.representer.RoundTripRepresenter):
    # See https://stackoverflow.com/questions/76689402
    def __init__(self, default_style=None, default_flow_style=None, dumper=None):
        super().__init__(
            default_style=default_style,
            default_flow_style=False,
            dumper=dumper,
        )

    def represent_str(self, s):
        if "\n" in s:
            return self.represent_scalar(
                "tag:yaml.org,2002:str", dedent_text(s), style="|"
            )
        return self.represent_scalar("tag:yaml.org,2002:str", s)

    def represent_sequence(self, tag, sequence, flow_style=None):
        return super().represent_sequence(tag, sequence, flow_style=True)


ORListRepresenter.add_representer(str, ORListRepresenter.represent_str)

In [12]:
data = {"a": "hello", "b": "hello\nthere\nworld", "c": [1] * 100}

yaml = ruamel.yaml.YAML()
yaml.Representer = ORListRepresenter
yaml.dump(data, sys.stdout)


a: hello
b: |-
  hello
  there
  world
c: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


In [13]:
def yaml_dumps(data):
    yaml = ruamel.yaml.YAML()
    yaml.Representer = ORListRepresenter

    out = StringIO()
    yaml.dump(data, out)
    return out.getvalue()

In [14]:
import re


def get_comment_blocks(lines, include_49999=False):
    """Get all the comment blocks in the file as a list of dict"""
    comment_blocks = []

    idx1 = 0
    for idx0, line in enumerate(lines):
        if idx0 < idx1:
            continue

        if match := re.match(
            r"BEGIN_COMMENT \s* , \s* ID \s* = \s* (\d+)", line, re.VERBOSE
        ):
            obsid = int(match.group(1))
            if obsid == 49999 and not include_49999:
                continue
            idx1 = lines.index("END_COMMENT", idx0)
            comment = "\n".join(lines[idx0 + 1 : idx1])
            comment_block = {
                "obsid": obsid,
                "idx0": idx0,
                "idx1": idx1,
                "comment": comment,
            }
            comment_blocks.append(comment_block)

    return comment_blocks

In [15]:
from pathlib import Path

orlist_text = Path("example-xml.or").read_text()
orlist_lines = orlist_text.splitlines()

In [16]:
comment_blocks = get_comment_blocks(orlist_lines)


In [17]:
pprint.pprint(comment_blocks[0])


{'comment': '<cycle_number>23</cycle_number>\n'
            '<sequence_number>201454</sequence_number>\n'
            '<si_modes>TE_00D50, NULL, NULL, NULL, NULL, NULL</si_modes>\n'
            '<acis_fp_limit>-109.0</acis_fp_limit>\n'
            '<phase_window>\n'
            '   #Start,                 End\n'
            '    2023:185:15:55:59.750, 2023:190:17:33:48.730\n'
            '</phase_window>\n'
            '<freeform_constraint>\n'
            'Remarks:\n'
            'Three observations are requested: conjunction with Be star in '
            'front\n'
            '(phi=0.25), conjunction with companion in front (phi=0.75) and '
            'quadrature\n'
            '(either phi=0 or phi=0.5)\n'
            '</freeform_constraint>',
 'idx0': 350,
 'idx1': 365,
 'obsid': 25109}


In [18]:
import re

## Define migrations

These are functions that incrementally change the OR list comments format from XML
to YAML.

In [19]:
def convert_comma_separated_line_to_list(data):
    for key, value in list(data.items()):
        if not isinstance(value, str) or "\n" in value or "," not in value:
            continue
        vals_str = re.split(r"\s*,\s*", value)
        data[key] = [convert_value(val_str) for val_str in vals_str]


In [20]:
COMMENT_FIELDS = {
    "relative_mon_window": ["obsid", "start", "stop"],
    "absolute_mon_window": ["obsid", "start", "stop"],
}


def convert_lists_to_dict(data):
    for key, value in list(data.items()):
        if key in COMMENT_FIELDS:
            names = COMMENT_FIELDS[key]
            assert len(names) == len(value)
            data[key] = {name: val for name, val in zip(names, value)}


In [21]:
RENAME_FIELDS = {
    "si_modes": "drop_chip_si_modes",
    "freeform_constraint": "comment",
}


def rename_fields(data):
    out = {}
    for key, value in data.items():
        if key in RENAME_FIELDS:
            out[RENAME_FIELDS[key]] = value
        else:
            out[key] = value
    data.clear()
    data.update(out)


In [22]:
# Currently these boolean fields can only be True.
BOOLEAN_FIELDS = ["coordination_window", "uninterrupted"]


def use_boolean_type_for_boolean_fields(data):
    for key in list(data):
        if key in BOOLEAN_FIELDS:
            data[key] = True

In [23]:
TABLE_FIELDS = ["star_field_constraints", "phase_window", "pointing", "roll"]


def use_table_tag_for_tables(lines):
    """
    For any line in lines that starts with an item in TABLE_FIELDS, replace
    it with the same line but with a !table tag prepended.

    For instance replace "phase_window: |-" with "phase_window: !table |-".
    """
    for ii, line in enumerate(lines):
        for field in TABLE_FIELDS:
            field_colon = field + ":"
            if field_colon in line:
                lines[ii] = re.sub(field_colon, field_colon + " !table", line)

In [24]:
def fixed_width_tables(data):
    """
    Write tables in fixed width format (columns line up vertically).
    """
    for key, value in list(data.items()):
        if key in TABLE_FIELDS:
            table = Table.read(
                value, format="ascii.commented_header", delimiter=",", guess=False
            )
            out = io.StringIO()
            table.write(out, format="ascii.fixed_width_two_line")
            data[key] = out.getvalue()

In [25]:
def change_drop_si_modes_from_list_to_dict(data):
    if "drop_chip_si_modes" not in data:
        return

    si_modes = data["drop_chip_si_modes"]
    if not isinstance(si_modes, list):
        return

    data["drop_chip_si_modes"] = {ii: si_modes[ii] for ii in range(6)}

In [26]:
txt = """\
   star_field_constraints: |2-
   #ROLL, nominal, creep
   81.00, -11.10, -10.10
   82.00, -10.80, -10.10
"""
lines = txt.splitlines()
use_table_tag_for_tables(lines)
print("\n".join(lines))

   star_field_constraints: !table |2-
   #ROLL, nominal, creep
   81.00, -11.10, -10.10
   82.00, -10.80, -10.10


In [27]:
MIGRATIONS_DATA = {
    1: convert_comma_separated_line_to_list,
    2: convert_lists_to_dict,
    3: rename_fields,
    4: use_boolean_type_for_boolean_fields,
    5: fixed_width_tables,
    7: change_drop_si_modes_from_list_to_dict,
}

MIGRATIONS_LINES = {
    6: use_table_tag_for_tables,
}


In [28]:
def parse_simple_xml(obsid, text):
    out = {}
    try:
        xml_str = "<root>" + text + "</root>"
        root = ET.fromstring(xml_str)
    except Exception as exc:
        raise XmlCommentParseError(
            f"could not parse text for {obsid}:\n{text}"
        ) from exc

    # XML tree depth is one layer, just grab those tag names
    tag_names = [elem.tag for elem in root]
    for tag_name in tag_names:
        value_str = root.find(tag_name).text
        out[tag_name] = convert_value(value_str)

    return out


In [29]:
data = parse_simple_xml(27012, obsreqs[27012]["comment"])
data

{'cycle_number': 24,
 'sequence_number': 590678,
 'si_modes': 'TE_00CD8, TE_00CE6, NULL, NULL, NULL, NULL',
 'acis_fp_limit': -109.0,
 'freeform_constraint': '\nWindow Constraint requirements exist for observation.\nWINDOW=(2023:182:00:00:00,2023:212:00:00:00)\n',
 'split_duration': 14}

In [30]:
def migrate_orlist_xml_to_yaml(orlist_file_xml, orlist_file_yaml, migrations=[]):
    orlist_text = Path(orlist_file_xml).read_text()
    orlist_lines = orlist_text.splitlines()
    comment_blocks = get_comment_blocks(orlist_lines)
    machine_readable_comments = []

    for comment_block in reversed(comment_blocks):
        data = parse_simple_xml(comment_block["obsid"], comment_block["comment"])
        for migration in migrations:
            func = MIGRATIONS_DATA.get(migration, lambda x: None)
            func(data)

        mrc = {"obsid": comment_block["obsid"]}
        mrc.update(data)
        machine_readable_comments.append(mrc)

        comment_yaml = yaml_dumps(data)
        comment_yaml_lines = comment_yaml.splitlines()
        for migration in migrations:
            func = MIGRATIONS_LINES.get(migration, lambda x: None)
            func(comment_yaml_lines)
        idx0, idx1 = comment_block["idx0"], comment_block["idx1"]
        orlist_lines = (
            orlist_lines[: idx0 + 1] + comment_yaml_lines + orlist_lines[idx1:]
        )

    suffix = "-" + "-".join(str(m) for m in migrations) if migrations else ""
    out_file = Path(orlist_file_yaml).stem + suffix + ".or"
    Path(out_file).write_text("\n".join(orlist_lines))

    return machine_readable_comments, out_file

In [31]:
orlist_files = ["example-xml.or"]
n_migrations_all = len(MIGRATIONS_DATA) + len(MIGRATIONS_LINES)
for n_migrations in range(n_migrations_all + 1):
    print(f"Running with {n_migrations} migrations")
    machine_readable_comments, orlist_file = migrate_orlist_xml_to_yaml(
        "example-xml.or", "example-yaml.or", migrations=list(range(1, n_migrations + 1))
    )
    orlist_files.append(orlist_file)

Running with 0 migrations
Running with 1 migrations
Running with 2 migrations
Running with 3 migrations
Running with 4 migrations
Running with 5 migrations
Running with 6 migrations
Running with 7 migrations


In [32]:
import difflib
import itertools

differ = difflib.HtmlDiff()
for ii, fn0, fn1 in zip(itertools.count(), orlist_files[:-1], orlist_files[1:]):
    print(f"Comparing {fn0} and {fn1}")
    old_text = Path(fn0).read_text()
    new_text = Path(fn1).read_text()
    diff = differ.make_file(old_text.splitlines(), new_text.splitlines())
    Path(f"migration-{ii}-diff.html").write_text(diff)

Comparing example-xml.or and example-yaml.or
Comparing example-yaml.or and example-yaml-1.or
Comparing example-yaml-1.or and example-yaml-1-2.or
Comparing example-yaml-1-2.or and example-yaml-1-2-3.or
Comparing example-yaml-1-2-3.or and example-yaml-1-2-3-4.or
Comparing example-yaml-1-2-3-4.or and example-yaml-1-2-3-4-5.or


Comparing example-yaml-1-2-3-4-5.or and example-yaml-1-2-3-4-5-6.or
Comparing example-yaml-1-2-3-4-5-6.or and example-yaml-1-2-3-4-5-6-7.or


In [33]:
import pprint


In [34]:
pprint.pprint(machine_readable_comments[1])


{'acis_fp_limit': -109.0,
 'comment': '\n'
            'Window Constraint requirements exist for observation.\n'
            '  WINDOW=(2023:152:00:00:00,2023:212:00:00:00)\n',
 'coordination_window': True,
 'cycle_number': 24,
 'drop_chip_si_modes': {0: 'TE_00920',
                        1: 'TE_0099C',
                        2: 'TE_00B92',
                        3: 'NULL',
                        4: 'NULL',
                        5: 'NULL'},
 'obsid': 27013,
 'sequence_number': 790381,
 'star_field_constraints': ' ROLL nominal creep\n'
                           '----- ------- -----\n'
                           '234.0    -6.2  -6.2\n'
                           '235.0    -6.2  -6.2\n'
                           '236.0    -6.1  -6.1\n'
                           '237.0    -6.0  -6.0\n'
                           '238.0    -6.0  -6.0\n'
                           '239.0    -6.4  -6.4\n'
                           '240.0    -6.4  -6.4\n'
                           '241.0    -6.2  -6

In [35]:
# Do the machine readable comments round trip through JSON?
import json

mrc_json = json.dumps(machine_readable_comments, indent=2)
machine_readable_comments_rt = json.loads(mrc_json)

machine_readable_comments == machine_readable_comments_rt


False