In [None]:
import mediapipe as mp
import torch
import numpy as np
import cv2
import math
import matplotlib.pyplot as plt
import os
from time import time
from tqdm import tqdm

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

In [None]:
%ls -al

In [None]:
# Deadlift 0, Squat 1, BenchPress 2
labels = ['deadlift', 'squat', 'benchpress']
LABEL_DICT = {labels[i]: i for i in range(len(labels))}

# 將每個影片處理成資料集 (角度、人體關鍵點)

## 三維角度計算

In [None]:
def calculateAngle(landmark1, landmark2, landmark3):
    x1, y1, z1 = landmark1.x, landmark1.y, landmark1.z
    x2, y2, z2 = landmark2.x, landmark2.y, landmark2.z
    x3, y3, z3 = landmark3.x, landmark3.y, landmark3.z

    # 計算兩個邊向量
    vec1 = [x1 - x2, y1 - y2, z1 - z2]
    vec2 = [x3 - x2, y3 - y2, z3 - z2]

    # 計算向量長度
    length1 = math.sqrt(vec1[0]**2 + vec1[1]**2 + vec1[2]**2)
    length2 = math.sqrt(vec2[0]**2 + vec2[1]**2 + vec2[2]**2)

    # 計算點乘
    dotProduct = vec1[0]*vec2[0] + vec1[1]*vec2[1] + vec1[2]*vec2[2]

    # 計算夾角(弧度制)
    angleRad = math.acos(dotProduct / (length1 * length2))

    # 將弧度制轉換為角度制
    angleDegree = math.degrees(angleRad)

    if angleDegree < 0:
        angleDegree += 360

    return angleDegree

## 人體大關節角度（非監督模型用）

In [None]:
def getPostureAngles(landmarks):    

    left_elbow_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                                      landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value],
                                      landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value])
    
    right_elbow_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
                                       landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value],
                                       landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value])   
    
    left_shoulder_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value],
                                         landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                                         landmarks[mp_pose.PoseLandmark.LEFT_HIP.value])

    right_shoulder_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value],
                                          landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
                                          landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value])

    left_knee_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.LEFT_HIP.value],
                                     landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value],
                                     landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value])

    right_knee_angle = calculateAngle(landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value],
                                      landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value],
                                      landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value])
    
    angles = np.array([left_elbow_angle, right_elbow_angle, left_shoulder_angle, right_shoulder_angle, left_knee_angle, right_knee_angle])    
    
    return angles

## 對每一個影片拆分成frame 100的片段，在進行姿勢辨識

In [None]:
def constructDatasetFromVideo(video, pose, label, dataPath, videoIndex, sliceLength=100):
    
    # 讀取影片
    cap = cv2.VideoCapture(video)
    
    # 取得影片的總幀數，加快處理速度
    totalFrameNumber = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # 32 個 （x, y, z）和 1 個 label
    print(f"video: {video} label: {label} total frame number: {totalFrameNumber}")

    # 將影片分割成多個片段
    sliceVideoFrame = list(range(sliceLength, totalFrameNumber, sliceLength))
    
    # 對片段中的每張frame進行處理
    for sliceIndex in range(len(sliceVideoFrame)):
         
        # 一個片段有 sliceLength 張frame
        video_data_points = np.zeros((sliceLength, 33 * 4))
        video_data_angles = np.zeros((sliceLength, 6))
        
        # 對每個frame進行處理
        for frameNumber in tqdm(range(sliceLength), desc=f"video {videoIndex} slice {sliceIndex}"):
            
            ret, frame = cap.read()
            
            if not ret:
                break
            
            # 跳過某些前面暖身的片段
            if sliceIndex == 0:
                continue
            
            frameResults = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

            # 紀錄每個frame的pose
            if frameResults.pose_landmarks:
                video_data_angles[frameNumber] = getPostureAngles(frameResults.pose_landmarks.landmark)
                for i in range(33):
                    poseValue = frameResults.pose_landmarks.landmark[mp_pose.PoseLandmark(i).value]
                    video_data_points[frameNumber][i * 3] = poseValue.x
                    video_data_points[frameNumber][i * 3 + 1] = poseValue.y
                    video_data_points[frameNumber][i * 3 + 2] = poseValue.z
                    video_data_points[frameNumber][i * 3 + 3] = poseValue.visibility
            
        # 跳過某些前面暖身的片段
        if sliceIndex == 0:
            continue
        # 將每個片段的資料存入檔案
        else:
            np.save(f"{dataPath}/{label}/points/{videoIndex}_{sliceIndex}_points.npy", video_data_points)
            np.save(f"{dataPath}/{label}/angles/{videoIndex}_{sliceIndex}_angles.npy", video_data_angles)

    cap.release()

