# Sample code for performing obstacle avoidance #

Import necessary packages.

In [None]:
# Code adapted from: https://github.com/bitcraze/crazyflie-lib-python/blob/master/examples/autonomousSequence.py

import time
import numpy as np
import cv2
import matplotlib.pyplot as plt

# CrazyFlie imports:

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger
from cflib.positioning.position_hl_commander import PositionHlCommander

Set your group number and camera number.

In [None]:
group_number = 13

# Possibly try 0, 1, 2 ...
camera = 1

## Tune the red filtering ##

You can use the following cell to test and visualize the red filtering. This cell *not* make the drone fly. It will connect to the CrazyFlie camera and perform red filtering on the live video feed. You should use this cell to tune the HSV intervals, and then copy/paste your tuned intervals into the __check_contours__ function below. When tuning the intervals, keep in mind that the lighting in the environment can matter.

In [None]:
# get_bounding_boxes()
OBSTACLE_CONTOUR_THRESHOLD = 750

def get_bounding_boxes(frame):
    # These define the upper and lower HSV for the red obstacles.
    # Note that the red color wraps around 180, so there are two intervals.
    # Tuning of these values will vary depending on the camera.
    lb1 = (145, 35, 75)
    ub1 = (180, 255, 255)
    lb2 = (0, 75, 75)
    ub2 = (20, 255, 255)

    # Perform contour detection on the input frame.
    hsv1 = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    hsv2 = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Compute mask of red obstacles in either color range.
    mask1 = cv2.inRange(hsv1, lb1, ub1)
    mask2 = cv2.inRange(hsv2, lb2, ub2)
    # Combine the masks.
    mask = cv2.bitwise_or(mask1, mask2)
    
    # Use the OpenCV findContours function.
    # Note that there are three outputs, but we discard the first one.
    contours, hierarchy = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
    
    boundRect = []
    for i, c in enumerate(contours):
        if cv2.contourArea(contours[i]) > OBSTACLE_CONTOUR_THRESHOLD:
            boundRect.append(cv2.boundingRect(c))
    print(f"Bounding Rectangles Found: {len(boundRect)}")
    return boundRect

def get_book_box(frame):
    lb1 = (100, 170, 220)#(5, 100, 100)
    ub1 = (120, 210, 235)#(30, 200, 200)

    # Perform contour detection on the input frame.
    hsv1 = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    
    mask = cv2.inRange(hsv1, lb1, ub1)
    
    # Use the OpenCV findContours function.
    # Note that there are three outputs, but we discard the first one.
    contours, hierarchy = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
    
    # Find the largest contour (should be the book)
    largest_area = 0
    largest_ind = -1
    
    for i, c in enumerate(contours):
        area = cv2.contourArea(c)
        if area > largest_area:
            largest_area = area
            largest_ind = i
    
    return cv2.boundingRect(c) if largest_ind != -1 else None

In [None]:
import random as rng
import numpy as np
import argparse
cap = cv2.VideoCapture(camera)

while(True):
    # Capture frame-by-frame
    ret, frame = cap.read()
    boundRect = get_bounding_boxes(frame)
    
    for i in range(len(boundRect)):
        color = (0, 255, 0)
        cv2.rectangle(frame, (int(boundRect[i][0]), int(boundRect[i][1])), \
          (int(boundRect[i][0]+boundRect[i][2]), int(boundRect[i][1]+boundRect[i][3])), color, 2)
    
    boundRect = get_book_box(frame)
    if boundRect is not None:
        color = (255,0,0)
        cv2.rectangle(frame, (int(boundRect[0]), int(boundRect[1])), \
          (int(boundRect[0]+boundRect[2]), int(boundRect[1]+boundRect[3])), color, 2)
    
    # shows all the bounding boxes for obstacles and the book
    cv2.imshow("Contours", frame)
    
    # Compute
    #cv2.imshow('mask', mask)    

    # Hit q to quit.
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the capture
cap.release()
cv2.destroyAllWindows()

In [None]:
# This block allows us to take screenshots and save the image
cap = cv2.VideoCapture(camera)

while(True):
    # Capture frame-by-frame
    ret, frame = cap.read()
    
    
    cv2.imshow("Contours", frame)
    key_pressed = cv2.waitKey(1)
    if key_pressed & 0xFF == ord('p'):
        cv2.imwrite("book.png", frame)
    
    # Hit q to quit.
    if key_pressed & 0xFF == ord('q'):
        break

# Release the capture
cap.release()
cv2.destroyAllWindows()

In [None]:
# if code using the camera errors, then we call this block
cap.release()

