In [1]:
import os
import re
import glob
import numpy as np
import pandas as pd
from keras.preprocessing.image import load_img, img_to_array
from keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from sklearn.model_selection import train_test_split

# Mish activation function
def mish(x):
    return x * tf.math.tanh(tf.math.softplus(x))

# 이미지 로드 및 전처리
def load_and_preprocess_image(image_path, target_size=(128, 128)):
    image = load_img(image_path, color_mode='grayscale', target_size=target_size)
    image_array = img_to_array(image)
    image_array /= 255.0
    return image_array

# 파일 이름에서 session과 point 추출
def extract_session_and_point(filename):
    session_match = re.search(r'img_(\d+)', filename)
    point_match = re.search(r'\((\d+)\)', filename)
    session = int(session_match.group(1)) if session_match else None
    point = int(point_match.group(1)) if point_match else None
    return session, point

# 병목 구조
def bottleneck_block(x, filters, downsample=False):
    shortcut = x
    strides = (2, 2) if downsample else (1, 1)

    x = layers.Conv2D(filters, kernel_size=(1, 1), strides=strides, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)

    x = layers.Conv2D(filters, kernel_size=(3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)

    x = layers.Conv2D(filters * 4, kernel_size=(1, 1), padding='same')(x)
    x = layers.BatchNormalization()(x)

    if downsample or shortcut.shape[-1] != filters * 4:
        shortcut = layers.Conv2D(filters * 4, kernel_size=(1, 1), strides=strides, padding='same')(shortcut)
    
    x = layers.add([x, shortcut])
    x = layers.Activation(mish)(x)
    return x

# ResNet-50
def create_resnet50_model(input_shape):
    input_layer = Input(shape=input_shape)

    x = layers.Conv2D(64, kernel_size=(7, 7), strides=(2, 2), padding='same')(input_layer)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)
    x = layers.MaxPooling2D(pool_size=(3, 3), strides=(2, 2), padding='same')(x)

    filter_sizes = [64, 128, 256, 512]
    num_blocks = [3, 4, 6, 3]  # ResNet-50에서 각 블록별 반복 횟수
    for filters, blocks in zip(filter_sizes, num_blocks):
        for i in range(blocks):
            x = bottleneck_block(x, filters, downsample=(i == 0 and filters != 64))
    
    x = layers.GlobalAveragePooling2D()(x)
    return Model(inputs=input_layer, outputs=x)

# MLP
def create_mlp_model(input_shape):
    input_layer = Input(shape=input_shape)
    x = layers.Dense(128, activation=mish)(input_layer)
    x = layers.Dense(64, activation=mish)(x)
    x = layers.Dense(3, activation=mish)(x)
    x = layers.Flatten()(x)
    return Model(inputs=input_layer, outputs=x)

# LOSO 교차 검증
folder_path = r"C:\Users\admin\Desktop\sihoon\webcam\img"
csv_path = r"C:\Users\admin\Desktop\sihoon\webcam\results"

subject_folders = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
results = []

