In [1]:
import cv2
import numpy as np
import threading
import argparse
import sys
import os
import time
import csv
import tensorflow.lite as tflite
import mediapipe as mp
import numpy as np
from pathlib import Path

from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2

from concurrent.futures import ThreadPoolExecutor


mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

In [18]:
n_time_steps = 20
interpreter = tflite.Interpreter(model_path="model_april.tflite")
interpreter.allocate_tensors()

# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_details
output_details

[{'name': 'StatefulPartitionedCall:0',
  'index': 72,
  'shape': array([1, 1]),
  'shape_signature': array([-1,  1]),
  'dtype': numpy.float32,
  'quantization': (0.0, 0),
  'quantization_parameters': {'scales': array([], dtype=float32),
   'zero_points': array([], dtype=int32),
   'quantized_dimension': 0},
  'sparsity_parameters': {}}]

In [3]:
#mediapipe_pose_model_asset = "mediapipe_pose_landmarker_model/pose_landmarker_heavy.task"
mediapipe_pose_model_asset = "mediapipe_pose_landmarker_model/pose_landmarker_full.task"
base_options = python.BaseOptions(model_asset_path=mediapipe_pose_model_asset)
options = vision.PoseLandmarkerOptions(
        base_options=base_options,
        running_mode=vision.RunningMode.VIDEO,
        num_poses=1,
        min_pose_detection_confidence=0.5,
        min_pose_presence_confidence=0.5,
        min_tracking_confidence=0.5,
        output_segmentation_masks=False)


In [4]:
def make_landmark_timestep(results):
    c_lm = []
    def add_lanmark(index):
        landmark = results.pose_landmarks[0][index]
        c_lm.append(landmark.x)
        c_lm.append(landmark.y)
        c_lm.append(landmark.z)
        c_lm.append(landmark.visibility)

    for i in range(33):
        add_lanmark(i)
    
    return c_lm


def draw_landmark_on_image(mpDraw, results, img):
    mpDraw.draw_landmarks(img, results.pose_landmarks, mpPose.POSE_CONNECTIONS)
    for id, lm in enumerate(results.pose_landmarks.landmark):
        h, w, c = img.shape
        print(id, lm)
        cx, cy = int(lm.x * w), int(lm.y * h)
        cv2.circle(img, (cx, cy), 5, (255, 0, 0), cv2.FILLED)
    return img


def draw_class_on_image(label, img):
    font = cv2.FONT_HERSHEY_SIMPLEX
    bottomLeftCornerOfText = (10, 30)
    fontScale = 1
    fontColor = (0, 255, 0)
    thickness = 2
    lineType = 2
    cv2.putText(img, label,
                bottomLeftCornerOfText,
                font,
                fontScale,
                fontColor,
                thickness,
                lineType)
    return img

In [5]:
fall_continous_count = 0
def detect(interpreter, lm_list):
    global label, fall_continous_count
    lm_list = np.array(lm_list, dtype=np.float32)
    lm_list = np.expand_dims(lm_list, axis=0)
    
    interpreter.set_tensor(input_details[0]['index'], lm_list)
    interpreter.invoke()
    
    output_data = interpreter.get_tensor(output_details[0]['index'])
    if output_data[0][0] > 0.5:
        label = "FALL"
        print("falll ", fall_continous_count)
        fall_continous_count += 1
        if fall_continous_count >= 3:
            print("Sending alarm...", fall_continous_count)
    else:
        label = "NORMAL"
        fall_continous_count = 0
        
    return label

In [6]:
executor = ThreadPoolExecutor(max_workers=1)

def threaded_detect(interpreter, lm_data):
    global label
    label = detect(interpreter, lm_data)
    # Consider adding any post-detection logic here, if needed


In [21]:
file_name = "evaluation/recorded5.mp4"

video_reader = cv2.VideoCapture(file_name)

out_path = "evaluation/out.mp4"
    # Get the width and height of the video.
original_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
original_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Initialize the VideoWriter Object to store the output video in the disk.
video_writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc('M', 'P', '4', 'V'), 
                                   video_reader.get(cv2.CAP_PROP_FPS), (original_video_width, original_video_height))


detector = vision.PoseLandmarker.create_from_options(options)
lm_list = []
label = "N/A"
current_frame = 0
while True:
    success, frame = video_reader.read()
    if not success:
        break
    #frame = cv2.flip(frame, 1)
    #frame = cv2.resize(frame, (320, 240))
    #frame = cv2.bilateralFilter(frame,9,75,75)  #filter
    #frame = cv2.fastNlMeansDenoisingColored(blur,None,10,10,7,21)    #remove noise

    rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_image)
    
    # tren rpi, toc do toi da 5 fps
    results = detector.detect_for_video(mp_image, int(video_reader.get(cv2.CAP_PROP_POS_MSEC))) 
    if results.pose_landmarks:
        if current_frame % 2 == 0: # moi truong video, cu 2 frame thi lay keypoint day vao model, 
                                    # tren rpi k can phai check frame, day thang vao (vi co delay san)
            c_lm = make_landmark_timestep(results)
            lm_list.append(c_lm)
            if len(lm_list) >= n_time_steps:
                lm_data_to_predict = lm_list[-n_time_steps:]
                label = detect(interpreter, lm_data_to_predict)
                #executor.submit(threaded_detect, interpreter, lm_data_to_predict)
                lm_list.pop(0)
                #lm_list = []

        for pose_landmarks in results.pose_landmarks:
                # Draw the pose landmarks.
                pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
                pose_landmarks_proto.landmark.extend([
                    landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y,
                                                    z=landmark.z) for landmark
                    in pose_landmarks
                ])
                mp_drawing.draw_landmarks(
                    frame,
                    pose_landmarks_proto,
                    mp_pose.POSE_CONNECTIONS,
                    mp_drawing_styles.get_default_pose_landmarks_style())

    else:
        lm_list = []
    current_frame += 1       
    frame = draw_class_on_image(label, frame)
    cv2.imshow("Image", frame)
    video_writer.write(frame)
    if cv2.waitKey(1) == ord('q'):
        break

print(current_frame)
video_reader.release()
video_writer.release()

falll  0
falll  1
469