# 創立資料夾

In [None]:
sliceL = [70, 100, 120, 150]

for i in sliceL:
    dataPath = "data" + str(i)

    if not os.path.exists(dataPath):
        os.makedirs(dataPath)
        
    for label in labels:
        if not os.path.exists(f"{dataPath}/{label}"):
            os.makedirs(f"{dataPath}/{label}")
            os.makedirs(f"{dataPath}/{label}/points")
            os.makedirs(f"{dataPath}/{label}/angles")

In [None]:
sliceL = [70, 100, 120, 150]

for i in sliceL:
    dataPath = "data" + str(i)
    print(dataPath)

# 處理每個影片並歸類

In [None]:
# 設定pose model，影片、鬱值等
pose = mp_pose.Pose(static_image_mode = False, min_detection_confidence = 0.4, min_tracking_confidence = 0.4, model_complexity = 2)

# 對每個label的資料夾內的影片進行處理
for label in labels:
    directoryPath = f'{label}_raw'
    video_paths = [os.path.join(directoryPath, f) for f in os.listdir(directoryPath) if f.endswith('.mp4')]
    
    for video in video_paths:
        for i in sliceL:
            dataPath = "data" + str(i)
            constructDatasetFromVideo(video, pose, label, dataPath, video_paths.index(video), i)

# 處理資料成數據集

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
import pandas as pd
import seaborn as sns

In [None]:
data_dict = {
    70: {"points": [], "labels": [], "angles": []},
    100: {"points": [], "labels": [], "angles": []},
    120: {"points": [], "labels": [], "angles": []},
    150: {"points": [], "labels": [], "angles": []}
}

for label in labels:
    for slice_length in sliceL:
        data_path = f"data{slice_length}"
        
        for data_type in ["points", "angles"]:
            files = os.listdir(f"{data_path}/{label}/{data_type}")
            files.sort()
            
            for file in files:
                data = np.load(f"{data_path}/{label}/{data_type}/{file}")
                
                if data_type == "points":
                    data_dict[slice_length]["points"].append(data)
                    data_dict[slice_length]["labels"].append(LABEL_DICT[label])
                else:
                    data_dict[slice_length]["angles"].append(data)

In [None]:
points_data = np.array(data_dict[150]["points"])
angles_data = np.array(data_dict[150]["angles"])
labels_data = to_categorical(data_dict[150]["labels"]).astype(int)
print(points_data.shape, angles_data.shape, labels_data.shape)

print(labels_data)

In [None]:
points_train, points_test,  angles_train, angles_test,  labels_train, labels_test  = train_test_split(points_data, angles_data, labels_data, test_size=0.10)
points_train, points_valid, angles_train, angles_valid, labels_train, labels_valid = train_test_split(points_train, angles_train, labels_train, test_size=0.10)

# LSTM 訓練

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


In [None]:
if not os.path.exists('Logs'):
    os.makedirs('Logs')

print(np.shape(points_train[0]))
print(np.shape(angles_train[0]))
print(np.shape(labels_train[0]))

