# 人物の体の向きを予測するモデルAを学習する

#### 手順

1. 元データセット（Market-1501）内の画像全部にmedia pipeを適用し，姿勢が検出された画像ファイルリストを作成する．Find_possible_jp.py
2. １のリストを基に姿勢が検出された画像フォルダを作成する　　Find_possible_jp.py
3. ２のフォルダをlabel-studioにインポートする
4. label-studioでアノテーションする（別のワードファイルで説明）
5. アノテーション結果csvファイルからアノテーション済みの画像ファイルを取得　　Image_list.py
6. 5のパスリストを基にアノテーション済みの画像フォルダを作成する　　Image_list.py
7. 7で作成したフォルダ内の画像にmediapipeを適用する（１．でやっているが，なぜか2回目適用しています．本当は1の結果から抜き出せばいいのに...）　Media_pipe_anotation.py
8. データセット完成
9. 体の向き予測モデルAを学習（損失関数：cross entropy, Arcfaceの2種類を試している）　orient_lerning.py(project10, cross entropy)　orient_lerning_metric.py(project10, ArcFace)
10. 実験で使用する全サンプルにモデルAを適用して，各サンプルの体の向きラベルと体の向き特徴量を取得する．

In [None]:
import os
import glob
import cv2
import mediapipe as mp


def get_image_list(folder_path):
    # フォルダ内の画像ファイルのリストを取得
    image_extensions = ['*.jpg', '*.jpeg', '*.png']
    image_paths = []
    for ext in image_extensions:
        image_paths.extend(glob.glob(os.path.join(folder_path, ext)))
    return image_paths


def check_pose_detection(image_paths):
    mp_pose = mp.solutions.pose
    detected_images = []

    with mp_pose.Pose(static_image_mode=True, model_complexity=2) as pose:
        for image_path in image_paths:
            image = cv2.imread(image_path)
            if image is None:
                continue

            # 画像をRGBに変換
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            results = pose.process(image_rgb)

            if results.pose_world_landmarks:
                detected_images.append(image_path)

    return detected_images


def save_detected_image_list(detected_images, output_file):
    with open(output_file, 'w') as f:
        for image_path in detected_images:
            f.write(image_path + '\n')


if __name__ == "__main__":
    folder_path = "C:/Users/sugie/PycharmProjects/pythonProject1/Market-1501-v15.09.15/Market-1501-v15.09.15/gt_bbox"
    output_file = "detected_images_gt_bbox.txt"

    image_paths = get_image_list(folder_path)
    detected_images = check_pose_detection(image_paths)
    save_detected_image_list(detected_images, output_file)

    print(f"Total images checked: {len(image_paths)}")
    print(f"Total images with detected poses: {len(detected_images)}")

import os
import shutil

## mediapipeで姿勢が検出された画像のみのフォルダを作成する
def load_detected_image_list(input_file):
    with open(input_file, 'r') as f:
        detected_images = [line.strip() for line in f.readlines()]
    return detected_images


def copy_detected_images(detected_images, destination_folder):
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)

    for image_path in detected_images:
        if os.path.exists(image_path):
            shutil.copy(image_path, destination_folder)


if __name__ == "__main__":
    input_file = "C:/Users/sugie/PycharmProjects/mediapipe/detected_images_gt_bbox.txt"
    destination_folder = "C:/Users/sugie/PycharmProjects/pythonProject1/Market-1501-v15.09.15/Market-1501-v15.09.15/gt_bbox_detected"

    detected_images = load_detected_image_list(input_file)
    copy_detected_images(detected_images, destination_folder)

    print(f"Total images to copy: {len(detected_images)}")


# 5. アノテーション結果csvファイルからアノテーション済みの画像ファイルを取得　　Image_list.py
label-studioでアノテーションを行った後，label-studioないのコマンドでアノテーション結果csvファイルを出力する．

csvファイルからアノテーション済みの画像リストを作成する

# 6. 5のパスリストを基にアノテーション済みの画像フォルダを作成する　　Image_list.py
以降の学習サンプルを作成するために5で取得した画像リストから，アノテーション済み画像フォルダを作成する


