In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import pickle

from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler

%cd /content/drive/MyDrive/sport/K-NN

/content/drive/MyDrive/sport/K-NN


#KNNモデル作成

###初期設定等

In [3]:
#モデル名：[学習に使うデータ]
target_json = {
    "ANKLE_HEEL_x":["LEFT_ANKLE_x", "RIGHT_ANKLE_x", "LEFT_HEEL_x", "RIGHT_HEEL_x"],
    "ANKLE_HEEL_y":["LEFT_ANKLE_y", "RIGHT_ANKLE_y", "LEFT_HEEL_y", "RIGHT_HEEL_y"],
    "FOOT_INDEX_x":["LEFT_FOOT_INDEX_x", "RIGHT_FOOT_INDEX_x"],
    "FOOT_INDEX_y":["LEFT_FOOT_INDEX_y", "RIGHT_FOOT_INDEX_y"],
    "KNEE_x":["LEFT_KNEE_x", "RIGHT_KNEE_x"],
    "KNEE_y":["LEFT_KNEE_y", "RIGHT_KNEE_y"],
    "HAND_x":["RIGHT_WRIST_x"],
    "HAND_y":["RIGHT_WRIST_y"],
    "SHOULDER_x":["RIGHT_SHOULDER_x"],
    "SHOULDER_y":["RIGHT_SHOULDER_y"],
    "ELBOW_x":["RIGHT_ELBOW_x"],
    "ELBOW_y":["RIGHT_ELBOW_y"],
}

In [4]:
column = 'LEFT_ANKLE_y' #学習したい部位を指定

#2つの動画のデータセットでKNNモデルを学習する
train_video_name1 = "IMG_0893_1940_2060"
train_video_name2 = "IMG_0893_7080_7260"

train_data1 = pd.read_csv(f"dataset/{train_video_name1}/correct_position_data.csv")
train_data2 = pd.read_csv(f"dataset/{train_video_name2}/correct_position_data.csv")

In [5]:
#読み込んだデータフレームの各列について標準化
sc = StandardScaler()

train_data1_np_std = sc.fit_transform(train_data1[train_data1.columns[1:]])
train_data2_np_std = sc.fit_transform(train_data2[train_data2.columns[1:]])

train_data1[train_data1.columns[1:]] = pd.DataFrame(train_data1_np_std)
train_data2[train_data1.columns[1:]] = pd.DataFrame(train_data2_np_std)

###学習データ作成・モデルの訓練

In [6]:
# ベクトルにする際のストライドと窓枠を設定
stride = 1
window_size = 6 #適宜変更
k=2 #適宜変更

num_windows1 = len(train_data1) - window_size + 1
num_windows2 = len(train_data2) - window_size + 1
num_windows = num_windows1 + num_windows2

for model_name, train_data_li in target_json.items():

  X_train = np.zeros((num_windows * len(train_data_li), window_size))
  y_train = np.zeros(num_windows * len(train_data_li)) #正解ラベル作成（正常データのみで学習するためすべて0）

  for count, column in enumerate(train_data_li):
    #時系列データをベクトルに変換
    for i in range(num_windows1):
        X_train[count*num_windows + i] = train_data1[column][i:i+window_size]

    for i in range(num_windows2):
        X_train[count*num_windows + num_windows1+i] = train_data2[column][i:i+window_size]

  print(model_name,len(X_train))

  #モデル訓練
  knn = KNeighborsClassifier(n_neighbors=k, p=2, metric = "minkowski")
  knn.fit(X_train,y_train)

  #モデル保存
  with open(f'./model/{model_name}.pkl', 'wb') as file:
    pickle.dump(knn, file)

ANKLE_HEEL_x 1168
ANKLE_HEEL_y 1168
FOOT_INDEX_x 584
FOOT_INDEX_y 584
KNEE_x 584
KNEE_y 584
HAND_x 292
HAND_y 292
SHOULDER_x 292
SHOULDER_y 292
ELBOW_x 292
ELBOW_y 292


#テスト

###異常検出・補間をしたprocessed_position_data.csvと予測ラベルpred_label.csvを作成

####入力：異常検出・補間したい動画のデータフレーム
####戻り値：processed_position_dataとpred_labelのデータフレーム

In [7]:
#欠損値間の短いシーケンスを削除する関数
def remove_short_nan_sequences(arr, max_length=6):
    mask = ~np.isnan(arr)
    start_indices = np.where(mask & np.roll(~mask, 1))[0]
    end_indices = np.where(mask & np.roll(~mask, -1))[0]
    for start, end in zip(start_indices, end_indices):
        if end - start <= max_length + 1:
            arr[start:end + 1] = np.nan
    return pd.Series(arr)