In [None]:
model = Sequential()
model.add(LSTM(256, return_sequences=True, activation='relu', input_shape=(np.shape(points_train[0]))))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(16, return_sequences=True, activation='relu'))
model.add(LSTM(32, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(3, activation='softmax'))

In [None]:
# model = Sequential()
# model.add(LSTM(256, return_sequences=True, activation='relu', input_shape=(np.shape(points_train[0]))))
# model.add(LSTM(16, return_sequences=True, activation='relu'))
# model.add(LSTM(16, return_sequences=True, activation='relu'))
# model.add(LSTM(64, return_sequences=False, activation='relu'))
# model.add(Dense(64, activation='relu'))
# model.add(Dense(32, activation='relu'))
# model.add(Dense(8, activation='relu'))
# model.add(Dense(3, activation='softmax'))

In [None]:
# optimizer = Adam(learning_rate=0.001, clipvalue=0.5)
optimizer = Adam(learning_rate=0.00001)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
# 設置 TensorBoard 
tb_callback = TensorBoard(log_dir='./logs', histogram_freq=1, write_graph=True, write_images=True)

In [None]:
# 訓練模型並進行驗證集驗證
batch_size = 8
epochs = 200

model.fit(points_train, labels_train,
          batch_size=batch_size,
          epochs=epochs,
          validation_data=(points_valid, labels_valid),
          callbacks=[tb_callback])



In [None]:
model.summary()


In [None]:
model.save('broad150.keras')

In [None]:
# model.load_weights('best_80.keras')

# 預測 & 評估


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, classification_report, roc_curve, auc

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc

# 在測試集上評估模型
loss, accuracy = model.evaluate(points_test, labels_test, verbose=0)
print(f'Test Loss: {loss:.4f}')
print(f'Test Accuracy: {accuracy:.4f}')

# 獲取測試集的預測結果
y_pred = model.predict(points_test)
y_pred_labels = np.argmax(y_pred, axis=1)
y_true_labels = np.argmax(labels_test, axis=1)

# 創建一個包含5個子圖的圖形
fig, axes = plt.subplots(1, 5, figsize=(20, 4))

# 計算混淆矩陣並繪製圖形
cm = confusion_matrix(y_true_labels, y_pred_labels)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0])
axes[0].set_xlabel('Predicted Labels')
axes[0].set_ylabel('True Labels')
axes[0].set_title('Confusion Matrix')

# 計算精確率、召回率、F1分數並繪製圖形
report = classification_report(y_true_labels, y_pred_labels, output_dict=True)
sns.barplot(x=list(report.keys())[:-3], y=[report[k]['precision'] for k in list(report.keys())[:-3]], ax=axes[1])
axes[1].set_xlabel('Classes')
axes[1].set_ylabel('Precision')
axes[1].set_title('Precision')

sns.barplot(x=list(report.keys())[:-3], y=[report[k]['recall'] for k in list(report.keys())[:-3]], ax=axes[2])
axes[2].set_xlabel('Classes')
axes[2].set_ylabel('Recall')
axes[2].set_title('Recall')

sns.barplot(x=list(report.keys())[:-3], y=[report[k]['f1-score'] for k in list(report.keys())[:-3]], ax=axes[3])
axes[3].set_xlabel('Classes')
axes[3].set_ylabel('F1 Score')
axes[3].set_title('F1 Score')

