In [53]:
# Most of this was adapted from https://github.com/lsst-ts/ts_salobj/blob/develop/tests/test_csc.py
import asyncio
import json
from lsst.ts import salobj
import logging

STD_TIMEOUT = 15  # timeout for command ack
LONG_TIMEOUT = 30  # timeout for CSCs to start
EVENT_DELAY = 0.1  # time for events to be output as a result of a command
NODATA_TIMEOUT = 0.1  # timeout for when we expect no new data
SHOW_LOG_MESSAGES = False


class FailedCallbackCsc(salobj.TestCsc):
    """A CSC whose do_wait command raises a RuntimeError and whose do_fault command includes a report"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.exc_msg = "do_wait raised an exception on purpose on Test-{}".format(kwargs["index"])
        self.index = kwargs["index"]

    async def do_wait(self, data):
        raise RuntimeError(self.exc_msg)

    async def do_fault(self, data):
        """Execute the fault command with a report, code and traceback.
        Change the summary state to State.FAULT
        """
        self.log.warning("Executing fault on Test-{}".format(self.index))
        code = 52
        report = "Report for error code for Test-{}".format(self.index,)
        traceback = "Traceback for error code for Test-{}".format(self.index,)
        self.fault(code=code, report=report, traceback=traceback)


class LogMessagesMock():
    """Triggers logMessages and errorCode events in a TestCSC """

    def __init__(self, salindex, *args, **kwargs):
        self.csc = FailedCallbackCsc(index=salindex, *args, **kwargs)
        d = salobj.Domain()
        self.r = salobj.Remote(d, 'Test', salindex)
        self.salindex = salindex

    async def set_log_level(self):
        await self.r.cmd_setLogLevel.set_start(level=logging.DEBUG, timeout=STD_TIMEOUT)

    def log_info_message(self):
        info_message = "test info message for Test-{}".format(self.salindex,)
        self.csc.log.info(info_message)

    def log_warn_message(self):
        warn_message = "test warn message for Test-{}".format(self.salindex,)
        self.csc.log.warning(warn_message)

    async def log_error_message(self):
        with salobj.assertRaisesAckError():
            await self.r.cmd_wait.set_start(duration=5, timeout=STD_TIMEOUT)

    async def printmessage(self):
        msg = await self.r.evt_logMessage.next(flush=True)
        print('\n TestCSC', self.salindex, ' | msg:', msg.message, '\nlvl:', msg.level, '\ntrace:', msg.traceback)


async def launch(salindex, debug=False):
    mock = LogMessagesMock(salindex, initial_state=salobj.State.ENABLED)
    asyncio.ensure_future(mock.csc.done_task)
    await mock.set_log_level()
    logmessages = [
        mock.log_info_message,
        mock.log_warn_message,
        mock.log_error_message
    ]

    counter = 1

    while True:
        for index, message in enumerate(logmessages):
            if index == 2:
                await message()
            else:
                message()
            if debug:
                while True:
                    try:
                        await mock.printmessage()
                    except asyncio.TimeoutError:
                        break
            counter += 1
            await asyncio.sleep(5)

def create_cscs(number):
    awaitables = []
    loop = asyncio.get_event_loop()
    for salindex in range(1, number+1):
        awaitables.append(loop.create_task(launch(salindex, True)))
    return awaitables

In [54]:
import aiohttp
import json
import asyncio
import requests
from astropy.time import Time

class clientInstance:
    websocket_url = ''
    received_messages = 0
    
    def request_token(self):
        url = 'http://love-manager-mount:8000/manager/api/get-token/'
        data = {
            'username': 'test',
            'password': 'test',
        }
        resp = requests.post(url, data = data)
        token = resp.json()['token']
        print(token)
        self.websocket_url = f'ws://love-manager-mount:8000/manager/ws/subscription?token={token}'
    
    async def handle_message_reception(self):
        """Handles the reception of messages"""
        if self.websocket:
            async for message in self.websocket:
                if message.type == aiohttp.WSMsgType.TEXT:
                    msg = json.loads(message.data)
                    if 'category' not in msg or msg['category'] != 'event' or "option" in msg and msg["option"] == "subscribe":
                        continue
                    print(msg)
                    cur_time = Time.now().tai.datetime.timestamp()
                    snd_time = msg['data'][0]['data']['summaryState'][0]['private_sndStamp']['value']
                    print(cur_time, snd_time, cur_time - snd_time)
                    self.received_messages = self.received_messages + 1
    
    async def subscribe_to_salindex(self, salindex):
        subscribe_msg = {
            'option': 'subscribe',
            'category': 'event',
            'csc': 'Test',
            'salindex': f'{salindex}',
            'stream': 'summaryState'
        }
        await self.websocket.send_str(json.dumps(subscribe_msg))
    
    async def start_ws_client(self, number):
        async with aiohttp.ClientSession() as session:
            self.websocket = await session.ws_connect(self.websocket_url)
            # async with websockets.connect(websocket_url) as websocket:
            print('started client')
            for i in range(1, N_EMITTERS+1):
                await self.subscribe_to_salindex(i)
                
            await self.handle_message_reception()


In [55]:
from lsst.ts import salobj
import asyncio

class Emitter:
    
    def __init__(self, *args, **kwargs):
        self.salindex = kwargs["salindex"]
        self.emitted_commands = 0
        
    async def create_emitter_task(self):
        d = salobj.Domain()
        r = salobj.Remote(d, 'Test', self.salindex)

        cmds = [
            r.cmd_enable,
            r.cmd_fault,
            r.cmd_standby,
            r.cmd_start,
        ]
        #while True:
        for i in range(100):
            for command in cmds:
                print('test', command, flush=True)
                try:
                    await command.start()
                    self.emitted_commands = self.emitted_commands + 1
                except Exception as e:
                    print('Test CSC error:', e)
                await asyncio.sleep(1)
            
def create_emitters(number):
    emitters = []
    for salindex in range(1, number+1):
        emitters.append(Emitter(salindex=salindex))
    return emitters

In [56]:
# Create emitters and CSCs
N_EMITTERS = 1
loop = asyncio.get_event_loop()
csc_tasks = create_cscs(N_EMITTERS)
emitters = create_emitters(N_EMITTERS)
emitter_tasks = [loop.create_task(e.create_emitter_task()) for e in emitters]

backend_tasks = [*csc_tasks, *emitter_tasks]
# [t.cancel() for t in tasks]
# await asyncio.gather(*tasks)



In [57]:
# Create clients and listen to ws messages
N_CLIENTS = 1
loop = asyncio.get_event_loop()
clients = [clientInstance() for i in range(N_CLIENTS)]
[c.request_token() for c in clients]
client_tasks = [loop.create_task(c.start_ws_client(N_EMITTERS)) for c in clients]
# await asyncio.gather(*tasks)

65f7fda930f261365c28dcde4e55cd797ca58b33


In [58]:
# print(clients[0].received_messages)
# print(emitters[0].emitted_commands)
print([c.received_messages for c in clients])
print(sum([e.emitted_commands for e in emitters]))

[0]
0
test RemoteCommand(Test, 1, enable)
started client
{'category': 'event', 'data': [{'csc': 'Test', 'salindex': 1, 'data': {'summaryState': [{'TestID': {'value': 1, 'dataType': 'Int'}, 'private_revCode': {'value': '90255bf1', 'dataType': 'String'}, 'private_sndStamp': {'value': 1591228052.144164, 'dataType': 'Float'}, 'private_rcvStamp': {'value': 1591228052.1459935, 'dataType': 'Float'}, 'private_seqNum': {'value': 1, 'dataType': 'Int'}, 'private_origin': {'value': 1437, 'dataType': 'Int'}, 'private_host': {'value': 2087767441, 'dataType': 'Int'}, 'summaryState': {'value': 2, 'dataType': 'Int'}, 'priority': {'value': 0, 'dataType': 'Int'}}]}}], 'subscription': 'event-Test-1-summaryState'}
1591228053.117181 1591228052.144164 0.9730169773101807
Test CSC error: msg='Command failed', ackcmd=(ackcmd private_seqNum=456806965, ack=<SalRetCode.CMD_FAILED: -302>, error=1, result='Failed: enable not allowed in state <State.ENABLED: 2>')

 TestCSC 1  | msg: test info message for Test-1 
lvl:

In [59]:
[t.cancel() for t in [*backend_tasks, *client_tasks]]
#[t.cancel() for t in [*client_tasks]]
for task in asyncio.Task.all_tasks():
    task.cancel()

1591228016.058736

test RemoteCommand(Test, 1, enable)
{'category': 'event', 'data': [{'csc': 'Test', 'salindex': 1, 'data': {'summaryState': [{'TestID': {'value': 1, 'dataType': 'Int', 'units': 'None'}, 'private_revCode': {'value': '90255bf1', 'dataType': 'String', 'units': 'unitless'}, 'private_sndStamp': {'value': 1591228016.4472847, 'dataType': 'Float', 'units': 'secs'}, 'private_rcvStamp': {'value': 1591228016.4499032, 'dataType': 'Float', 'units': 'secs'}, 'private_seqNum': {'value': 161, 'dataType': 'Int', 'units': 'unitless'}, 'private_origin': {'value': 1437, 'dataType': 'Int', 'units': 'unitless'}, 'private_host': {'value': 344063876, 'dataType': 'Int', 'units': 'unitless'}, 'summaryState': {'value': 2, 'dataType': 'Int', 'units': 'unitless'}, 'priority': {'value': 0, 'dataType': 'Int', 'units': 'unitless'}}]}}], 'subscription': 'event-Test-1-summaryState'}
1591227979.4594197 1591228016.4472847 -36.98786497116089
test RemoteCommand(Test, 1, fault)

 TestCSC 1  | msg: Executing fault on Test-1 
