-
Notifications
You must be signed in to change notification settings - Fork 31
/
agps3threaded.py
89 lines (73 loc) · 3.43 KB
/
agps3threaded.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3.5
# coding=utf-8
"""Threaded gps client"""
from __future__ import print_function
from threading import Thread
from time import sleep
try: # This kludge to get around imports with files and directories the same name.
import agps3 # Python 3
except ImportError:
from . import agps3 # Python 2
__author__ = 'Moe'
__copyright__ = 'Copyright 2016 Moe'
__license__ = 'MIT'
__version__ = '0.2.3'
HOST = '127.0.0.1' # gpsd
GPSD_PORT = 2947 # defaults
PROTOCOL = 'json' # "
class AGPS3mechanism(object):
"""Create threaded data stream as updated object attributes
"""
def __init__(self):
self.socket = agps3.GPSDSocket()
self.data_stream = agps3.DataStream()
def stream_data(self, host=HOST, port=GPSD_PORT, enable=True, gpsd_protocol=PROTOCOL, devicepath=None):
""" Connect and command, point and shoot, flail and bail
"""
self.socket.connect(host, port)
self.socket.watch(enable, gpsd_protocol, devicepath)
def unpack_data(self, usnap=.2): # 2/10th second sleep between empty requests
""" Iterates over socket response and unpacks values of object attributes.
Sleeping here has the greatest response to cpu cycles short of blocking sockets
"""
for new_data in self.socket:
if new_data:
self.data_stream.unpack(new_data)
else:
sleep(usnap) # Sleep in seconds after an empty look up.
def run_thread(self, usnap=.2, daemon=True):
"""run thread with data
"""
# self.stream_data() # Unless other changes are made this would limit to localhost only.
try:
gps3_data_thread = Thread(target=self.unpack_data, args={usnap: usnap}, daemon=daemon)
except TypeError:
# threading.Thread() only accepts daemon argument in Python 3.3
gps3_data_thread = Thread(target=self.unpack_data, args={usnap: usnap})
gps3_data_thread.setDaemon(daemon)
gps3_data_thread.start()
def stop(self):
""" Stop as much as possible, as gracefully as possible, if possible.
"""
self.stream_data(enable=False) # Stop data stream, thread is on its own so far.
print('Process stopped by user')
print('Good bye.') # You haven't gone anywhere, re-start it all with 'self.stream_data()'
if __name__ == '__main__':
from misc import add_args
args = add_args()
agps3_thread = AGPS3mechanism() # The thread triumvirate
agps3_thread.stream_data(host=args.host, port=args.port, gpsd_protocol=args.gpsd_protocol)
agps3_thread.run_thread(usnap=.2) # Throttle sleep between empty lookups in seconds defaults = 0.2 of a second.
seconds_nap = int(args.seconds_nap) # Threaded Demo loop 'seconds_nap' is not the same as 'usnap'
while True:
for nod in range(0, seconds_nap):
print('{:.0%} wait period of {} seconds'.format(nod / seconds_nap, seconds_nap), end='\r')
sleep(1)
print('\nGPS3 Thread still functioning at {}'.format(agps3_thread.data_stream.time))
print('Lat:{} Lon:{} Speed:{} Course:{}\n'.format(agps3_thread.data_stream.lat,
agps3_thread.data_stream.lon,
agps3_thread.data_stream.speed,
agps3_thread.data_stream.track))
#
######
# END