# 計算ROC曲線並繪製圖形
fpr = {}
tpr = {}
roc_auc = {}
for i in range(len(np.unique(y_true_labels))):
    fpr[i], tpr[i], _ = roc_curve(labels_test[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

for i in range(len(np.unique(y_true_labels))):
    axes[4].plot(fpr[i], tpr[i], label=f'Class {i} (AUC = {roc_auc[i]:.2f})')
axes[4].plot([0, 1], [0, 1], 'k--')
axes[4].set_xlabel('False Positive Rate')
axes[4].set_ylabel('True Positive Rate')
axes[4].set_title('ROC Curve')
axes[4].legend(loc='lower right')

# 調整子圖之間的間距
plt.tight_layout()

# 顯示圖形
plt.show()

In [None]:
frame_lengths = [70, 100, 120, 150]
test_losses = [0.4708, 1.1183, 1.1026, 50372.9922]
test_accuracies = [0.8052, 0.1961, 0.4048, 0.4062]

fig, ax1 = plt.subplots(figsize=(8, 6))

color1 = 'tab:red'
ax1.set_xlabel('Frame Length')
ax1.set_ylabel('Test Loss', color=color1)
ax1.plot(frame_lengths, test_losses, color=color1, marker='o', linestyle='-', linewidth=2)
ax1.tick_params(axis='y', labelcolor=color1)

ax2 = ax1.twinx()

color2 = 'tab:blue'
ax2.set_ylabel('Test Accuracy', color=color2)
ax2.plot(frame_lengths, test_accuracies, color=color2, marker='o', linestyle='-', linewidth=2)
ax2.tick_params(axis='y', labelcolor=color2)

fig.tight_layout()
plt.title('Test Loss and Test Accuracy vs Frame Length')
plt.xticks(frame_lengths)
plt.grid(True)
plt.show()

# 建立隨機森林模型（使用關鍵點的角度、不使用關鍵點本身）

In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
angles_data = np.array(data_dict[70]["angles"])
labels_data = np.array(data_dict[70]["labels"]).astype(int)
angles_data_2d = angles_data.reshape(angles_data.shape[0], -1)

print(angles_data_2d.shape, labels_data.shape)

angles_train, angles_test,  labels_train, labels_test  =  train_test_split(angles_data_2d, labels_data,  test_size=0.10)
angles_train, angles_valid, labels_train, labels_valid =  train_test_split(angles_train,   labels_train, test_size=0.10)

In [None]:
# 創建隨機森林分類器
rf_model = RandomForestClassifier(n_estimators=100)

# 訓練模型
rf_model.fit(angles_train, labels_train)

# 在驗證集上進行預測
y_pred_valid = rf_model.predict(angles_valid)

# 在測試集上進行預測
y_pred_test = rf_model.predict(angles_test)

In [None]:
# 創建一個包含5個子圖的圖形
fig, axes = plt.subplots(1, 5, figsize=(20, 4))

accuracy_test = accuracy_score(labels_test, y_pred_test)
print(f'Test Accuracy: {accuracy_test:.4f}')

# 計算混淆矩陣並繪製圖形
cm = confusion_matrix(labels_test, y_pred_test)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0])
axes[0].set_xlabel('Predicted Labels')
axes[0].set_ylabel('True Labels')
axes[0].set_title('Confusion Matrix')

# 計算精確率、召回率、F1分數並繪製圖形
report = classification_report(labels_test, y_pred_test, output_dict=True)
sns.barplot(x=list(report.keys())[:-3], y=[report[k]['precision'] for k in list(report.keys())[:-3]], ax=axes[1])
axes[1].set_xlabel('Classes')
axes[1].set_ylabel('Precision')
axes[1].set_title('Precision')

sns.barplot(x=list(report.keys())[:-3], y=[report[k]['recall'] for k in list(report.keys())[:-3]], ax=axes[2])
axes[2].set_xlabel('Classes')
axes[2].set_ylabel('Recall')
axes[2].set_title('Recall')

sns.barplot(x=list(report.keys())[:-3], y=[report[k]['f1-score'] for k in list(report.keys())[:-3]], ax=axes[3])
axes[3].set_xlabel('Classes')
axes[3].set_ylabel('F1 Score')
axes[3].set_title('F1 Score')

# 計算ROC曲線並繪製圖形
fpr = {}
tpr = {}
roc_auc = {}
for i in range(len(np.unique(labels_test))):
    fpr[i], tpr[i], _ = roc_curve(labels_test, y_pred_test, pos_label=i)
    roc_auc[i] = auc(fpr[i], tpr[i])

for i in range(len(np.unique(labels_test))):
    axes[4].plot(fpr[i], tpr[i], label=f'Class {i} (AUC = {roc_auc[i]:.2f})')
axes[4].plot([0, 1], [0, 1], 'k--')
axes[4].set_xlabel('False Positive Rate')
axes[4].set_ylabel('True Positive Rate')
axes[4].set_title('ROC Curve')
axes[4].legend(loc='lower right')

# 調整子圖之間的間距
plt.tight_layout()

# 顯示圖形
plt.show()

# 非監督式學習 使用 K-Means

In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

In [None]:
# 創建 KMeans 聚類模型
n_clusters = len(np.unique(labels_data))  # 設置聚類數量為標籤的唯一值數量
kmeans = KMeans(n_clusters=n_clusters)

# 訓練模型
kmeans.fit(angles_data_2d)

# 獲取聚類標籤
labels_pred = kmeans.labels_

# 評估聚類質量
silhouette_avg = silhouette_score(angles_data_2d, labels_pred)
print(f"Silhouette Score: {silhouette_avg:.4f}")

# 可視化聚類結果
fig, ax = plt.subplots(figsize=(8, 6))

