地平线机器人 Sunrise™ 2 | 96Boards DragonBoard™ 410c
AP Python SDK
Horizon Hackathon 2019 三万块团队荣誉出品
本工程提供了在96Boards板上,方便快捷的在Python代码中获取地平线Sunrise™ 2芯片数据并展示的SDK。(人生苦短,我用python)
如下框图,本SDK复用了上文标准配置中的CP端库与AP端驱动,并在上层封装了Python SDK,提供API供调用获取结构化数据。
本SDK使用的系统配置与环境为2019 Hackathon标准的配置环境。可参考地平线提供的安装部署手册。
(其中ap_base.tar.gz
文件请选择【bif内核模块版本2.9.0,hbipc接口库版本2.4.0】对应文件 )
然后在AP端git clone
本项目,并将lib/lib_hbipc_ap.so
复制到系统/lib
目录下。
此后,按部署教程在CP侧执行./xpp_start.sh
启动cp侧程序,即可在ap侧执行sample
文件夹下的示例代码。
注:本项目中的全部sample均需要在AP上使用sudo命令启动,如:
sudo python3 smart.py
如需使用本SDK,只需要复制sample
文件夹中的hobotx2.so
库到任意位置,在Python代码中import:
import hobotx2 # hobotx2.so
即可使用。
因Hackathon Demo中提供的CP侧程序尚无重连机制,故每次程序重新启动均需要断电重启设备,所以强烈建议在CP/AP侧部署自启动脚本。部署步骤如下:
- 复制本SDK
init
文件夹中initx2.sh
脚本到CP侧的/etc/init.d
文件夹中。 - 在CP侧的
/etc/init.d
文件夹中执行chmod +x initx2.sh
为脚本添加可执行权限。 - 在
/etc/init.d
文件夹中执行update-rc.d initx2.sh defaults 99
添加开机启动项。
- 复制本SDK
init
文件夹中initap.sh
脚本到AP侧的/etc/init.d
文件夹中。 - 在CP侧的
/etc/init.d
文件夹中执行chmod +x initap.sh
为脚本添加可执行权限。 - 在
/etc/init.d
文件夹中执行update-rc.d initap.sh defaults 99
添加开机启动项。
我们在sample/
目录下提供了编译好的库文件hobotx2.so
,以及三个示例代码文件,分别展示在三种不同场景下的使用方式。
在本示例中,我们使用一个无限循环,接收CP发出的智能数据。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import hobotx2 # hobotx2.so
import base64
import time
import struct
hobotx2.init_smart()
while True:
err, frame = hobotx2.read_smart_frame()
if err is not 0:
print("read smart frame error:", err)
else:
print("smart frame:",base64.b64encode(frame))
print("frame len:",len(frame))
hobotx2.deinit_smart()
其中,在程序最开始,我们调用hobotx2.init_smart()
对SDK进行初始化。
在循环体中,通过hobotx2.read_smart_frame()
获得智能帧数据(byte形式的protobuf数据)。
该函数为阻塞调用,频率和视频帧率保持一致。
用户可以在循环体中,解析得到的数据并进行相关处理。
在某些场景中,我们需要边获取智能数据,边进行其他非阻塞/耗时操作,例如提供HTTP服务、处理某些硬件通信消息等。
为了实现这一目的,可以通过我们提供的X2WrapperThread
类,实现在单独的现成中进行智能数据获取与处理功能。
import hobotx2 # hobotx2.so
import threading
import time
class X2WrapperThread(threading.Thread):
def __init__(self):
hobotx2.init_smart()
threading.Thread.__init__(self)
self.cl = threading.Lock()
self.example_data = 0
self.result_data = None
def run(self):
while True:
err, frame = hobotx2.read_smart_frame()
if err is not 0:
print("read smart frame error:", err)
else:
self.cl.acquire()
# Do your stuff
print(self.example_data)
# Process the frame data
# self.result_data = DoSomethingWith(frame)
self.cl.release()
def set_data(self, data):
self.cl.acquire()
# do something else from the main thread
self.example_data = data
self.cl.release()
def get_result(self):
self.cl.acquire()
result = self.result_data # remember to deep copy when necessary
self.cl.release()
return result
# a sample test case
if __name__ == '__main__':
tr = X2WrapperThread()
tr.start()
idx = 0
while True:
time.sleep(1)
tr.set_data(idx)
idx += 1
在本示例中,我们将X2WrapperThread
继承自threading.Thread
类,在线程run()
函数中运行读取智能帧数据的循环体。
同时,外界可以通过调用set_data()
或get_result()
接口,向内部处理函数传输数据,或在外界获得智能帧处理的结果。
注意此时get_result()
的频次完全由于外部调用放来控制,不能保证和智能帧获取频率一致。如需精细控制,可自行加工处理。
很多情况下我们需要和Web前端进行通信,用于展示或者控制。
我们推荐使用网络通信库tornado
(安装:pip3 install tornado
),使用WebSocket协议进行通信。
import hobotx2 # hobotx2.so
import json
import threading
import tornado.web
import tornado.websocket
import tornado.httpserver
import tornado.ioloop
from thread_wrapper import X2WrapperThread
tr = X2WrapperThread() # use a global variable for communication between threads (you may find a better way to do this)
class WSHandler(tornado.websocket.WebSocketHandler):
def open(self):
pass
def check_origin(self, origin):
"""
Keep this return true to overcome cross-site problems
"""
return True
def on_message(self, msg):
"""
Communicate with websocket client
"""
result_data = tr.get_result() # get from processing thread
tr.set_data(0) # set to processing thread
self.write_message(json.dumps(result_data)) # send data to the client
def on_close(self):
pass
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r'/', WSHandler)
]
settings = {}
tornado.web.Application.__init__(self, handlers, **settings)
def serve_ws():
ws_app = Application()
server = tornado.httpserver.HTTPServer(ws_app)
server.listen(8080)
tornado.ioloop.IOLoop.instance().start()
if __name__ == '__main__':
tr.start()
print('serving ws')
serve_ws()
此处我们复用了前面的X2WrapperThread
类对获取智能数据的循环进行封装,使用一个单独的线程运行。
在本例中,我们使用tornado
实现了一个简单的WebSocket服务,在WS Handler中,通过访问X2WrapperThread的实例与智能数据进行交互。
hobot_x2.init_smart()
Parameters:
- void Returns:
0
if smart frame trans initialized successfully.-101
if smart frame trans has already been initialized.-21
connection to HBIPC_AP init failed.-22
provider app start failed.-23
AP <-> CP connection establish failed.
hobot_x2.deinit_smart()
Parameters:
- void Returns:
0
if smart frame trans deinitialized successfully.-31
smart frame trans was not inited.
hobot_x2.read_smart_frame()
Parameters:
- void Returns: [error code, data]
error code :
0
get smart frame success.-41
smart frame trans was not inited.- Other negative value please check Horizon Robotics SDK API.
data :
smart frame in encoded by protobuf. if error code is nagative value, data will be ''
.