# Lab 19B

This lab uses voice recognition to control the Tello Drone. It will not run in Windows. 

Open this file in VSCode or Kaggle.com in the Raspberry Pi 400. Then follow the instructions as written below:

Step 1: Install the software pre-requisites. This Jupyter file will work in Windows and Raspberry Pi.

In [5]:
#Run to install software pre-resuisites for this lab. This needs to be done only once. All software is compatible with both windows and Pi 3 and above.

import platform
import os

# Detect the operating system
current_platform = platform.system()

if current_platform == "Windows":
    # Windows-specific commands
    os.system('pip install pipwin openai whisper torch')
    os.system('pipwin install pyaudio')
    os.system('python -m pip install PyAudio')
    os.system('pip install requests djitellopy google-generativeai SpeechRecognition pydub audioread soundfile python3-pyaudio')
    
    print("Installation completed for Windows. Note: tesseract-ocr may require manual installation on Windows.")
elif current_platform == "Linux":  # Pi 400
    # Pi-specific commands

    os.system('sudo pip3 install requests opencv-python djitellopy google-generativeai SpeechRecognition openai whisper torch --break-system-packages')
    os.system('sudo apt-get install python3-pyaudio flac portaudio19-dev ffmpeg tesseract-ocr -y')         
    
    print("Installation completed for Raspberry Pi.")
else:
    print("Unsupported platform. Please ensure you are running this on Windows or Raspberry Pi.")
print("Installation completed.")


Installation completed for Windows. Note: tesseract-ocr may require manual installation on Windows.
Installation completed.


Step 2: Run this code ONLY if memory address of the drone is being used and drone refuses to run.

In [None]:
#This program will kill the Kernel causing it to reset. Use when necessary (such as between code executions - when the drone is not responding)
import os
import platform
import subprocess
import sys




def reset_kernel():
    
      #Detect the operating system and reset the Jupyter Notebook kernel accordingly.
    
    os_name = platform.system()
    print(f"Detected OS: {os_name}")
    if os_name == "Linux":  # Assuming Raspberry Pi 400 runs a Linux-based OS
        try:
            # Run the kill command for Python processes in Linux
            print("Resetting kernel for Pi 400 (Linux)...")
            result = subprocess.run("kill -9 $(ps -A | grep python | awk '{print $1}')", shell=True, check=True)
            print("Kernel reset successfully!")
        except subprocess.CalledProcessError as e:
            print(f"Failed to reset kernel on Pi 400: {e}")
    elif os_name == "Windows":
        try:
            # Use os._exit to exit the Python process on Windows
            print("Resetting kernel for Windows...")
            os._exit(0)
        except Exception as e:
            print(f"Failed to reset kernel on Windows: {e}")
    else:
        print("Unsupported operating system.")

# Run the kernel reset function
    reset_kernel()

Step 3: This code will connect the tello drone and the microphone. In the event that the Tello drone is not detected, it will take speech inputs from the microphone and will output the drone comamnds that match the closest command. In the event the drone and mic is connected, it will do what is being said over the microphone. For example, if you say "tak-off" the drone will take-off. Run this code and write your observation:

In [None]:
#Controls the drone via voice. Works even when drone not connected via fallback via microphone.



from tkinter import Tk, Button, Label
from djitellopy import Tello
import speech_recognition as sr
from difflib import get_close_matches
import requests

# Function to check for Tello drone connection
def is_tello_connected():
    try:
        tello = Tello()
        tello.connect()
        tello.streamon()
        return True
    except Exception as e:
        print(f"Tello connection failed: {e}")
        return False

# Function to check for USB microphone connection
def is_microphone_connected():
    recognizer = sr.Recognizer()
    microphones = sr.Microphone.list_microphone_names()
    for mic in microphones:
        print(f"Found microphone: {mic}")
        if "usb" in mic.lower() or "microphone" in mic.lower():
            return True
    return False