In [None]:
""""
label-studioから出力されたアノテーションcsvファイルより，元の画像データディレクトリのうちアノテーション済みの
画像パスを取得し，それを基にアノテーション済み画像フォルダを作成するコード
"""


import os
import shutil
import csv

# CSVファイルのパスを指定
csv_file_path = 'C:/Users//sugie/PycharmProjects/mediapipe/project-2-at-2024-07-16-03-40-eeb46e25.csv'

# 抽出されたimageの部分文字列を格納するリスト（アノテーション済み画像リスト）
image_substrings = []

# CSVファイルを読み込んで処理
with open(csv_file_path, newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        # 空行を無視する
        if row['image'] == '':
            continue
        # /data/local-files/?d=gt_bbox_detected%5以降の文字列を抽出
        image_path = row['image']
        substring = image_path.split('%5C', 1)[1]  #(separator, maxsplit)[リスト番号]
        image_substrings.append(substring)

# 抽出された部分文字列のリストを表示
print(image_substrings)

##以下のコードでimage_substringsに含まれる画像（アノテーション済み画像）で構成されるフォルダを作成する

# 元の画像が保存されているフォルダのパス
source_folder = 'C:/Users/sugie/PycharmProjects/pythonProject1/Market-1501-v15.09.15/Market-1501-v15.09.15/gt_bbox_detected'

# 画像をコピーする新しいフォルダのパス
destination_folder = 'C:/Users/sugie/PycharmProjects/pythonProject1/Market-1501-v15.09.15/Market-1501-v15.09.15/gt_bbox_detected_anotation'

# source_folder内のファイルをリスト
files = os.listdir(source_folder)

# image_substringsリストに含まれている画像ファイルをコピーする
for file in files:
    # ファイル名がimage_substringsリストに含まれているかをチェック
    for substring in image_substrings:
        if substring in file:
            # ファイルのフルパスを作成
            source_file = os.path.join(source_folder, file)
            # ファイルを新しいフォルダにコピー
            shutil.copy(source_file, destination_folder)
            print(f"Copied: {source_file} to {destination_folder}")
            break  # マッチする部分文字列が見つかったら次のファイルへ

# 追加のデバッグメッセージ
print("Copy process completed.")



# 7. 6で作成した画像フォルダにmedia_pipeを適用する

In [None]:
"""
アノテーション済みフォルダの画像ファイルにmediapipeを適用して
file_name/choice(正解ラベル)/姿勢情報
をcsvファイルで出力するコード

"""

##以下，全てのサンプルにmediapipeを適用するコード

import os
import glob
import math
import cv2
import csv
import mediapipe as mp
import pandas as pd

# Mediapipe Poseの初期化
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

# 入力フォルダのパス
input_folder = 'C:/Users/sugie/PycharmProjects/pythonProject1/Market-1501-v15.09.15/Market-1501-v15.09.15/gt_bbox_detected_anotation'
# 出力CSVファイルのパス
output_csv_file = 'C:/Users/sugie/PycharmProjects/mediapipe/detected_images_gt_bbox_mediapipe.csv'

# 画像のリサイズ設定
DESIRED_HEIGHT = 128
DESIRED_WIDTH = 64

# CSVファイルのパスを指定
csv_file_path = 'C:/Users/sugie/PycharmProjects/mediapipe/project-2-at-2024-07-16-03-40-eeb46e25.csv'

# 抽出されたimageの部分文字列と対応するchoiceの値を格納するリスト
image_substrings = []

# CSVファイルを読み込んで処理
with open(csv_file_path, newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        # 空行を無視する
        if row['image'] == '':
            continue
        # /data/local-files/?d=gt_bbox_detected%5以降の文字列を抽出
        image_path = row['image']
        substring = image_path.split('%5C', 1)[1]  #(separator, maxsplit)[リスト番号]
        # 部分文字列と対応するchoiceの値をタプルとしてリストに追加
        image_substrings.append((substring, row['choice']))

# CSVのヘッダを準備
def fields_name():
    fields = ['file_name', 'choice']
    for i in range(33):
        fields.append(f'{i}_x')
        fields.append(f'{i}_y')
        fields.append(f'{i}_z')
        fields.append(f'{i}_v')
    return fields

# 画像のリストを取得
image_paths = glob.glob(os.path.join(input_folder, '*.jpg'))  # 拡張子がjpgの場合

# 結果をCSVファイルに保存
with open(output_csv_file, 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=fields_name())
    writer.writeheader()

    # Mediapipe Poseの適用
    with mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5, model_complexity=2) as pose:
        for image_path in image_paths:
            image = cv2.imread(image_path)
            if image is None:
                print(f"Failed to load image: {image_path}")
                continue

            # 画像をリサイズ
            h, w = image.shape[:2]
            if h < w:
                image = cv2.resize(image, (DESIRED_WIDTH, math.floor(h / (w / DESIRED_WIDTH))))
            else:
                image = cv2.resize(image, (math.floor(w / (h / DESIRED_HEIGHT)), DESIRED_HEIGHT))

            # Mediapipe Poseを適用
            results = pose.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

            if not results.pose_world_landmarks:
                print(f"No landmarks detected for image: {image_path}")
                continue

            # ランドマークが6つ以上検出できないサンプルを消去
            visible_landmarks = [landmark for landmark in results.pose_world_landmarks.landmark if landmark.visibility > 0.5]
            if len(visible_landmarks) < 6:
                print(f"Not enough landmarks detected for image: {image_path}")
                continue

            # 結果をCSVに書き込み
            record = {'file_name': os.path.basename(image_path)}
            # 対応するchoiceの値を追加
            substring_key = os.path.basename(image_path)
            choice_value = next((choice for substring, choice in image_substrings if substring in substring_key), 'N/A')
            record['choice'] = choice_value
            for i, landmark in enumerate(results.pose_world_landmarks.landmark):
                record[f'{i}_x'] = landmark.x
                record[f'{i}_y'] = landmark.y
                record[f'{i}_z'] = landmark.z
                record[f'{i}_v'] = landmark.visibility
            writer.writerow(record)

print("Processing complete.")


# 9. 体の向きを予測するモデルを学習（モデルA）
損失関数：Cross entropy

入力：1枚の人物画像にmediapipe_poseを適用して得た33のランドマークの画像内xyz座標と視認性(visavilitty)の132次元ベクトル

出力：体の向き予測ラベル/前・横・後ろ（one-hot形式）

- Netクラス　（3層の全結合層とドロップアウト層のニューラルネットワーク）
- ラベルの統合とエンコーディング　（アノテーション時はカメラに対して8方向の体の向きラベルを与えていた．今回の学習では前・横・後ろの3種類を予測するため，ラベルを統合ｓいている）


In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import matplotlib.pyplot as plt
import numpy as np
import joblib

# 早期終了のためのクラス定義
class EarlyStopping:
    def __init__(self, patience=5, verbose=0):
        self.epoch = 0
        self.pre_loss = float('inf')
        self.patience = patience
        self.verbose = verbose

    def __call__(self, current_loss):
        if self.pre_loss < current_loss:
            self.epoch += 1
            if self.epoch > self.patience:
                if self.verbose:
                    print('early stopping')
                return True
        else:
            self.epoch = 0
            self.pre_loss = current_loss
        return False

# データセットの定義
class PoseDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# ニューラルネットワークの定義
class Net(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.5):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(132, 100)
        self.fc2 = nn.Linear(100, 50)
        self.bn1 = nn.BatchNorm1d(50)
        self.fc3 = nn.Linear(50, 25)
        self.dropout_1 = nn.Dropout(dropout_rate)
        self.fc4 = nn.Linear(25, num_classes)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.bn1(x)
        x = F.relu(self.fc3(x))
        x = self.dropout_1(x)
        x = self.fc4(x)  # 出力にはsoftmaxを使用しない
        return x

# データの読み込み
data_path = 'C:/Users/sugie/PycharmProjects/mediapipe/detected_images_gt_bbox_mediapipe.csv'
data = pd.read_csv(data_path)

# 特徴量とラベルの分割
X = data.iloc[:, 2:].values  # 3列目以降が特徴量
y = data['choice'].values  # 2列目がラベル

# NaN値を除去
non_nan_indices = [i for i, label in enumerate(y) if label == label]  # NaNでないインデックスを取得
X = X[non_nan_indices]
y = y[non_nan_indices]

# ラベルの統合
label_mapping = {
    'front': 'front',
    'front_left': 'front',
    'front_right': 'front',
    'right': 'side',
    'back_left': 'back',
    'back': 'back',
    'back right': 'back',
    'left': 'side'
}

y = [label_mapping[label] for label in y]

# ラベルのエンコーディング
le = LabelEncoder()
y = le.fit_transform(y)

# データフレームに変換
df = pd.DataFrame(X)
df['label'] = y

# ラベルエンコーダーの保存
joblib.dump(le, 'label_encoder.joblib')

# 各ラベルの最小数に合わせてサンプリングする関数
def resample_to_min_label(df):
    # 各ラベルのサンプル数を確認
    min_label_count = df['label'].value_counts().min()

    # 各ラベルを最小数に合わせてサンプリング
    resampled_df = df.groupby('label').apply(lambda x: x.sample(min_label_count)).reset_index(drop=True)

    return resampled_df

# 最小ラベル数に合わせてデータをサンプリング
resampled_df = resample_to_min_label(df)

# 特徴量とラベルに分割
X = resampled_df.iloc[:, :-1].values
y = resampled_df['label'].values

# クラス数の確認
num_classes = len(le.classes_)
print(num_classes)

# データの標準化
scaler = StandardScaler()
X = scaler.fit_transform(X)

# スケーラーの保存
joblib.dump(scaler, 'scaler.joblib')

# データを訓練データと一時テストデータに分割
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)

