In [53]:
!pip install mediapipe

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [54]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd

In [55]:
train_data = pd.read_csv("data/train_data.csv")

In [56]:
train_data

Unnamed: 0,degree,Diff_X,Diff_Y,label
0,3.739419,-6.855240,104.812789,1
1,3.901521,-7.255754,106.307044,1
2,5.526133,-12.400465,127.970481,1
3,2.210225,4.927044,127.628975,1
4,4.202948,-9.186401,124.894252,1
...,...,...,...,...
295,5.622362,-20.578928,208.701453,0
296,0.120880,0.420895,199.499416,0
297,3.458771,12.508659,206.832533,0
298,0.571113,2.089586,209.623003,0


In [57]:
file = np.genfromtxt('data/train_data.csv', delimiter=',')
file = np.delete(file, (0), axis = 0)

In [58]:
feature = file[:,:-1].astype(np.float32)
label = file[:, -1].astype(np.float32)

In [59]:
knn = cv2.ml.KNearest_create()
knn.train(feature, cv2.ml.ROW_SAMPLE, label)

True

In [60]:
def Nose_Point(result, image_width, image_height) :
    Nose_X = result.pose_landmarks.landmark[mp_pose.PoseLandmark.NOSE].x * image_width
    Nose_Y = result.pose_landmarks.landmark[mp_pose.PoseLandmark.NOSE].y * image_height

    return Nose_X, Nose_Y

def Compute_Neck(result, image_width, image_height) :
    L_Shoulder_X = result.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER].x * image_width
    R_Shoulder_X = result.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER].x * image_width
    Neck_X = (L_Shoulder_X + R_Shoulder_X) / float(2.0)
    
    R_Shoulder_Y = result.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER].y * image_height
    L_Shoulder_Y = result.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER].y * image_height
    Neck_Y = (L_Shoulder_Y + R_Shoulder_Y) / float(2.0)

    return Neck_X, Neck_Y

def Compute_Diff(Nose_X, Nose_Y, Neck_X, Neck_Y):
    Diff_X = Neck_X - Nose_X
    Diff_Y = Neck_Y - Nose_Y
    return Diff_X, Diff_Y

def Comput_Degree(Nose_X, Nose_Y, Neck_X, Neck_Y):
    AB = np.sqrt((Nose_X - Neck_X) ** 2 + (Nose_Y - Neck_Y) ** 2)
    AC = np.abs(Nose_X - Neck_X)
    cos = AC / AB
    degree = np.degrees(cos)
    return degree

In [61]:
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose

