# SKA SDP AA0.5LOW Vis Receive Workflow with CBF-Emulator

Instructions to testing the sdp receive workflow using emulated AA0.5LOW data with tango and sdp interfaces against either the persistent sdp or a custom sdp deployment. This notebook can be executed remotely using binderhub via the following link:

https://sdhp.stfc.skao.int/binderhub/v2/gl/ska-telescope%2Fsdp%2Fska-sdp-notebooks/HEAD

## Tango Device Proxy Interface Intro

The Tango device proxy interface provides interaction to a subarray and it's associated execution block in the form of a state machine. When the device is On, this interface provides a single observable state object:

#### ObsState

* EMPTY
* IDLE
* READY
* SCANNING
* FAULT

Communication to the Tango device is performed via the use of commands and accessors whereby all data is conformant to the sdp schemas available here:

https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp.html


### Interface Command Schemas

| Command          | Current State | Next State | Input  | Output |
| ---------------- | ------------- | ---------- | ------ | ------ |
| AssignResources  |     EMPTY     |   IDLE     | https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-assignres.html | None |
| Configure        |     IDLE      |   READY    | https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-configure.html | None |
| Scan             |     READY     |  SCANNING  | https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-scan.html | None |
| EndScan          |    SCANNING   |   READY    | None | None |
| End              |     READY     |   IDLE     | None | None |
| ReleaseResources |     IDLE      |   EMPTY    | https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-releaseres.html | None |

Note: The next state transition is not instantaneous and should be waited for before executing another command.

### Interface Attribute Schemas

| Attribute        | Current State | Next State | Input  | Output |
| ---------------- | ------------- | ---------- | ------ | ------ |
| RecvAddrs        |    SCANNING   |  SCANNING  | None | https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-recvaddrs.html |

### Execution Block

The tango device monitors exactly 1 execution block that is defined by the assign resources command.

### Processing Block

Individual processes running in an execution block when the tango device has resources assigned and is not in the EMPTY state.

# Connect to SDP

The SDP is a helm chart in https://gitlab.com/ska-telescope/sdp/ska-sdp-integration deployed to a namespace on the kubernetes cluster. `dp-shared` is the name of the persistent SDP namespace. For testing the following script can alternatively deploy to a custom personal or team namespace.

**Note:** When deploying the SDP the installation may take a few minutes.

In [None]:
# Restart Kernel to refresh connections and environment variables
import os
os._exit(00)

In [1]:
from tango import DeviceProxy, EventType
import ska_sdp_config
import os
import json
import random
from datetime import date
import logging
import ska_ser_logging

# Make sure you connect to the correct sdp namespace
# for running the sdp see https://developer.skao.int/projects/ska-sdp-integration/en/latest/
KUBE_NAMESPACE = "dp-yanda-callan"
KUBE_PROC_NAMESPACE = f"{KUBE_NAMESPACE}-p"

# set the name of the databaseds service
DATABASEDS_NAME = "databaseds-tango-base"

# finally set the predetermined host name
os.environ["SDP_CONFIG_HOST"] = f"ska-sdp-etcd-client.{KUBE_NAMESPACE}"
os.environ["TANGO_HOST"] = f"{DATABASEDS_NAME}.{KUBE_NAMESPACE}.svc.cluster.local:10000"

# tango device
d = DeviceProxy('test-sdp/subarray/01')
d.set_logging_level(5)

# sdp config
config = ska_sdp_config.Config()

# ska logging
logger = logging.getLogger(__name__)
ska_ser_logging.configure_logging(level=logging.INFO)

In [28]:
# Utilities
from tango import DevState
import pytest
import time
import datetime
from typing import Optional

TIMEOUT = 60.0  # seconds
INTERVAL = 0.5  # seconds

def wait_for_predicate(
    predicate, description, timeout=TIMEOUT, interval=INTERVAL
):
    """
    Wait for predicate to be true.

    :param predicate: callable to test
    :param description: description to use if test fails
    :param timeout: timeout in seconds
    :param interval: interval between tests of the predicate in seconds

    """
    start = time.time()
    while True:
        if predicate():
            break
        if time.time() >= start + timeout:
            raise TimeoutError("{description} not achieved after {timeout} seconds")
        time.sleep(interval)