for test_subject in subject_folders:
    print(f"Testing on subject: {test_subject}")
    
    test_folder = os.path.join(folder_path, test_subject)
    test_csv = os.path.join(csv_path, f"{test_subject}.csv")
    test_images = glob.glob(os.path.join(test_folder, '*.jpg'))
    
    test_image_data = []
    for img in test_images:
        session, point = extract_session_and_point(os.path.basename(img))
        test_image_data.append({'Filename': img, 'Session': session, 'Point': point})
    test_df = pd.DataFrame(test_image_data)
    
    if os.path.exists(test_csv):
        test_csv_data = pd.read_csv(test_csv)
        test_merged = pd.merge(test_df, test_csv_data, left_on=['Session', 'Point'], right_on=['session', 'point'])
    else:
        print(f"No CSV data for {test_subject}. Skipping.")
        continue
    
    test_images_array = np.array([load_and_preprocess_image(path) for path in test_merged['Filename']])
    test_features = test_merged.drop(['Filename', 'session', 'point', 'Session', 'Point'], axis=1).values
    test_labels = to_categorical(test_merged['Point'].values)
    
    train_subjects = [s for s in subject_folders if s != test_subject]
    train_images = []
    train_csv_data = pd.DataFrame()
    for train_subject in train_subjects:
        train_folder = os.path.join(folder_path, train_subject)
        train_csv = os.path.join(csv_path, f"{train_subject}.csv")
        train_images.extend(glob.glob(os.path.join(train_folder, '*.jpg')))
        if os.path.exists(train_csv):
            train_csv_data = pd.concat([train_csv_data, pd.read_csv(train_csv)])
    
    train_image_data = []
    for img in train_images:
        session, point = extract_session_and_point(os.path.basename(img))
        train_image_data.append({'Filename': img, 'Session': session, 'Point': point})
    train_df = pd.DataFrame(train_image_data)
    train_merged = pd.merge(train_df, train_csv_data, left_on=['Session', 'Point'], right_on=['session', 'point'])
    
    train_images_array = np.array([load_and_preprocess_image(path) for path in train_merged['Filename']])
    train_features = train_merged.drop(['Filename', 'session', 'point', 'Session', 'Point'], axis=1).values
    train_labels = to_categorical(train_merged['Point'].values)
    
    # 모델 구성
    right_eye_model = create_resnet50_model((128, 128, 1))
    left_eye_model = create_resnet50_model((128, 128, 1))
    mlp_model = create_mlp_model(train_features.shape[1:])
    
    combined_input = layers.concatenate([right_eye_model.output, left_eye_model.output, mlp_model.output])
    x = layers.Dense(256, activation=mish)(combined_input)
    x = layers.Dropout(0.5)(x)
    output_layer = layers.Dense(len(valid_labels), activation='softmax')(x)
    
    combined_model = Model(inputs=[right_eye_model.input, left_eye_model.input, mlp_model.input], outputs=output_layer)
    
    combined_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    
    # 모델 학습
    combined_model.fit([train_images_array, train_images_array, train_features], train_labels, 
                       epochs=10, batch_size=1, verbose=1)
    
    # 테스트 데이터 평가
    loss, accuracy = combined_model.evaluate([test_images_array, test_images_array, test_features], test_labels)
    print(f"Subject {test_subject} - Accuracy: {accuracy:.4f}")
    results.append({'Subject': test_subject, 'Accuracy': accuracy})

# 최종 결과
results_df = pd.DataFrame(results)
results_df.to_csv(os.path.join(csv_path, "loso_results.csv"), index=False)
print("LOSO results saved.")


Testing on subject: hsb
Epoch 1/10




ValueError: Arguments `target` and `output` must have the same shape. Received: target.shape=(1, 46), output.shape=(1, 2)

In [2]:
import os
import re
import glob
import numpy as np
import pandas as pd
from keras.preprocessing.image import load_img, img_to_array
from keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from sklearn.model_selection import train_test_split

# Mish activation function
def mish(x):
    return x * tf.math.tanh(tf.math.softplus(x))

# 이미지 로드 및 전처리
def load_and_preprocess_image(image_path, target_size=(128, 128)):
    image = load_img(image_path, color_mode='grayscale', target_size=target_size)
    image_array = img_to_array(image)
    image_array /= 255.0
    return image_array

# 파일 이름에서 session과 point 추출
def extract_session_and_point(filename):
    session_match = re.search(r'img_(\d+)', filename)
    point_match = re.search(r'\((\d+)\)', filename)
    session = int(session_match.group(1)) if session_match else None
    point = int(point_match.group(1)) if point_match else None
    return session, point

# 병목 구조
def bottleneck_block(x, filters, downsample=False):
    shortcut = x
    strides = (2, 2) if downsample else (1, 1)

    x = layers.Conv2D(filters, kernel_size=(1, 1), strides=strides, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)

    x = layers.Conv2D(filters, kernel_size=(3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)

    x = layers.Conv2D(filters * 4, kernel_size=(1, 1), padding='same')(x)
    x = layers.BatchNormalization()(x)

    if downsample or shortcut.shape[-1] != filters * 4:
        shortcut = layers.Conv2D(filters * 4, kernel_size=(1, 1), strides=strides, padding='same')(shortcut)
    
    x = layers.add([x, shortcut])
    x = layers.Activation(mish)(x)
    return x

# ResNet-50
def create_resnet50_model(input_shape):
    input_layer = Input(shape=input_shape)

    x = layers.Conv2D(64, kernel_size=(7, 7), strides=(2, 2), padding='same')(input_layer)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)
    x = layers.MaxPooling2D(pool_size=(3, 3), strides=(2, 2), padding='same')(x)

    filter_sizes = [64, 128, 256, 512]
    num_blocks = [3, 4, 6, 3]  # ResNet-50에서 각 블록별 반복 횟수
    for filters, blocks in zip(filter_sizes, num_blocks):
        for i in range(blocks):
            x = bottleneck_block(x, filters, downsample=(i == 0 and filters != 64))
    
    x = layers.GlobalAveragePooling2D()(x)
    return Model(inputs=input_layer, outputs=x)