# Function to transcribe speech to text
def transcribe_speech():
    recognizer = sr.Recognizer()
    try:
        with sr.Microphone() as source:
            status_label.config(text="Listening... Please speak.")
            audio = recognizer.listen(source)
            recognized_text = recognizer.recognize_google(audio, language="en-US")
            status_label.config(text=f"You said: {recognized_text}")
            match_drone_command(recognized_text)
    except sr.UnknownValueError:
        status_label.config(text="Sorry, could not understand the speech.")
    except sr.RequestError as e:
        status_label.config(text=f"Speech recognition service error: {e}")

# Function to match the recognized text with the closest drone command
def match_drone_command(spoken_text):
    try:
        response = requests.get("https://raw.githubusercontent.com/clarence1979/TelloDroneEduTech2024/main/tello_basic.txt")
        if response.status_code == 200:
            drone_commands = response.text.splitlines()
            closest_match = get_close_matches(spoken_text, drone_commands, n=1, cutoff=0.2)
            if closest_match:
                drone_command_label.config(text=f"Closest Drone Command: {closest_match[0]}")
            else:
                drone_command_label.config(text="No matching drone command found.")
        else:
            drone_command_label.config(text="Failed to fetch drone commands.")
    except Exception as e:
        drone_command_label.config(text=f"Error fetching drone commands: {e}")

# Initialize GUI
window = Tk()
window.title("Tello Drone and Microphone")
window.geometry("600x400")

# Check connections
tello_connected = is_tello_connected()
microphone_connected = is_microphone_connected()

if tello_connected:
    status_label = Label(window, text="Tello drone is connected. Ready to operate!", font=("Helvetica", 12), fg="green")
    status_label.pack(pady=10)

    # Additional Tello functionality can be added here
else:
    if microphone_connected:
        status_label = Label(window, text="No Tello drone found. USB microphone is connected.", font=("Helvetica", 12), fg="orange")
        status_label.pack(pady=10)

        # Button to transcribe speech
        speak_button = Button(window, text="Speak and Match Command", command=transcribe_speech)
        speak_button.pack(pady=20)

        drone_command_label = Label(window, text="", font=("Helvetica", 12), fg="blue", wraplength=500)
        drone_command_label.pack(pady=10)
    else:
        status_label = Label(window, text="Neither Tello drone nor USB microphone is connected.", font=("Helvetica", 12), fg="red")
        status_label.pack(pady=10)

window.mainloop()



Step 4: Does the same thing as above, but does not have a fallback. Drone needs to be connected.

In [None]:
#Does the same thing as above, but does not have a fallback. Drone needs to be connected.

import re
import os
import json
from djitellopy import Tello
import speech_recognition as sr
import openai
import subprocess
import socket

def free_port(port):
    try:
        # Find the process using the port
        command = f"netstat -ano | findstr :{port}"
        result = subprocess.check_output(command, shell=True).decode().strip()
        if result:
            print(f"Port {port} is in use. Attempting to free it...")
            # Extract PID from the result
            lines = result.split("\n")
            for line in lines:
                if f":{port}" in line:
                    pid = line.split()[-1]
                    # Kill the process using the PID
                    os.system(f"taskkill /PID {pid} /F")
                    print(f"Process with PID {pid} terminated.")
        else:
            print(f"Port {port} is not in use.")
    except Exception as e:
        print(f"Error: {e}")
    
free_port(8889)

# Configure the OpenAI API key
openai.api_key = os.getenv("OPENAI_API_KEY")  # Ensure the key is set in the environment variable

print("Initializing OpenAI GPT-based Drone Assistant...")

chat_history = [
    {
        "role": "system",
        "content": """You are an assistant helping me control the Tello drone using the djitellopy library. When I ask you to do something, you are supposed to give me Python code 
                      that is needed to achieve that task using the djitellopy library and then an explanation of what that code does. You are only allowed to use the functions provided by the djitellopy library.
                      You are not to use any other hypothetical functions that you think might exist.""",
    }
]