# 一時テストデータを検証データとテストデータに分割
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# 各データセットのラベルごとのサンプル数を表示
def print_label_counts(y, dataset_name):
    label_counts = np.bincount(y)
    print(f"{dataset_name} label counts:")
    for i, count in enumerate(label_counts):
        print(f"{le.classes_[i]}: {count}")

print_label_counts(y_train, "Training data")
print_label_counts(y_val, "Validation data")
print_label_counts(y_test, "Test data")

# データローダーの作成
train_dataset = PoseDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = PoseDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
test_dataset = PoseDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# モデル、損失関数、オプティマイザーの定義
model = Net(num_classes=num_classes, dropout_rate=0.5)  # ドロップアウト率を0.5に設定
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)  # L2正則化を追加

# 早期終了の設定
early_stopping = EarlyStopping(patience=5, verbose=1)

# 訓練ループ
num_epochs = 50
train_loss_history = []
val_loss_history = []

for epoch in range(num_epochs):
    model.train()
    running_train_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_train_loss += loss.item()

    epoch_train_loss = running_train_loss / len(train_loader)
    train_loss_history.append(epoch_train_loss)

    model.eval()
    running_val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_val_loss += loss.item()

    epoch_val_loss = running_val_loss / len(val_loader)
    val_loss_history.append(epoch_val_loss)

    print(
        f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {epoch_train_loss:.4f}, Validation Loss: {epoch_val_loss:.4f}')

    # Early Stoppingのチェック
    if early_stopping(epoch_val_loss):
        break

# 損失合計の表を出力
loss_df = pd.DataFrame({'Train Loss': train_loss_history, 'Validation Loss': val_loss_history})
print(loss_df)

# 損失合計の表をCSVファイルとして保存
loss_df.to_csv('loss_history.csv', index=False)

# 損失履歴のプロット
plt.plot(train_loss_history, label='Train Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss History')
plt.legend()
plt.show()

# モデルの保存
model_path = 'pose_classification_model.pth'
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}')