In [None]:
# if some opencv windows linger, we can call this to destroy them
cv2.destroyAllWindows()

In [None]:
# test code for book bounding box
frame = cv2.imread('book.png')
boundRect, mask = get_book_box(frame)
print(boundRect)
if boundRect is not None:
    color = (255,0,0)
    cv2.rectangle(frame, (int(boundRect[0]), int(boundRect[1])), \
      (int(boundRect[0]+boundRect[2]), int(boundRect[1]+boundRect[3])), color, 2)
    
while True:
    cv2.imshow('mask', mask)
    cv2.imshow('frame', frame)

    # Hit q to quit.
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cv2.destroyAllWindows()

## Helper functions ##

The following cell contains some sample functions which will be useful.

In particular, __check_contours__ and __findGreatesContour__ will perform red filtering on the live camera feed and identify the obstacles. The red filtering is controlled by setting HSV intervals in the __check_contours__ function. Note that the intervals will require tuning and may vary on different drones/cameras.

The __adjust_position__ function can also be modified for performing obstacle avoidance.

In [None]:
# Get the current crazyflie position:
def position_estimate(scf):
    log_config = LogConfig(name='Kalman Variance', period_in_ms=500)
    log_config.add_variable('kalman.varPX', 'float')
    log_config.add_variable('kalman.varPY', 'float')
    log_config.add_variable('kalman.varPZ', 'float')

    with SyncLogger(scf, log_config) as logger:
        for log_entry in logger:
            data = log_entry[1]
            x = data['kalman.varPX']
            y = data['kalman.varPY']
            z = data['kalman.varPZ']
            
    print(x, y, z)
    return x, y, z


# Set the built-in PID controller:
def set_PID_controller(cf):
    # Set the PID Controller:
    print('Initializing PID Controller')
    cf.param.set_value('stabilizer.controller', '1')
    cf.param.set_value('kalman.resetEstimation', '1')
    time.sleep(0.1)
    cf.param.set_value('kalman.resetEstimation', '0')
    time.sleep(2)
    return


# Ascend and hover at 1m:
def ascend_and_hover(cf):
    # Ascend:
    # NOTE: was changed from starter code - made ascent a little smoother
    for y in range(10):
        cf.commander.send_hover_setpoint(0, 0, 0, y / 20) # vx (m/s), vy (m/s), yaw_rate (deg/s), z_height (m)
        time.sleep(0.1)
    # Hover at 0.5 meters:
    for _ in range(20):
        cf.commander.send_hover_setpoint(0, 0, 0, 0.5) # vx (m/s), vy (m/s), yaw_rate (deg/s), z_height (m)
        time.sleep(0.1)
    return


# Sort through contours in the image
def findGreatesContour(contours):
    largest_area = 0
    largest_contour_index = -1
    i = 0
    total_contours = len(contours)

    while i < total_contours:
        area = cv2.contourArea(contours[i])
        if area > largest_area:
            largest_area = area
            largest_contour_index = i
        i += 1

    #print(largest_area)

    return largest_area, largest_contour_index


# Find contours in the image
def check_contours(frame):

    print('Checking image:')

    # These define the upper and lower HSV for the red obstacles.
    # Note that the red color wraps around 180, so there are two intervals.
    # Tuning of these values will vary depending on the camera.
    lb1 = (145, 35, 75)
    ub1 = (180, 255, 255)
    lb2 = (0, 75, 75)
    ub2 = (20, 255, 255)

    # Perform contour detection on the input frame.
    hsv1 = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    hsv2 = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    # Compute mask of red obstacles in either color range.
    mask1 = cv2.inRange(hsv1, lb1, ub1)
    mask2 = cv2.inRange(hsv2, lb2, ub2)
    # Combine the masks.
    mask = cv2.bitwise_or(mask1, mask2)

    # Use the OpenCV findContours function.
    # Note that there are three outputs, but we discard the first one.
    contours, hierarchy = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
    largest_area, largest_contour_index = findGreatesContour(contours)

    print(largest_area)

    if largest_area > 100:
        return True
    else:
        return False


# Follow the setpoint sequence trajectory:
def adjust_position(cf, current_y):

    print('Adjusting position')

    steps_per_meter = int(10)
    # Set the number here (the iterations of the for-loop) to the number of side steps.
    # You may choose to tune the number and size of the steps.
    for i in range(3): 
        current_y = current_y - 1.0/float(steps_per_meter)
        position = [0, current_y, 0.5, 0.0]

        print('Setting position {}'.format(position))
        for i in range(10):
            cf.commander.send_position_setpoint(position[0],
                                                position[1],
                                                position[2],
                                                position[3]) # x_pos (m), y_pos (m), z_pos (m), yaw (deg)
            time.sleep(0.1)

    cf.commander.send_stop_setpoint() # From documentation "Send STOP setpoing, stopping the motors and (potentially) falling."
    # Make sure that the last packet leaves before the link is closed.
    # The message queue is not flushed before closing.
    time.sleep(0.1)
    return current_y


