 ## 실행 방법: 아래의 1, 2번 코드 셀을 실행하면 됩니다. 
 - 코드 수정 혹은 프로세스 종료 후 다시 로봇을 실행하고 싶다면, 3번 코드 셀을 실행 하고 2번 셀을 실행하면 됩니다.

### 1. 라이브러리와 영상처리 함수, PID 클래스 구현

In [17]:
# 라이브러리와 필요한 함수들, 파라미터
# IPython Libraries for display and widgets
import traitlets
import ipywidgets.widgets as widgets
from IPython.display import display

# Camera and Motor Interface for JetBot
from jetbot import Robot, Camera, bgr8_to_jpeg
from SCSCtrl import TTLServo

# for mqtt
import paho.mqtt.client as mqtt

# Python basic packages for image anno tation
import cv2
import imutils
import numpy as np
import time
import threading
from uuid import uuid1
import datetime
import os
import json
import glob
import ast

# Calibration 파라미터
intrinsic_parameter_path = "./parameters/intrinsic_parameter.npy"
dist_coeffs_path = "./parameters/distortion_coefficients.npy"
intrinsic_parameter = np.load(intrinsic_parameter_path)
dist_coeffs = np.load(dist_coeffs_path)


# 검은색의 RGB, HSV 범위 설정 (필요에 따라 조정)
lower_rgb = np.array([0, 0, 0])
upper_rgb = np.array([100, 100, 100])
black_lower_hsv = np.array([0, 0, 0])
black_upper_hsv = np.array([180, 66, 117])

# 검은색 제외 색상의 HSV에서 H
colors_hue = {
    "red": [0, 10],
    "green": [50, 80],
    "blue": [80, 110],
    "purple": [115, 130],
    "yellow": [25, 30],
    "orange": [10, 20],
    "black": [0, 359]  # 검정은 모든 H
}

# 영상처리용 변수 모음
min_area = 400  # 최소 면적. 이거보다 작으면 노이즈로 판단한다.
s_threshold = 50  # 채도 임계값 (최소)
v_threshold = 200  # 명도 임계값 (최대)
global_threshold = 50 # excute 함수에서 임계값
width, height = 300, 300

# 제어에서 쓰일 변수
x_moment, y_moment = 150, 150
flag = 0
mid = 112

# 새로운 사다리꼴 모양의 vertices 정의 (300x300 해상도 기준)
vertices = np.array([[(width * 0.1, height), (width * 0.3, height * 0.3), (width * 0.7, height * 0.3), (width * 0.9, height)]], dtype=np.int32)


# 왜곡 보정해주는 함수
def callibration(img):
    global intrinsic_parameter, dist_coeffs
    undistorted_image = cv2.undistort(img, intrinsic_parameter, dist_coeffs)
    return undistorted_image


# ROI 추출 함수
def region_of_interest(img, vertices):
    mask = np.zeros_like(img)
    if len(img.shape) > 2:  # Color 이미지인 경우
        channel_count = img.shape[2]
        ignore_mask_color = (255,) * channel_count
    else:
        ignore_mask_color = 255
    
    cv2.fillPoly(mask, vertices, ignore_mask_color)  # 다각형 내부를 흰색으로 채우기
    roi_image = cv2.bitwise_and(img, mask)           # 입력 이미지와 마스크 비트 연산
    return roi_image 


# HSV로 색상 마스킹 함수
def mask_hsv(color_name, img):
    global colors, black_lower_hsv, black_upper_hsv
    
    if color_name not in colors_hue:
        raise ValueError(f"Color '{color_name}' not found in the colors list")
    
    h_lower, h_upper = colors_hue[color_name][0], colors_hue[color_name][1]
    
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    h, s, v = cv2.split(hsv)
    
    if color_name == "black":
        mask = cv2.inRange(hsv, black_lower_hsv, black_upper_hsv)
    else:
        masks = []
        h_lower, h_upper = colors_hue[color_name][0], colors_hue[color_name][1]
        h_mask = cv2.inRange(h, h_lower, h_upper)
        masks.append(h_mask)
        
        mask = cv2.bitwise_or(masks[0], masks[1]) if len(masks) > 1 else masks[0]

        # 흰색, 검은색 노이즈 제외를 위한 채도와 명도 마스크 추가
        s_mask = cv2.inRange(s, s_threshold, 255)
        v_mask = cv2.inRange(v, 0, v_threshold)
        mask = cv2.bitwise_and(mask, s_mask)
        mask = cv2.bitwise_and(mask, v_mask)
    
    masked_img = cv2.bitwise_and(img, img, mask=mask)
    return masked_img