# 繪製散點圖，根據聚類標籤設置顏色
unique_labels = np.unique(labels_pred)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
for label, color in zip(unique_labels, colors):
    mask = labels_pred == label
    ax.scatter(angles_data_2d[mask, 0], angles_data_2d[mask, 1], c=[color], label=f"Cluster {label}", alpha=0.8)

# 繪製聚類中心
centers = kmeans.cluster_centers_
ax.scatter(centers[:, 0], centers[:, 1], c='black', s=200, alpha=0.5, marker='x', label='Cluster Centers')

ax.set_xlabel('Feature 1')
ax.set_ylabel('Feature 2')
ax.set_title('K-means Clustering')
ax.legend()

plt.tight_layout()
plt.show()

# 計算聚類標籤與真實標籤的匹配情況
from sklearn.metrics import accuracy_score
from scipy.optimize import linear_sum_assignment

# 計算聚類標籤與真實標籤的混淆矩陣
cm = np.zeros((n_clusters, n_clusters))
for i in range(n_clusters):
    for j in range(n_clusters):
        cm[i, j] = np.sum((labels_pred == i) & (labels_data == j))

# 使用匈牙利算法找到最佳的聚類標籤與真實標籤的匹配
row_ind, col_ind = linear_sum_assignment(-cm)
labels_pred_matched = np.zeros_like(labels_pred)
for i in range(n_clusters):
    labels_pred_matched[labels_pred == row_ind[i]] = col_ind[i]

# 計算匹配後的準確率
accuracy = accuracy_score(labels_data, labels_pred_matched)
print(f"Clustering Accuracy: {accuracy:.4f}")

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

In [None]:
# 對特徵進行標準化
scaler = StandardScaler()
angles_data_scaled = scaler.fit_transform(angles_data_2d)

# 使用 PCA 進行降維
pca = PCA(n_components=0.95)  # 保留 95% 的方差
angles_data_pca = pca.fit_transform(angles_data_scaled)

# 創建 KMeans 聚類模型
n_clusters = len(np.unique(labels_data))  # 設置聚類數量為標籤的唯一值數量
kmeans = KMeans(n_clusters=n_clusters)

# 訓練模型
kmeans.fit(angles_data_pca)

# 獲取聚類標籤
labels_pred = kmeans.labels_

# 評估聚類質量
silhouette_avg = silhouette_score(angles_data_pca, labels_pred)
print(f"Silhouette Score: {silhouette_avg:.4f}")

# 可視化聚類結果
fig, ax = plt.subplots(figsize=(8, 6))

# 繪製散點圖，根據聚類標籤設置顏色
unique_labels = np.unique(labels_pred)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
for label, color in zip(unique_labels, colors):
    mask = labels_pred == label
    ax.scatter(angles_data_pca[mask, 0], angles_data_pca[mask, 1], c=[color], label=f"Cluster {label}", alpha=0.8)

# 繪製聚類中心
centers = kmeans.cluster_centers_
ax.scatter(centers[:, 0], centers[:, 1], c='black', s=200, alpha=0.5, marker='x', label='Cluster Centers')

ax.set_xlabel('PCA Component 1')
ax.set_ylabel('PCA Component 2')
ax.set_title('K-means Clustering (with PCA)')
ax.legend()

plt.tight_layout()
plt.show()

# 計算聚類標籤與真實標籤的匹配情況
from sklearn.metrics import accuracy_score
from scipy.optimize import linear_sum_assignment

# 計算聚類標籤與真實標籤的混淆矩陣
cm = np.zeros((n_clusters, n_clusters))
for i in range(n_clusters):
    for j in range(n_clusters):
        cm[i, j] = np.sum((labels_pred == i) & (labels_data == j))

# 使用匈牙利算法找到最佳的聚類標籤與真實標籤的匹配
row_ind, col_ind = linear_sum_assignment(-cm)
labels_pred_matched = np.zeros_like(labels_pred)
for i in range(n_clusters):
    labels_pred_matched[labels_pred == row_ind[i]] = col_ind[i]

# 計算匹配後的準確率
accuracy = accuracy_score(labels_data, labels_pred_matched)
print(f"Clustering Accuracy: {accuracy:.4f}")

In [None]:
%load_ext tensorboard
%tensorboard --logdir logs --port=12345