# テストループ
model.eval()
correct = 0
total = 0
correct_by_class = [0] * num_classes
total_by_class = [0] * num_classes

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        for label, prediction in zip(labels, predicted):
            total_by_class[label] += 1
            if label == prediction:
                correct_by_class[label] += 1

print(f'Accuracy: {100 * correct / total:.2f}%')

# 各クラスごとの正答率を計算
for i in range(num_classes):
    accuracy = 100 * correct_by_class[i] / total_by_class[i] if total_by_class[i] > 0 else 0
    print(f'Accuracy for class {le.classes_[i]}: {accuracy:.2f}%')


# 9. 体の向きを予測するモデルを学習（モデルA）
損失関数：ArcFace Loss

入力：1枚の人物画像にmediapipe_poseを適用して得た33のランドマークの画像内xyz座標と視認性(visavilitty)の132次元ベクトル

出力：3次元ベクトル　（その後Softmax関数を適用して体の向きラベルを出力している）

- Netクラス　（3層の全結合層とドロップアウト層のニューラルネットワーク）
- ラベルの統合とエンコーディング　（アノテーション時はカメラに対して8方向の体の向きラベルを与えていた．今回の学習では前・横・後ろの3種類を予測するため，ラベルを統合しいている）
- また距離学習の結果を可視化するために，UMAPにて可視化している．

