In [None]:
import numpy as np
import cv2
import mediapipe as mp
import matplotlib.pyplot as plt
from scipy import signal
from scipy.signal import savgol_filter

In [None]:
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

# Read video with OpenCV.
cap=cv2.VideoCapture('../UPDRS_video/arise from chair.mp4')

## Get video info
RES=(round(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),round(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
fps = round(cap.get(cv2.CAP_PROP_FPS))

#create video writer to write detected_video
output_video = "../UPDRS_result/result.mp4"
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter(output_video, fourcc, fps, RES)

In [None]:
def angle(vector_1,vector_2):
    unit_vector_1 = vector_1 / np.linalg.norm(vector_1)
    unit_vector_2 = vector_2 / np.linalg.norm(vector_2)
    dot_product = np.dot(unit_vector_1, unit_vector_2)
    angle = np.arccos(dot_product)
    return angle

In [None]:
#leg_agility detector

pose = mp_pose.Pose(static_image_mode=True,min_detection_confidence=0.4)

right_foot_index = []
left_foot_index = []
ankle_dis = []
frame_count = 0
lost_frame = []
img_list = []
while(cap.isOpened()):
    ret, frame = cap.read()

    if ret == True:

        # Convert the BGR image to RGB and process it with MediaPipe Pose.
        results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        annotated_image = frame.copy()

        if results.pose_landmarks != None:
            mp_drawing.draw_landmarks(annotated_image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

            right_foot_index.append(results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_FOOT_INDEX].y)
            left_foot_index.append(results.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_FOOT_INDEX].y)

            img_list.append(annotated_image)
        else:
            lost_frame.append(frame_count)

        frame_count += 1
        out.write(annotated_image)

    else:
        break

cap.release()
out.release()

In [None]:
#arise from chair

pose = mp_pose.Pose(static_image_mode=True,min_detection_confidence=0.4)

right_shoulder = []
left_shoulder = []
frame_count = 0
lost_frame = []
img_list = []
while(cap.isOpened()):
    ret, frame = cap.read()

    if ret == True:

        # Convert the BGR image to RGB and process it with MediaPipe Pose.
        results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        annotated_image = frame.copy()

        if results.pose_landmarks != None:
            mp_drawing.draw_landmarks(annotated_image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

            right_shoulder.append(results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER].y)
            left_shoulder.append(results.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER].y)

        else:
            lost_frame.append(frame_count)

        img_list.append(annotated_image)
        frame_count += 1
        out.write(annotated_image)

    else:
        break

cap.release()
out.release()

In [None]:
fig, axs = plt.subplots(2)
axs[0].plot(right_shoulder); axs[0].title.set_text("right shoulder")
axs[1].plot(left_shoulder) ; axs[1].title.set_text("left shoulder")
fig.tight_layout()
#fig.savefig("UPDRS_result/pronation/patient_wave.png")

In [None]:
#single person
pose = mp_pose.Pose(static_image_mode=True,min_detection_confidence=0.4)

right_heel = []
right_knee_angle = []
ankle_dis = []
frame_count = 0
lost_frame = []
img_list = []
while(cap.isOpened()):
    ret, frame = cap.read()

    if ret == True:

        # Convert the BGR image to RGB and process it with MediaPipe Pose.
        results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        annotated_image = frame.copy()

        if results.pose_landmarks != None:
            mp_drawing.draw_landmarks(annotated_image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
            right_heel.append(results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_HEEL].y)

            right_hip_x = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_HIP].x
            right_hip_y = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_HIP].y
            right_knee_x = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_KNEE].x
            right_knee_y = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_KNEE].y
            right_ankle_x = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_ANKLE].x
            right_ankle_y = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_ANKLE].y

            left_ankle_x = results.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_ANKLE].x

            vector_a = [right_knee_x-right_hip_x, right_knee_y-right_hip_y]
            vector_b = [right_ankle_x-right_knee_x, right_ankle_y-right_knee_y]

            right_knee_angle.append(np.degrees(angle(vector_a,vector_b)))
            ankle_dis.append(abs(right_ankle_x-left_ankle_x))
            

            img_list.append(annotated_image)
        else:
            lost_frame.append(frame_count)

        frame_count += 1
        #out.write(annotated_image)

    else:
        break

cap.release()
#out.release()
plt.plot(ankle_dis)

In [None]:
ankle_dis

In [None]:
plt.title("right_shoulder(y axis)")
plt.xlabel("frame")
plt.ylabel("pixel coordinate")
plt.plot(right_shoulder)
plt.savefig("../UPDRS_result/arise/right_shoulder.png")

In [None]:
#leg agility
copy = right_foot_index[:390].copy()

peaks, _ = signal.find_peaks(copy,height=0.86,prominence=0.01)

plt.plot(copy)
plt.plot(peaks,np.array(copy)[peaks],"x")


#calculcate stomp action time
action_time_list = []
action_time_list.append(peaks[0])
for time in np.diff(peaks):
    action_time_list.append(time)

In [None]:
#ankle distance
copy = ankle_dis.copy()
smooth = savgol_filter(copy,window_length=11,polyorder=3)

peaks, _ = signal.find_peaks(smooth,height=0.08,prominence=0.03)

plt.plot(smooth)
plt.plot(peaks,np.array(smooth)[peaks],"x")

plt.title("ankle distance(x axis)")
plt.xlabel("frame")
plt.ylabel("pixel coordinate")
plt.savefig("ankle_dis_peak.png")