def wait_for_state(device, state, timeout=TIMEOUT):
    """
    Wait for device state to have the expected value.

    :param device: device client
    :param state: the expected state
    :param timeout: timeout in seconds

    """

    def predicate():
        return device.state() == state

    description = f"Device state {state.name}"
    logger.info(f"Waiting for device state {state.name}...")
    wait_for_predicate(predicate, description, timeout=timeout)


def wait_for_obs_state(device, obs_state, timeout=TIMEOUT):
    """
    Wait for obsState to have the expected value.

    :param device: device proxy
    :param obs_state: the expected value
    :param timeout: timeout in seconds
    """

    def predicate():
        return device.obsState == obs_state

    description = f"obsState {obs_state.name}"
    logger.info(f"Waiting for device obs_state {obs_state.name}...")
    wait_for_predicate(predicate, description, timeout=timeout)

def tango_safe_release(device):
    """
    Safely releases tango device to EMPTY obsState
    """
    if device.obsState == device.obsState.SCANNING:
        logger.info(">> End Scan")
        device.EndScan()
        wait_for_obs_state(device, device.obsState.READY)

    if device.obsState == device.obsState.READY:
        logger.info(">> End")
        device.End()
        wait_for_obs_state(device, device.obsState.IDLE)

    try:
        if device.obsState == device.obsState.IDLE:
            logger.info(">> Releasing All Resources")
            device.ReleaseAllResources()        
            wait_for_obs_state(device, device.obsState.EMPTY)    
    except:
        # Execution block in progress may occur if never configured
        if device.obsState == device.obsState.IDLE:
            logger.info(">> Configure")
            scan_types = list(filter(lambda v: v != "interface", json.loads(device.receiveAddresses).keys()))
            d.Configure(json.dumps({"interface": "https://schema.skao.int/ska-sdp-configure/0.4", "scan_type": scan_types[0]}))
            wait_for_obs_state(device, device.obsState.READY)
            device.End()
            wait_for_obs_state(device, device.obsState.IDLE)

    
    if device.obsState == device.obsState.IDLE:
            logger.info(">> Releasing All Resources")
            device.ReleaseAllResources()        
            wait_for_obs_state(device, device.obsState.EMPTY)  

    #try:
    #    if device.obsState == device.obsState.IDLE:
    #        logger.info(">> Releasing All Resources")
    #        device.ReleaseAllResources()        
    #        wait_for_obs_state(device, device.obsState.EMPTY)
    #except:
    #    if device.obsState == device.obsState.IDLE:
    #        logger.info(">> Abort")
    #        device.Abort()        
    #        wait_for_obs_state(device, device.obsState.ABORTED)

    if device.obsState == device.obsState.ABORTED:
        logger.info(">> Restart")
        device.Restart()        
        wait_for_obs_state(device, device.obsState.EMPTY)
        
    if device.obsState == device.obsState.FAULT:
        device.Restart()
        wait_for_obs_state(device, device.obsState.EMPTY)
        
    assert device.obsState == device.obsState.EMPTY
    logger.info("Tango Device is EMPTY")
    
def tango_safe_off(device):
    """
    Safely turns tango device to OFF state
    """
    tango_safe_release(device)
    if device.state() == DevState.ON:
        logger.info(">> Device OFF")
        device.Off()
        wait_for_state(device, DevState.OFF)

    assert device.state() == DevState.OFF
    logger.info("Tango Device is OFF")

In [27]:
tango_safe_off(d)

1|2022-10-03T08:59:43.600Z|INFO|MainThread|tango_safe_release|1795832312.py#82||>> Releasing All Resources
1|2022-10-03T08:59:43.639Z|INFO|MainThread|tango_safe_release|1795832312.py#87||>> Configure
1|2022-10-03T08:59:43.797Z|INFO|MainThread|wait_for_obs_state|1795832312.py#63||Waiting for device obs_state READY...
1|2022-10-03T08:59:43.884Z|INFO|MainThread|wait_for_obs_state|1795832312.py#63||Waiting for device obs_state IDLE...
1|2022-10-03T08:59:43.885Z|INFO|MainThread|tango_safe_release|1795832312.py#96||>> Releasing All Resources
1|2022-10-03T08:59:43.915Z|INFO|MainThread|wait_for_obs_state|1795832312.py#63||Waiting for device obs_state EMPTY...
1|2022-10-03T08:59:43.918Z|INFO|MainThread|tango_safe_release|1795832312.py#121||Tango Device is EMPTY
1|2022-10-03T08:59:43.919Z|INFO|MainThread|tango_safe_off|1795832312.py#129||>> Device OFF
1|2022-10-03T08:59:43.942Z|INFO|MainThread|wait_for_state|1795832312.py#46||Waiting for device state OFF...
1|2022-10-03T08:59:43.950Z|INFO|MainTh