# Hover, descend, and stop all motion:
def hover_and_descend(cf):
    print('Descending:')
    # Hover at 0.5 meters:
    for _ in range(30):
        cf.commander.send_hover_setpoint(0, 0, 0, 0.5) # vx (m/s), vy (m/s), yaw_rate (deg/s), z_height (m)
        time.sleep(0.1)
    # Descend:
    for y in range(10):
        cf.commander.send_hover_setpoint(0, 0, 0, (10 - y) / 25) # vx (m/s), vy (m/s), yaw_rate (deg/s), z_height (m)
        time.sleep(0.1)
    # Stop all motion:
    for i in range(10):
        cf.commander.send_stop_setpoint() # From documentation "Send STOP setpoing, stopping the motors and (potentially) falling."
        time.sleep(0.1)
    return

## Test obstacle avoidance on the CrazyFlie ##

The following cell *will* fly the drone. Place the CrazyFlie in front of an obstacle in the netted area for testing. This cell will perform object detection and avoidance using the red filtering defined in the helper functions above.

In [None]:
import time

def hover(cf, iterations=1):
    for _ in range(iterations):
        cf.commander.send_hover_setpoint(0, 0, 0, 0.5)
        time.sleep(0.1)
    print("Hover")
    
DEBUG = False
def tprint(s):
    if DEBUG:
        print(s)
# TODO come up with better way to end a trial early (currently depending on using opencv window)
# TODO figure out camera failures

# strategy list:
# 0: just hover (debug)
# 1: move_between_boxes
CURR_STRATEGY = 1

# Set the URI the Crazyflie will connect to
uri = f'radio://0/{group_number}/2M'

# Initialize all the CrazyFlie drivers:
cflib.crtp.init_drivers(enable_debug_driver=False)

# Scan for Crazyflies in range of the antenna:
print('Scanning interfaces for Crazyflies...')
available = cflib.crtp.scan_interfaces()

# List local CrazyFlie devices:
print('Crazyflies found:')
for i in available:
    print(i[0])

fps = 20
image_size = (640, 480)
file_name = 'video_crazyflie.avi'

video = cv2.VideoWriter(file_name,cv2.VideoWriter_fourcc('M','J','P','G'), fps, image_size)

r = FrameRejection(3)

# Check that CrazyFlie devices are available:
if len(available) == 0:
    print('No Crazyflies found, cannot run example')