def ask(prompt):
    try:
        # Send the prompt to OpenAI API
        messages = chat_history + [{"role": "user", "content": prompt}]
        response = openai.ChatCompletion.create(
            model="gpt-4",  # Replace with "gpt-3.5-turbo" if using GPT-3.5
            messages=messages,
            max_tokens=500,
        )
        
        assistant_message = response['choices'][0]['message']['content']
        
        # Append interaction to chat history
        chat_history.append({"role": "user", "content": prompt})
        chat_history.append({"role": "assistant", "content": assistant_message})
        
        return assistant_message
    except Exception as e:
        print("Error during model interaction:", e)
        return "I'm sorry, I couldn't generate a response."

print("Done.")

code_block_regex = re.compile(r"```(.*?)```", re.DOTALL)

def extract_python_code(content):
    code_blocks = code_block_regex.findall(content)
    if code_blocks:
        full_code = "\n".join(code_blocks)
        if full_code.startswith("python"):
            full_code = full_code[7:]
        return full_code
    else:
        return None

class colors:  
    RED = "\033[31m"
    ENDC = "\033[m"
    GREEN = "\033[32m"
    YELLOW = "\033[33m"
    BLUE = "\033[34m"

print("Initializing drone...")

drone = Tello()
drone.connect()
print(f"Battery level: {drone.get_battery()}%")

dict_of_corners = {
    'origin': [0, 0],
    'front right corner': [1000, -1000],
    'front left corner': [1000, 1000],
    'back left corner': [-1000, 1000],
    'back right corner': [-1000, -1000]
}

print("Done.")

# Load prompt from file
prompt_file = "tello_basic.txt"
with open(prompt_file, "r") as f:
    prompt = f.read()

ask(prompt)

print("Welcome to the Tello drone chatbot! I am ready to help you with your Tello questions and commands.")

# Main loop for user input via voice
recognizer = sr.Recognizer()
microphone = sr.Microphone()

while True:
    print(colors.YELLOW + "Tello Drone> " + colors.ENDC + "Listening for your command...")

    with microphone as source:
        recognizer.adjust_for_ambient_noise(source)
        audio = recognizer.listen(source)

    try:
        question = recognizer.recognize_google(audio)
        print(colors.YELLOW + "You said: " + colors.ENDC + question)

        if question.lower() in ["quit", "exit"]:
            break

        if question.lower() == "clear":
            os.system("cls" if os.name == 'nt' else 'clear')
            continue

        response = ask(question)
        print(f"\n{response}\n")

        code = extract_python_code(response)
        if code is not None:
            print("Extracted code:", code)
            print("Please wait while I run the code with the Tello drone...")
            exec(code)
            print("Done!\n")

    except sr.UnknownValueError:
        print(colors.RED + "Sorry, I did not understand that." + colors.ENDC)
    except sr.RequestError as e:
        print(colors.RED + f"Could not request results; {e}" + colors.ENDC)

drone.end()


Step 5: After copying all files into the same folder, connect your Microphone (or webcam with Microphone to your Pi 400) and run the code in the Pi 400 after connecting ti to your drone. The program (from stap 3) will obey your verbal command to control the drone. The program (in step 4) will "nod" or "shake its head" when you ask it a question.

Take a video of both programs working and upload to SEQTA as evidence of compeltion. You can work in a group of 2 but each person must upload the video to SEQTA. There must not be more than 2 people per group. 

In [1]:
#Version 2

import os
import time
from djitellopy import Tello
import speech_recognition as sr
import openai

import subprocess
import socket

def free_port(port):
    try:
        # Find the process using the port
        command = f"netstat -ano | findstr :{port}"
        result = subprocess.check_output(command, shell=True).decode().strip()
        if result:
            print(f"Port {port} is in use. Attempting to free it...")
            # Extract PID from the result
            lines = result.split("\n")
            for line in lines:
                if f":{port}" in line:
                    pid = line.split()[-1]
                    # Kill the process using the PID
                    os.system(f"taskkill /PID {pid} /F")
                    print(f"Process with PID {pid} terminated.")
        else:
            print(f"Port {port} is not in use.")
    except Exception as e:
        print(f"Error: {e}")
    
