<b>Definitions</b>

Define the used TANGO devices and constants.
Change list of device proxies and "devices" and "tiles" to actually present and used tiles

Put all devices Online. Tiles must be in Maintenance mode in order to use the test signal generator.

In [1]:
import tango
import time
import json
import numpy as np

from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import (
    AdminMode,
    CommunicationStatus,
    HealthState,
    PowerState,
    SimulationMode,
    TestMode,
)
# for time conversion
from datetime import datetime,timezone
RFC_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"

# define devices
station = tango.DeviceProxy('low-mccs/station/001')
subrack = tango.DeviceProxy('low-mccs/subrack/0001')
t1 = tango.DeviceProxy('low-mccs/tile/0001')
t2 = tango.DeviceProxy('low-mccs/tile/0002')
t3 = tango.DeviceProxy('low-mccs/tile/0003')
t4 = tango.DeviceProxy('low-mccs/tile/0004')
t5 = tango.DeviceProxy('low-mccs/tile/0005')
t6 = tango.DeviceProxy('low-mccs/tile/0006')
t7 = tango.DeviceProxy('low-mccs/tile/0007')
t8 = tango.DeviceProxy('low-mccs/tile/0008')

station.logginglevel=5
devices = [station, subrack, t1, t2, t3, t4, t5, t6, t7, t8]
tiles = [t1, t4]

# Put everything online
for d in devices:
    d.adminmode = AdminMode.ONLINE
time.sleep(0.2)
# 
# Tiles must be in Maintenance mode to allow test signal generator
for t in tiles:
    t.adminMode = AdminMode.MAINTENANCE

<b> Local parameters </b>
<ul><li> csp_ingest_ip: IP address of the CSP ingest port</li>
    <li> lmc_ip: IP address of the LMC DAQ system</li>
    <li> input ADC: ADC input channel (0-31, 0-1 for antenna 1, 30-31 for antenna 16)</li>
    <li> input_frequency: frequency of the input signal. Used to compute the beamformed channel</li></ul>
The beamformer beamforms 8 channels starting at the first even channel equal or lower to this one

In [49]:
csp_ingest_ip = "10.0.0.98"
lmc_ip = "10.0.0.98"
input_adc = 8
input_frequency = 230e6
nof_channels = 8
start_channel = int(round(input_frequency/800e6*1024))
if start_channel % 2 == 0:
    print(f"Signal is on beamformed channel 0, corresponding to TPM channel {start_channel}")
else: 
    print(f"Signal is on beamformed channel 1, corresponding to TPM channel {start_channel}")
    start_channel = start_channel -1

Signal is on beamformed channel 0, corresponding to TPM channel 294


<b>Turn TPM on</b>
<ul>
    <li>Turn TPM on if not already on.</li>
    <li>Wait for intialization if not already initialised</li> 
    <li>If initialisation succeeds <ul>
        <li>perform initial setup</li>
        <li>start the acquisition.</li></ul>
    <li>Set destination IP addresses</li>
    <li>At the end, print signal level on input ADCs</li></ul>

In [51]:
t1.on()
t = 0
while not t1.tileprogrammingstate in ['Initialised', 'Synchronised']:
    print(f"{t}: {t1.tileprogrammingstate}")
    time.sleep(2)
    t = t + 2
    if t > 60:
        break
if t > 60:
    print("Initialisation failed")
elif t1.tileprogrammingstate == 'Initialised':
    t1.ConfigureStationBeamformer(json.dumps({
        "start_channel": 192,
        "n_channels": 8,
        "is_first": True,
        "is_last": True,
    }))
    t1.statictimedelays=np.zeros([32],dtype=int)
    t1.channeliserRounding=[4]*512
    t1.cspRounding=[0]*384
    start_time = datetime.strftime(datetime.fromtimestamp(int(time.time())+3), RFC_FORMAT)
    t1.StartAcquisition(json.dumps({"start_time": start_time}))
    time.sleep(3)
t1.SetLmcDownload(json.dumps({"destination_ip": lmc_ip, "mode": "10g"}))
t1.SetLmcIntegratedDownload(json.dumps({"destination_ip": lmc_ip, "mode": "10g"}))
t1.Configure40GCore(json.dumps({"core_id": 0, "arp_table_entry": 0, "destination_ip": csp_ingest_ip}))
t1.Configure40GCore(json.dumps({"core_id": 1, "arp_table_entry": 0, "destination_ip": csp_ingest_ip}))
print(f"{t1.fpgaframetime}: Tile is in state {t1.tileprogrammingstate}")
print(f"Input levels: {t1.adcPower}")

0: Off
2: Off
4: Off
6: Off
8: Off
10: Off
12: Off
14: Off
16: Off
18: Off
20: Off
22: NotProgrammed
24: NotProgrammed
26: NotProgrammed
28: NotProgrammed
30: Programmed
32: Programmed
34: Programmed
36: Programmed
38: Programmed
40: Programmed
42: Programmed
44: Programmed
46: Programmed
48: Programmed
2023-03-27T20:12:58.353093Z: Tile is in state Synchronised
Input levels: [0.86762496 0.999998   0.91489422 0.999962   0.99534705 0.99889736
 0.98399345 0.999998   2.77278187 0.99175175 2.79982062 0.999998
 2.68401522 0.9987652  2.76492858 0.99984998 0.99545555 0.99952187
 0.5679657  0.97687395 0.93411371 0.99987799 0.99915762 0.99856092
 2.73375369 0.98354618 2.72180959 0.99986199 2.68047331 0.99989799
 2.78843212 0.999998  ]


Program te test generator to produce null samples except for selected input.
Input ADC can be identified from adcPower attribute above.
Then start the generator.

In [52]:
print(t1.fpgaframetime)
channels = list(range(32))
channels.remove(input_adc)
start_time = datetime.strftime(datetime.fromtimestamp(time.time()+2), RFC_FORMAT)
arguments = json.dumps({
    'tone_frequency': 100.06e6, 
    'tone_amplitude': 0.0, 
    'adc_channels': channels,
    'set_time': start_time})
t1.ConfigureTestGenerator(arguments)
t1.SetBeamformerRegions([start_channel,nof_channels,0,1,0,0,0,0])
t1.StartBeamformer(json.dumps({"start_time": start_time}))
time.sleep(2)
print(f"Beamformer running: {t1.isBeamformerRunning}")

2023-03-27T20:13:17.864840Z
Beamformer running: True


In [53]:
t1.senddatasamples(json.dumps({"data_type": "beam"}))

[array([0], dtype=int32), ['SendDataSamples command completed OK']]

In [48]:
t1.off()


[array([2], dtype=int32), ['1679947863.7298675_270892651940524_Off']]