In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import matplotlib.pyplot as plt
import numpy as np
import joblib
from pytorch_metric_learning.losses import ArcFaceLoss

from pytorch_metric_learning import losses, distances, regularizers
import umap

# 早期終了のためのクラス定義（変更なし）
class EarlyStopping:
    def __init__(self, patience=15, verbose=0):
        self.epoch = 0
        self.pre_loss = float('inf')
        self.patience = patience
        self.verbose = verbose

    def __call__(self, current_loss):
        if self.pre_loss < current_loss:
            self.epoch += 1
            if self.epoch > self.patience:
                if self.verbose:
                    print('early stopping')
                return True
        else:
            self.epoch = 0
            self.pre_loss = current_loss
        return False

# データセットの定義（変更なし）
class PoseDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# ニューラルネットワークの定義
class Net(nn.Module):
    def __init__(self, embedding_size=3):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(132, 100)
        self.fc2 = nn.Linear(100, 50)
        self.bn1 = nn.BatchNorm1d(50)
        self.fc3 = nn.Linear(50, 25)
        self.fc4 = nn.Linear(25, embedding_size)  # 出力を3次元の埋め込みサイズに設定

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.bn1(x)
        x = F.relu(self.fc3(x))
        x = self.fc4(x)  # 出力にはsoftmaxを使用しない
        return x



# データの読み込み
data_path = 'C:/Users/sugie/PycharmProjects/mediapipe/detected_images_gt_bbox_mediapipe.csv'
data = pd.read_csv(data_path)

# 特徴量とラベルの分割
X = data.iloc[:, 2:].values  # 3列目以降が特徴量
y = data['choice'].values  # 2列目がラベル

# NaN値を除去
non_nan_indices = [i for i, label in enumerate(y) if label == label]  # NaNでないインデックスを取得
X = X[non_nan_indices]
y = y[non_nan_indices]

# ラベルの統合
label_mapping = {
    'front': 'front',
    'front_left': 'front',
    'front_right': 'front',
    'right': 'side',
    'back_left': 'back',
    'back': 'back',
    'back right': 'back',
    'left': 'side'
}

y = [label_mapping[label] for label in y]

# ラベルのエンコーディング
le = LabelEncoder()
y = le.fit_transform(y)

# データフレームに変換
df = pd.DataFrame(X)
df['label'] = y

# ラベルエンコーダーの保存
joblib.dump(le, 'label_encoder.joblib')

# 各ラベルの最小数に合わせてサンプリングする関数
def resample_to_min_label(df):
    # 各ラベルのサンプル数を確認
    min_label_count = df['label'].value_counts().min()

    # 各ラベルを最小数に合わせてサンプリング
    resampled_df = df.groupby('label').apply(lambda x: x.sample(min_label_count)).reset_index(drop=True)

    return resampled_df

# 最小ラベル数に合わせてデータをサンプリング
resampled_df = resample_to_min_label(df)

# 特徴量とラベルに分割
X = resampled_df.iloc[:, :-1].values
y = resampled_df['label'].values

# クラス数の確認
num_classes = len(le.classes_)
print(num_classes)

# データの標準化
scaler = StandardScaler()
X = scaler.fit_transform(X)

