In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, concatenate, Dropout, BatchNormalization, GlobalAveragePooling2D, Embedding, Reshape
from tensorflow.keras.optimizers import Adam

# 데이터 로딩
train_path = '/content/drive/MyDrive/Toa_sort.csv'
test_path = '/content/drive/MyDrive/Toa_test_sort.csv'

# 데이터 로딩 (헤더 없음)
columns = ['re_x', 're_y', 'anchor_id', 'TOA'] + [f'CSI_{i}' for i in range(1, 129)]
df_train = pd.read_csv(train_path, header=None, names=columns)
df_test = pd.read_csv(test_path, header=None, names=columns)

# 출력: re_x, re_y
output_cols = ['re_x', 're_y']

# 입력 특징: CSI_1 to CSI_128
csi_cols = [f'CSI_{i}' for i in range(1, 129)]

# --- 그룹화 방식: 연속 8행을 하나의 입력으로 묶기 (non-overlap, label=마지막 행) ---
group_size = 8
step = 8  # non-overlapping

# ---------------------------------------------------------
# 1. 데이터 전처리 (CSI만 사용)
# ---------------------------------------------------------

# 학습 데이터 준비
Xv_csi = df_train[csi_cols].values
yv = df_train[output_cols].values

n_full = (Xv_csi.shape[0] // step) * step
Xv_csi_cut = Xv_csi[:n_full]
yv_cut = yv[:n_full]

n_groups = n_full // step

# CSI Reshape: (n_groups, 8, 128, 1)
X_groups_csi = Xv_csi_cut.reshape(n_groups, step, 128)
X_groups_csi = X_groups_csi[..., np.newaxis]

# Label: 그룹의 마지막 행 좌표 사용
y_group_labels = yv_cut.reshape(n_groups, step, 2)[:, -1, :]

# 테스트 데이터 준비
Xv_test_csi = df_test[csi_cols].values
yv_test = df_test[output_cols].values

n_full_test = (Xv_test_csi.shape[0] // step) * step
Xv_test_csi_cut = Xv_test_csi[:n_full_test]
yv_test_cut = yv_test[:n_full_test]

n_groups_test = n_full_test // step

X_groups_test_csi = Xv_test_csi_cut.reshape(n_groups_test, step, 128)[..., np.newaxis]
y_groups_test = yv_test_cut.reshape(n_groups_test, step, 2)[:, -1, :]

# ---------------------------------------------------------
# 2. 정규화 (Scaling)
# ---------------------------------------------------------
# CSI 정규화
max_abs_csi = np.max(np.abs(X_groups_csi)) if np.max(np.abs(X_groups_csi)) != 0 else 1.0
X_train_csi_scaled = X_groups_csi / max_abs_csi
X_test_csi_scaled = X_groups_test_csi / max_abs_csi

# Label 정규화
scaler_y = StandardScaler()
y_train_scaled = scaler_y.fit_transform(y_group_labels)
y_test_scaled = scaler_y.transform(y_groups_test)

# ---------------------------------------------------------
# 3. CSI만 사용하는 CNN 모델 구축
# ---------------------------------------------------------

# 입력: CSI (8, 128, 1)
input_csi = Input(shape=(8, 128, 1), name='input_csi')
x = Conv2D(32, (3, 3), padding='same', activation='relu')(input_csi)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)
x = Dropout(0.2)(x)

x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2))(x)
x = Dropout(0.2)(x)

x = Conv2D(128, (3, 3), padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = Flatten()(x) # 위치 정보 보존
x = Dense(128, activation='relu')(x)

# Joint processing
w = Dense(128, activation='relu')(x)
w = Dropout(0.3)(w)
output = Dense(2, name='output')(w)

model = Model(inputs=input_csi, outputs=output)

model.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae'])
model.summary()

# ---------------------------------------------------------
# 4. 모델 학습
# ---------------------------------------------------------
history = model.fit(
    X_train_csi_scaled, 
    y_train_scaled, 
    epochs=50, 
    batch_size=32, 
    validation_split=0.2, 
    verbose=1
)

# ---------------------------------------------------------
# 5. 평가 및 예측
# ---------------------------------------------------------
loss, mae = model.evaluate(X_test_csi_scaled, y_test_scaled, verbose=0)
print(f'Test Loss: {loss:.4f}, Test MAE: {mae:.4f}')

predictions = model.predict(X_test_csi_scaled)
predictions_original = scaler_y.inverse_transform(predictions)
actual_original = scaler_y.inverse_transform(y_test_scaled)

# RMSE 계산
from sklearn.metrics import mean_squared_error
rmse = np.sqrt(mean_squared_error(actual_original, predictions_original))
print(f'RMSE: {rmse:.4f}')

# 모델 저장
model.save('cnn_position_model_csi_only.h5')
print("모델이 'cnn_position_model_csi_only.h5'로 저장되었습니다.")

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 8))
plt.scatter(actual_original[:, 0], actual_original[:, 1], label='Actual', alpha=0.5)
plt.scatter(predictions_original[:, 0], predictions_original[:, 1], label='Predicted', alpha=0.5)
plt.xlabel('X Position')
plt.ylabel('Y Position')
plt.title('Actual vs Predicted Positions')
plt.legend()
plt.show()