# Configure Execution Block

To create an execution block, parameters need to be passed to the workflow script and helm charts in ska-sdp-scripts to ensure correct reception is configured.

In [4]:
# list of available workflows
!ska-sdp list script

Keys with prefix /script: 
/script/batch:test-batch:0.3.0
/script/batch:test-daliuge:0.3.0
/script/batch:test-dask:0.3.0
/script/realtime:pss-receive:0.3.0
/script/realtime:test-realtime:0.3.0
/script/realtime:test-realtime:0.4.0
/script/realtime:test-receive-addresses:0.4.0
/script/realtime:test-receive-addresses:0.5.0
/script/realtime:vis-receive:0.5.0
/script/realtime:vis-receive:0.5.1
/script/realtime:vis-receive:0.6.0


In [44]:
# Config Globals
total_channels = 13824
total_streams = 4
rate = 10416667  # bits per second
channels_per_stream = total_channels // total_streams

total_timesteps = 6
num_repeats = 1


def create_receive_parameters(eb_id: str, pb_id: str) -> dict:
    max_payloads = total_timesteps * total_streams * num_repeats
    max_payload_misses = 30  # payload timeout in seconds
    max_ms = 1  # -1 to continuously run

    return {
        "image": "artefact.skao.int/ska-sdp-realtime-receive-modules",
        "version": "3.3.0",
        "pvc": {
            "name": "shared"
        },
        "reception": {
            "num_channels": total_channels,
            "channels_per_stream": channels_per_stream,
            # alternatives are schedblock filename or datamodel filename
            "execution_block_id": eb_id,
            "layout": "http://127.0.0.1:80/model/default/ska1_low/layout",
            "sdp_config_backend": "etcd3",
            "sdp_config_host": os.environ["SDP_CONFIG_HOST"],
            "sdp_config_port": 2379,
            "transport_protocol": "tcp",
            "continuous_mode": True
        },
        "plasmaEnabled": True,
        "plasma_parameters": {
            "extraContainers": [
                {
                    "name": "tmlite-server",
                    "image": "artefact.skao.int/ska-sdp-tmlite-server:0.3.0"
                },
                {
                    "name":"plasma-processor",
                    "image":"artefact.skao.int/ska-sdp-realtime-receive-modules:3.3.0",
                    "command": [
                        "plasma-mswriter",
                        "-s", "/plasma/socket",
                        "--max_payloads", "12",
                        "--use_plasmastman", "False",
                        f"/mnt/data/{pb_id}/output.ms"
                    ],
                    "volumeMounts":[
                        {
                           "name":"plasma-storage-volume",
                           "mountPath":"/plasma"
                        },
                        {
                            "name":"shared",
                            "mountPath":"/mnt/data"
                        }
                    ]
                }
            ]
        }
    }