# スケーラーの保存
joblib.dump(scaler, 'scaler.joblib')

# データを訓練データと一時テストデータに分割
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)

# 一時テストデータを検証データとテストデータに分割
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# 各データセットのラベルごとのサンプル数を表示
def print_label_counts(y, dataset_name):
    label_counts = np.bincount(y)
    print(f"{dataset_name} label counts:")
    for i, count in enumerate(label_counts):
        print(f"{le.classes_[i]}: {count}")

print_label_counts(y_train, "Training data")
print_label_counts(y_val, "Validation data")
print_label_counts(y_test, "Test data")

# データローダーの作成
train_dataset = PoseDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = PoseDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
test_dataset = PoseDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# モデルの定義
model = Net(embedding_size=3)  # 埋め込みサイズを3に設定

# ArcFaceの定義
distance = distances.CosineSimilarity()
regularizer = regularizers.RegularFaceRegularizer()
arcface_loss = ArcFaceLoss(num_classes=num_classes, embedding_size=3, margin=28.6, scale=64,
                           weight_regularizer=regularizer, distance=distance)

# オプティマイザーの定義（変更なし）
optimizer = optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-5)

# 早期終了の設定（変更なし）
early_stopping = EarlyStopping(patience=5, verbose=1)

# 訓練ループの修正
num_epochs = 300
train_loss_history = []
val_loss_history = []

for epoch in range(num_epochs):
    model.train()
    running_train_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        embeddings = model(inputs)
        loss = arcface_loss(embeddings, labels)
        loss.backward()
        optimizer.step()
        running_train_loss += loss.item()

    epoch_train_loss = running_train_loss / len(train_loader)
    train_loss_history.append(epoch_train_loss)

    model.eval()
    running_val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            embeddings = model(inputs)
            loss = arcface_loss(embeddings, labels)
            running_val_loss += loss.item()

    epoch_val_loss = running_val_loss / len(val_loader)
    val_loss_history.append(epoch_val_loss)

    print(
        f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {epoch_train_loss:.4f}, Validation Loss: {epoch_val_loss:.4f}')

    # Early Stoppingのチェック
    if early_stopping(epoch_val_loss):
        break

# 損失合計の表を出力
loss_df = pd.DataFrame({'Train Loss': train_loss_history, 'Validation Loss': val_loss_history})
print(loss_df)

# 損失合計の表をCSVファイルとして保存
loss_df.to_csv('loss_history.csv', index=False)

# 損失履歴のプロット
plt.plot(train_loss_history, label='Train Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss History')
plt.legend()
plt.show()

# モデルの保存
model_path = 'pose_classification_model_metric.pth'
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}')

# テストデータの可視化のためのUMAPの適用とプロット
def plot_umap(embeddings, labels, n_components=2):
    reducer = umap.UMAP(n_components=n_components, random_state=42)
    umap_embeddings = reducer.fit_transform(embeddings)

    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(umap_embeddings[:, 0], umap_embeddings[:, 1], c=labels, cmap='Spectral', s=5)
    plt.colorbar(scatter, boundaries=np.arange(num_classes+1)-0.5).set_ticks(np.arange(num_classes))
    plt.title('UMAP projection of the embeddings')
    plt.show()



# テストループ
model.eval()
correct = 0
total = 0
correct_by_class = [0] * num_classes
total_by_class = [0] * num_classes

with torch.no_grad():
    for inputs, labels in test_loader:
        embeddings = model(inputs)
        outputs = arcface_loss.get_logits(embeddings)  # ArcFaceの出力からlogitsを取得
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()


        for label, prediction in zip(labels, predicted):
            total_by_class[label] += 1
            if label == prediction:
                correct_by_class[label] += 1

print(f'Accuracy: {100 * correct / total:.2f}%')

# 各クラスごとの正答率を計算（変更なし）
for i in range(num_classes):
    accuracy = 100 * correct_by_class[i] / total_by_class[i] if total_by_class[i] > 0 else 0
    print(f'Accuracy for class {le.classes_[i]}: {accuracy:.2f}%')

