-
Notifications
You must be signed in to change notification settings - Fork 6
/
Asynclidar_drive.py
123 lines (110 loc) · 4.06 KB
/
Asynclidar_drive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
'''Records measurments to a given file. Usage example:
$ ./record_measurments.py out.txt'''
import sys
import time
import asyncio
from rplidar import RPLidar
import PyCmdMessenger
import math
angle_offset = 45
gain = 2
speed = 70
left = 0
right = 0
start = time.time()
stop = False
PORT_NAME = '/dev/ttyUSB0' # this is for the Lidar
# Initialize an ArduinoBoard instance. This is where you specify baud rate and
# serial timeout. If you are using a non ATmega328 board, you might also need
# to set the data sizes (bytes for integers, longs, floats, and doubles).
myarduino = PyCmdMessenger.ArduinoBoard("/dev/ttyACM0",baud_rate=9600)
# List of commands and their associated argument formats. These must be in the
# same order as in the sketch.
commands = [["motors","iii"], # motor power, left, right
["get_sonar",""], # commands a sonar sample
["sonar","i"], # result of sonar sample
["sonar_angle","i"], # commands a sonar servo angle
["line_tracker","iiii"], # output of line-follower senors
["ir_in","f"], # commands from IR remote control
["error","s"]]
# Initialize the messenger
c = PyCmdMessenger.CmdMessenger(myarduino,commands)
def tic():
return 'at %1.1f seconds' % (time.time() - start)
async def scan(lidar):
print('10Hz loop started work: {}'.format(tic()))
global average_angle, stop
time1 = time.time()
while True:
counter = 0
print('Recording measurements... Press Crl+C to stop.')
data = 0
lasttime = time.time()
for measurment in lidar.iter_measurments():
await asyncio.sleep(0) # this allows other threads to run
if stop == True:
lidar.stop()
lidar.stop_motor()
c.send("motors",0,0,0) # turn off wheel motors
lidar.disconnect()
ioloop.stop
task.cancel(scan(lidar))
break
if (measurment[2] > 0 and measurment[2] < 90): # in angular range
if (measurment[3] < 1000 and measurment[3] > 100): # in distance range
data = data + measurment[2] # angle
counter = counter + 1 # increment counter
if time.time() > (lasttime + 0.1):
print("this should happen ten times a second")
if counter > 0: # this means we see something
average_angle = data/counter
print ("Average Angle: ", average_angle-angle_offset)
counter = 0
lasttime = time.time() # reset 10Hz timer
async def drive():
global speed, gain, angle_offset, average_angle
if stop == True:
task.cancel(drive())
ioloop.stop
if time.time() > time2 + 0.1: # run this ten times a second
# Send motor commands
steer = int(100*math.atan(math.radians(average_angle-angle_offset)))
print ("steer: ", steer * gain)
if steer <= 0:
right = -1 * steer * gain
left = 0
else:
left = steer * gain
right = 0
print (speed, left, right)
c.send("motors",speed,left,right)
print('Drive command sent: {}'.format(tic()))
time2 = time.time()
await asyncio.sleep(0) # go back to running other threads
def run():
'''Main function'''
global stop
if stop == False:
lidar = RPLidar(PORT_NAME)
lidar.start_motor()
time.sleep(1)
info = lidar.get_info()
print(info)
ioloop = asyncio.get_event_loop()
tasks = [
ioloop.create_task(scan(lidar)),
ioloop.create_task(drive())
]
try:
ioloop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt:
print('Stopping.')
stop = True
lidar.stop()
lidar.stop_motor()
c.send("motors",0,0,0) # turn off wheel motors
lidar.disconnect()
ioloop.close()
if __name__ == '__main__':
run()