forked from dhague/vpower
-
Notifications
You must be signed in to change notification settings - Fork 9
/
vpower.py
122 lines (107 loc) · 3.69 KB
/
vpower.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python
import sys
import time
import platform
from ant.core import driver
from ant.core import node
from usb.core import find
from PowerMeterTx import PowerMeterTx
from SpeedCadenceSensorRx import SpeedCadenceSensorRx
from config import DEBUG, LOG, NETKEY, POWER_CALCULATOR, POWER_SENSOR_ID, SENSOR_TYPE, SPEED_SENSOR_ID
antnode = None
speed_sensor = None
power_meter = None
last_speed = 0
last_time = 0
stopped = True
def stop_ant():
if speed_sensor:
print("Closing speed sensor")
speed_sensor.close()
speed_sensor.unassign()
if power_meter:
print("Closing power meter")
power_meter.close()
power_meter.unassign()
if antnode:
print("Stopping ANT node")
antnode.stop()
pywin32 = False
if platform.system() == 'Windows':
def on_exit(sig, func=None):
stop_ant()
try:
import win32api
win32api.SetConsoleCtrlHandler(on_exit, True)
pywin32 = True
except ImportError:
print("Warning: pywin32 is not installed, use Ctrl+C to stop")
try:
print("Using " + POWER_CALCULATOR.__class__.__name__)
devs = find(find_all=True, idVendor=0x0fcf)
for dev in devs:
if dev.idProduct in [0x1008, 0x1009]:
stick = driver.USB2Driver(log=LOG, debug=DEBUG, idProduct=dev.idProduct, bus=dev.bus, address=dev.address)
try:
stick.open()
except:
continue
stick.close()
break
else:
print("No ANT devices available")
if getattr(sys, 'frozen', False):
input()
sys.exit()
antnode = node.Node(stick)
print("Starting ANT node")
antnode.start()
key = node.Network(NETKEY, 'N:ANT+')
antnode.setNetworkKey(0, key)
print("Starting speed sensor")
try:
# Create the speed sensor object and open it
speed_sensor = SpeedCadenceSensorRx(antnode, SENSOR_TYPE, SPEED_SENSOR_ID & 0xffff)
speed_sensor.open()
# Notify the power calculator every time we get a speed event
speed_sensor.notify_change(POWER_CALCULATOR)
except Exception as e:
print("speed_sensor error: " + repr(e))
speed_sensor = None
print("Starting power meter with ANT+ ID " + repr(POWER_SENSOR_ID))
try:
# Create the power meter object and open it
power_meter = PowerMeterTx(antnode, POWER_SENSOR_ID)
power_meter.open()
except Exception as e:
print("power_meter error: " + repr(e))
power_meter = None
# Notify the power meter every time we get a calculated power value
POWER_CALCULATOR.notify_change(power_meter)
print("Main wait loop")
while True:
try:
# Workaround for RGT Cycling and GTBikeV
if not stopped:
t = int(time.time())
if t >= last_time + 3:
if speed_sensor.currentData.speedEventTime == last_speed:
# Set power to zero if speed sensor doesn't update for 3 seconds
power_meter.powerData.instantaneousPower = 0
stopped = True
last_speed = speed_sensor.currentData.speedEventTime
last_time = t
# Force an update every second to avoid power drops
power_meter.update(power_meter.powerData.instantaneousPower)
elif power_meter.powerData.instantaneousPower:
stopped = False
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
break
except Exception as e:
print("Exception: " + repr(e))
if getattr(sys, 'frozen', False):
input()
finally:
if not pywin32:
stop_ant()