# テストループ
model.eval()
test_embeddings = []
test_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        embeddings = model(inputs)
        test_embeddings.append(embeddings.cpu().numpy())
        test_labels.append(labels.cpu().numpy())

test_embeddings = np.concatenate(test_embeddings)
test_labels = np.concatenate(test_labels)

# UMAPを使用して可視化
plot_umap(test_embeddings, test_labels, n_components=2)  # 2次元での可視化

# 10.モデルAを適用して，各サンプルの体の向きラベルと体の向き特徴量を取得する
このコードはArcFacelossで学習したモデルAを使用している．

以降の処理で自前の実験用のデータセットを使用するので，全サンプルにモデルAを適用する．

#### result_dfについて
- "label" 人物ラベル（体の向きラベルでないことに注意！！）
- "predicted_1"-"predicted_3"　体の向き特徴量
- 'predicted_label' 体の向きラベル


（注意）

現行の実験では以下のコードを使用していない．外観特徴量にCeoss entropyで出力した予測ラベルとArcFaceLossで出力した体の向き特徴量を統合している．（本当はどっちかに統一する必要あり）

In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import StandardScaler, LabelEncoder
import joblib
import numpy as np

# データセットの定義
class PoseDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# ニューラルネットワークの定義
class Net(nn.Module):
    def __init__(self, embedding_size=3):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(132, 100)
        self.fc2 = nn.Linear(100, 50)
        self.bn1 = nn.BatchNorm1d(50)
        self.fc3 = nn.Linear(50, 25)
        self.fc4 = nn.Linear(25, embedding_size)  # 出力を3次元の埋め込みサイズに設定

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.bn1(x)
        x = F.relu(self.fc3(x))
        x = self.fc4(x)  # 出力にはsoftmaxを使用しない
        return x

# ラベルエンコーダーと標準化スケーラーのロード
le = joblib.load('label_encoder.joblib')
scaler = joblib.load('scaler.joblib')

# クラス数の確認
num_classes = len(le.classes_)

# モデルのロード
model_path = 'pose_classification_model_metric.pth'
# モデルの定義
model = Net(embedding_size=3)  # 埋め込みサイズを3に設定
model.load_state_dict(torch.load(model_path))
model.eval()

# 新しいデータの読み込み
new_data_path = 'C:/Users/sugie/PycharmProjects/mediapipe/orientation/exp1_3Dlandmark_csv/exp13Dlandmark_all.csv'
new_data = pd.read_csv(new_data_path)

# データの内容を確認
print(new_data.head())
print(new_data.columns)

# ファイル名の抽出
file_names = new_data['file_name'].values  # ファイル名

# labelの抽出
label_names = new_data['label'].values  # ラベル名

#　point_IDの抽出
point_ID = new_data["point_ID"].values  #point_ID

# 特徴量の抽出（数値データのみ）
new_X = new_data.iloc[:, 3:].values

# データの標準化
new_X = scaler.transform(new_X)

# データローダーの作成
new_dataset = PoseDataset(torch.tensor(new_X, dtype=torch.float32), torch.tensor([0]*len(new_X), dtype=torch.long))
new_loader = DataLoader(new_dataset, batch_size=32, shuffle=False)

# ラベルをエンコードされた数値に変換し、結果データフレームに追加する部分
predicted_labels = []

with torch.no_grad():
    for inputs, _ in new_loader:
        outputs = model(inputs)
        predictions.extend(outputs.numpy())
        # 各ベクトルを最も近いクラスに分類
        _, predicted = torch.max(outputs, 1)
        predicted_labels.extend(predicted.numpy())

# 予測ラベルをデコードし、テキストラベルに変換
predicted_labels = le.inverse_transform(predicted_labels)

# 結果をデータフレームに追加
results_df = pd.DataFrame({
    "label": label_names,
    "point_ID": point_ID,
    'file_name': file_names[:len(pred_1)],
    'predicted_1': pred_1,
    'predicted_2': pred_2,
    'predicted_3': pred_3,
    'predicted_label': predicted_labels  # 予測ラベルを追加
})

# CSVファイルに出力
results_df.to_csv('predicted_exp1_metric.csv', index=False)
print('Predictions saved to predicted_exp1_metric.csv')