else:
    ## Ascent to hover; run the sequence; then descend from hover:
    # Use the CrazyFlie corresponding to team number:
    with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf:
        # Get the Crazyflie class instance:
        cf = scf.cf
        current_y = 0.0

        # Initialize and ascend:
        t = time.time()
        prev_time = t
        elapsed = time.time() - t
        ascended_bool = 0

        cap = cv2.VideoCapture(camera)
        # CHANGED FROM STARTER CODE:
        # Currently, the drone hovers until we press q on the opencv window
        while(cap.isOpened()):
            
            bruh = time.time()
            ret, frame = cap.read()
            tprint(f"Camera read: {time.time() - bruh}")
            bruh = time.time()
            
            elapsed = time.time() - t
            if(elapsed > 5.0):
                if ret:
                    cv2.imshow('frame',frame)

                    if(ascended_bool==0):
                        set_PID_controller(cf)
                        ascend_and_hover(cf)
                        ascended_bool = 1
                        curr_x, curr_y = 0, 0 # ADDED
                    else:
                        frame = r.should_reject_frame(frame)
                        if frame is not None:
                            boundRect = get_bounding_boxes(frame)
                            tprint(f"Obstacle detection: {time.time() - bruh}")
                            bruh = time.time()

                            for i in range(len(boundRect)):
                                color = (0, 255, 0)
                                cv2.rectangle(frame, (int(boundRect[i][0]), int(boundRect[i][1])), \
                                  (int(boundRect[i][0]+boundRect[i][2]), int(boundRect[i][1]+boundRect[i][3])), color, 2)

                            bookRect = get_book_box(frame)
                            if bookRect is not None:
                                color = (0,0,255)
                                cv2.rectangle(frame, (int(bookRect[0]), int(bookRect[1])), \
                                  (int(bookRect[0]+bookRect[2]), int(bookRect[1]+bookRect[3])), color, 2)

                            video.write(frame)
                            tprint(f"Video write/box drawing: {time.time() - bruh}")
                            bruh = time.time()

                            # TEST - if no obstacles, either land or center on book
                            if not boundRect:
                                # if nothing is found, hover and pray the camera finds something later
                                if not bookRect:
                                    hover(cf)
                                    tprint(f"Hover when nothing found: {time.time() - bruh}")
                                    bruh = time.time()
                                else:
                                    curr_x, curr_y, goal_reached = move_towards_book(cf, curr_x, curr_y, bookRect)
                                    tprint(f"Moving towards book: {time.time() - bruh}")
                                    bruh = time.time()
                                    if goal_reached:
                                        print("goal reached")
                                        break

                            if CURR_STRATEGY == 0:
                                hover(cf)
                                tprint(f"Hover debug: {time.time() - bruh}")
                                bruh = time.time()
                            elif CURR_STRATEGY == 1:
                                if boundRect:
                                    curr_x, curr_y = move_between_boxes(cf, curr_x, curr_y, boundRect, bookRect)
                                    print(curr_x, curr_y)
                                else:
                                    hover(cf)

                            cv2.imshow("Contours", frame)
                            tprint(f"Show frame: {time.time() - bruh}")
                        else:
                            hover(cf)
                        bruh = time.time()
                        # NOTE: DO NOT SPAM q - inputs are stored to a buffer and will cause subsequent trials to end early
                        if (cv2.waitKey(10) & 0xFF) == ord('q'):
                            print("hello")
                            break
                    
                    tprint(time.time() - prev_time)
                    prev_time = time.time()
        
        cap.release()
        video.release()
        cv2.destroyAllWindows()

        # Descend and stop all motion:
        hover_and_descend(cf)

print('Done!')

In [None]:
from collections import deque

# TUNABLE PARAMETERS
# is_forward_safe()
LB = 210 # lower bound for x coordinate of obstacle avoidance
UB = 430 # upper bound for x coordinate of obstace avoidance
WIDTH_THRESHOLD = 70 # width of obstacle in camera view

# move_towards_book()
BOOK_SIZE = 10000 # area of book before we stop moving the drone
BOOK_OFFSET = 0.1 # threshold of how far the center of the book should be from the center of the FOV before stopping
BOOK_STEP_SIZE = 0.5 # lateral movement scaling when centering on book

# move_between_boxes()
CENTRAL_GAP_WEIGHT = 1.4
FORWARD_STEP = 0.15 # distance drone moves forward
SIDE_STEP = 0.1 # distance drone moves sideways

# FrameRejection
BLUR_WINDOW = (11,11)
HISTOGRAM_BIN_SIZES = [8,8,8]
SIMILARITY_THRESHOLD = 0.75
FRAME_STRAT = 1
# 0 - Rolling window of histograms
# 1 - average frame

# transforms pixel x coordinates to normalized coordinates ([0, image width] -> [-1, 1])
def get_normalized_x(x):
    image_half_width = 320 # TODO verify actual image width
    return (x - image_half_width) / (image_half_width << 1)

# determines if the drone can move forward safely based on whether or not an obstacle is in some band in front of it
def is_forward_safe(obs):
    for o in obs:
        x, _, w, _ = o
        if LB < x < UB: # TODO see if we need to check if any part of obstacle is in band
            if min(x + w, UB) - max(x, LB)  > WIDTH_THRESHOLD: # TEST - determine if this threshold idea works
                return False
    return True

# returns updated x and y, also whether or not drone is centered on book
def move_towards_book(cf, curr_x, curr_y, book):
    dest = get_normalized_x(book[0] + book[2] / 2)
    
    # TEST - choosing distance threshold
    if book[2] * book[3] > BOOK_SIZE and abs(dest) < BOOK_OFFSET:
        return curr_x, curr_y, True
    
    new_y = curr_y - dest * BOOK_STEP_SIZE
    cf.commander.send_position_setpoint(curr_x, new_y, 0.5, 0.0)
    return curr_x, new_y, False