# MLP
def create_mlp_model(input_shape):
    input_layer = Input(shape=input_shape)
    x = layers.Dense(128, activation=mish)(input_layer)
    x = layers.Dense(64, activation=mish)(x)
    x = layers.Dense(3, activation=mish)(x)
    x = layers.Flatten()(x)
    return Model(inputs=input_layer, outputs=x)

# LOSO 교차 검증
folder_path = r"C:\Users\admin\Desktop\sihoon\webcam\img"
csv_path = r"C:\Users\admin\Desktop\sihoon\webcam\results"

subject_folders = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
results = []

for test_subject in subject_folders:
    print(f"Testing on subject: {test_subject}")
    
    test_folder = os.path.join(folder_path, test_subject)
    test_csv = os.path.join(csv_path, f"{test_subject}.csv")
    test_images = glob.glob(os.path.join(test_folder, '*.jpg'))
    
    test_image_data = []
    for img in test_images:
        session, point = extract_session_and_point(os.path.basename(img))
        test_image_data.append({'Filename': os.path.abspath(img), 'Session': session, 'Point': point})
    test_df = pd.DataFrame(test_image_data)
    
    if os.path.exists(test_csv):
        test_csv_data = pd.read_csv(test_csv)
        test_merged = pd.merge(test_df, test_csv_data, left_on=['Session', 'Point'], right_on=['session', 'point'])
    else:
        print(f"No CSV data for {test_subject}. Skipping.")
        continue
    
    test_images_array = np.array([load_and_preprocess_image(path) for path in test_merged['Filename']])
    test_features = test_merged.drop(['Filename', 'session', 'point', 'Session', 'Point'], axis=1).values
    test_labels = to_categorical(test_merged['Point'].values)
    
    train_subjects = [s for s in subject_folders if s != test_subject]
    train_images = []
    train_csv_data = pd.DataFrame()
    for train_subject in train_subjects:
        train_folder = os.path.join(folder_path, train_subject)
        train_csv = os.path.join(csv_path, f"{train_subject}.csv")
        train_images.extend(glob.glob(os.path.join(train_folder, '*.jpg')))
        if os.path.exists(train_csv):
            train_csv_data = pd.concat([train_csv_data, pd.read_csv(train_csv)])
    
    train_image_data = []
    for img in train_images:
        session, point = extract_session_and_point(os.path.basename(img))
        train_image_data.append({'Filename': os.path.abspath(img), 'Session': session, 'Point': point})
    train_df = pd.DataFrame(train_image_data)
    train_merged = pd.merge(train_df, train_csv_data, left_on=['Session', 'Point'], right_on=['session', 'point'])
    
    train_images_array = np.array([load_and_preprocess_image(path) for path in train_merged['Filename']])
    train_features = train_merged.drop(['Filename', 'session', 'point', 'Session', 'Point'], axis=1).values
    train_labels = to_categorical(train_merged['Point'].values)
    
    # 모델 구성
    right_eye_model = create_resnet50_model((128, 128, 1))
    left_eye_model = create_resnet50_model((128, 128, 1))
    mlp_model = create_mlp_model(train_features.shape[1:])
    
    combined_input = layers.concatenate([right_eye_model.output, left_eye_model.output, mlp_model.output])
    x = layers.Dense(256, activation=mish)(combined_input)
    x = layers.Dropout(0.5)(x)
    output_layer = layers.Dense(len(valid_labels), activation='softmax')(x)
    
    combined_model = Model(inputs=[right_eye_model.input, left_eye_model.input, mlp_model.input], outputs=output_layer)
    
    combined_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    
    # 모델 학습
    combined_model.fit([train_images_array, train_images_array, train_features], train_labels, 
                       epochs=10, batch_size=1, verbose=1)
    
    # 테스트 데이터 평가
    predictions = combined_model.predict([test_images_array, test_images_array, test_features])
    predicted_classes = np.argmax(predictions, axis=1)
    actual_classes = np.argmax(test_labels, axis=1)
    
    loss, accuracy = combined_model.evaluate([test_images_array, test_images_array, test_features], test_labels)
    print(f"Subject {test_subject} - Accuracy: {accuracy:.4f}")
    
    # 결과 저장 (절대 경로, 예측값, 실제 값 포함)
    for idx, (image_path, feature, pred_class, actual_class) in enumerate(zip(
        test_merged['Filename'], test_features, predicted_classes, actual_classes)):
        results.append({
            'Subject': test_subject,
            'Test Accuracy': accuracy,
            'Test Loss': loss,
            'Image File': image_path,  # 절대 경로
            'Feature': feature.tolist(),  # Features
            'Predicted Class': pred_class,  # 예측 값
            'Actual Class': actual_class   # 실제 값
        })