cap = cv2.VideoCapture(0)
with mp_pose.Pose(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as pose:
  while cap.isOpened():
    success, image = cap.read()
    if not success:
      print("Ignoring empty camera frame.")
      # If loading a video, use 'break' instead of 'continue'.
      continue

    # To improve performance, optionally mark the image as not writeable to
    # pass by reference.
    image.flags.writeable = False
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image_height, image_width, _ = image.shape
    result = pose.process(image)
    
    # compute degree and diff
    Nose_X, Nose_Y = Nose_Point(result, image_width, image_height)
    Neck_X, Neck_Y = Compute_Neck(result, image_width, image_height)
    Diff_X, Diff_Y = Compute_Diff(Nose_X, Nose_Y, Neck_X, Neck_Y)
    degree = Comput_Degree(Nose_X, Nose_Y, Neck_X, Neck_Y)
    data = np.array([degree, Diff_X, Diff_Y], dtype = np.float32)
    data = data.reshape(1,3)
    
    ret, label, neighbours, dist = knn.findNearest(data, 2)    
    

    # Draw the pose annotation on the image.
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    mp_drawing.draw_landmarks(
        image,
        result.pose_landmarks,
        mp_pose.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())
    # Flip the image horizontally for a selfie-view display.
    if label == 1:
        text = "Drowsy"
    else:
        text = "NonDorwsy"
    
    cv2.putText(image, text, (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.imshow('MediaPipe Pose', image)
    if cv2.waitKey(5) & 0xFF == 27:
      break
cap.release()

In [62]:
from IPython.display import display, Javascript, Image
from google.colab.output import eval_js
from base64 import b64decode, b64encode
import cv2
import numpy as np
import PIL
import io
import html
import time

In [63]:
# function to convert the JavaScript object into an OpenCV image
def js_to_image(js_reply):
  """
  Params:
          js_reply: JavaScript object containing image from webcam
  Returns:
          img: OpenCV BGR image
  """
  # decode base64 image
  image_bytes = b64decode(js_reply.split(',')[1])
  # convert bytes to numpy array
  jpg_as_np = np.frombuffer(image_bytes, dtype=np.uint8)
  # decode numpy array into OpenCV BGR image
  img = cv2.imdecode(jpg_as_np, flags=1)

  return img

# function to convert OpenCV Rectangle bounding box image into base64 byte string to be overlayed on video stream
def bbox_to_bytes(bbox_array):
  """
  Params:
          bbox_array: Numpy array (pixels) containing rectangle to overlay on video stream.
  Returns:
        bytes: Base64 image byte string
  """
  # convert array into PIL image
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGB')
  iobuf = io.BytesIO()
  # format bbox into png for return
  bbox_PIL.save(iobuf, format='png')
  # format return string
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))

  return bbox_bytes

In [64]:
# JavaScript to properly create our live video stream using our webcam as input
def video_stream():
  js = Javascript('''
    var video;
    var div = null;
    var stream;
    var captureCanvas;
    var imgElement;
    var labelElement;
    
    var pendingResolve = null;
    var shutdown = false;
    
    function removeDom() {
       stream.getVideoTracks()[0].stop();
       video.remove();
       div.remove();
       video = null;
       div = null;
       stream = null;
       imgElement = null;
       captureCanvas = null;
       labelElement = null;
    }
    
    function onAnimationFrame() {
      if (!shutdown) {
        window.requestAnimationFrame(onAnimationFrame);
      }
      if (pendingResolve) {
        var result = "";
        if (!shutdown) {
          captureCanvas.getContext('2d').drawImage(video, 0, 0, 640, 480);
          result = captureCanvas.toDataURL('image/jpeg', 0.8)
        }
        var lp = pendingResolve;
        pendingResolve = null;
        lp(result);
      }
    }
    
    async function createDom() {
      if (div !== null) {
        return stream;
      }

      div = document.createElement('div');
      div.style.border = '2px solid black';
      div.style.padding = '3px';
      div.style.width = '100%';
      div.style.maxWidth = '600px';
      document.body.appendChild(div);
      
      const modelOut = document.createElement('div');
      modelOut.innerHTML = "<span>Status:</span>";
      labelElement = document.createElement('span');
      labelElement.innerText = 'No data';
      labelElement.style.fontWeight = 'bold';
      modelOut.appendChild(labelElement);
      div.appendChild(modelOut);
           
      video = document.createElement('video');
      video.style.display = 'block';
      video.width = div.clientWidth - 6;
      video.setAttribute('playsinline', '');
      video.onclick = () => { shutdown = true; };
      stream = await navigator.mediaDevices.getUserMedia(
          {video: { facingMode: "environment"}});
      div.appendChild(video);

      imgElement = document.createElement('img');
      imgElement.style.position = 'absolute';
      imgElement.style.zIndex = 1;
      imgElement.onclick = () => { shutdown = true; };
      div.appendChild(imgElement);
      
      const instruction = document.createElement('div');
      instruction.innerHTML = 
          '<span style="color: red; font-weight: bold;">' +
          'When finished, click here or on the video to stop this demo</span>';
      div.appendChild(instruction);
      instruction.onclick = () => { shutdown = true; };
      
      video.srcObject = stream;
      await video.play();

      captureCanvas = document.createElement('canvas');
      captureCanvas.width = 640; //video.videoWidth;
      captureCanvas.height = 480; //video.videoHeight;
      window.requestAnimationFrame(onAnimationFrame);
      
      return stream;
    }
    async function stream_frame(label, imgData) {
      if (shutdown) {
        removeDom();
        shutdown = false;
        return '';
      }

      var preCreate = Date.now();
      stream = await createDom();
      
      var preShow = Date.now();
      if (label != "") {
        labelElement.innerHTML = label;
      }
            
      if (imgData != "") {
        var videoRect = video.getClientRects()[0];
        imgElement.style.top = videoRect.top + "px";
        imgElement.style.left = videoRect.left + "px";
        imgElement.style.width = videoRect.width + "px";
        imgElement.style.height = videoRect.height + "px";
        imgElement.src = imgData;
      }
      
      var preCapture = Date.now();
      var result = await new Promise(function(resolve, reject) {
        pendingResolve = resolve;
      });
      shutdown = false;
      
      return {'create': preShow - preCreate, 
              'show': preCapture - preShow, 
              'capture': Date.now() - preCapture,
              'img': result};
    }
    ''')

  display(js)
  
def video_frame(label, overay_img):
  data = eval_js('stream_frame("{}", "{}")'.format(label, overay_img))
  return data

In [65]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd

file = np.genfromtxt('data/train_data.csv', delimiter=',')
file = np.delete(file, (0), axis = 0)

feature = file[:,:-1].astype(np.float32)
label = file[:, -1].astype(np.float32)

knn = cv2.ml.KNearest_create()
knn.train(feature, cv2.ml.ROW_SAMPLE, label)

def Nose_Point(result, image_width, image_height) :
    Nose_X = result.pose_landmarks.landmark[mp_pose.PoseLandmark.NOSE].x * image_width
    Nose_Y = result.pose_landmarks.landmark[mp_pose.PoseLandmark.NOSE].y * image_height

    return Nose_X, Nose_Y

def Compute_Neck(result, image_width, image_height) :
    L_Shoulder_X = result.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER].x * image_width
    R_Shoulder_X = result.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER].x * image_width
    Neck_X = (L_Shoulder_X + R_Shoulder_X) / float(2.0)
    
    R_Shoulder_Y = result.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER].y * image_height
    L_Shoulder_Y = result.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER].y * image_height
    Neck_Y = (L_Shoulder_Y + R_Shoulder_Y) / float(2.0)

    return Neck_X, Neck_Y

