In [None]:
'''
测试用服务端
'''


import cv2
import time
import base64
import asyncio
import websockets
import numpy as np

from djitellopy import Tello
from pynq_dpu import DpuOverlay


class TelloDpuServer:
    def __init__(self, dpu_bitfile, model_file, websocket_port=5678):
        # 硬件初始化
        self.overlay = DpuOverlay(dpu_bitfile)
        self.overlay.load_model(model_file)
        self.dpu = self.overlay.runner

        # 获取输入输出tensor
        self.inputTensors = self.dpu.get_input_tensors()
        self.outputTensors = self.dpu.get_output_tensors()

        # 定义输入输出形状
        self.shapeIn = tuple(self.inputTensors[0].dims)
        self.shapeOut = tuple(self.outputTensors[0].dims)

        # 创建输入输出数据
        self.input_data = [np.empty(self.shapeIn, dtype=np.float32, order="C")]
        self.output_data = [np.empty(self.shapeOut, dtype=np.float32, order="C")]

        # 连接Tello无人机
        self.tello = Tello(scale=6)
        self.tello.connect()
        self.tello.streamon()

        # WebSocket配置
        self.websocket_port = websocket_port

    async def handle_commands(self, message):
        """
        处理来自客户端的控制命令。
        """
        match message:
            case "takeoff":
                self.tello.takeoff()
                print('take')
            case "land":
                self.tello.land()
            case "moveLeft":
                self.tello.move_left(10)
            case "moveRight":
                self.tello.move_right(10)
            case "moveForward":
                self.tello.move_forward(10)
            case "moveBackward":
                self.tello.move_back(10)
            case "ascend":
                self.tello.move_up(10)
            case "descend":
                self.tello.move_down(10)
            case "rotateClockwise":
                self.tello.rotateClockwise(10)
            case "rotateCounterclockwise":
                self.tello.rotateCounterClockwise(10)

    async def video_stream(self, websocket, path):
        """
        循环获取视频帧并处理。
        """
        while True:
            # 检查是否有命令从客户端发送
            try:
                message = await asyncio.wait_for(websocket.recv(), timeout=0.01)
                await self.handle_commands(message)
            except asyncio.TimeoutError:
                # 超时后继续执行视频流处理，不影响整体循环
                pass

            # 获取Tello视频帧并处理
            frame1 = cv2.resize(self.tello.get_frame_read().frame, (256, 256))
            in_img = cv2.cvtColor(np.array(frame1, dtype='float32'), cv2.COLOR_RGB2BGR)
            self.input_data[0] = in_img

            # 模型预测
            job_id = self.dpu.execute_async(self.input_data, self.output_data)
            self.dpu.wait(job_id)

            # 获取模型预测后的帧
            frame2 = np.array(self.output_data).squeeze()

            # 转换颜色空间和裁剪
            rgb_frame1 = in_img
            rgb_frame2 = frame2.clip(0, 255)

            # 编码两个视频帧
            _, end1 = cv2.imencode('.jpg', rgb_frame1)
            _, end2 = cv2.imencode('.jpg', rgb_frame2)

            # 进行base64编码
            jpg1_as_text = base64.b64encode(end1).decode('utf-8')
            jpg2_as_text = base64.b64encode(end2).decode('utf-8')

            # 将两个视频帧发送给客户端
            await websocket.send(jpg1_as_text + '||' + jpg2_as_text)
        

    async def start_server(self):
        """
        启动WebSocket服务器。
        """
        server = websockets.serve(self.video_stream, "localhost", self.websocket_port)
        await server

    def run(self):
        """
        启动事件循环。
        """
        asyncio.get_event_loop().run_until_complete(self.start_server())
        asyncio.get_event_loop().run_forever()

if __name__ == '__main__':
    server = TelloDpuServer(dpu_bitfile="dpu.bit", model_file="sesr_model_best.xmodel")
server.run()

In [None]:
'''
实际运行用服务端
'''

import cv2
import time
import base64
import asyncio
import websockets
import numpy as np

from djitellopy import Tello
from pynq_dpu import DpuOverlay


class TelloDpuServer:
    def __init__(self, dpu_bitfile, model_file, websocket_port=5678):
        # 硬件初始化
        self.overlay = DpuOverlay(dpu_bitfile)
        self.overlay.load_model(model_file)
        self.dpu = self.overlay.runner

        # 获取输入输出tensor
        self.inputTensors = self.dpu.get_input_tensors()
        self.outputTensors = self.dpu.get_output_tensors()

        # 定义输入输出形状
        self.shapeIn = tuple(self.inputTensors[0].dims)
        self.shapeOut = tuple(self.outputTensors[0].dims)

        # 创建输入输出数据
        self.input_data = [np.empty(self.shapeIn, dtype=np.float32, order="C")]
        self.output_data = [np.empty(self.shapeOut, dtype=np.float32, order="C")]

        # 连接Tello无人机
        self.tello = Tello(scale=6)
        self.tello.connect()
        self.tello.streamon()

        # WebSocket配置
        self.websocket_port = websocket_port

    async def handle_commands(self, message):
        """
        处理来自客户端的控制命令。
        """
        match message:
            case "takeoff":
                self.tello.takeoff()
                print('take')
            case "land":
                self.tello.land()
            case "moveLeft":
                self.tello.move_left(10)
            case "moveRight":
                self.tello.move_right(10)
            case "moveForward":
                self.tello.move_forward(10)
            case "moveBackward":
                self.tello.move_back(10)
            case "ascend":
                self.tello.move_up(10)
            case "descend":
                self.tello.move_down(10)
            case "rotateClockwise":
                self.tello.rotateClockwise(10)
            case "rotateCounterclockwise":
                self.tello.rotateCounterClockwise(10)

    async def video_stream(self, websocket, path):
        """
        循环获取视频帧并处理。
        """
        while True:
            # 检查是否有命令从客户端发送
            try:
                message = await asyncio.wait_for(websocket.recv(), timeout=0.01)
                await self.handle_commands(message)
            except asyncio.TimeoutError:
                # 超时后继续执行视频流处理，不影响整体循环
                pass

            # 获取Tello视频帧并处理
            frame1 = cv2.resize(self.tello.get_frame_read().frame, (256, 256))
            in_img = cv2.cvtColor(np.array(frame1, dtype='float32'), cv2.COLOR_RGB2BGR)
            self.input_data[0] = in_img

            # 模型预测
            job_id = self.dpu.execute_async(self.input_data, self.output_data)
            self.dpu.wait(job_id)

            # 获取模型预测后的帧
            frame2 = np.array(self.output_data).squeeze()

            # 转换颜色空间和裁剪
            rgb_frame2 = frame2.clip(0, 255)

            # 编码两个视频帧
            _, end2 = cv2.imencode('.jpg', rgb_frame2)

            # 进行base64编码
            jpg2_as_text = base64.b64encode(end2).decode('utf-8')

            # 将两个视频帧发送给客户端
            await websocket.send(jpg2_as_text)

    async def start_server(self):
        """
        启动WebSocket服务器。
        """
        server = websockets.serve(self.video_stream, "localhost", self.websocket_port)
        await server

    def run(self):
        """
        启动事件循环。
        """
        asyncio.get_event_loop().run_until_complete(self.start_server())
        asyncio.get_event_loop().run_forever()


if __name__ == '__main__':
    server = TelloDpuServer(dpu_bitfile="dpu.bit", model_file="sesr_model_best.xmodel")
server.run()