# 최종 결과
results_df = pd.DataFrame(results)

# 결과 저장
results_df.to_csv(os.path.join(csv_path, "loso_results_with_details.csv"), index=False, encoding='utf-8')
print("LOSO results with detailed information saved.")


Testing on subject: hsb


NameError: name 'valid_labels' is not defined

In [3]:
import os
import re
import glob
import numpy as np
import pandas as pd
from keras.preprocessing.image import load_img, img_to_array
from keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from sklearn.model_selection import train_test_split

# Mish activation function
def mish(x):
    return x * tf.math.tanh(tf.math.softplus(x))

# 이미지 로드 및 전처리
def load_and_preprocess_image(image_path, target_size=(128, 128)):
    image = load_img(image_path, color_mode='grayscale', target_size=target_size)
    image_array = img_to_array(image)
    image_array /= 255.0
    return image_array

# 파일 이름에서 session과 point 추출
def extract_session_and_point(filename):
    session_match = re.search(r'img_(\d+)', filename)
    point_match = re.search(r'\((\d+)\)', filename)
    session = int(session_match.group(1)) if session_match else None
    point = int(point_match.group(1)) if point_match else None
    return session, point

# 병목 구조
def bottleneck_block(x, filters, downsample=False):
    shortcut = x
    strides = (2, 2) if downsample else (1, 1)

    x = layers.Conv2D(filters, kernel_size=(1, 1), strides=strides, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)

    x = layers.Conv2D(filters, kernel_size=(3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)

    x = layers.Conv2D(filters * 4, kernel_size=(1, 1), padding='same')(x)
    x = layers.BatchNormalization()(x)

    if downsample or shortcut.shape[-1] != filters * 4:
        shortcut = layers.Conv2D(filters * 4, kernel_size=(1, 1), strides=strides, padding='same')(shortcut)
    
    x = layers.add([x, shortcut])
    x = layers.Activation(mish)(x)
    return x

# ResNet-50
def create_resnet50_model(input_shape):
    input_layer = Input(shape=input_shape)

    x = layers.Conv2D(64, kernel_size=(7, 7), strides=(2, 2), padding='same')(input_layer)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)
    x = layers.MaxPooling2D(pool_size=(3, 3), strides=(2, 2), padding='same')(x)

    filter_sizes = [64, 128, 256, 512]
    num_blocks = [3, 4, 6, 3]  # ResNet-50에서 각 블록별 반복 횟수
    for filters, blocks in zip(filter_sizes, num_blocks):
        for i in range(blocks):
            x = bottleneck_block(x, filters, downsample=(i == 0 and filters != 64))
    
    x = layers.GlobalAveragePooling2D()(x)
    return Model(inputs=input_layer, outputs=x)

# MLP
def create_mlp_model(input_shape):
    input_layer = Input(shape=input_shape)
    x = layers.Dense(128, activation=mish)(input_layer)
    x = layers.Dense(64, activation=mish)(x)
    x = layers.Dense(3, activation=mish)(x)
    x = layers.Flatten()(x)
    return Model(inputs=input_layer, outputs=x)

# 유효한 라벨 정의
valid_labels = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45]