def create_resources_config():
    generator = "notebook"
    today = date.today().strftime("%Y%m%d")
    number = random.randint(0, 99998)

    EXECUTION_BLOCK_ID = f"eb-{generator}-{today}-{number:05d}"
    PROCESSING_BLOCK_ID_REALTIME_SENDER = f"pb-{generator}-{today}-{number:05d}"
    PROCESSING_BLOCK_ID_REALTIME_RECEIVER = f"pb-{generator}-{today}-{number+1:05d}"
    PROCESSING_BLOCK_ID_BATCH = f"pb-{generator}-{today}-{number+2:05d}"

    return {
        "interface": "https://schema.skao.int/ska-sdp-assignres/0.4",
        "resources": {  # also required in 0.3
            "csp_links": [1, 1],
            "receive_nodes": 1,
            "receptors": ["C1", "C2", "C3", "C4", "C5"]
        },
        "execution_block": {
            "eb_id": f"{EXECUTION_BLOCK_ID}",
            "max_length": 21600.0,
            "context": {},
            "scan_types": [
                {
                    "scan_type_id": "science_a",
                    "beams": {
                        "vis0": { "field_id": "field_a", "channels_id": "vis_channels", "polarisations_id": "all"}
                    }
                },
                {
                    "scan_type_id": "science_b",
                    "beams": {
                        "vis1": { "field_id": "field_b", "channels_id": "vis_channels", "polarisations_id": "all"}
                    }
                }
            ],
            "beams": [
                { "beam_id": "vis0", "function": "visibilities" },
                { "beam_id": "vis1", "function": "visibilities" },
            ],
            "channels": [
                {
                    "channels_id": "vis_channels",
                    "spectral_windows": [
                        {
                            "spectral_window_id": "all_channels",
                            "count": 5, "start": 0, "stride": 2,
                            "freq_min": 0.35e9, "freq_max": 0.368e9,
                            "link_map": [[0, 0], [200, 1], [744, 2], [944, 3]]
                        }
                    ]
                }
            ],
            "polarisations": [
                {
                    "polarisations_id": "all",
                    "corr_type": ["XX", "XY", "YX", "YY"]
                }
            ],
            "fields": [
                {
                    "field_id": "field_a",
                    "phase_dir": {
                        "ra": [123.0],
                        "dec": [-60.0],
                        #"ra_str": ["02:42:40.771"], # "ra_str"?
                        #"dec_str": ["-00:00:47.84"], # "dec_str"?
                        "reference_time": "...",
                        "reference_frame": "ICRF3"
                    }
                },
                {
                    "field_id": "field_b",
                    "phase_dir": {
                        "ra": [123.0],
                        "dec": [-60.0],
                        #"ra_str": ["12:29:06.699"], # "ra_str"?
                        #"dec_str": ["02:03:08.598"], # "dec_str"?
                        "reference_time": "...",
                        "reference_frame": "ICRF3" # ICRS
                    }
                },
            ]
        },
        "processing_blocks": [
            {
                "pb_id": f"{PROCESSING_BLOCK_ID_REALTIME_RECEIVER}",
                "script": {"kind": "realtime", "name": "vis-receive", "version": "0.6.0"},
                "parameters": create_receive_parameters(EXECUTION_BLOCK_ID, PROCESSING_BLOCK_ID_REALTIME_RECEIVER)
            }
        ]
    }


# Install and run CBF-Emulator with data from Jupyter

The simplest approach for synchronizing scans between sender and receiver is to run the cbf-emulator packetizer from the runtime controlling the tango device. 

In [47]:
import os
import cbf_sdp.packetiser
from realtime.receive.core.config import create_config_parser

# Download Data
if not os.path.isdir("AA05LOW.ms"):
    !curl https://gitlab.com/ska-telescope/sdp/ska-sdp-realtime-receive-core/-/raw/main/data/AA05LOW.ms.tar.gz --output AA05LOW.ms.tar.gz
!tar -xzf AA05LOW.ms.tar.gz

async def cbf_scan(target_host: str, target_port: str, scan_id: int):
    sender_args = create_config_parser()
    sender_args['reader'] = {
        'scan_ids': [scan_id],
        'num_repeats': num_repeats
    }
    sender_args['transmission'] = {
        'method': 'spead2_transmitters',
        'channels_per_stream': channels_per_stream,
        'transport_protocol': 'tcp',
        'rate': rate,
        'target_host': target_host,
        'target_port': target_port
    }
    await cbf_sdp.packetiser.packetise(sender_args, "AA05LOW.ms")    


# Send-Receive Emulated Scans

Using the cbf-emulator library and tango device, emulated scan reception can be performed by configuring the tango device into the SCANNING state and calling the cbf_scan function to send packets directly from notebook to receiver pod. For tango device usage see: https://developer.skao.int/projects/ska-sdp-lmc/en/latest/index.html

In [None]:
import sys
import socket