def Compute_Diff(Nose_X, Nose_Y, Neck_X, Neck_Y):
    Diff_X = Neck_X - Nose_X
    Diff_Y = Neck_Y - Nose_Y
    return Diff_X, Diff_Y

def Comput_Degree(Nose_X, Nose_Y, Neck_X, Neck_Y):
    AB = np.sqrt((Nose_X - Neck_X) ** 2 + (Nose_Y - Neck_Y) ** 2)
    AC = np.abs(Nose_X - Neck_X)
    cos = AC / AB
    degree = np.degrees(cos)
    return degree

mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose

video_stream()

cap = cv2.VideoCapture(0)
with mp_pose.Pose(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as pose:
  while cap.isOpened():
    success, image = cap.read()
    if not success:
      print("Ignoring empty camera frame.")
      # If loading a video, use 'break' instead of 'continue'.
      continue

    # To improve performance, optionally mark the image as not writeable to
    # pass by reference.
    image.flags.writeable = False
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image_height, image_width, _ = image.shape
    result = pose.process(image)
    
    # compute degree and diff
    Nose_X, Nose_Y = Nose_Point(result, image_width, image_height)
    Neck_X, Neck_Y = Compute_Neck(result, image_width, image_height)
    Diff_X, Diff_Y = Compute_Diff(Nose_X, Nose_Y, Neck_X, Neck_Y)
    degree = Comput_Degree(Nose_X, Nose_Y, Neck_X, Neck_Y)
    data = np.array([degree, Diff_X, Diff_Y], dtype = np.float32)
    data = data.reshape(1,3)
    
    ret, label, neighbours, dist = knn.findNearest(data, 2)    
    

    # Draw the pose annotation on the image.
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    mp_drawing.draw_landmarks(
        image,
        result.pose_landmarks,
        mp_pose.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())
    # Flip the image horizontally for a selfie-view display.
    if label == 1:
        text = "Drowsy"
    else:
        text = "NonDorwsy"
    
    cv2.putText(image, text, (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.imshow('MediaPipe Pose', image)
    if cv2.waitKey(5) & 0xFF == 27:
      break
cap.release()

<IPython.core.display.Javascript object>