def Anomaly_detect_interp(df): #dfはposition_dataのこと

  df_processed_position_data = df.copy()#異常検出・補間をしたprocessed_position_data.csv用
  df_pred_label = df.copy()#予測ラベルpred_label.csv用
  df_pred_label[:] = 0

  #各列について標準化
  sc = StandardScaler()
  np_std = sc.fit_transform(df[df.columns[1:]])
  df_std = df.copy()
  df_std[df.columns[1:]] = pd.DataFrame(np_std)

  #使用するKNNモデルをまとめたjson
  #モデル名：[異常検知するデータ]
  model_data_releation = {
      "ANKLE_HEEL_x":["LEFT_ANKLE_x", "RIGHT_ANKLE_x", "LEFT_HEEL_x", "RIGHT_HEEL_x"],
      "ANKLE_HEEL_y":["LEFT_ANKLE_y", "RIGHT_ANKLE_y", "LEFT_HEEL_y", "RIGHT_HEEL_y"],
      "FOOT_INDEX_x":["LEFT_FOOT_INDEX_x", "RIGHT_FOOT_INDEX_x"],
      "FOOT_INDEX_y":["LEFT_FOOT_INDEX_y", "RIGHT_FOOT_INDEX_y"],
      "KNEE_x":["LEFT_KNEE_x", "RIGHT_KNEE_x"],
      "KNEE_y":["LEFT_KNEE_y", "RIGHT_KNEE_y"],
      "HAND_x":["LEFT_WRIST_x","RIGHT_WRIST_x","LEFT_PINKY_x", "RIGHT_PINKY_x", "LEFT_INDEX_x", "RIGHT_INDEX_x", "LEFT_THUMB_x", "RIGHT_THUMB_x"],
      "HAND_y":["LEFT_WRIST_y","RIGHT_WRIST_y","LEFT_PINKY_y", "RIGHT_PINKY_y", "LEFT_INDEX_y", "RIGHT_INDEX_y", "LEFT_THUMB_y", "RIGHT_THUMB_y"],
      "SHOULDER_x":["LEFT_SHOULDER_x","RIGHT_SHOULDER_x"],
      "SHOULDER_y":["LEFT_SHOULDER_y","RIGHT_SHOULDER_y"],
      "ELBOW_x":["LEFT_ELBOW_x","RIGHT_ELBOW_x"],
      "ELBOW_y":["LEFT_ELBOW_y","RIGHT_ELBOW_y"],
  }

  window_size = 6
  num_windows = len(df_std) - window_size + 1

  for model_name, pred_data_li in model_data_releation.items():

    #KNNモデルの読み込み
    with open(f'./model/{model_name}.pkl', 'rb') as file:
      knn = pickle.load(file)

    for column in pred_data_li:
      #---------------------前処理---------------------------
      X = np.zeros((num_windows, window_size))
      #時系列データをベクトルに変形
      for i in range(num_windows):
        X[i] = df_std[column][i:i+window_size]
      X = np.nan_to_num(X, nan=0.0) #nanを0に変換（後から異常として判断される)

      #---------------------異常検知---------------------------
      #異常度計算
      neigh_dist, neigh_ind = knn.kneighbors(X, n_neighbors=k, return_distance=True)
      neigh_dist = np.mean(neigh_dist, axis=1) #neigh_dist（サンプル数,ｋ）はk個の近傍までの距離であるため、列方向で平均をとり異常度とする
      #異常ラベル生成
      pred_label = [1 if error==True else 0 for error in neigh_dist>0.3]
      #読み込み時と整形時の差分を1(異常)で埋める
      for i in range(len(df_std)-len(X)):
        pred_label.append(1)

      df_pred_label[column] = pred_label

      #---------------------削除・補間---------------------------

      #dfから異常と予測したデータを削除したdelete_error_data作成
      delete_error_data = df[column].copy()
      delete_error_data[np.array(pred_label)==1] = np.nan
      #欠損値間の短いシーケンスを削除
      remove_short_nan_sequences_data = remove_short_nan_sequences(delete_error_data.values.copy())
      #補間
      interp_error_data = remove_short_nan_sequences_data.interpolate("cubic")

      df_processed_position_data[column] = interp_error_data

  return df_processed_position_data, df_pred_label

###実行

In [20]:
video_name = "side_1-10" #異常検出・補間したいデータセット指定
position_data = pd.read_csv(f"dataset/{video_name}/position_data.csv")

#実行
processed_position_data, pred_label = Anomaly_detect_interp(position_data)

#保存
processed_position_data.to_csv(f"dataset/{video_name}/processed_position_data.csv",index=False)
pred_label.to_csv(f"dataset/{video_name}/pred_label.csv",index=False)

  df_pred_label[:] = 0
