In [None]:
!pip uninstall mediapipe

In [6]:
!pip install --upgrade pip --user

Collecting pip
  Using cached pip-22.0.4-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
Successfully installed pip-22.0.4
Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'C:\ProgramData\Anaconda3\python.exe -m pip install --upgrade pip' command.


In [9]:
!pip install --user mediapipe





In [10]:
#!/usr/bin/env python3

import cv2
import depthai as dai
from calc import HostSpatialsCalc
from utility import *
import os
import numpy as np
import math
import mediapipe as mp

In [11]:
# Create pipeline
pipeline = dai.Pipeline()


# Properties
queueNames = []
downscaleColor = True
fps = 30
monoResolution = dai.MonoCameraProperties.SensorResolution.THE_720_P

#--- RGB Camera ---#
camRgb = pipeline.create(dai.node.ColorCamera)
camRgb.setBoardSocket(dai.CameraBoardSocket.RGB)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setFps(fps)
if downscaleColor: camRgb.setIspScale(2, 3)
# For now, RGB needs fixed focus to properly align with depth.
# This value was used during calibration
camRgb.initialControl.setManualFocus(130)

# Get RGB Output
rgbOut = pipeline.create(dai.node.XLinkOut)
rgbOut.setStreamName("rgb")
camRgb.isp.link(rgbOut.input)
queueNames.append("rgb")

#--- Depth Camera ---#

monoLeft = pipeline.create(dai.node.MonoCamera)
monoRight = pipeline.create(dai.node.MonoCamera)
stereo = pipeline.create(dai.node.StereoDepth)

monoLeft.setResolution(monoResolution)
monoLeft.setBoardSocket(dai.CameraBoardSocket.LEFT)
monoRight.setResolution(monoResolution)
monoRight.setBoardSocket(dai.CameraBoardSocket.RIGHT)

stereo.initialConfig.setConfidenceThreshold(255)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
# LR-check is required for depth alignment
stereo.setLeftRightCheck(True)
stereo.setDepthAlign(dai.CameraBoardSocket.RGB)

# Get Depth Ouput
monoLeft.out.link(stereo.left)
monoRight.out.link(stereo.right)

DepthOut = pipeline.create(dai.node.XLinkOut)
DepthOut.setStreamName("depth")
queueNames.append("depth")
stereo.depth.link(DepthOut.input)

dispOut = pipeline.create(dai.node.XLinkOut)
dispOut.setStreamName("disp")
queueNames.append("disp")
stereo.disparity.link(dispOut.input)

# Hand skeleton
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_hands = mp.solutions.hands
mp_pose = mp.solutions.pose
hands = mp_hands.Hands( model_complexity=0,
                        min_detection_confidence=0.5,
                        min_tracking_confidence=0.5)
# Pose skeleton
pose  = mp_pose.Pose( static_image_mode=True,
                      model_complexity=2,
                      enable_segmentation=True,
                      min_detection_confidence=0.5)

TypeError: __init__() got an unexpected keyword argument 'model_complexity'

<img src="pose_tracking_full_body_landmarks.png">

<img src="hand_landmarks.png">

In [3]:
def get_hand_data(index,hand,hand_results) : 
    points = np.zeros((21,3))
    norm_points = np.zeros((21,3))
    Label = None
    for idx,classification in enumerate(hand_results.multi_handedness) : 
        if classification.classification[0].index == index :
            Label = classification.classification[0].label
        for i in range(21) : 
            points[i,:] = np.multiply(np.array((hand.landmark[i].x, hand.landmark[i].y,0)),[1280,720,1])
            norm_points[i:] = np.array((hand.landmark[i].x, hand.landmark[i].y,0))
    return Label,points,norm_points


def get_pose_data(pose_results) :
    points = np.zeros((13,3))  
    norm_points = np.zeros((13,3))
    pose_landmark = pose_results.pose_landmarks.landmark[0:13] 
    for i in range(13) : 
        points[i,:] = np.multiply(np.array((pose_landmark[i].x, pose_landmark[i].y,0)),[1280,720,1])
        norm_points[i,:] = np.array((pose_landmark[i].x, pose_landmark[i].y,0))
    return points,norm_points

def get_spatial(depthFrame,x,y) : 
    spatials, centroid = hostSpatials.calc_spatials(depthFrame, (x,y))
    return spatials,centroid