# RGB로 검은색만 마스킹하는 함수
def mask_rgb(img, lower_rgb, upper_rgb):
    global colors
    mask_black = cv2.inRange(img, lower_rgb, upper_rgb)
    masked_img = cv2.bitwise_and(img, img, mask=mask_black)
    return masked_img

# 음성에서 파싱하고 첫번째 인자만 반환하는 함수
def parse_and_get_first(input_str):
    try:
        # 문자열의 불필요한 공백 제거
        clean_str = input_str.strip()
        # 문자열의 시작과 끝에 있는 대괄호와 따옴표 제거
        if clean_str.startswith("[") and clean_str.endswith("]"):
            clean_str = clean_str[1:-1]
        
        # 쉼표로 구분하여 리스트로 변환
        parsed_list = [s.strip(" '\"") for s in clean_str.split(',')]
        
        # 첫 번째 요소 반환
        if parsed_list:
            first_element = parsed_list[0]
            # 첫 번째 요소의 불필요한 따옴표 제거
            first_element = first_element.strip(" '\"")
            first_element = first_element.replace('[', '').replace(']', '').replace("'", '')
            return first_element
        else:
            return None
    except Exception as e:
        print(f"Error parsing input string: {e}")
        return None
    
##########################################################################    
class PIDController:
    def __init__(self, Kp, Ki, Kd, target):
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.target = target
        self.integral = 0
        self.previous_error = 0

    def update(self, x_moment, dt):
        error = self.target - x_moment
        self.integral += error * dt
        derivative = (error - self.previous_error) / dt
        self.previous_error = error
        return self.Kp * error + self.Ki * self.integral + self.Kd * derivative

def pid_control(x_moment, pid, dt=0.1):
    # 최대 및 최소 속도 설정
    max_speed = 0.5
    min_speed = 0.0

    # 속도 범위 설정 (0.0에서 0.5까지)
    speed_levels = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5]

    # PID 제어 값 계산
    correction = pid.update(x_moment, dt)

    # 왼쪽, 오른쪽 바퀴 속도 계산
    left_speed = max(min_speed, min(max_speed, 0.5 - correction))
    right_speed = max(min_speed, min(max_speed, 0.5 + correction))

    # 속도를 가장 가까운 speed_levels 값으로 반올림
    left_speed = min(speed_levels, key=lambda x: abs(x - left_speed))
    right_speed = min(speed_levels, key=lambda x: abs(x - right_speed))

    return left_speed, right_speed

# PID 컨트롤러 초기화
Kp = 0.005
Ki = 0.0001
Kd = 0.0005
dt = 0.02
target = 112  # 중앙값으로 설정


def lets_dance():
    # 서보모터의 최소 및 최대 각도 설정
    min_angle = -90
    max_angle = 0
    initial_angle_2 = -20
    initial_angle_4 = -20

    # 서보모터 움직임을 위한 각도 설정
    dance_moves = [
        (30, 30),  # (servo2_target_angle, servo4_target_angle)
        (-30, -30),
        (30, 30),
        (-30, -30),
        (30, 30),
        (-30, -30)
    ]

    for move in dance_moves:
        angle_2, angle_4 = move
        
        # 서보모터 2와 4를 설정된 각도로 움직임
        TTLServo.servoAngleCtrl(2, angle_2, 1, 600)
        TTLServo.servoAngleCtrl(4, angle_4, 1, 600)
        
        # 1초 대기
        time.sleep(0.2)
    
    # 초기 위치로 복귀
    TTLServo.servoAngleCtrl(2, initial_angle_2, 1, 600)
    TTLServo.servoAngleCtrl(4, initial_angle_4, 1, 600)
    
    



In [69]:
# 간단한 pid 테스트
x_momment = 140
left_speed, right_speed = pid_control(x_momment, pid, dt)
print(f"x_moment: {x_momment} -> Left Speed: {left_speed}, Right Speed: {right_speed}")