In [None]:
copy = img_list[10].copy()
plt.imshow(cv2.cvtColor(copy, cv2.COLOR_BGR2RGB))
#plt.savefig("./UPDRS_gait/heelpeak10.png")

In [None]:
def boxplot_outliers(data):
    # finding the 1st quartile
    q1 = np.quantile(data, 0.25)
    
    # finding the 3rd quartile
    q3 = np.quantile(data, 0.75)
    
    # finding the iqr region
    iqr = q3-q1
    
    # finding upper and lower whiskers
    upper_bound = q3+(15*iqr)
    lower_bound = q1-(15*iqr)
    print(lower_bound,upper_bound)

    removed = [x for x in data if lower_bound < x < upper_bound]

    return removed

def reject_outliers(arr):
    dis = np.array(arr)
    u = np.mean(dis,axis=0)
    s = np.std(dis,axis=0)
    final_list = [x for x in arr if (x > u - 2*s)]
    final_list = [x for x in final_list if (x < u + 2*s)]
    return final_list

def get_median_filtered(signal, threshold=0.1):
    signal = signal.copy()
    signal = np.array(signal)
    difference = np.abs(signal - np.median(signal))
    median_difference = np.median(difference)
    if median_difference == 0:
        s = 0
    else:
        s = difference / float(median_difference)
    mask = s > threshold
    signal[mask] = np.median(signal)
    return signal

In [None]:
removed = boxplot_outliers(right_heel)
plt.title("right heel")
plt.xlabel("frame")
plt.ylabel("pixel coordinate")
plt.plot(removed)
plt.savefig("heel_removed_outlier.png")

In [None]:
#arise from chair
copy = savgol_filter(right_shoulder,31,3)
#copy = right_shoulder.copy()


mid = (min(right_shoulder) + max(right_shoulder))/2
pro = (max(right_shoulder) - mid)

peaks, _ = signal.find_peaks(copy,height=mid,prominence=pro)
valley, _ = signal.find_peaks(np.array(copy)*-1,height=mid*-1)

start_frame = peaks[0]
end_frame = valley[0]

plt.title("right_shoulder")
plt.xlabel("frame")
plt.ylabel("pixel coordinate")

plt.plot(copy)
plt.plot(peaks,np.array(copy)[peaks],"x")
plt.plot(valley,np.array(copy)[valley],"o")
plt.savefig("../UPDRS_result/arise/arise_wave.png")

In [None]:
smooth = savgol_filter(ankle_dis,11,3)

peaks, _ = signal.find_peaks(smooth,height=0.08,prominence=0.03)
#valley, _ = signal.find_peaks(smooth*-1,height=-0.88,prominence=0.01)

fist_closing_frame = peaks

#calculcate fist closing action time
action_time_list = []
action_time_list.append(fist_closing_frame[0])
for time in np.diff(fist_closing_frame):
    action_time_list.append(time)

plt.title("right heel")
plt.xlabel("frame")
plt.ylabel("pixel coordinate")

plt.plot(smooth)
plt.plot(peaks,np.array(smooth)[peaks],"x")
#plt.plot(valley,np.array(smooth)[valley],"o")
#plt.savefig("heel_PeakValley.png")


In [None]:
#write result to video (arise from chair)
action_time = 0
has_result = 0
time_count = 0
state = "sit"
for i in range(len(img_list)):
    if i not in lost_frame:
        if has_result == start_frame:
            state = "arise"
        elif has_result == end_frame:
            state = "stand"
        has_result += 1
    if state == "arise":
        time_count+=1

    cv2.putText(img_list[i],f'state:{state}',(10, 70), cv2.FONT_HERSHEY_SIMPLEX,1.5, (255, 0, 0), 3, cv2.LINE_AA)
    cv2.putText(img_list[i],f'action_time:{time_count/fps:.2f}s',(10, 120), cv2.FONT_HERSHEY_SIMPLEX,1.5, (255, 0, 0), 3, cv2.LINE_AA)
    out.write(img_list[i])

out.release()

In [None]:
#write result to video (leg agility)
action_count = 0
action_time = 0
for i in range(len(img_list)):
    if( len(peaks) > action_count and i == peaks[action_count]):
        action_time = action_time_list[action_count]
        action_count += 1
    cv2.putText(img_list[i],f'action_count:{action_count}',(10, 70), cv2.FONT_HERSHEY_SIMPLEX,1.5, (255, 0, 0), 3, cv2.LINE_AA)
    cv2.putText(img_list[i],f'action_time:{action_time/fps:.2f}s',(10, 120), cv2.FONT_HERSHEY_SIMPLEX,1.5, (255, 0, 0), 3, cv2.LINE_AA)
    out.write(img_list[i])

out.release()

In [None]:
#write result to video (step length)
step = 0
step_length = 0
for i in range(len(img_list)):
    if( len(fist_closing_frame) > step and i == fist_closing_frame[step]):
        step_length = ankle_dis[peaks[step]]
        step += 1
    cv2.putText(img_list[i],f'step:{step}',(10, 70), cv2.FONT_HERSHEY_SIMPLEX,1.5, (255, 0, 0), 3, cv2.LINE_AA)
    cv2.putText(img_list[i],f'step_length:{step_length:.2f}',(10, 120), cv2.FONT_HERSHEY_SIMPLEX,1.5, (255, 0, 0), 3, cv2.LINE_AA)
    out.write(img_list[i])

out.release()