-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathEmonHubTx3eInterfacer.py
101 lines (71 loc) · 2.58 KB
/
EmonHubTx3eInterfacer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import serial
import time
import Cargo
from pydispatch import dispatcher
import emonhub_coder as ehc
import emonhub_interfacer as ehi
"""class EmonHubTx3eInterfacer
Monitors the serial port for data
"""
class EmonHubTx3eInterfacer(ehi.EmonHubInterfacer):
def __init__(self, name, com_port='', com_baud=9600):
"""Initialize interfacer
com_port (string): path to COM port
"""
# Initialization
super(EmonHubTx3eInterfacer, self).__init__(name)
# Open serial port
self._ser = self._open_serial_port(com_port, com_baud)
# Initialize RX buffer
self._rx_buf = ''
def close(self):
"""Close serial port"""
# Close serial port
if self._ser is not None:
self._log.debug("Closing serial port")
self._ser.close()
def _open_serial_port(self, com_port, com_baud):
"""Open serial port
com_port (string): path to COM port
"""
#if not int(com_baud) in [75, 110, 300, 1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200]:
# self._log.debug("Invalid 'com_baud': " + str(com_baud) + " | Default of 9600 used")
# com_baud = 9600
try:
s = serial.Serial(com_port, com_baud, timeout=0)
self._log.debug("Opening serial port: " + str(com_port) + " @ "+ str(com_baud) + " bits/s")
except serial.SerialException as e:
self._log.error(e)
raise EmonHubInterfacerInitError('Could not open COM port %s' %
com_port)
else:
return s
def read(self):
"""Read data from serial port and process if complete line received.
Return data as a list: [val1, val2,...]
"""
# Read serial RX
self._rx_buf = self._rx_buf + self._ser.readline()
# If line incomplete, exit
if '\r\n' not in self._rx_buf:
return
# Remove CR,LF
f = self._rx_buf[:-2].strip()
# Create a Payload object
c = Cargo.new_cargo(rawdata=f)
# Reset buffer
self._rx_buf = ''
# Parse the ESP format string
payload=[]
if f.startswith('ct1:'):
for item in f.split(',')[:]:
payload.append(item.split(':')[1])
f=payload
if int(self._settings['nodeoffset']):
c.nodeid = int(self._settings['nodeoffset'])
c.realdata = f
else:
if len(f)>0:
c.nodeid = int(f[0])
c.realdata = f[1:]
return c