# LOSO 교차 검증
folder_path = r"C:\Users\admin\Desktop\sihoon\webcam\img"
csv_path = r"C:\Users\admin\Desktop\sihoon\webcam\results"

subject_folders = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
results = []

for test_subject in subject_folders:
    print(f"Testing on subject: {test_subject}")
    
    test_folder = os.path.join(folder_path, test_subject)
    test_csv = os.path.join(csv_path, f"{test_subject}.csv")
    test_images = glob.glob(os.path.join(test_folder, '*.jpg'))
    
    test_image_data = []
    for img in test_images:
        session, point = extract_session_and_point(os.path.basename(img))
        if point in valid_labels:  # valid_labels로 필터링
            test_image_data.append({'Filename': os.path.abspath(img), 'Session': session, 'Point': point})
    test_df = pd.DataFrame(test_image_data)
    
    if os.path.exists(test_csv):
        test_csv_data = pd.read_csv(test_csv)
        test_merged = pd.merge(test_df, test_csv_data, left_on=['Session', 'Point'], right_on=['session', 'point'])
    else:
        print(f"No CSV data for {test_subject}. Skipping.")
        continue
    
    # valid_labels로 다시 필터링
    test_merged = test_merged[test_merged['Point'].isin(valid_labels)]
    
    test_images_array = np.array([load_and_preprocess_image(path) for path in test_merged['Filename']])
    test_features = test_merged.drop(['Filename', 'session', 'point', 'Session', 'Point'], axis=1).values
    test_labels = test_merged['Point'].map({label: idx for idx, label in enumerate(valid_labels)}).values
    test_labels = to_categorical(test_labels, num_classes=len(valid_labels))
    
    train_subjects = [s for s in subject_folders if s != test_subject]
    train_images = []
    train_csv_data = pd.DataFrame()
    for train_subject in train_subjects:
        train_folder = os.path.join(folder_path, train_subject)
        train_csv = os.path.join(csv_path, f"{train_subject}.csv")
        train_images.extend(glob.glob(os.path.join(train_folder, '*.jpg')))
        if os.path.exists(train_csv):
            train_csv_data = pd.concat([train_csv_data, pd.read_csv(train_csv)])
    
    train_image_data = []
    for img in train_images:
        session, point = extract_session_and_point(os.path.basename(img))
        if point in valid_labels:  # valid_labels로 필터링
            train_image_data.append({'Filename': os.path.abspath(img), 'Session': session, 'Point': point})
    train_df = pd.DataFrame(train_image_data)
    train_merged = pd.merge(train_df, train_csv_data, left_on=['Session', 'Point'], right_on=['session', 'point'])
    
    # valid_labels로 다시 필터링
    train_merged = train_merged[train_merged['Point'].isin(valid_labels)]
    
    train_images_array = np.array([load_and_preprocess_image(path) for path in train_merged['Filename']])
    train_features = train_merged.drop(['Filename', 'session', 'point', 'Session', 'Point'], axis=1).values
    train_labels = train_merged['Point'].map({label: idx for idx, label in enumerate(valid_labels)}).values
    train_labels = to_categorical(train_labels, num_classes=len(valid_labels))
    
    # 모델 구성
    right_eye_model = create_resnet50_model((128, 128, 1))
    left_eye_model = create_resnet50_model((128, 128, 1))
    mlp_model = create_mlp_model(train_features.shape[1:])
    
    combined_input = layers.concatenate([right_eye_model.output, left_eye_model.output, mlp_model.output])
    x = layers.Dense(256, activation=mish)(combined_input)
    x = layers.Dropout(0.5)(x)
    output_layer = layers.Dense(len(valid_labels), activation='softmax')(x)
    
    combined_model = Model(inputs=[right_eye_model.input, left_eye_model.input, mlp_model.input], outputs=output_layer)
    
    combined_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    
    # 모델 학습
    combined_model.fit([train_images_array, train_images_array, train_features], train_labels, 
                       epochs=10, batch_size=1, verbose=1)
    
    # 테스트 데이터 평가
    predictions = combined_model.predict([test_images_array, test_images_array, test_features])
    predicted_classes = np.argmax(predictions, axis=1)
    actual_classes = np.argmax(test_labels, axis=1)
    
    loss, accuracy = combined_model.evaluate([test_images_array, test_images_array, test_features], test_labels)
    print(f"Subject {test_subject} - Accuracy: {accuracy:.4f}")
    
    # 결과 저장 (절대 경로, 예측값, 실제 값 포함)
    for idx, (image_path, feature, pred_class, actual_class) in enumerate(zip(
        test_merged['Filename'], test_features, predicted_classes, actual_classes)):
        results.append({
            'Subject': test_subject,
            'Test Accuracy': accuracy,
            'Test Loss': loss,
            'Image File': image_path,  # 절대 경로
            'Feature': feature.tolist(),  # Features
            'Predicted Class': pred_class,  # 예측 값
            'Actual Class': actual_class   # 실제 값
        })