tango_safe_release(d)
try:
    if d.state() == DevState.OFF:
        print(">> Device ON")
        d.On()
        wait_for_state(d, DevState.ON)
        wait_for_obs_state(d, d.obsState.EMPTY)

    print(">> Assigning Resources")
    config_eb = create_resources_config()
    # https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-assignres.html
    d.AssignResources(json.dumps(config_eb))
    wait_for_obs_state(d, d.obsState.IDLE, timeout=60)

    print(">> Get Receive Address")
    receiveAddresses = json.loads(d.receiveAddresses)
    
    # use scan types from tango response
    scan_types = list(filter(lambda v: v != "interface", receiveAddresses.keys()))
    hosts = [receiveAddresses[scan_type]["vis0"]['host'][0][1] for scan_type in scan_types]
    print(hosts)
    print("overriding hosts")
    hosts = [f"proc-{config_eb['processing_blocks'][0]['pb_id']}-receive-0.receive.{KUBE_PROC_NAMESPACE}.svc.cluster.local"]
    
    # check all hosts are receiving
    for host in set(hosts):
        found = False
        for tries in range(12):
            try:
                print(host)
                socket.gethostbyname(host.encode('ascii'))
                found = True
                break
            except socket.gaierror as e:
                logger.info(f"waiting for host {host}")
                time.sleep(5)
        if not found:
            raise TimeoutError(host)
    print("Hosts found")
    
    # perform two scans for each configured scan type
    scan_id = 1
    for scan_type in scan_types: 
        host = receiveAddresses[scan_type]["vis0"]["host"][0][1]
        host = hosts[0]
        start_port = receiveAddresses[scan_type]["vis0"]["port"][0][1]
        
        # Note: must always perform configure
        print(">> Configure", scan_type)
        # https://developer.skao.int/projects/ska-telmodel/en/latest/schemas/ska-sdp-configure.html
        d.Configure(json.dumps({"interface": "https://schema.skao.int/ska-sdp-configure/0.4", "scan_type": scan_type}))
        wait_for_obs_state(d, d.obsState.READY)

        for scan in range(1):
            print(">> Scan", scan_id)
            d.Scan(json.dumps({"interface": "https://schema.skao.int/ska-sdp-scan/0.4", "scan_id": scan_id}))
            wait_for_obs_state(d, d.obsState.SCANNING)

            # host sometimes doesnt resolve
            time.sleep(20)
            await cbf_scan(host, start_port, scan_id)
            time.sleep(10)

            print(">> End Scan")

            d.EndScan()
            wait_for_obs_state(d, d.obsState.READY)
            scan_id += 1

    wait = True
    if not wait:
        print(">> End")
        d.End()
        wait_for_obs_state(d, d.obsState.IDLE)

        print(">> Releasing Resources")
        print("resources", config_eb["resources"])
        d.ReleaseResources(json.dumps({"interface": "https://schema.skao.int/ska-sdp-releaseres/0.4", "resources": config_eb["resources"]}))
        wait_for_obs_state(d, d.obsState.EMPTY)

        print(">> Device OFF")
        d.Off()
        wait_for_state(d, DevState.OFF)

except Exception as e:
    # In case of failure, safely restore the device back to its original off state
    #tango_safe_release(d)
    raise e

1|2022-10-03T09:50:42.220Z|INFO|MainThread|tango_safe_release|2433661818.py#76||>> End
1|2022-10-03T09:50:42.266Z|INFO|MainThread|wait_for_obs_state|2433661818.py#63||Waiting for device obs_state IDLE...
1|2022-10-03T09:50:42.268Z|INFO|MainThread|tango_safe_release|2433661818.py#82||>> Releasing All Resources
1|2022-10-03T09:50:42.298Z|INFO|MainThread|wait_for_obs_state|2433661818.py#63||Waiting for device obs_state EMPTY...
1|2022-10-03T09:50:42.301Z|INFO|MainThread|tango_safe_release|2433661818.py#122||Tango Device is EMPTY
>> Assigning Resources
1|2022-10-03T09:50:42.371Z|INFO|MainThread|wait_for_obs_state|2433661818.py#63||Waiting for device obs_state IDLE...
>> Get Receive Address
['proc-pb-notebook-20221003-18277-receiveproc-pb-notebook-20221003-18277-receive-0.receive.dp-yanda-callan-p.svc.cluster.localreceive.dp-yanda-callan-p.svc.cluster.local', 'proc-pb-notebook-20221003-18277-receiveproc-pb-notebook-20221003-18277-receive-0.receive.dp-yanda-callan-p.svc.cluster.localreceive.

In [42]:
tango_safe_release(d)

1|2022-10-03T09:42:57.271Z|INFO|MainThread|tango_safe_release|2433661818.py#76||>> End
1|2022-10-03T09:42:57.317Z|INFO|MainThread|wait_for_obs_state|2433661818.py#63||Waiting for device obs_state IDLE...
1|2022-10-03T09:42:57.318Z|INFO|MainThread|tango_safe_release|2433661818.py#82||>> Releasing All Resources
1|2022-10-03T09:42:57.350Z|INFO|MainThread|wait_for_obs_state|2433661818.py#63||Waiting for device obs_state EMPTY...
1|2022-10-03T09:42:57.353Z|INFO|MainThread|tango_safe_release|2433661818.py#122||Tango Device is EMPTY