In [4]:
def Collect_word(word = None) :
    DATA_PATH = os.path.join('Collected_Data_With_Face') 
    DATA_IMAGE_PATH =os.path.join(DATA_PATH,word,'data_image')
    DATA_NORM_IMAGE_PATH =os.path.join(DATA_PATH,word,'data_norm_image')
    DATA_WORLD_PATH =os.path.join(DATA_PATH,word,'data_world_image')
    img_exist = os.path.exists(DATA_IMAGE_PATH)
    normimg_exist = os.path.exists(DATA_NORM_IMAGE_PATH)
    world_exist = os.path.exists(DATA_WORLD_PATH)
    print(f"You are going to collect '{word}'")
    if not img_exist:
    # Create a new directory because it does not exist 
        os.makedirs(DATA_IMAGE_PATH)
        print("The new directory for image data is created !!!!")
    if not normimg_exist:
    # Create a new directory because it does not exist 
        os.makedirs(DATA_NORM_IMAGE_PATH)
        print("The new directory for normalized image data is created !!!!")
    if not world_exist:
    # Create a new directory because it does not exist 
        os.makedirs(DATA_WORLD_PATH)
        print("The new directory for world data  is created !!!!")
    return word, DATA_IMAGE_PATH , DATA_NORM_IMAGE_PATH, DATA_WORLD_PATH

