Skip to content

Commit

Permalink
Add integration tests for SCPI PSU Agent (#311)
Browse files Browse the repository at this point in the history
* Sleep in prologix write to prevent messages blending together

* Write tests for all agent tasks/processes

* Add test_mode parameter to monitor_output
  • Loading branch information
erfz committed Jun 24, 2022
1 parent 07b7f4a commit 5b791a4
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 4 deletions.
5 changes: 5 additions & 0 deletions agents/scpi_psu/scpi_psu_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ def monitor_output(self, session, params=None):

wait_time = params.get('wait', 1)
channels = params.get('channels', [1, 2, 3])
test_mode = params.get('test_mode', False)

session.set_status('running')
self.monitor = True

while self.monitor:
Expand Down Expand Up @@ -93,6 +95,9 @@ def monitor_output(self, session, params=None):

time.sleep(wait_time)

if test_mode:
break

return True, "Finished monitoring current"

def stop_monitoring(self, session, params=None):
Expand Down
10 changes: 6 additions & 4 deletions socs/agent/prologix_interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import socket


Expand All @@ -16,16 +17,17 @@ def conn_socket(self):
self.sock.settimeout(5)

def configure(self):
self.write('++mode 1\n')
self.write('++auto 1\n')
self.write('++mode 1')
self.write('++auto 1')
self.write('++addr ' + str(self.gpibAddr))

def write(self, msg):
message = msg + '\n'
self.sock.send(message.encode())
self.sock.sendall(message.encode())
time.sleep(0.1) # Don't send messages too quickly

def read(self):
return self.sock.recv(128).decode().rstrip('\n').rstrip('\r')
return self.sock.recv(128).decode().strip()

def version(self):
self.write('++ver')
Expand Down
6 changes: 6 additions & 0 deletions tests/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,11 @@ hosts:
['--port', 5502],
]
},
{'agent-class': 'ScpiPsuAgent',
'instance-id': 'psuK',
'arguments': [['--ip-address', '127.0.0.1'],
['--gpib-slot', '1']
]
},
]
}
86 changes: 86 additions & 0 deletions tests/integration/test_scpi_psu_agent_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import pytest

import ocs
from ocs.base import OpCode

from ocs.testing import (
create_agent_runner_fixture,
create_client_fixture,
)

from integration.util import create_crossbar_fixture

from socs.testing.device_emulator import create_device_emulator

pytest_plugins = "docker_compose"

wait_for_crossbar = create_crossbar_fixture()
run_agent = create_agent_runner_fixture(
"../agents/scpi_psu/scpi_psu_agent.py",
"scpi_psu_agent",
args=["--log-dir", "./logs/"],
)
client = create_client_fixture("psuK")
gpib_emu = create_device_emulator(
{
# manufacturer, model, serial, firmware
"*idn?": "Keithley instruments, 2230G-30-1, 9203269, 1.16-1.04",
},
relay_type="tcp",
port=1234, # hard-coded in prologix_interface.py (line 16)
)


def check_resp_success(resp):
print(resp)
assert resp.status == ocs.OK
print(resp.session)
assert resp.session["op_code"] == OpCode.SUCCEEDED.value


@pytest.mark.integtest
def test_scpi_psu_init_psu(wait_for_crossbar, gpib_emu, run_agent, client):
resp = client.init()
check_resp_success(resp)


@pytest.mark.integtest
def test_scpi_psu_set_output(wait_for_crossbar, gpib_emu, run_agent, client):
client.init()
resp = client.set_output(channel=2, state=True)
check_resp_success(resp)

resp = client.set_output(channel=2, state=False)
check_resp_success(resp)


@pytest.mark.integtest
def test_scpi_psu_set_current(wait_for_crossbar, gpib_emu, run_agent, client):
client.init()
resp = client.set_current(channel=1, current=2.5)
check_resp_success(resp)


@pytest.mark.integtest
def test_scpi_psu_set_voltage(wait_for_crossbar, gpib_emu, run_agent, client):
client.init()
resp = client.set_voltage(channel=3, volts=19.7)
check_resp_success(resp)


@pytest.mark.integtest
def test_scpi_psu_monitor_output(wait_for_crossbar, gpib_emu, run_agent, client):
responses = {
"MEAS:VOLT? CH1": "3.14",
"MEAS:CURR? CH1": "6.28",
"MEAS:VOLT? CH2": "2.72",
"MEAS:CURR? CH2": "5.44",
"MEAS:VOLT? CH3": "1.23",
"MEAS:CURR? CH3": "2.46",
}
gpib_emu.define_responses(responses)

client.init()
resp = client.monitor_output.start(test_mode=True, wait=0)
resp = client.monitor_output.wait()
check_resp_success(resp)

0 comments on commit 5b791a4

Please sign in to comment.