x_moment: 140 -> Left Speed: 0.5, Right Speed: 0.0


### 2. 객체생성과 mqtt 실행

In [None]:
##############################################  Object, widget  ####################################################


# JetBot 카메라 인스턴스, 로봇 객체 생성
camera = Camera()
robot = Robot()
pid = PIDController(Kp, Ki, Kd, target)

servo_ids = [1, 2, 3, 4, 5]
initial_positions = [0, -30, 29, -20, 48]  # 초기 각도 설정

for servo_id, position in zip(servo_ids, initial_positions):
    TTLServo.servoAngleCtrl(servo_id, position, 1, 200)

# 서보모터 준비
TTLServo.servoAngleCtrl(1, 10, 1, 150)
TTLServo.servoAngleCtrl(5, -50, 1, 300)  # 카메라 라인 보기 위해 고개 숙이기
# 서보모터 목표 각도 초기화
servo2_target_angle = -20
servo4_target_angle = -20
servo_moving = False


# 이미지 표시를 위한 위젯 생성
image_widget = widgets.Image(format='jpeg', width=300, height=300)
target_widget = widgets.Image(format='jpeg', width=300, height=300)
x_moment_label = widgets.Label(value="x_moment: 0")
current_position_label = widgets.Label(value="current position: unknown")
last_current_position_label = widgets.Label(value="last current position: unknown")
target_position_label = widgets.Label(value="target position: unknown")
button_command_label = widgets.Label(value="button command: unknown")
voice_command_label = widgets.Label(value="voice command: unknown")
is_voice_label = widgets.Label(value="is voice: unknown")
moter_speed_label = widgets.Label(value="motor speed: unknown")

##############################################  MQTT  ####################################################

# MQTT 클라이언트 설정
broker_address = "192.168.110.103"
topic_img = "agv0/image"
topic_position = "agv0/currentZone"
topic_mode = "agv0/mode"
topic_voice_command = "agv0/voiceCommand"  # voice
topic_button_command = "agv0/command"      # stop, forward, backward, left, emergencyBreak
topic_distance_command = "agv0/distance"

current_zone = None
last_sent_zone = None
target_zone = None
home_zone = "orange"
voice_command_list = []
voice_command = None
last_voice_command = None
button_command = None
current_direction = "clockwise"  # Counterclockwise
last_sent_zone = None
is_voice = False
distance = 0

# 연결됐을때 콜백 함수
def on_connect(client, userdata, flags, reason_code, properties=None):
    client.subscribe("agv0/command")
    client.subscribe("agv0/voiceCommand")
    client.subscribe("agv0/distance")

# MQTT 메시지를 받을 때 호출되는 콜백 함수
def on_message(client, userdata, message):
    global target_zone, current_direction, voice_command_list, button_command, topic_voice_command, topic_button_command, is_voice, last_voice_command, servo2_target_angle, servo4_target_angle, servo_moving, distance, voice_command, last_voice_command
 
    # 서보모터의 최소 및 최대 각도 설정
    min_angle = -90
    max_angle = 0
    
    # 음성명령 
    if message.topic == topic_voice_command:
        str_data = message.payload.decode('utf-8')
        
        print(f"original : {str_data}")
        
        voice_command = parse_and_get_first(str_data)  # 문자열을 리스트로 변환
     
        # 명령어 리스트로 변환
        print(f"Received voice command: {voice_command}")
        
        # if voice_command:
            #voice_command_label.value = f"last voice command: {last_voice_command}, voice_command: {last_voice_command}"
            
        if (last_voice_command != voice_command):


            voice_command_label.value = f"last voice command: {last_voice_command}, voice_command: {last_voice_command}, distance: {distance}"

            if voice_command == 'goToHome':
                target_zone = 'home'
            elif voice_command == 'goToYellow':
                target_zone = 'yellow'
            elif voice_command == 'goToPurple':
                target_zone = 'purple'
            elif voice_command == 'goToRed':
                target_zone = 'red'
            elif voice_command == 'goToBlue':
                target_zone = 'blue'
            elif voice_command == 'goToGreen':
                target_zone = 'green'
            elif voice_command == "dance":
                lets_dance()
            else:
                target_zone = None
            is_voice = True                                                    # 음성명령
            target_position_label.value = f"target position: {target_zone}"
            is_voice_label.value = f"is voice: {is_voice}"
            last_voice_command = voice_command
            
            

    # 버튼 명령
    elif message.topic == topic_button_command:
        button_command = message.payload.decode('utf-8')
        button_command_label.value = f"button command: {message.payload.decode('utf-8')}"

    elif message.topic == topic_distance_command:
        distance = int(message.payload.decode('utf-8'))


