<a href="https://colab.research.google.com/github/minjae0501/yolo_block/blob/master/lstm_yolo_detecting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

In [1]:
# !git clone https://github.com/minjae0501/yolo_block.git

Cloning into 'yolo_block'...
remote: Enumerating objects: 3012, done.[K
remote: Counting objects: 100% (19/19), done.[K
remote: Compressing objects: 100% (17/17), done.[K
remote: Total 3012 (delta 3), reused 13 (delta 1), pack-reused 2993[K
Receiving objects: 100% (3012/3012), 117.88 MiB | 50.70 MiB/s, done.
Resolving deltas: 100% (50/50), done.


In [1]:
# %pip install ultralytics
# %pip install mediapipe
import ultralytics
ultralytics.checks()

Ultralytics YOLOv8.0.227 🚀 Python-3.9.13 torch-1.12.1+cu116 CUDA:0 (NVIDIA GeForce RTX 2070, 8192MiB)
Setup complete ✅ (8 CPUs, 15.9 GB RAM, 208.7/232.3 GB disk)


In [2]:
import cv2
import numpy as np
import mediapipe as mp
from ultralytics import YOLO
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from PIL import ImageFont, ImageDraw, Image

In [3]:
if torch.cuda.is_available() == True:
    device = 'cuda:0'
else:
    device = 'cpu'

print(device)

cuda:0


In [4]:
class MyDataset(Dataset):
    def __init__(self, seq_list):
        self.X = []
        self.y = []
        for dic in seq_list:
            self.y.append(dic['key'])
            self.X.append(dic['value'])
    def __getitem__(self, index):
        data = self.X[index]
        label = self.y[index]
        return torch.Tensor(np.array(data)), torch.tensor(np.array(int(label)))

    def __len__(self):
        return len(self.X)

In [7]:
# ver 9
class hand_LSTM(nn.Module):
    def __init__(self, num_layers=1):
        super(hand_LSTM, self).__init__()
        """
        LayerNorm(): RNN과 LSTM에 적합 
        - LSTM과 같은 순환 신경망에서는 시간에 따른 의존성 때문에 배치 정규화가 잘 작동하지 않을 수 있다.
        - 반면 레이어 정규화는 시간적 의존성에 영향을 받지 않아 RNN과 LSTM에 더 적합하다.
        """
        # bidirectional -> 양방향 lstm: 시퀀스 데이터를 순방향과 역방향 모두 학습
        self.lstm1 = nn.LSTM(67, 128, num_layers, batch_first=True, bidirectional=True)
        # lstm layer 정규화 사용, 양방향이기 때문에 256개 
        self.layer_norm1 = nn.LayerNorm(256)
        self.dropout1 = nn.Dropout(0.1)
        
        self.lstm2 = nn.LSTM(256, 64, num_layers, batch_first=True, bidirectional=True)
        self.layer_norm2 = nn.LayerNorm(128)
        self.dropout2 = nn.Dropout(0.1)
        
        self.lstm3 = nn.LSTM(128, 32, num_layers, batch_first=True, bidirectional=True)
        self.layer_norm3 = nn.LayerNorm(64)
        self.dropout3 = nn.Dropout(0.1)
        
        self.attention = nn.Linear(64, 1)
        self.fc = nn.Linear(64, 2)
        
    def forward(self, x):
        x, _ = self.lstm1(x)
        x = self.layer_norm1(x)
        x = self.dropout1(x)
        
        x, _ = self.lstm2(x)
        x = self.layer_norm2(x)
        x = self.dropout2(x)
        
        x, _ = self.lstm3(x)
        x = self.layer_norm3(x)
        x = self.dropout3(x)
        
        # Attention 메커니즘
        attention_weights = torch.softmax(self.attention(x), dim=1)
        x = torch.sum(attention_weights * x, dim=1)
        
        x = self.fc(x)
        return x

In [8]:
def grab_release(image, yolo_model, hand_list , lstm_model, detect_cls, hand_cls, length, xyz_list_list, status_num):
    mp_hands, hands, mp_drawing = hand_list[0], hand_list[1], hand_list[2]

    # YOLO 객체 감지
    box_results = yolo_model.predict(image, conf = 0.6, verbose=False, show = False)
    boxes = box_results[0].boxes.xyxy.cpu()
    box_class = box_results[0].boxes.cls.cpu().tolist()

    x1, y1, x2, y2 = 0, 0, 0, 0
    hx1, hy1, hx2, hy2 = 0,0,0,0
    for idx, cls in enumerate(box_class):
        if int(cls) == detect_cls:
            x1, y1, x2, y2 = boxes[idx]
            x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
            cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
        elif int(cls) == hand_cls:
            hx1, hy1, hx2, hy2 = boxes[idx]
            hx1, hy1, hx2, hy2 = int(hx1), int(hy1), int(hx2), int(hy2)
            cv2.rectangle(image, (int(hx1), int(hy1)), (int(hx2), int(hy2)), (0, 0, 255), 2)
    
    #mediapipe
    results = hands.process(image)
    xyz_list = []
    if results.multi_hand_landmarks:
        for x_y_z in results.multi_hand_landmarks:
            for landmark in x_y_z.landmark:
                xyz_list.append(landmark.x) # *10 삭제
                xyz_list.append(landmark.y)
                xyz_list.append(landmark.z) 
                

        xyz_list.append(abs(x1-hx1)/640) # /640 추가
        xyz_list.append(abs(x2-hx2)/640)
        xyz_list.append(abs(y1-hy1)/640)
        xyz_list.append(abs(y2-hy2)/640)

        if x1 != 0 and y1 != 0 and x2 != 0 and y2 != 0 and hx1 != 0 and hy1 != 0 and hx2 != 0 and hy2 != 0:
            xyz_list_list.append(xyz_list)# 객체와 손이 인식되면

        for hand_landmarks in results.multi_hand_landmarks:
              with torch.no_grad():
                mp_drawing.draw_landmarks(image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
    
    if len(xyz_list_list) == length:
        dataset = []
        dataset.append({'key': 0, 'value': xyz_list_list})
        dataset = MyDataset(dataset)
        dataset = DataLoader(dataset)
        xyz_list_list = []
        for data, label in dataset:
            data = data.to(device)
            with torch.no_grad():
                result = lstm_model(data)
                _, out = torch.max(result, 1)
                status_num = out.item()
                # if out.item() == 0: status = 'Release'
                # else: status = 'Grab'

    # cv2.putText(image, status, (0, 50), cv2.FONT_HERSHEY_COMPLEX, 1.5, (0,0, 225), 2)

    return image, xyz_list_list, status_num



In [9]:
# 한글폰트 출력
def putText_korean(img, text, position, font_path, font_size, color):
    img_pil = Image.fromarray(img)
    draw = ImageDraw.Draw(img_pil)
    font = ImageFont.truetype(font_path, font_size)
    draw.text(position, text, font=font, fill=color)
    return np.array(img_pil)

In [10]:
# YOLO 객체 감지 모델 초기화
best_model = './block_model/block_best_02.pt'
yolo_model = YOLO(best_model)

# YOLO 스탭 탐지 모델
step_best_model = './block_model/truck_best_04.pt'
step_model = YOLO(step_best_model)

# lstm모델 불러오기
model_path = './lstm_pth/more_data_lstm_model_ver9.pth'
lstm_model = hand_LSTM().to(device)
lstm_model.load_state_dict(torch.load(model_path, map_location=device))
lstm_model.eval()
print("모델이 성공적으로 불러와졌습니다.")

모델이 성공적으로 불러와졌습니다.


In [11]:
# mediapipe 손 감지 모듈 초기화
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=False, max_num_hands=1, min_detection_confidence=0.3)
mp_drawing = mp.solutions.drawing_utils

hand_list = [mp_hands, hands, mp_drawing]

In [23]:
# 손, dataset길이 설정
hand_cls = 6
length = 10

# 한글 텍스트를 추가할 위치, 폰트경로, 폰트크기, 색상설정
position = (15, 40) # 텍스트를 출력할 위치
font_path = './font/KCC-Ganpan.ttf' # 한글 폰트 파일 경로
font_size = 30 # 폰트 크기
color = (0, 0, 255)

In [24]:
# 실시간 안내 system 함수화
def detect_and_process(image, step_model, yolo_model, hand_list, lstm_model, step, detect_list, hand_cls, length, xyz_list_list, status_num): # detect_cls삭제, step, detect_list추가
    box_results = step_model.predict(image, conf = 0.6, verbose=False, show = False)
    boxes = box_results[0].boxes.xyxy.cpu()
    box_class = box_results[0].boxes.cls.cpu().tolist()
    
    x1, y1, x2, y2 = 0, 0, 0, 0 # 초기화
    for idx, cls in enumerate(box_class):
        if int(cls) == step: #detect_cls를 step으로 대체
            x1, y1, x2, y2 = boxes[idx]
            cv2.rectangle(image, (int(x1), int(y1)), (int(x2), int(y2)), (225, 0, 0), 2)

    image, xyz_list_list, status_num = grab_release(image, yolo_model, hand_list, lstm_model, detect_list[step], hand_cls, length, xyz_list_list, status_num) #detect_list[step]추가, detect_cls삭제
    
    return image, xyz_list_list, status_num, x1, y1, x2, y2

def overlay_text(image, text, position, font_path, font_size, color):
    return putText_korean(image, text, position, font_path, font_size, color)

In [48]:
cap_device = 0
cap = cv2.VideoCapture(cap_device)  # for Mac
# cap = cv2.VideoCapture(0)  # for Windows
lstm_model.eval()
status_num = -1
step = 0
wait_frames = 60
cnt_frames = 0
xyz_list_list = []
detect_list = [0, 13, 4, 5, 3] #객체 번호인 detect_list추가

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    image = cv2.resize(frame, (640, 640))
    # detect_cls = step 삭제
    if step <= len(detect_list) -1: #step이 5이상이면 list index out of range오류 발생
        image, xyz_list_list, status_num, x1, y1, x2, y2 = detect_and_process(
            image, step_model, yolo_model, hand_list, lstm_model, step, detect_list ,hand_cls, length, xyz_list_list, status_num) # detect_cls삭제, step, detect_list추가
    
    # 각 단계별 텍스트 지정 로직
    if step == 0:
        text = '파란 다리를 집어 올리세요.' if status_num != 1 else '빨간 원 위에 파란 다리를 올려놓으세요.'
    elif step == 1:
        text = '노란 다리를 집어 올리세요.' if status_num != 1 else '파란색 다리의 왼쪽에 노란색 다리를 놓으세요.'
    elif step == 2:
        text = '초록색 원을 집어 올리세요.' if status_num != 1 else '노란색 다리 밑에 초록색 원을 넣어주세요.'
    elif step == 3:
        text = '초록색 큐브를 집어 올리세요.' if status_num != 1 else '노란색 다리의 오른쪽 위에 초록색 큐브를 올려놓으세요.'
    elif step == 4:
        text = '파란색 부채꼴을 집어 올리세요.' if status_num != 1 else '초록색 큐브의 왼쪽에 파란색 부채꼴을 놓으세요.'
    else:
        x1, y1, x2, y2 = 0, 0, 0, 0 # 초기화
        text = '완료!'
                
    if x1 != 0 and y1 != 0 and x2 != 0 and y2 != 0:
        text = '참 잘했어요!'
        if cnt_frames < wait_frames:
            cnt_frames += 1
        else:
            step += 1
            cnt_frames = 0
            status_num = -1
    
    image = overlay_text(image, text, position, font_path, font_size, color)

    cv2.imshow('frame', image)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    
cap.release()
cv2.destroyAllWindows()
cv2.waitKey()

-1