In [5]:
# Connect to device and start pipeline
Show_depth = True
with dai.Device(pipeline) as device:
    # Output queue will be used to get the depth frames from the outputs defined above
    # Print Myriad X Id (MxID), USB speed, and available cameras on the device
    print('MxId:',device.getDeviceInfo().getMxId())
    print('USB speed:',device.getUsbSpeed())
    print('Connected cameras:',device.getConnectedCameras())
    
    # Define output Queue
    rgbQueue = device.getOutputQueue(name = "rgb")
    depthQueue = device.getOutputQueue(name="depth")
    dispQueue = device.getOutputQueue(name="disp")
    # Name windows
    rgbWindowName = "rgb"
    depthWindowName = "depth"
    cv2.namedWindow(rgbWindowName)
    cv2.namedWindow(depthWindowName)

    text = TextHelper()
    hostSpatials = HostSpatialsCalc(device)
    delta = 10
    hostSpatials.setDeltaRoi(delta)        
    DisplayRect = False            
                                                        
    previousTime = 0
    currentTime = 0

    image_sequence      = []
    norm_image_sequence = []
    world_sequence      = []
    # Left(0->20),Right(21->41),Face(42->54)
    # data_image          = {'Left': np.zeros((21,3)),'Right': np.zeros((21,3))}
    # data_norm_image     = {'Left': np.zeros((21,3)),'Right': np.zeros((21,3))}
    # data_world          = {'Left': np.zeros((21,3)),'Right': np.zeros((21,3))}

    while True:
        latestPacket = {}
        latestPacket["rgb"] = None
        latestPacket["disp"] = None
        latestPacket['depth'] = None
        queueEvents = device.getQueueEvents(('rgb','disp','depth'))
        for queueName in queueEvents :
            packets = device.getOutputQueue(queueName).tryGetAll()
            if len(packets) > 0 :
                latestPacket[queueName] = packets[-1]
        data_image          = np.zeros((55,3)) # Contain x y with 1280x720 resolution and z in metre.
        data_norm_image     = np.zeros((55,3)) # Contain normalized x y in range (0,1) and z in metre.
        data_world          = np.zeros((55,3)) # Contain x y and z in metre.
        #---- RGB ----#
        if latestPacket["rgb"] is not None:
            rgbFrame = latestPacket["rgb"].getCvFrame()
            # Pre-process
            rgbFrame.flags.writeable = False
            rgbFrame = cv2.cvtColor(rgbFrame, cv2.COLOR_BGR2RGB)
            # Note that handedness is determined assuming the input image is mirrored, 
            # i.e., taken with a front-facing/selfie camera with images flipped horizontally. 
            # If it is not the case, please swap the handedness output in the application.
            rgbFrame = cv2.flip(rgbFrame, 1)
            hand_results = hands.process(rgbFrame)
            pose_results = pose.process(rgbFrame)
            # Draw the hand annotations on the image.
            rgbFrame.flags.writeable = True
            rgbFrame = cv2.cvtColor(rgbFrame, cv2.COLOR_RGB2BGR)

            #--- Depth ---#
            if latestPacket["depth"] is not None:
                depthFrame = latestPacket["depth"].getCvFrame()
                depthFrame = cv2.flip(depthFrame, 1)
                # Get disparity frame for nicer depth visualization
                if latestPacket["disp"] is not None:
                    dispFrame = latestPacket["disp"].getFrame()
                    maxDisparity = stereo.initialConfig.getMaxDisparity()

                
                    dispFrame = (dispFrame * (255 / maxDisparity)).astype(np.uint8)
                    dispFrame = cv2.applyColorMap(dispFrame, cv2.COLORMAP_JET)
                    dispFrame = np.ascontiguousarray(dispFrame)
                    dispFrame = cv2.flip(dispFrame, 1)
                    if hand_results.multi_hand_landmarks:
                        for index, hand in enumerate(hand_results.multi_hand_landmarks) :
                            # Draw hand on RGB Frame
                            mp_drawing.draw_landmarks(  rgbFrame,
                                                        hand,
                                                        mp_hands.HAND_CONNECTIONS,
                                                        mp_drawing_styles.get_default_hand_landmarks_style(),
                                                        mp_drawing_styles.get_default_hand_connections_style())
                            # Draw hand on Depth Frame
                            mp_drawing.draw_landmarks(  dispFrame,
                                                        hand,
                                                        mp_hands.HAND_CONNECTIONS,
                                                        mp_drawing_styles.get_default_hand_landmarks_style(),
                                                        mp_drawing_styles.get_default_hand_connections_style())
                            # get points of each hand
                            Label,hand_points,hand_norm_points = get_hand_data(index,hand,hand_results)
                            if Label == 'Left' : 
                                data_image[0:21,:] = hand_points
                                data_norm_image[0:21,:] = hand_norm_points
                            else :
                                data_image[21:42,:] = hand_points
                                data_norm_image[21:42,:] = hand_norm_points
                    if pose_results.pose_landmarks : 
                        # Draw Pose on RGB Frame
                        mp_drawing.draw_landmarks(  rgbFrame,
                                                    pose_results.pose_landmarks,
                                                    mp_pose.POSE_CONNECTIONS,
                                                    landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())
                        # Draw Pose on Depth Frame
                        mp_drawing.draw_landmarks(  dispFrame,
                                                    pose_results.pose_landmarks,
                                                    mp_pose.POSE_CONNECTIONS,
                                                    landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())
                        # get points of pose
                        pose_points,pose_norm_points = get_pose_data(pose_results)
                        data_image[42:55,:] = pose_points
                        data_norm_image[42:55,:] = pose_norm_points
                    for i in range(55) : 
                        x = int(data_image[i,0])
                        y = int(data_image[i,1]) 
                        spatials, centroid = get_spatial(depthFrame,x,y)
                        if not math.isnan(float(spatials['x']))  : data_world[i,0] = spatials['x']/1000
                        if not math.isnan(float(spatials['y']))  : data_world[i,1] = spatials['y']/1000
                        if not math.isnan(float(spatials['z']))  : 
                            data_world[i,2]    = spatials['z']/1000
                            data_norm_image[i,2] = spatials['z']/1000
                            data_image[i,2]      = spatials['z']/1000
                        if DisplayRect : text.rectangle(dispFrame, (x-delta, y-delta), (x+delta, y+delta))
                    image_sequence.append(data_image)
                    norm_image_sequence.append(data_norm_image)
                    world_sequence.append(data_world)
                    # Calculate FPS
                    currentTime = time.time()
                    fps = 1 / (currentTime-previousTime)
                    previousTime = currentTime
                    # Display FPS on the image
                    cv2.putText(rgbFrame, f" FPS : {int(fps)}", (10, 70), cv2.FONT_HERSHEY_COMPLEX, 1, (0,255,0), 2)
                    cv2.putText(dispFrame, f" FPS : {int(fps)}", (10, 70), cv2.FONT_HERSHEY_COMPLEX, 1, (255,0,0), 2)
            
                    # Display the resulting image
                    cv2.imshow(rgbWindowName, rgbFrame)
                    if Show_depth : cv2.imshow(depthWindowName, dispFrame)
                    rgbFrame = None
                    depthFrame = None
                    dispFrame = None
        if cv2.waitKey(1) & 0xFF == ord('q'):
            cv2.destroyAllWindows()
            break
      

MxId: 14442C1071FD60D700
USB speed: UsbSpeed.SUPER
Connected cameras: [<CameraBoardSocket.RGB: 0>, <CameraBoardSocket.LEFT: 1>, <CameraBoardSocket.RIGHT: 2>]


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


In [None]:
cv2.destroyAllWindows()