# MQTT 전송 및 이미지 처리 콜백함수
def mqtt_execute(camera_image):
    global last_sent_zone

    # 프레임 전송
    frame = np.copy(camera_image)
    if frame is not None:
        img_str = cv2.imencode('.jpeg', frame)[1].tobytes()
        mqttc.publish(topic_img, img_str)
    
    # currentZone 전송 (currentZone이 변경되었을 때만)
    if current_zone != last_sent_zone:
        mqttc.publish(topic_position, current_zone)
        last_sent_zone = current_zone
        last_current_position_label.value = f"last current position: {last_sent_zone}"
     
    return bgr8_to_jpeg(frame)

# MQTT 클라이언트 설정
mqttc = mqtt.Client()
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.connect(broker_address)

# 비동기 루프 시작
mqttc.loop_start()

# 카메라로 찍은 frame을 띄워줄 image 객체 생성, 크기를 맞출 필요는 없다.
mqtt_widget = widgets.Image(format='jpeg', width=300, height=300)

################################################# Camera update ##################################################

# 카메라 프레임을 업데이트하는 함수
def execute(camera_image):
    global x_moment, y_moment, flag, mid, min_area, global_threshold, current_zone, is_voice, servo2_target_angle, servo4_target_angle, servo_moving, dt, distance, pid 


    
    # 프레임 가져오기
    frame = np.copy(camera_image)
    
    # 캘리브레이션 적용
    undistorted_frame = callibration(frame)

    # ROI 마스크 적용
    roi_frame = region_of_interest(undistorted_frame, vertices)
    mid = int(roi_frame.shape[0] / 2)
    
    # RGB 마스킹 적용
    masked_frame = mask_rgb(roi_frame, lower_rgb, upper_rgb)
    
    # 그레이스케일 변환
    gray_frame = cv2.cvtColor(masked_frame, cv2.COLOR_BGR2GRAY)

    # Gaussian Blur
    blur_frame = cv2.GaussianBlur(gray_frame, (7, 7), 0)

    # Threshold 적용, 흑백으로 변환
    _, threshold_frame = cv2.threshold(blur_frame, 15, 255, cv2.THRESH_BINARY)

    # 외곽선 찾기
    outline_frame = cv2.findContours(threshold_frame.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    outline_list = imutils.grab_contours(outline_frame)

    # 라인 따라가기: 외곽선 검출 코드
    frame_ds = np.zeros((300, 300, 3), dtype=np.uint8)
    
    if len(outline_list) != 0:
        c = max(outline_list, key=cv2.contourArea)

        if cv2.contourArea(c) > min_area:
            cv2.drawContours(frame_ds, [c], -1, (0, 255, 255), 2)

            # 무게중심 찾기 -> 제일 큰 도형의 무게중심
            M = cv2.moments(c)
            if M['m00'] != 0:
                x_moment = int(M['m10'] / M['m00'])
                y_moment = int(M['m01'] / M['m00'])
                frame_ds = cv2.circle(frame_ds, (x_moment, y_moment), 3, (255, 0, 0), -1)
    else:
        x_moment = 150

    # x_moment 값을 레이블에 반영
    x_moment_label.value = f"x_moment: {x_moment} , flag: {flag}, mid: {mid}, ROI shape: {roi_frame.shape}, servo_moving {servo_moving}, distance: {distance}"
    left_speed, right_speed = pid_control(x_moment, pid, dt)
    moter_speed_label.value = f"current speed: left-{left_speed}, right-{right_speed}"
    
    
    # 색상 감지 및 current position 업데이트
    detected_color = "unknown"
    for color_name in colors_hue.keys():
        if color_name == "black":
            continue
        color_masked = mask_hsv(color_name, roi_frame)
        color_gray = cv2.cvtColor(color_masked, cv2.COLOR_BGR2GRAY)
        _, color_threshold = cv2.threshold(color_gray, global_threshold, 255, cv2.THRESH_BINARY)
        
        # 외곽선 찾기
        color_outline_frame = cv2.findContours(color_threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        color_outline_list = imutils.grab_contours(color_outline_frame)
        
        if len(color_outline_list) != 0:
            c = max(color_outline_list, key=cv2.contourArea)
            if cv2.contourArea(c) > min_area:
                detected_color = color_name
                break

    current_position_label.value = f"current position: {detected_color}"
    current_zone = detected_color


    # 프레임을 JPEG로 변환하여 위젯에 설정
    jpeg_image = bgr8_to_jpeg(frame_ds)

    ####################################################### Control ##############################

  
     # 수동 명령
    
    # 서보모터의 최소 및 최대 각도 설정
    min_angle = -90
    max_angle = 0
    
    if not is_voice:
        if (button_command == "stop"):
            robot.set_motors(0, 0)
        elif (button_command == "forward"):
            robot.set_motors(1.0, 1.0)
        elif (button_command == "backward"):
            robot.set_motors(-1.0, -1.0)
        elif (button_command == "left"):
            robot.set_motors(-1.0, 1.0)
        elif (button_command == "right"):
            robot.set_motors(1.0, -1.0)
        elif (button_command == "emergencyBreak"):
            robot.set_motors(0.0, 0.0)
            
        elif button_command == "armUp":
            if servo2_target_angle < max_angle:
                servo2_target_angle += 1  # 서보모터 3번을 천천히 위로 이동
            servo_moving = True
        elif button_command == "armDown":
            if servo2_target_angle > min_angle:
                servo2_target_angle -= 1  # 서보모터 3번을 천천히 아래로 이동
            servo_moving = True
        elif button_command == "grab":
            if servo4_target_angle > min_angle:
                servo4_target_angle -= 1  # 서보모터 5번을 천천히 왼쪽으로 이동
            servo_moving = True
        elif button_command == "release":
            if servo4_target_angle < max_angle:
                servo4_target_angle += 1  # 서보모터 5번을 천천히 오른쪽으로 이동
            servo_moving = True
        elif button_command == "stop":
            servo_moving = False  # 서보모터 움직임 중지
            
            
    
    # 음성 명령 
    # 목적지 도달했을 경우 or 다른 명령일경우 멈춤
    elif is_voice:
        
        if target_zone == current_zone or target_zone is None:
            robot.set_motors(0, 0)
            is_voice = False
        
        elif (button_command == "emergencyBreak"):
            robot.set_motors(0.0, 0.0)        
        
        elif (distance < 10):
            robot.set_motors(0.0, 0.0)  

        else:
            # 목적지가 다를 경우 - 라인따라이동
           
        
            #left_speed, right_speed = pid_control(x_moment, pid, dt)
            robot.set_motors(left_speed, right_speed)
            
    # 서보모터 제어
    if servo_moving:
        TTLServo.servoAngleCtrl(2, servo2_target_angle, 1, 100)
        TTLServo.servoAngleCtrl(4, servo4_target_angle, 1, 100)

        
    #####################################################################################
    return jpeg_image

################################### widget connection + display #############################


time.sleep(1)
traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)
traitlets.dlink((camera, 'value'), (target_widget, 'value'), transform=execute)
traitlets.dlink((camera, 'value'), (mqtt_widget, 'value'), transform=mqtt_execute)
display(widgets.VBox([widgets.HBox([image_widget, target_widget, mqtt_widget]), x_moment_label, current_position_label, last_current_position_label, target_position_label, voice_command_label, button_command_label, is_voice_label, moter_speed_label]))




## 3. 쓰레드, 카메라 객체 종료 코드

In [16]:
# 기존의 링크와 관찰자 제거 함수
def cleanup():
    global camera
    try:
        # 모든 관찰자 제거
        camera.unobserve_all()
        
        # 카메라 정지
        camera.stop()
    except Exception as e:
        print(f"Error during cleanup: {e}")
    finally:
        # 카메라 객체 삭제
        del camera
    
# 기존의 링크와 관찰자 제거, 로봇 제어파트 종료
cleanup()
TTLServo.servoAngleCtrl(5, 0, 1, 300)  # 다시 고개 드세요
robot.stop()