# Safe Working code!

In [None]:
from djitellopy import Tello
import cv2
import pygame
import numpy as np
import time
from tflite_runtime.interpreter import Interpreter

##### TODO: UPDATE THE PARAMETERS IN THIS SECTION #####

label_path = 'dict.txt'
model_path = 'model.tflite'
confidence_threshold = 0.5

##### END OF SECTION #####

# Speed of the drone
S = 60
# Frames per second of the pygame window display
# A low number results in input lag, as input information is processed once per frame
FPS = 120
# Settings for frame size
FRAME_WIDTH = 960
FRAME_HEIGHT = 660

class FrontEnd(object):
    """
        Maintains the Tello display and moves it through the keyboard keys.
        Press escape key to quit.
        Press enter key to take a snapshot.
        The controls are:
        - T: Takeoff
        - L: Land
        - Arrow keys: Forward, backward, left, right
        - A and D: Counter clockwise, clockwise rotations (yaw)
        - W and S: Up, down
    """
    
    def __init__(self):
        # Init pygame
        pygame.init()

        # Create pygame window 
        pygame.display.set_caption("Tello video stream")
        # Set width and height
        self.screen = pygame.display.set_mode([FRAME_WIDTH, FRAME_HEIGHT])

        # Init Tello object that interacts with the Tello drone
        self.tello = Tello()

        # Drone velocities between -100~100
        self.for_back_velocity = 0
        self.left_right_velocity = 0
        self.up_down_velocity = 0
        self.yaw_velocity = 0
        self.speed = 10

        # Initialize snapshot time 
        self.last_snapshot = "N/A"
        self.text_color = (0, 0, 255)
        
        self.send_rc_control = False

        # Import labels 
        with open(label_path, "r") as f:
            self.labels = [line.strip() for line in f.readlines()]
            f.close()

        # Initialize TFLite model and allocate tensors
        self.interpreter = Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details, self.output_details = self.interpreter.get_input_details(), self.interpreter.get_output_details()

        # Create update timer
        pygame.time.set_timer(pygame.USEREVENT + 1, 1000 // FPS)

    def run(self):
        self.tello.connect()
        self.tello.set_speed(self.speed)

        # In case streaming is on. This happens when we quit this program without the escape key.
        self.tello.streamoff()
        self.tello.streamon()

        frame_read = self.tello.get_frame_read()

        should_stop = False
        while not should_stop:
            for event in pygame.event.get():
                if event.type == pygame.USEREVENT + 1:
                    self.update()
                elif event.type == pygame.QUIT:
                    should_stop = True
                elif event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_ESCAPE:
                        should_stop = True
            
            if frame_read.stopped:
                break
            
            self.screen.fill([0, 0, 0])
            
            # Read and resize image    
            original_shape = np.shape(frame_read.frame)
            input_shape = self.input_details[0]['shape']
            new_image = cv2.resize(frame_read.frame, (input_shape[1], input_shape[2]))

            # Get prediction
            self.interpreter.set_tensor(self.input_details[0]['index'], [new_image])
            self.interpreter.invoke()
            
            boxes = self.interpreter.get_tensor(self.output_details[0]['index']).squeeze()
            classes = self.interpreter.get_tensor(self.output_details[1]['index']).squeeze()
            scores = self.interpreter.get_tensor(self.output_details[2]['index']).squeeze()

            for i in range(len(scores)):
                if scores[i] > confidence_threshold:
                    # Unnormalize boundaries
                    unnormed_coords = boxes[i] * input_shape[1] 
                    start_point = (int(unnormed_coords[1]), int(unnormed_coords[0]))
                    end_point = (int(unnormed_coords[3]), int(unnormed_coords[2]))
                    # Draw bounding box 
                    drawn = cv2.rectangle(new_image, start_point, end_point, color=(0,255,0), thickness=2)
                    # Add label and score
                    img_text = f"{self.labels[int(classes[i])]}: {scores[i]:.3f}"
                    output_label = cv2.putText(new_image, img_text, (int(unnormed_coords[1]), int(unnormed_coords[0])+15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1)
                    self.handle_obstacle(int(classes[i]), scores[i])

            frame = frame_read.frame
            new_image = cv2.resize(new_image, (original_shape[1], original_shape[0]))
            # Display battery 
            text = "Battery: {}%".format(self.tello.get_battery())
            cv2.putText(new_image, text, (5, FRAME_HEIGHT - 5),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

            # Display last snapshot timing
            text = "Last snapshot: {}".format(self.last_snapshot)
            cv2.putText(new_image, text, (5, FRAME_HEIGHT - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, self.text_color, 2)
            
            self.new_image = new_image

            new_image = np.rot90(new_image)
            new_image = np.flipud(new_image)
            
            frame = pygame.surfarray.make_surface(new_image)
            self.screen.blit(frame, (0, 0))
            pygame.display.update()

            time.sleep(1 / FPS)

        # Call it always before finishing. To deallocate resources.
        self.tello.end()
    
    def update(self):
        """
            Update routine. Send velocities to Tello.
        """
        if self.send_rc_control:
            self.tello.send_rc_control(self.left_right_velocity, self.for_back_velocity, self.up_down_velocity, self.yaw_velocity)
    
    def handle_obstacle(self, label, score):
        """
            Handle the detected obstacle based on its label and score.
        """
        if score > confidence_threshold:
            if self.labels[label] == 'Gate 1':
                # Recognition of Gate 1 gap
                self.recognize_gap()
                # Fly through Gate 1 gap
                self.fly_through_gap()

            elif self.labels[label] == 'Gate 2':
                # Recognition of Gate 2 gap
                self.recognize_gap()
                # Fly through Gate 2 gap
                self.fly_through_gap()

            elif self.labels[label] == 'Pillar 3':
                # Recognition of Pillar 3
                self.recognize_pillar()
                # Fly up and land on landing pad
                self.land_on_pad()

    def recognize_gap(self):
        """
            Hover and recognize the gate gap.
        """
        self.tello.send_rc_control(0, 0, 0, 0)
        time.sleep(2)
    
    def fly_through_gap(self):
        """
            Fly through the recognized gap.
        """
        self.tello.send_rc_control(0, 50, 0, 0)
        time.sleep(2)
    
    def recognize_pillar(self):
        """
            Hover and recognize the pillar.
        """
        self.tello.send_rc_control(0, 0, 0, 0)
        time.sleep(2)
    
    def land_on_pad(self):
        """
            Fly up and land on the mission pad.
        """
        self.tello.send_rc_control(0, 0, 50, 0)
        time.sleep(2)
        self.tello.send_rc_control(0, 0, 0, 0)
        time.sleep(2)
        self.tello.land()

def main():
    frontend = FrontEnd()

    # Run frontend
    frontend.run()

if __name__ == '__main__':
    main()


In [1]:
from djitellopy import Tello
import cv2
import pygame
import numpy as np
import time
from tflite_runtime.interpreter import Interpreter

# Constants
S = 60  # Speed of the drone
FPS = 120
FRAME_WIDTH = 960
FRAME_HEIGHT = 660

# ArUco dictionary
ARUCO_DICT = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
ARUCO_PARAMS = cv2.aruco.DetectorParameters_create()

# Object detection parameters
LABEL_PATH = 'dict.txt'
MODEL_PATH = 'model.tflite'
CONFIDENCE_THRESHOLD = 0.5

class FrontEnd(object):
    def __init__(self):
        pygame.init()
        pygame.display.set_caption("Tello NYTC24 Qualifiers")
        self.screen = pygame.display.set_mode([FRAME_WIDTH, FRAME_HEIGHT])
        self.tello = Tello()

        self.for_back_velocity = 0
        self.left_right_velocity = 0
        self.up_down_velocity = 0
        self.yaw_velocity = 0
        self.speed = 10

        self.send_rc_control = False
        self.autonomous_mode = False
        self.current_obstacle = 0
        self.obstacles = ['Gate 1', 'Gate 2', 'Pillar 3']

        # Object detection setup
        with open(LABEL_PATH, "r") as f:
            self.labels = [line.strip() for line in f.readlines()]
        self.interpreter = Interpreter(model_path=MODEL_PATH)
        self.interpreter.allocate_tensors()
        self.input_details, self.output_details = self.interpreter.get_input_details(), self.interpreter.get_output_details()

        pygame.time.set_timer(pygame.USEREVENT + 1, 1000 // FPS)

    def run(self):
        self.tello.connect()
        self.tello.set_speed(self.speed)
        self.tello.streamoff()
        self.tello.streamon()

        frame_read = self.tello.get_frame_read()

        should_stop = False
        while not should_stop:
            for event in pygame.event.get():
                if event.type == pygame.USEREVENT + 1:
                    self.update()
                elif event.type == pygame.QUIT:
                    should_stop = True
                elif event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_ESCAPE:
                        should_stop = True
                    elif event.key == pygame.K_t:
                        self.tello.takeoff()
                        self.send_rc_control = True
                    elif event.key == pygame.K_l:
                        self.tello.land()
                        self.send_rc_control = False
                    elif event.key == pygame.K_a:
                        self.autonomous_mode = not self.autonomous_mode
                        print(f"Autonomous mode: {'ON' if self.autonomous_mode else 'OFF'}")

            if frame_read.stopped:
                break

            self.screen.fill([0, 0, 0])

            frame = frame_read.frame
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            if self.autonomous_mode:
                self.autonomous_navigation(frame)
            
            frame = self.detect_objects(frame)
            
            frame = np.rot90(frame)
            frame = np.flipud(frame)
            
            frame = pygame.surfarray.make_surface(frame)
            self.screen.blit(frame, (0, 0))
            pygame.display.update()

            time.sleep(1 / FPS)

        self.tello.end()

    def autonomous_navigation(self, frame):
        corners, ids, _ = cv2.aruco.detectMarkers(frame, ARUCO_DICT, parameters=ARUCO_PARAMS)
        
        if ids is not None:
            cv2.aruco.drawDetectedMarkers(frame, corners, ids)
            
            if self.current_obstacle < len(self.obstacles):
                obstacle = self.obstacles[self.current_obstacle]
                
                if obstacle == 'Gate 1' or obstacle == 'Gate 2':
                    self.navigate_through_gate(corners[0][0])
                elif obstacle == 'Pillar 3':
                    self.land_on_pillar(corners[0][0])

    def navigate_through_gate(self, corner):
        center_x = np.mean(corner[:, 0])
        center_y = np.mean(corner[:, 1])
        
        # Simple centering logic
        if center_x < FRAME_WIDTH / 2 - 50:
            self.left_right_velocity = -S
        elif center_x > FRAME_WIDTH / 2 + 50:
            self.left_right_velocity = S
        else:
            self.left_right_velocity = 0
        
        if center_y < FRAME_HEIGHT / 2 - 50:
            self.up_down_velocity = S
        elif center_y > FRAME_HEIGHT / 2 + 50:
            self.up_down_velocity = -S
        else:
            self.up_down_velocity = 0
        
        # Move forward if centered
        if abs(center_x - FRAME_WIDTH / 2) < 50 and abs(center_y - FRAME_HEIGHT / 2) < 50:
            self.for_back_velocity = S
        else:
            self.for_back_velocity = 0
        
        # Check if passed through gate
        if np.max(corner[:, 0]) < FRAME_WIDTH * 0.1 or np.min(corner[:, 0]) > FRAME_WIDTH * 0.9:
            self.current_obstacle += 1
            print(f"Passed through {self.obstacles[self.current_obstacle - 1]}")

    def land_on_pillar(self, corner):
        center_x = np.mean(corner[:, 0])
        center_y = np.mean(corner[:, 1])
        
        # Center over the pillar
        if center_x < FRAME_WIDTH / 2 - 20:
            self.left_right_velocity = -S
        elif center_x > FRAME_WIDTH / 2 + 20:
            self.left_right_velocity = S
        else:
            self.left_right_velocity = 0
        
        if center_y < FRAME_HEIGHT / 2 - 20:
            self.up_down_velocity = S
        elif center_y > FRAME_HEIGHT / 2 + 20:
            self.up_down_velocity = -S
        else:
            self.up_down_velocity = 0
        
        # If centered, start descending
        if abs(center_x - FRAME_WIDTH / 2) < 20 and abs(center_y - FRAME_HEIGHT / 2) < 20:
            self.up_down_velocity = -S // 2
        
        # Check if landed (marker fills most of the frame)
        if (np.max(corner[:, 0]) - np.min(corner[:, 0])) > FRAME_WIDTH * 0.8 and \
           (np.max(corner[:, 1]) - np.min(corner[:, 1])) > FRAME_HEIGHT * 0.8:
            self.tello.land()
            self.send_rc_control = False
            self.autonomous_mode = False
            print("Landed on Pillar 3. Mission complete!")

    def detect_objects(self, frame):
        original_shape = np.shape(frame)
        input_shape = self.input_details[0]['shape']
        resized_frame = cv2.resize(frame, (input_shape[1], input_shape[2]))

        self.interpreter.set_tensor(self.input_details[0]['index'], [resized_frame])
        self.interpreter.invoke()
        
        boxes = self.interpreter.get_tensor(self.output_details[0]['index']).squeeze()
        classes = self.interpreter.get_tensor(self.output_details[1]['index']).squeeze()
        scores = self.interpreter.get_tensor(self.output_details[2]['index']).squeeze()

        for i in range(len(scores)):
            if scores[i] > CONFIDENCE_THRESHOLD:
                ymin, xmin, ymax, xmax = boxes[i]
                xmin = int(xmin * original_shape[1])
                xmax = int(xmax * original_shape[1])
                ymin = int(ymin * original_shape[0])
                ymax = int(ymax * original_shape[0])
                
                cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
                label = f"{self.labels[int(classes[i])]}: {scores[i]:.2f}"
                cv2.putText(frame, label, (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        return frame

    def update(self):
        if self.send_rc_control:
            self.tello.send_rc_control(self.left_right_velocity, self.for_back_velocity, self.up_down_velocity, self.yaw_velocity)

def main():
    frontend = FrontEnd()
    frontend.run()

if __name__ == '__main__':
    main()

pygame 2.6.0 (SDL 2.28.4, Python 3.11.5)
Hello from the pygame community. https://www.pygame.org/contribute.html


ImportError: dlopen(/Users/saumil/anaconda3/lib/python3.11/site-packages/tflite_runtime/_pywrap_tensorflow_interpreter_wrapper.so, 0x0002): tried: '/Users/saumil/anaconda3/lib/python3.11/site-packages/tflite_runtime/_pywrap_tensorflow_interpreter_wrapper.so' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e' or 'arm64')), '/System/Volumes/Preboot/Cryptexes/OS/Users/saumil/anaconda3/lib/python3.11/site-packages/tflite_runtime/_pywrap_tensorflow_interpreter_wrapper.so' (no such file), '/Users/saumil/anaconda3/lib/python3.11/site-packages/tflite_runtime/_pywrap_tensorflow_interpreter_wrapper.so' (mach-o file, but is an incompatible architecture (have 'x86_64', need 'arm64e' or 'arm64'))