# 최종 결과
results_df = pd.DataFrame(results)

# 결과 저장
results_df.to_csv(os.path.join(csv_path, "loso_results_with_valid_labels.csv"), index=False, encoding='utf-8')
print("LOSO results with valid labels saved.")


Testing on subject: hsb
Epoch 1/10




[1m  628/18400[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m1:40:05[0m 338ms/step - accuracy: 0.0499 - loss: 7.6274

KeyboardInterrupt: 

In [4]:
# 1. 각 서브젝트 폴더에서 수집된 파일 개수 확인
for subject in subject_folders:
    subject_folder = os.path.join(folder_path, subject)
    subject_files = glob.glob(os.path.join(subject_folder, '*.jpg'))
    print(f"Subject {subject} 파일 개수: {len(subject_files)}")

# 2. train_merged 크기 확인
print(f"train_merged 데이터 개수: {train_merged.shape[0]}")

# 3. train_left_features와 train_right_features 분리 후 크기 확인
train_left_features = train_merged[train_merged['Filename'].str.contains('left', case=False)]
train_right_features = train_merged[train_merged['Filename'].str.contains('right', case=False)]
print(f"train_left_features 개수: {train_left_features.shape[0]}")
print(f"train_right_features 개수: {train_right_features.shape[0]}")

# 4. 최종 배열 크기 확인
print(f"훈련 데이터 Left 이미지 개수: {train_left_images_array.shape[0]}")
print(f"훈련 데이터 Right 이미지 개수: {train_right_images_array.shape[0]}")
print(f"훈련 데이터 Features 개수: {train_features.shape[0]}")


Subject hsb 파일 개수: 2250
Subject hsh 파일 개수: 2250
Subject hyh 파일 개수: 2250
Subject lgj 파일 개수: 2250
Subject scy 파일 개수: 2250
train_merged 데이터 개수: 4600
train_left_features 개수: 2300
train_right_features 개수: 2300
훈련 데이터 Left 이미지 개수: 2300
훈련 데이터 Right 이미지 개수: 2300
훈련 데이터 Features 개수: 2300


In [None]:
import os
import re
import glob
import numpy as np
import pandas as pd
from keras.preprocessing.image import load_img, img_to_array
from keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras import layers, Model, Input

# Mish activation function
def mish(x):
    return x * tf.math.tanh(tf.math.softplus(x))

# 이미지 로드 및 전처리
def load_and_preprocess_image(image_path, target_size=(128, 128)):
    image = load_img(image_path, color_mode='grayscale', target_size=target_size)
    image_array = img_to_array(image)
    image_array /= 255.0
    return image_array

# 파일 이름에서 session과 point 추출
def extract_session_and_point(filename):
    session_match = re.search(r'img_(\d+)', filename)
    point_match = re.search(r'\((\d+)\)', filename)
    session = int(session_match.group(1)) if session_match else None
    point = int(point_match.group(1)) if point_match else None
    return session, point

# ResNet-50
def create_resnet50_model(input_shape):
    input_layer = Input(shape=input_shape)
    x = layers.Conv2D(64, kernel_size=(7, 7), strides=(2, 2), padding='same')(input_layer)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)
    x = layers.MaxPooling2D(pool_size=(3, 3), strides=(2, 2), padding='same')(x)

    filter_sizes = [64, 128, 256, 512]
    num_blocks = [3, 4, 6, 3]
    for filters, blocks in zip(filter_sizes, num_blocks):
        for i in range(blocks):
            x = bottleneck_block(x, filters, downsample=(i == 0 and filters != 64))
    x = layers.GlobalAveragePooling2D()(x)
    return Model(inputs=input_layer, outputs=x)

# 병목 구조
def bottleneck_block(x, filters, downsample=False):
    shortcut = x
    strides = (2, 2) if downsample else (1, 1)
    x = layers.Conv2D(filters, kernel_size=(1, 1), strides=strides, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)

    x = layers.Conv2D(filters, kernel_size=(3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(mish)(x)

    x = layers.Conv2D(filters * 4, kernel_size=(1, 1), padding='same')(x)
    x = layers.BatchNormalization()(x)

    if downsample or shortcut.shape[-1] != filters * 4:
        shortcut = layers.Conv2D(filters * 4, kernel_size=(1, 1), strides=strides, padding='same')(shortcut)
    x = layers.add([x, shortcut])
    x = layers.Activation(mish)(x)
    return x

# MLP
def create_mlp_model(input_shape):
    input_layer = Input(shape=input_shape)
    x = layers.Dense(128, activation=mish)(input_layer)
    x = layers.Dense(64, activation=mish)(x)
    x = layers.Dense(3, activation=mish)(x)
    x = layers.Flatten()(x)
    return Model(inputs=input_layer, outputs=x)

# 유효한 라벨 정의
valid_labels = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45]

# LOSO 교차 검증
folder_path = r"C:\Users\admin\Desktop\sihoon\webcam\img"
csv_path = r"C:\Users\admin\Desktop\sihoon\webcam\results"
subject_folders = [f for f in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, f))]
results = []

for test_subject in subject_folders:
    print(f"Testing on subject: {test_subject}")

    # 테스트 데이터 준비
    test_folder = os.path.join(folder_path, test_subject)
    test_csv = os.path.join(csv_path, f"{test_subject}.csv")
    test_images = glob.glob(os.path.join(test_folder, '*.jpg'))

    test_image_data = []
    for img in test_images:
        session, point = extract_session_and_point(os.path.basename(img))
        if point in valid_labels:
            subject_name = os.path.basename(os.path.dirname(img))  # 폴더 이름
            unique_filename = f"{subject_name}_{os.path.basename(img)}"
            test_image_data.append({
                'Filename': os.path.abspath(img),
                'UniqueFilename': unique_filename,
                'Session': session,
                'Point': point
            })
    test_df = pd.DataFrame(test_image_data)


    if os.path.exists(test_csv):
        test_csv_data = pd.read_csv(test_csv)
        test_csv_data.rename(columns={'session': 'Session', 'point': 'Point'}, inplace=True)
        test_merged = pd.merge(test_df, test_csv_data, on=['Session', 'Point'])
    else:
        print(f"No CSV data for {test_subject}. Skipping.")
        continue

    test_merged = test_merged.drop_duplicates(subset=['UniqueFilename', 'Session', 'Point'])

    # 테스트 데이터 필터링
    test_merged = test_merged[test_merged['Point'].isin(valid_labels)]
    test_left_features = test_merged[test_merged['Filename'].str.contains('left', case=False)]
    test_images_array = np.array([load_and_preprocess_image(path) for path in test_left_features['Filename']])
    test_features = test_left_features.drop(['Filename', 'UniqueFilename', 'Session', 'Point'], axis=1).values
    test_labels = test_left_features['Point'].map({label: idx for idx, label in enumerate(valid_labels)}).values
    test_labels = to_categorical(test_labels, num_classes=len(valid_labels))

    # 학습 데이터 준비
    train_subjects = [s for s in subject_folders if s != test_subject]
    train_images = []
    train_csv_data = pd.DataFrame()
    for train_subject in train_subjects:
        train_folder = os.path.join(folder_path, train_subject)
        train_csv = os.path.join(csv_path, f"{train_subject}.csv")
        train_images.extend(glob.glob(os.path.join(train_folder, '*.jpg')))
        if os.path.exists(train_csv):
            train_csv_data = pd.concat([train_csv_data, pd.read_csv(train_csv)])

    train_csv_data.rename(columns={'session': 'Session', 'point': 'Point'}, inplace=True)
    train_image_data = []
    for img in train_images:
        session, point = extract_session_and_point(os.path.basename(img))
        if point in valid_labels:
            subject_name = os.path.basename(os.path.dirname(img))  # 폴더 이름
            unique_filename = f"{subject_name}_{os.path.basename(img)}"
            train_image_data.append({
                'Filename': os.path.abspath(img),
                'UniqueFilename': unique_filename,
                'Session': session,
                'Point': point
            })
    train_df = pd.DataFrame(train_image_data)

    train_merged = pd.merge(train_df, train_csv_data, on=['Session', 'Point'])
    train_merged = train_merged.drop_duplicates(subset=['UniqueFilename', 'Session', 'Point'])

    # 학습 데이터 필터링
    train_left_features = train_merged[train_merged['Filename'].str.contains('left', case=False)]
    train_images_array = np.array([load_and_preprocess_image(path) for path in train_left_features['Filename']])
    train_features = train_left_features.drop(['Filename', 'Session', 'Point', 'UniqueFilename'], axis=1).values
    train_labels = train_left_features['Point'].map({label: idx for idx, label in enumerate(valid_labels)}).values
    train_labels = to_categorical(train_labels, num_classes=len(valid_labels))

    # 모델 구성
    right_eye_model = create_resnet50_model((128, 128, 1))
    left_eye_model = create_resnet50_model((128, 128, 1))
    mlp_model = create_mlp_model(train_features.shape[1:])
    combined_input = layers.concatenate([right_eye_model.output, left_eye_model.output, mlp_model.output])
    x = layers.Dense(256, activation=mish)(combined_input)
    x = layers.Dropout(0.5)(x)
    output_layer = layers.Dense(len(valid_labels), activation='softmax')(x)
    combined_model = Model(inputs=[right_eye_model.input, left_eye_model.input, mlp_model.input], outputs=output_layer)

    combined_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    # 모델 학습
    combined_model.fit([train_images_array, train_images_array, train_features], train_labels, 
                       epochs=50, batch_size=1, verbose=1)

    # 테스트 데이터 평가
    predictions = combined_model.predict([test_images_array, test_images_array, test_features])
    predicted_classes = np.argmax(predictions, axis=1)
    actual_classes = np.argmax(test_labels, axis=1)

    loss, accuracy = combined_model.evaluate([test_images_array, test_images_array, test_features], test_labels)
    print(f"Subject {test_subject} - Accuracy: {accuracy:.4f}")

    model_save_path = os.path.join(csv_path, f"1124_model_subject_{test_subject}.h5")
    combined_model.save(model_save_path)
    print(f"Model for subject {test_subject} saved at: {model_save_path}")

    # 결과 저장
    for idx, (image_path, feature, pred_class, actual_class) in enumerate(zip(
        test_left_features['Filename'], test_features, predicted_classes, actual_classes)):
        results.append({
            'Subject': test_subject,
            'Test Accuracy': accuracy,
            'Test Loss': loss,
            'Image File': image_path,
            'Feature': feature.tolist(),
            'Predicted Class': pred_class,
            'Actual Class': actual_class
        })

# 최종 결과 저장
results_df = pd.DataFrame(results)
results_df.to_csv(os.path.join(csv_path, "1124_result.csv"), index=False, encoding='utf-8')
print("LOSO results with valid labels saved.")


Testing on subject: hsb
Epoch 1/50




[1m2300/2300[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m799s[0m 329ms/step - accuracy: 0.0509 - loss: 4.6492
Epoch 2/50
[1m2300/2300[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m756s[0m 329ms/step - accuracy: 0.0416 - loss: 3.1582
Epoch 3/50
[1m2300/2300[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m757s[0m 329ms/step - accuracy: 0.0303 - loss: 3.1676
Epoch 4/50
[1m2300/2300[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m757s[0m 329ms/step - accuracy: 0.0433 - loss: 3.1710
Epoch 5/50
[1m2300/2300[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m757s[0m 329ms/step - accuracy: 0.0330 - loss: 3.1377
Epoch 6/50
[1m2300/2300[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m773s[0m 336ms/step - accuracy: 0.0421 - loss: 3.1368
Epoch 7/50
[1m2300/2300[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m766s[0m 333ms/step - accuracy: 0.0367 - loss: 3.1381
Epoch 8/50
[1m2300/2300[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m757s[0m 329ms/step - accuracy: 0.0507 - loss: 3.1363
Epo