free_port(8889)

# Configure OpenAI API Key
openai.api_key = os.getenv("OPENAI_API_KEY")  # Ensure the API key is set in the environment variables

def get_openai_response(question):
    """
    Use OpenAI's GPT model to get a 'yes' or 'no' response.
    """
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",  # Use GPT-4 or GPT-3.5-turbo
            messages=[
                {"role": "system", "content": "You are an assistant that answers only 'yes' or 'no' to questions."},
                {"role": "user", "content": question}
            ],
            max_tokens=5
        )
        model_output = response['choices'][0]['message']['content'].strip().lower()

        # Process the model's output to ensure a proper format
        if "yes" in model_output:
            return "yes"
        elif "no" in model_output:
            return "no"
        else:
            return "i don't understand"
    except Exception as e:
        print(f"Error with OpenAI API: {e}")
        return "i don't understand"

# Tello drone control
def control_tello(response):
    """
    Control the Tello drone based on the response ('yes' or 'no').
    """
    drone = Tello()
    drone.connect()
    drone.takeoff()

    if response.lower() == "yes":
        # Nod motion
        for _ in range(2):
            print("Nodding head...")
            drone.send_rc_control(0, -80, 0, 0)  # Pitch forward
            time.sleep(0.5)
            drone.send_rc_control(0, 80, 0, 0)   # Pitch backward
            time.sleep(0.5)
        drone.send_rc_control(0, 0, 0, 0)
        
    elif response.lower() == "no":
        # Shake head motion
        for _ in range(2):
            print("Shaking head...")
            drone.send_rc_control(0, 0, 0, -80)  # Yaw left
            time.sleep(0.5)
            drone.send_rc_control(0, 0, 0, 80)   # Yaw right
            time.sleep(0.5)
        drone.send_rc_control(0, 0, 0, 0)
    
    drone.land()

# Function to capture verbal questions
def get_verbal_question():
    """
    Capture a verbal question using the microphone.
    """
    recognizer = sr.Recognizer()
    with sr.Microphone() as source:
        print("Listening...")
        audio = recognizer.listen(source)

    try:
        question = recognizer.recognize_google(audio)
        print(f"You said: {question}")
        return question
    except sr.UnknownValueError:
        print("Sorry, could not understand audio")
        return ""
    except sr.RequestError as e:
        print(f"Could not request results from Google Speech Recognition service; {e}")
        return ""

# Main program loop
def main():
    """
    Main function to capture verbal input, process it using OpenAI API, and control the Tello drone.
    """
    question = get_verbal_question()
    if question.lower() in ['exit', 'quit', 'bye']:
        print("Exiting...")
        return

    response = get_openai_response(question)
    print(f"OpenAI response: {response}")
    control_tello(response)

# Entry point
if __name__ == "__main__":
    main()


Error: Command 'netstat -ano | findstr :8889' returned non-zero exit status 1.
Listening...
You said: is Donald Trump a girl


[INFO] tello.py - 129 - Tello instance was initialized. Host: '192.168.10.1'. Port: '8889'.
[INFO] tello.py - 438 - Send command: 'command'
[INFO] tello.py - 462 - Response command: 'ok'
[INFO] tello.py - 438 - Send command: 'takeoff'


OpenAI response: no


[INFO] tello.py - 462 - Response takeoff: 'ok'
[INFO] tello.py - 471 - Send command (no response expected): 'rc 0 0 0 -80'


Shaking head...


[INFO] tello.py - 471 - Send command (no response expected): 'rc 0 0 0 80'
[INFO] tello.py - 471 - Send command (no response expected): 'rc 0 0 0 -80'


Shaking head...


[INFO] tello.py - 471 - Send command (no response expected): 'rc 0 0 0 80'
[INFO] tello.py - 471 - Send command (no response expected): 'rc 0 0 0 0'
[INFO] tello.py - 438 - Send command: 'land'
[INFO] tello.py - 462 - Response land: 'ok'
