-
Notifications
You must be signed in to change notification settings - Fork 5
/
otii.py
130 lines (104 loc) · 4.54 KB
/
otii.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python
from otii_tcp_client import otii_connection, otii_exception, project, arc
class Otii:
""" Class to define an Otii object.
Attributes:
connection (:obj:OtiiConnection): Object to handle connection to the Otii server.
"""
def __init__(self, connection):
"""
Args:
connection (:obj:OtiiConnection): Object to handle connection to the Otii server.
"""
self.connection = connection
def create_project(self):
""" Create a new project.
Returns:
int: ID of created project.
"""
request = {"type": "request", "cmd": "otii_create_project"}
response = self.connection.send_and_receive(request)
if response["type"] == "error":
raise otii_exception.Otii_Exception(response)
return project.Project(response["data"]["project_id"], self.connection)
def get_active_project(self):
""" Returns the active project if there is one.
Returns:
:obj:Project: Project object.
"""
request = {"type": "request", "cmd": "otii_get_active_project"}
response = self.connection.send_and_receive(request)
if response["type"] == "error":
raise otii_exception.Otii_Exception(response)
if response["data"]["project_id"] == -1:
return None
else:
return project.Project(response["data"]["project_id"], self.connection)
def get_device_id(self, device_name):
""" Get device id from device name.
Args:
device_name (str): Name of device to get ID of.
Returns:
str: Device ID of requested device.
"""
data = {"device_name": device_name}
request = {"type": "request", "cmd": "otii_get_device_id", "data": data}
response = self.connection.send_and_receive(request)
if response["type"] == "error":
raise otii_exception.Otii_Exception(response)
return response["data"]["device_id"]
def get_devices(self):
""" Get a list of connected devices.
Returns:
list: List of Arc device objects.
"""
request = {"type": "request", "cmd": "otii_get_devices"}
response = self.connection.send_and_receive(request)
if response["type"] == "error":
raise otii_exception.Otii_Exception(response)
elif not response["data"]:
return []
device_objects = []
for device in response["data"]["devices"]:
if device["type"] == "Arc":
device_object = arc.Arc(device, self.connection)
device_objects.append(device_object)
return device_objects
def open_project(self, filename, force = False, progress = False):
""" Open an existing project.
Args:
filename (str): Name of project file.
force (bool, optional): True to open even if unsaved data exists, False not to.
progress (bool, optional): True to receive notifications about progress of opening file, False not to.
Returns:
int: ID of opened project.
"""
data = {"filename": filename, "force": force, "progress": progress}
request = {"type": "request", "cmd": "otii_open_project", "data": data}
# Set timeout to None (blocking) as command can operate over large quantities of data to avoid timeout
response = self.connection.send_and_receive(request, None)
if response["type"] == "error":
raise otii_exception.Otii_Exception(response)
proj = project.Project(response["data"]["project_id"], self.connection)
proj.filename = response["data"]["filename"]
return proj
def set_all_main(self, enable):
""" Turn on or off the main power on all connected devices.
Args:
enable (bool): True to turn on main power, False to turn off.
"""
data = {"enable": enable}
request = {"type": "request", "cmd": "otii_set_all_main", "data": data}
response = self.connection.send_and_receive(request)
if response["type"] == "error":
raise otii_exception.Otii_Exception(response)
def shutdown(self):
""" Shutdown Otii
"""
request = {"type": "request", "cmd": "otii_shutdown"}
try:
response = self.connection.send_and_receive(request)
if response["type"] == "error":
raise otii_exception.Otii_Exception(response)
except otii_connection.DisconnectedException:
pass