Skip to content
This repository has been archived by the owner on Oct 14, 2022. It is now read-only.

Commit

Permalink
controller, unittest, documentation, packaging
Browse files Browse the repository at this point in the history
  • Loading branch information
jordens committed May 28, 2018
1 parent d31eda1 commit b126d86
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 21 deletions.
14 changes: 4 additions & 10 deletions conda/meta.yaml
@@ -1,24 +1,17 @@
package:
name: hut2
version: {{ environ.get("GIT_DESCRIBE_TAG", "") }}
version: {{ environ.get("GIT_DESCRIBE_TAG", "")[1:] }}

source:
git_url: ..

{% set data = load_setup_py_data() %}

build:
noarch: python
number: {{ environ.get("GIT_DESCRIBE_NUMBER", 0) }}
string: py_{{ environ.get("GIT_DESCRIBE_NUMBER", 0) }}+git{{ environ.get("GIT_FULL_HASH", "")[:8] }}
entry_points:
# NOTE: conda-build cannot distinguish between console and gui scripts
{% for entry_point_type, entry_points in data.get("entry_points", dict()).items() -%}
{% for entry_point in entry_points -%}
- {{ entry_point }}
{% endfor %}
{% endfor %}
script: $PYTHON setup.py install --single-version-externally-managed --record=record.txt
entry_points:
- aqctl_hut2 = hut2.aqctl_hut2:main

requirements:
build:
Expand All @@ -31,6 +24,7 @@ requirements:
test:
imports:
- hut2
- hut2.aqctl_hut2

about:
home: https://github.com/quartiq/hut2
Expand Down
57 changes: 57 additions & 0 deletions hut2/aqctl_hut2.py
@@ -0,0 +1,57 @@
#!/usr/bin/env python3

import argparse
import logging
import sys
import asyncio

from . import HUT2

from artiq.protocols.pc_rpc import Server
from artiq.tools import (verbosity_args, simple_network_args, init_logger,
bind_address_from_args)


logger = logging.getLogger(__name__)


def get_argparser():
parser = argparse.ArgumentParser(
description="""HUT2 controller.
Use this controller for Anel HUT2 power distribution devices.""")
parser.add_argument(
"-d", "--device", default=None,
help="Device host name or IP address.")
simple_network_args(parser, 3271)
verbosity_args(parser)
return parser


def main():
args = get_argparser().parse_args()
init_logger(args)

if args.device is None:
print("You need to supply a -d/--device "
"argument. Use --help for more information.")
sys.exit(1)

loop = asyncio.get_event_loop()

async def run():
with await HUT2.connect(args.device, loop=loop) as dev:
server = Server({"hut2": dev}, None, True)
await server.start(bind_address_from_args(args), args.port)
try:
await server.wait_terminate()
finally:
await server.stop()

try:
loop.run_until_complete(run())
finally:
loop.close()


if __name__ == "__main__":
main()
21 changes: 21 additions & 0 deletions hut2/protocol.py
Expand Up @@ -112,6 +112,27 @@ def datagram_received(self, data, addr):
for r in read:
r.set_result(self.status)

async def get_port_states(self):
"""Return current relay states as a list.
:return: List of eight integers
"""
return (await self.get_status()).port_states

async def get_io_states(self):
"""Return current IO states as a list.
:return: List of eight integers
"""
return (await self.get_status()).io_states

async def get_temp(self):
"""Return current temperature.
:return: Temperature in C
"""
return (await self.get_status()).temp

def __enter__(self):
return self

Expand Down
59 changes: 59 additions & 0 deletions hut2/test.py
@@ -0,0 +1,59 @@
import asyncio
import unittest
import os

from .protocol import HUT2


target = os.getenv("HUT2")
@unittest.skipIf(not target, "HUT2 environment variable not defined")
class HUT2Test(unittest.TestCase):
def with_device(self, f):
loop = asyncio.get_event_loop()
async def run():
with await HUT2.connect(target, loop=loop) as dev:
return await f(dev)
return loop.run_until_complete(run())

def test_connect(self):
async def run(dev):
pass
self.with_device(run)

def test_status(self):
async def run(dev):
st = await dev.get_status()
self.assertGreater(st.temp, 0.)
self.assertLess(st.temp, 100.)
self.with_device(run)

def test_sw(self):
async def run(dev):
for s in 0, 1:
dev.sw(0x7f | (s << 7))
await asyncio.sleep(.2)
self.assertEqual((await dev.get_status()).port_states[7], s)
self.with_device(run)

def test_sw_onoff(self):
async def run(dev):
dev.sw_off(8)
await asyncio.sleep(.2)
self.assertEqual((await dev.get_status()).port_states[7], 0)
dev.sw_on(8)
await asyncio.sleep(.2)
self.assertEqual((await dev.get_status()).port_states[7], 1)
self.with_device(run)

def test_st(self):
async def run(dev):
dev.sw_on(8)
await asyncio.sleep(.2)
dev.st_off(8, 1)
await asyncio.sleep(.2)
self.assertEqual((await dev.get_status()).port_states[7], 1)
await asyncio.sleep(1)
self.assertEqual((await dev.get_status()).port_states[7], 0)
dev.sw_on(8)
await asyncio.sleep(.2)
self.with_device(run)
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -18,7 +18,7 @@
install_requires=[],
entry_points={
"console_scripts": [
# "aqctl_hut2 = hut2.aqctl_hut2:main",
"aqctl_hut2 = hut2.aqctl_hut2:main",
],
},
test_suite="hut2.test",
Expand Down
21 changes: 11 additions & 10 deletions test.py
@@ -1,4 +1,3 @@
import time
import logging
import asyncio

Expand All @@ -10,24 +9,26 @@ def main():
loop = asyncio.get_event_loop()
loop.set_debug(False)
async def run():
with await HUT2.connect("opticlock") as dev:
with await HUT2.connect("opticlock", loop=loop) as dev:
print(await dev.get_status())
dev.sw(0xfe)
await asyncio.sleep(.2)
dev.sw(0x7f)
await asyncio.sleep(.2)
dev.sw(0xff)
await asyncio.sleep(.2)
dev.sw_off(1)
dev.sw_off(8)
await asyncio.sleep(.2)
assert (await dev.get_status()).port_states[8 - 1] == 0
await asyncio.sleep(.2)
assert (await dev.get_status()).port_states[1 - 1] == 0
dev.sw_on(8)
await asyncio.sleep(.2)
dev.sw_on(1)
assert (await dev.get_status()).port_states[8 - 1] == 1
await asyncio.sleep(.2)
assert (await dev.get_status()).port_states[1 - 1] == 1
dev.st_off(1, 1)
dev.st_off(8, 1)
await asyncio.sleep(.2)
assert (await dev.get_status()).port_states[1 - 1] == 1
assert (await dev.get_status()).port_states[8 - 1] == 1
await asyncio.sleep(1)
assert (await dev.get_status()).port_states[1 - 1] == 0
assert (await dev.get_status()).port_states[8 - 1] == 0
loop.run_until_complete(run())


Expand Down

0 comments on commit b126d86

Please sign in to comment.