# TODO handle no valid gap case
# bounding boxes are (I think) in the form of (top left x, top left y, width, height)
# returns updated x and y coordinates
def move_between_boxes(cf, curr_x, curr_y, obstacles, book):
    # choose destination here
    if book is None:
        # TEST - choose the largest gap if book is not visible
        sorted_obs = sorted(obstacles, key=lambda x: x[0])
        
        if curr_y < 0.5:
            champ_gap = sorted_obs[0][0]
            dest = sorted_obs[0][0] >> 1
        else:
            next_x = 640 if len(sorted_obs) < 2 else sorted_obs[1][0]
            obs_x, _ , width, _ = sorted_obs[0]
            champ_gap = next_x - obs_x - width
            dest = (obs_x + width + next_x) >> 1
            
        for i in range(len(obstacles) - 1):
            obs_x, _ , width, _ = sorted_obs[i]
            next_x = sorted_obs[i + 1][0]
            gap = (next_x - obs_x - width) * CENTRAL_GAP_WEIGHT # TEST - weight the central gaps more
            if gap > champ_gap:
                champ_gap = gap
                dest = (obs_x + width + next_x) >> 1
        
        if curr_y > -0.5:
            # handles right most gap
            obs_x, _ , width, _ = sorted_obs[-1]
            next_x = 640 # TODO figure out image width
            gap = next_x - obs_x - width
            if gap > champ_gap:
                champ_gap = gap
                dest = (obs_x + width + next_x) >> 1
        
    else:
        # TEST - choose the book center as the destination
        dest = book[0] + (book[2] >> 1)
    
    normalized_dest = get_normalized_x(dest)
    
    new_x = curr_x + is_forward_safe(obstacles) * FORWARD_STEP
    
    new_y = curr_y - normalized_dest * SIDE_STEP # TEST - see how much drone can move laterally
    
    cf.commander.send_position_setpoint(new_x, new_y, 0.5, 0.0)
    return new_x, new_y

class FrameRejection:
    def __init__(self, buffer_size=0):
        self.first_hist = None
        self.good_frames = deque()
        self.buffer_size = buffer_size
        self.frame_count = 0
    
    def insert_frame(self, frame):
        if self.buffer_size > 0 and len(self.good_frames) == self.buffer_size:
            self.good_frames.popleft()
        self.good_frames.append(frame)
        
    def should_reject_frame(self, frame):
        frame = cv2.medianBlur(frame, 7)#cv2.GaussianBlur(frame, BLUR_WINDOW, 0) # Smooth the image from smaller noise
        
        if FRAME_STRAT == 0:
            # calculate the histogram for the current frame to create a (bin size)^3 size feature vector
            curr_hist = cv2.calcHist([frame], [0,1,2], None, HISTOGRAM_BIN_SIZES, [0,256,0,256,0,256])
            curr_hist = cv2.normalize(curr_hist, curr_hist).flatten()

            # TEST - current rejection strategy is to see if the frame is similar to a rolling window
            if self.first_hist is None:
                self.frame_count += 1
                if self.frame_count == 1:
                    return frame
                self.first_hist = curr_hist
                self.insert_frame(frame)
                return frame

            similarity = cv2.compareHist(curr_hist, self.first_hist, cv2.HISTCMP_CORREL)
            #print(f"Similarity: {similarity}")
            if similarity > SIMILARITY_THRESHOLD:
                self.insert_frame(frame)
                updated_hist = cv2.calcHist(self.good_frames, [0,1,2], None, HISTOGRAM_BIN_SIZES, [0,256,0,256,0,256])
                self.first_hist = cv2.normalize(updated_hist, updated_hist).flatten()
            
            return frame if similarity > SIMILARITY_THRESHOLD else None
        elif FRAME_STRAT == 1:
            self.insert_frame(frame)
            avg_frame = self.good_frames[0]
            for i in range(1, len(self.good_frames)):
                alpha = 1.0/(i + 1)
                beta = 1.0 - alpha
                avg_frame = cv2.addWeighted(self.good_frames[i], alpha, avg_frame, beta, 0.0)
            
            return avg_frame

In [None]:
import cv2
import time
import numpy as np
cap = cv2.VideoCapture('wtf_mae345.avi')
prev_frame = None
similarity = 1
rejections = 0
frames = 0

r = FrameRejection(10)
while cap.isOpened():
    ret, frame = cap.read()
    # if frame is read correctly ret is True
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break
    
    curr = time.time()
    frame = r.should_reject_frame(frame)
    print(f"Calc time: {time.time() - curr}")
    
    # rejects frame if similarity is not high enough
    if frame is None:
        frame = prev_frame
        rejections += 1
    
    prev_frame = frame
    
    # Code to increase contrast of the image
    # frame = cv2.convertScaleAbs(frame,alpha=2, beta=0)
    
    cv2.imshow('frame', frame)
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
    time.sleep(0.05)
    frames += 1
cap.release()
cv2.destroyAllWindows()
print(rejections)
print(frames)