forked from hundeboll/riddler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
node_power.py
132 lines (108 loc) · 3.65 KB
/
node_power.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python2
import serial
import threading
import os
import time
class power(threading.Thread):
def __init__(self, args):
super(power, self).__init__(None)
self.args = args
# State control
self.end = threading.Event()
self.data_lock = threading.Lock()
# Temporary data storages
self.measure_amp = []
self.measure_volt = []
self.avg_amp = None
self.avg_volt = None
self.avg_pwr = None
self.open_serial()
# Configure and start thread
self.name = 'power_meas'
self.start()
def stop(self):
self.end.set()
if self.ser:
print(" Closing serial device")
self.ser.close()
# Main function of thread
def run(self):
# Main loop while we are not told to end
while not self.end.is_set():
if not self.ser:
time.sleep(1) #TIME!
continue
# Read from arduino
try:
data = self.ser.readline().strip()
if not data:
continue
except OSError as e:
print(e)
continue
meas = data.split()
# Save measurement for later processing
self.data_lock.acquire()
try:
self.measure_amp.append(float(meas[0])/1000)
#self.measure_volt.append(meas[1])
self.measure_volt.append(5)
except ValueError as e:
print(e)
self.data_lock.release()
# Prepare configured serial device for measuring
def open_serial(self):
self.ser = None
# Check if device exists
if (os.name == "posix") and not os.path.exists(self.args.power_dev):
self.error = " Power device '{0}' does not exist".format(self.args.power_dev)
print(self.error)
return False
# Open device and flush old data
print(" Opening serial device: {}".format(self.args.power_dev))
self.ser = serial.Serial(self.args.power_dev, 9600, timeout=1)
self.ser.flushInput()
# Wait for device to settle
while not self.ser.readline().strip():
continue
print(" Serial device ready")
return True
# Process the data measured since last read
def process_data(self):
# Don't measure while processing data
self.data_lock.acquire()
f_amp = self.measure_amp
f_volt = self.measure_volt
self.measure_amp = []
self.measure_volt = []
self.data_lock.release()
# Make sure we got some data
if not f_amp or not f_volt:
return
# Calculate average values
self.avg_amp = sum(f_amp)/len(f_amp)
self.avg_volt = sum(f_volt)/len(f_volt)
self.avg_pwr = self.avg_amp * self.avg_volt
# Read the total power usage since last read
def read_power(self):
return self.avg_pwr
# Read the average ampere level since last read
def read_amp(self):
return self.avg_amp
# Read the average volt level since last read
def read_volt(self):
return self.avg_volt
if __name__ == "__main__":
import node_defaults as args
print("Start power")
p = power(args)
print("Wait 20 secs")
time.sleep(20)
p.process_data()
print("Read Power")
print(p.read_power())
print("Read Volt")
print(p.read_volt())
print("Read Amp")
print(p.read_amp())
p.stop.set()