<a href="https://colab.research.google.com/github/jjangmo91/flying-squirrel/blob/main/Maxent_flying_squirrel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1. 환경 설정 및 라이브러리 임포트

In [None]:
# Numpy 버전 2.0 미만 다운그레이드
# !pip install 'numpy<2.0' -q

In [None]:
# 필수 패키지 설치
# !pip install earthengine-api -U -q
# !pip install geemap -U -q
# !pip install pandas seaborn matplotlib -q
# !pip install scikit-learn-extra -q
!pip install geojson -q

In [None]:
# 라이브러리 임포트
import pandas as pd
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
import glob
import ee
import geemap
import seaborn as sns
import sys
import time

from statsmodels.stats.outliers_influence import variance_inflation_factor
from shapely.geometry import Point
from ipyleaflet import WidgetControl
from ipywidgets import Label

# Earth Engine 인증 및 초기화
ee.Authenticate()
ee.Initialize(project='ee-jjangmo91')

# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

#2. 데이터 준비 (종 출현, 환경 변수)

In [None]:
import rasterio
import os
from pyproj import Transformer

# 종 출현 데이터 경로 (CSV)
csv_path = '/content/drive/MyDrive/KNPS/Siberian flying squirrel/data/Pteromys volnas.csv'

# 환경 변수 폴더 경로
raster_folder = '/content/drive/MyDrive/KNPS/Siberian flying squirrel/data/'

# 사용할 변수 파일 목록
tif_files = {
    'elevation': 'Clipped_dem.tif',
    'slope': 'Clipped_slope.tif',
    'northness': 'Clipped_Northness.tif',
    'tpi': 'Clipped_TPI_300.tif',
    'twi': 'Clipped_TWI.tif',
    'tree_cover': 'Clipped_treecover_focal300.tif',
    'forest_age': 'Clipped_age.tif',
    'forest_dmcls': 'Clipped_dmcls.tif',
    'forest_type': 'Clipped_frtp.tif'
}

# CSV 읽기
df = pd.read_csv(csv_path)
print(f"1. 원본 데이터 로드 완료: {len(df)}개")

# 좌표 변환기 생성 (WGS84 -> Korea Central Belt 2010)
transformer = Transformer.from_crs("epsg:4326", "epsg:5186", always_xy=True)

# 좌표 변환 실행
xx, yy = transformer.transform(df['longitude'].values, df['latitude'].values)

# 변환된 좌표를 데이터프레임에 추가 (검증용)
df['X_5186'] = xx
df['Y_5186'] = yy

print("2. 좌표 변환 완료 (EPSG:4326 -> EPSG:5186)")
print(f"   - 예시 변환: ({df['longitude'][0]:.4f}, {df['latitude'][0]:.4f}) -> ({df['X_5186'][0]:.1f}, {df['Y_5186'][0]:.1f})")

# 래스터 값 추출
print("\n3. 환경 변수값 추출 시작...")

# 변환된 5186 좌표 리스트 생성
coords_5186 = [(x, y) for x, y in zip(df['X_5186'], df['Y_5186'])]

for var_name, file_name in tif_files.items():
    full_path = os.path.join(raster_folder, file_name)

    if os.path.exists(full_path):
        try:
            with rasterio.open(full_path) as src:
                if var_name == 'elevation':
                    print(f"   - 래스터 좌표계 확인: {src.crs}")
                sampled_values = [x[0] for x in src.sample(coords_5186)]
                df[var_name] = sampled_values
                print(f"   - [성공] {var_name} 추출 완료")

        except Exception as e:
            print(f"   - [에러] {var_name} 처리 중 오류 발생: {e}")
    else:
        print(f"   - [경고] 파일을 찾을 수 없음: {full_path}")

# 데이터 정제(NoData 제거)
original_count = len(df)

if 'elevation' in df.columns:
    df_final = df[df['elevation'] > -9000].copy()
else:
    df_final = df.copy()

# 혹시 모를 결측치(NaN) 제거
df_final = df_final.dropna()

print("\n" + "="*50)
print(f"최종 데이터셋 준비 완료")
print(f" - 원본 좌표 수: {original_count}")
print(f" - 유효 좌표 수: {len(df_final)} (경계 밖/NoData 제거됨)")
print(f" - 추출된 변수: {list(tif_files.keys())}")
print("="*50)

# 결과 미리보기
display(df_final[['longitude', 'latitude', 'elevation', 'forest_type']].head())

# (선택) CSV로 저장하고 싶다면 주석 해제
# df_final.to_csv('/content/drive/MyDrive/KNPS/Deer/SDM/Data/model_input_data.csv', index=False)

In [None]:
if 'df_final' not in locals() or df_final.empty:
    print("Error: df_final data not found.")
else:
    m = geemap.Map(center=[36.54, 127.83], zoom=11)
    m.add_basemap('HYBRID')

    points_ee = geemap.pandas_to_ee(
        df_final,
        latitude='latitude',
        longitude='longitude'
    )

    point_params = {
        'color': 'ff0000',
        'fillColor': 'ff0000',
        'pointSize': 5,
        'width': 1
    }

    m.addLayer(points_ee.style(**point_params), {}, 'Siberian Flying Squirrel')

    legend_dict = {
        "Siberian Flying Squirrel": "ff0000"
    }
    m.add_legend(title="Legend", legend_dict=legend_dict, position='bottomright')

    display(m)

#3. 다중공선성 분석 및 변수 선택

In [None]:
import matplotlib.pyplot as plt
from statsmodels.stats.outliers_influence import variance_inflation_factor

# 데이터 클리닝
cols_to_check = [
    'elevation', 'slope', 'northness', 'tpi', 'twi',
    'tree_cover', 'forest_age', 'forest_dmcls', 'forest_type'
]

print(f"정제 전 데이터 수: {len(df_final)}")
df_cleaned = df_final.copy()

for col in cols_to_check:
    if col in df_cleaned.columns:
        # -9000 미만 또는 10000 초과(1e38) 값 제거
        df_cleaned[col] = df_cleaned[col].apply(lambda x: np.nan if (x < -9000 or x > 10000) else x)

# 결측치가 하나라도 있는 행 삭제
df_cleaned = df_cleaned.dropna(subset=cols_to_check)

print(f"정제 후 데이터 수: {len(df_cleaned)}")

In [None]:
# Correlation Matrix
plt.figure(figsize=(12, 10))

# 상관계수 계산
correlation_matrix = df_cleaned[cols_to_check].corr()

# 히트맵 그리기
sns.heatmap(
    correlation_matrix,
    annot=True,
    fmt=".2f",
    cmap='coolwarm',
    vmin=-1, vmax=1,
    linewidths=.5
)

plt.title('Correlation Matrix (Pearson)', fontsize=16, fontweight='bold')
plt.xticks(rotation=45)
plt.show()

In [None]:
from statsmodels.tools.tools import add_constant

# 상수항 추가
df_vif_input = add_constant(df_cleaned[cols_to_check])

# VIF 계산 수행
vif_data = pd.DataFrame()
vif_data["feature"] = df_vif_input.columns
vif_data["VIF"] = [variance_inflation_factor(df_vif_input.values, i)
                   for i in range(len(df_vif_input.columns))]

# 결과 정리
vif_data = vif_data[vif_data['feature'] != 'const']

# 높은 순서대로 정렬
vif_sorted = vif_data.sort_values(by='VIF', ascending=False)

# 표 출력
print("\n[상수항 추가 후 VIF 계산 결과]")
print(vif_sorted.to_string(index=False))

# 그래프
plt.figure(figsize=(10, 6))
sns.barplot(x='VIF', y='feature', data=vif_sorted, palette='viridis')

# 기준선 추가 (5=주의, 10=위험)
plt.axvline(x=5, color='orange', linestyle='--', label='Warning (VIF=5)')
plt.axvline(x=10, color='red', linestyle='--', label='Danger (VIF=10)')

plt.title('Variance Inflation Factor (VIF) - Corrected', fontsize=16, fontweight='bold')
plt.xlabel('VIF Value')
plt.ylabel('')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Histograms
print("\n[변수별 데이터 분포]")
df_cleaned[cols_to_check].hist(bins=30, figsize=(15, 12), layout=(3, 3), color='#2ca02c')
plt.suptitle("Variable Distributions (Cleaned)", fontsize=20)
plt.tight_layout()
plt.show()

#4. Presence-Background 데이터 생성

In [None]:
from pyproj import Transformer

# Background Point 생성 개수 설정
NUM_BACKGROUND = 2000
SEED = 42
np.random.seed(SEED)

print(f"배경점(Pseudo-absence) {NUM_BACKGROUND}개를 생성을 시작합니다...")

# 기준이 되는 래스터 파일
dem_path = os.path.join(raster_folder, 'Clipped_dem.tif')
transformer_inv = Transformer.from_crs("epsg:5186", "epsg:4326", always_xy=True)

# 유효한 픽셀에서 랜덤 좌표 추출
with rasterio.open(dem_path) as src:
    data = src.read(1)
    valid_indices = np.where((data > -9000) & (data < 10000))
    num_valid_pixels = len(valid_indices[0])
    print(f" - 전체 유효 픽셀 수: {num_valid_pixels}개")
    if num_valid_pixels > NUM_BACKGROUND:
        # 중복 없이 인덱스 선택
        random_idx = np.random.choice(num_valid_pixels, NUM_BACKGROUND, replace=False)
        # 선택된 인덱스의 행(row), 열(col) 가져오기
        rand_rows = valid_indices[0][random_idx]
        rand_cols = valid_indices[1][random_idx]
    else:
        print("경고: 배경점 개수가 유효 픽셀 수보다 많습니다. 전체 픽셀을 사용합니다.")
        rand_rows = valid_indices[0]
        rand_cols = valid_indices[1]

    xs, ys = rasterio.transform.xy(src.transform, rand_rows, rand_cols)
    lons, lats = transformer_inv.transform(xs, ys)

# 배경 데이터프레임 초기화
df_background = pd.DataFrame({
    'longitude': lons,
    'latitude': lats,
    'X_5186': xs,
    'Y_5186': ys
})

# 환경 변수 값 추출
coords_5186 = [(x, y) for x, y in zip(df_background['X_5186'], df_background['Y_5186'])]

for var_name, file_name in tif_files.items():
    full_path = os.path.join(raster_folder, file_name)
    if os.path.exists(full_path):
        with rasterio.open(full_path) as src:
            # 좌표 위치의 값 샘플링
            sampled_values = [x[0] for x in src.sample(coords_5186)]
            df_background[var_name] = sampled_values
    else:
        print(f"\n[경고] 파일을 찾을 수 없음: {file_name}")
print("완료!")

# 데이터 병합 및 정리 (Presence + Background)
df_presence = df_cleaned.copy()
df_presence['pa'] = 1  # Presence = 1
df_background['pa'] = 0  # Background = 0
common_cols = list(set(df_presence.columns) & set(df_background.columns))
final_modeling_df = pd.concat([df_presence[common_cols], df_background[common_cols]], ignore_index=True)
cols_env = list(tif_files.keys()) # 환경변수 컬럼들

for col in cols_env:
    if col in final_modeling_df.columns:
        final_modeling_df[col] = final_modeling_df[col].apply(lambda x: np.nan if (x < -9000 or x > 10000) else x)

# NaN 제거
before_len = len(final_modeling_df)
final_modeling_df = final_modeling_df.dropna()
print(f" - 최종 정제 후 삭제된 배경점 수: {before_len - len(final_modeling_df)}개")

# 결과 확인 및 저장
print("\n" + "="*50)
print(f"최종 모델링 데이터셋 생성 완료")
print(f" - 총 데이터 수: {len(final_modeling_df)} 개")
print(f" - Presence (1): {len(final_modeling_df[final_modeling_df['pa']==1])} 개 (하늘다람쥐)")
print(f" - Background (0): {len(final_modeling_df[final_modeling_df['pa']==0])} 개 (가짜 부재점)")
print("="*50)

# 저장
SAVE_PATH = '/content/drive/MyDrive/KNPS/Siberian flying squirrel/data/final_modeling_data_v1.csv'
# 폴더가 없으면 생성
os.makedirs(os.path.dirname(SAVE_PATH), exist_ok=True)

final_modeling_df.to_csv(SAVE_PATH, index=False)
print(f"파일 저장 완료: {SAVE_PATH}")

# 미리보기
display(final_modeling_df.head())
display(final_modeling_df.tail())

#5. Spatial Block Cross-Validation

In [None]:
# 기준 파일 경로
dem_path = os.path.join(raster_folder, 'Clipped_dem.tif')

try:
    with rasterio.open(dem_path) as src:
        local_crs = src.crs.to_string()
        bounds = src.bounds
        bbox = [bounds.left, bounds.bottom, bounds.right, bounds.top]
        print(f" - 참조 파일: {os.path.basename(dem_path)}")
        print(f" - 좌표계: {local_crs}")
        print(f" - 범위(Bounds): {bbox}")

except Exception as e:
    print(f"TIF 파일 읽기 실패: {e}")

# 격자 생성
BLOCK_SCALE = 2000
tif_geometry = ee.Geometry.Rectangle(bbox, proj='EPSG:5186', geodesic=False)
grid = tif_geometry.coveringGrid(proj='EPSG:5186', scale=BLOCK_SCALE)
grid_random = grid.randomColumn(seed=42)

# 7:3 비율로 분할
split_threshold = 0.7
grid_classified = grid_random.map(lambda f: f.set('group', ee.Number(f.get('random')).gte(split_threshold)))
print(f" - 블록 크기: {BLOCK_SCALE}m")
print(" - 격자 생성 완료 (로컬 TIF 범위 기준).")

# 로컬 데이터에 블록 정보 매핑
try:
    # 로컬 데이터프레임 업로드
    points_ee = geemap.pandas_to_ee(
        final_modeling_df[['latitude', 'longitude']],
        latitude_col='latitude',
        longitude_col='longitude'
    )

    # 각 포인트가 속한 격자의 group 값 가져오기
    def get_group(feat):
        intersects = grid_classified.filterBounds(feat.geometry())
        first_grid = ee.Feature(intersects.first())
        group_val = ee.Algorithms.If(first_grid, first_grid.get('group'), -1)
        return feat.set('group', group_val)
    points_with_group = points_ee.map(get_group)
    group_info = points_with_group.getInfo()
    group_values = []
    for f in group_info['features']:
        val = f['properties'].get('group')
        if val is None: val = -1
        group_values.append(val)
    final_modeling_df['group'] = group_values

    # 격자 밖 포인트 확인
    invalid_count = len(final_modeling_df[final_modeling_df['group'] == -1])
    if invalid_count > 0:
        print(f"TIF 범위 밖 포인트 {invalid_count}개 발견. (제외 처리)")
        final_modeling_df = final_modeling_df[final_modeling_df['group'] != -1]
    else:
        print("모든 포인트가 TIF 격자 내에 정상적으로 위치합니다.")

    # 4. 데이터 분할 (Train / Test)
    train_df = final_modeling_df[final_modeling_df['group'] == 0].copy()
    test_df = final_modeling_df[final_modeling_df['group'] == 1].copy()

    # 컬럼 정리
    train_df.drop(columns=['group'], inplace=True)
    test_df.drop(columns=['group'], inplace=True)

    print("\n[공간 분할(Spatial CV) 결과]")
    print(f" - 훈련 세트 (Training): {len(train_df)}개 ({(len(train_df)/len(final_modeling_df))*100:.1f}%)")
    print(f"   ㄴ Presence: {len(train_df[train_df['pa']==1])}, Background: {len(train_df[train_df['pa']==0])}")

    print(f" - 테스트 세트 (Testing): {len(test_df)}개 ({(len(test_df)/len(final_modeling_df))*100:.1f}%)")
    print(f"   ㄴ Presence: {len(test_df[test_df['pa']==1])}, Background: {len(test_df[test_df['pa']==0])}")

except Exception as e:
    print(f"오류 발생: {e}")

# 시각화
m = geemap.Map(center=[36.54, 127.83], zoom=11)
m.add_basemap('HYBRID')

# 훈련용 블록 (파란색)
train_blocks = grid_classified.filter(ee.Filter.eq('group', 0))
m.addLayer(train_blocks.style(**{'color': '0000FF', 'fillColor': '0000FF22'}), {}, 'Training Blocks (Blue)')

# 테스트용 블록 (빨간색)
test_blocks = grid_classified.filter(ee.Filter.eq('group', 1))
m.addLayer(test_blocks.style(**{'color': 'FF0000', 'fillColor': 'FF000022'}), {}, 'Testing Blocks (Red)')

# Presence 포인트
presence_viz = geemap.pandas_to_ee(final_modeling_df[final_modeling_df['pa']==1], 'latitude', 'longitude')
m.addLayer(presence_viz.style(**{'color': 'ffff00', 'pointSize': 4}), {}, 'Presence Points')

# 4) TIF 범위 경계 (초록색 선)
tif_boundary_fc = ee.FeatureCollection([ee.Feature(tif_geometry)])
m.addLayer(tif_boundary_fc.style(**{'color': '00FF00', 'fillColor': '00000000', 'width': 2}), {}, 'TIF Bounds')
m.add_legend(title="Spatial CV (Based on TIF)", legend_dict={
    "Training Grid": "0000FF",
    "Testing Grid": "FF0000",
    "Presence": "ffff00",
    "TIF Boundary": "00FF00"
})

display(m)

#6. MaxEnt 하이퍼파라미터 튜닝 및 모델 훈련

In [None]:
from sklearn.metrics import roc_auc_score

try:
    training_fc = geemap.pandas_to_ee(train_df, latitude_col='latitude', longitude_col='longitude')
    testing_fc = geemap.pandas_to_ee(test_df, latitude_col='latitude', longitude_col='longitude')

    print("Pandas 데이터프레임을 GEE FeatureCollection으로 변환 완료.")
    print(f" - 훈련 데이터: {training_fc.size().getInfo()} 개")
    print(f" - 테스트 데이터: {testing_fc.size().getInfo()} 개")

except NameError:
    print(" 오류: 'train_df'나 'test_df'가 없습니다. 바로 앞 단계(공간 분할) 코드를 먼저 실행해주세요.")
except Exception as e:
    print(f" 데이터 변환 중 오류 발생: {e}")

# 하이퍼파라미터 튜닝 설정
if 'cols_to_check' in locals():
    feature_bands = cols_to_check
else:
    # cols_to_check가 없으면 train_df에서 직접 추출
    exclude_cols = ['pa', 'latitude', 'longitude', 'group', 'geometry', 'system:index', 'const']
    feature_bands = [c for c in train_df.columns if c not in exclude_cols]

print(f"학습에 사용할 독립변수 ({len(feature_bands)}개): {feature_bands}")

best_auc = -1
best_params = {}
tuning_results = []

# 튜닝 범위 설정
reg_multipliers = [0.5, 1.0, 1.5, 2.0, 3.0, 5.0]
auto_feature_options = [True, False]

print("\n 하이퍼파라미터 튜닝 루프를 시작합니다.")

# 튜닝 루프 실행
for multiplier in reg_multipliers:
    for auto_feat in auto_feature_options:
        try:
            maxent_model = ee.Classifier.amnhMaxent(
                betaMultiplier=multiplier,
                autoFeature=auto_feat,
                addSamplesToBackground=True
            )

            # 모델 훈련
            trained_model = maxent_model.train(
                features=training_fc,
                classProperty='pa',
                inputProperties=feature_bands
            )

            # 테스트 데이터로 예측
            classified_test = testing_fc.classify(trained_model)

            # 결과 회수
            predicted_df = geemap.ee_to_df(classified_test.select(['pa', 'probability']))

            if predicted_df is not None and not predicted_df.empty:
                true_labels = predicted_df['pa']
                pred_scores = predicted_df['probability']

                # AUC 계산 가능 여부 확인
                if len(true_labels.unique()) < 2:
                    # print(f" [Skip] ... 클래스 불균형")
                    continue

                auc_score = roc_auc_score(true_labels, pred_scores)

                result_str = f" - Beta: {multiplier}, Auto: {auto_feat} -> AUC: {auc_score:.4f}"
                print(result_str)
                tuning_results.append(result_str)

                # 최고 성능 갱신
                if auc_score > best_auc:
                    best_auc = auc_score
                    best_params['betaMultiplier'] = multiplier
                    best_params['autoFeature'] = auto_feat
            else:
                print(" [Error] 예측 결과가 비어 있습니다.")

        except Exception as e:
            print(f" [Error] 튜닝 중 오류 (Beta: {multiplier}): {e}")
            continue

# 최적 모델 확정 및 재훈련
print("\n" + "="*50)
if best_params:
    print(f" - Beta Multiplier: {best_params['betaMultiplier']}")
    print(f" - Auto Feature: {best_params['autoFeature']}")
    print(f" - 최고 AUC 점수: {best_auc:.4f}")
    print("\n최적 파라미터로 '최종 모델(Final Model)'을 생성합니다.")

    # 최적 파라미터 적용하여 최종 모델 생성
    final_model = ee.Classifier.amnhMaxent(
        betaMultiplier=best_params['betaMultiplier'],
        autoFeature=best_params['autoFeature'],
        addSamplesToBackground=True
    ).train(
        features=training_fc,
        classProperty='pa',
        inputProperties=feature_bands
    )

    print("최종 모델 훈련 완료.")
else:
    print("최적의 파라미터를 찾지 못했습니다. 오류 메시지를 확인해주세요.")
print("="*50)

#7. 모델 적합성 평가

In [None]:
from sklearn.metrics import confusion_matrix, roc_auc_score, roc_curve

# 테스트 데이터 예측 수행
print("최종 모델을 사용하여 테스트 데이터셋의 예측 확률을 계산합니다.")

try:
    # classify 실행 -> 'probability' 컬럼 생성
    classified_test_final = testing_fc.classify(final_model, 'probability')

    # 필요한 컬럼만 선택해서 가져옴
    predicted_df = geemap.ee_to_df(classified_test_final.select(['pa', 'probability']))

    # 데이터 타입 변환
    predicted_df['pa'] = predicted_df['pa'].astype(int)
    predicted_df['probability'] = predicted_df['probability'].astype(float)

    print(f"예측 완료. (데이터 수: {len(predicted_df)}개)")

except Exception as e:
    print(f"테스트 데이터 예측 중 오류 발생: {e}")

# 성능 지표 계산 (AUC, TSS)
true_labels = predicted_df['pa']
pred_scores = predicted_df['probability']

# AUC 점수 계산
print("\n--- [성능 평가 결과] ---")
auc_score = roc_auc_score(true_labels, pred_scores)
print(f"AUC (Area Under the Curve): {auc_score:.4f}")

# 최적의 임계값(Threshold) 및 TSS 찾기
print("최적의 임계값(Threshold) 탐색 중...")
thresholds = np.linspace(0.0, 1.0, 101)
tss_scores = []
sensitivity_scores = []
specificity_scores = []

for t in thresholds:
    # 확률이 t보다 크면 1(출현), 작으면 0(비출현)
    pred_labels = (pred_scores >= t).astype(int)

    # 혼동 행렬 계산
    tn, fp, fn, tp = confusion_matrix(true_labels, pred_labels).ravel()

    # 민감도(Sensitivity) & 특이도(Specificity)
    sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

    # TSS (True Skill Statistic) = 민감도 + 특이도 - 1
    tss = sensitivity + specificity - 1

    tss_scores.append(tss)
    sensitivity_scores.append(sensitivity)
    specificity_scores.append(specificity)

# 최적값 추출
max_tss_index = np.argmax(tss_scores)
optimal_threshold = thresholds[max_tss_index]
max_tss = tss_scores[max_tss_index]

print(f"\n--- 최적 임계값 및 주요 지표 ---")
print(f"최적 임계값 (Max TSS): {optimal_threshold:.4f}")
print(f" - 최대 TSS 점수: {max_tss:.4f}")
print(f" - 민감도 (Sensitivity): {sensitivity_scores[max_tss_index]:.4f} (실제 있는 곳을 맞출 확률)")
print(f" - 특이도 (Specificity): {specificity_scores[max_tss_index]:.4f} (실제 없는 곳을 맞출 확률)")

# \결과 시각화 (ROC Curve & Threshold Plot)
print("\n평가 지표를 시각화합니다...")
fpr, tpr, _ = roc_curve(true_labels, pred_scores)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 7))

# ROC Curve
ax1.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {auc_score:.4f})')
ax1.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
ax1.set_xlim([0.0, 1.0])
ax1.set_ylim([0.0, 1.05])
ax1.set_xlabel('False Positive Rate (1 - Specificity)', fontsize=12)
ax1.set_ylabel('True Positive Rate (Sensitivity)', fontsize=12)
ax1.set_title('Receiver Operating Characteristic (ROC) Curve', fontsize=16)
ax1.legend(loc="lower right")
ax1.grid(True, alpha=0.3)

# Metrics vs. Threshold
ax2.plot(thresholds, tss_scores, label='TSS', color='blue', lw=2)
ax2.plot(thresholds, sensitivity_scores, label='Sensitivity', color='green', linestyle='--', alpha=0.7)
ax2.plot(thresholds, specificity_scores, label='Specificity', color='red', linestyle='--', alpha=0.7)

# 최적 임계값 표시선
ax2.axvline(optimal_threshold, color='purple', linestyle=':', lw=2, label=f'Optimal Threshold ({optimal_threshold:.2f})')

ax2.set_title('Metrics vs. Threshold', fontsize=16)
ax2.set_xlabel('Threshold (Probability Cutoff)', fontsize=12)
ax2.set_ylabel('Score', fontsize=12)
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
import math

# 범주형/순서형
discrete_vars = ['forest_age', 'forest_dmcls', 'forest_type']

# 분석할 전체 변수 목록 가져오기
if 'feature_bands' in locals():
    predictor_names = feature_bands
else:
    # 변수명이 없으면 train_df에서 제외할 것 빼고 가져오기
    exclude_cols = ['pa', 'latitude', 'longitude', 'group', 'geometry', 'system:index', 'const']
    predictor_names = [c for c in train_df.columns if c not in exclude_cols]

# 나머지 변수들의 평균값 계산 (고정값으로 사용)
mean_values = train_df[predictor_names].mean()

print(f"변수 유형 설정 완료")
print(f" - 막대 그래프(Bar): {discrete_vars}")
print(f" - 선 그래프(Line): 나머지 {len(predictor_names) - len(discrete_vars)}개 변수")

# 그래프 설정 (3x3 그리드)
n_cols = 3
n_rows = math.ceil(len(predictor_names) / n_cols)

fig, axes = plt.subplots(n_rows, n_cols, figsize=(20, 5 * n_rows))
axes = axes.flatten()
fig.suptitle('Response Curves (Bar for Categorical, Line for Continuous)', fontsize=22, fontweight='bold')

# 반응 곡선 생성 및 시각화
print("\n반응 곡선 생성을 시작합니다...")

for i, var_to_plot in enumerate(predictor_names):
    if var_to_plot in discrete_vars:
        # 범주형: 데이터에 존재하는 고유값(Unique)만 딱 뽑아서 사용
        value_range = np.sort(train_df[var_to_plot].unique())
        is_categorical = True
    else:
        # 연속형
        min_val = train_df[var_to_plot].min()
        max_val = train_df[var_to_plot].max()
        value_range = np.linspace(min_val, max_val, 50)
        is_categorical = False

    features_to_classify = []
    base_properties = mean_values.to_dict()

    for val in value_range:
        properties = base_properties.copy()
        properties[var_to_plot] = val
        features_to_classify.append(ee.Feature(None, properties))

    fc_to_classify = ee.FeatureCollection(features_to_classify)

    try:
        classified_fc = fc_to_classify.classify(final_model, 'probability')
        predictions = classified_fc.aggregate_array('probability').getInfo()

        ax = axes[i]

        if is_categorical:
            bars = ax.bar(value_range, predictions, color='#2ca02c', alpha=0.8, width=0.6, edgecolor='black')
            ax.set_xticks(value_range)

        else:
            ax.plot(value_range, predictions, lw=3, color='#2ca02c')
            ax.fill_between(value_range, predictions, color='#2ca02c', alpha=0.1)

        ax.set_xlabel(var_to_plot, fontsize=12, fontweight='bold')
        ax.set_ylabel('Suitability (Probability)', fontsize=10)
        ax.set_title(f'Response to {var_to_plot}', fontsize=14)
        ax.grid(True, linestyle='--', alpha=0.5, axis='y')
        ax.set_ylim(0, 1.05)

    except Exception as e:
        print(f"\n{var_to_plot} 처리 중 오류 발생: {e}")

for i in range(len(predictor_names), len(axes)):
    fig.delaxes(axes[i])

print("\n모든 반응 곡선 생성이 완료되었습니다!")
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

In [None]:
# Full Jackknife Test (변수 중요도 정밀 분석)
print("\n" + "="*50)
print("--- [Full Jackknife Test] 변수 중요도 분석 시작 ---")
print(" (1) 단독 모델(Solo), (2) 제거 모델(Without), (3) 전체 모델(Total) 비교")

jackknife_results = []

# 기존 섹션 7에서 계산된 전체 모델 AUC 활용 (없을 경우 0 처리)
total_auc_val = auc_score if 'auc_score' in locals() else 0
print(f" - 기준 모델(Total) AUC: {total_auc_val:.4f}")

# 변수 루프 시작
for var in feature_bands:
    # -------------------------------------------------------
    # Solo Model (With Only): 이 변수 하나만 사용
    # -------------------------------------------------------
    try:
        solo_model = ee.Classifier.amnhMaxent(
            betaMultiplier=best_params.get('betaMultiplier', 1.0),
            autoFeature=best_params.get('autoFeature', True),
            addSamplesToBackground=True
        ).train(
            features=training_fc,
            classProperty='pa',
            inputProperties=[var]
        )

        # 예측 및 평가
        c_solo = testing_fc.classify(solo_model, 'probability')
        d_solo = geemap.ee_to_df(c_solo.select(['pa', 'probability']))

        # 데이터프레임 컬럼 타입 강제 변환
        if not d_solo.empty:
             d_solo['pa'] = d_solo['pa'].astype(int)
             d_solo['probability'] = d_solo['probability'].astype(float)

        if not d_solo.empty and len(d_solo['pa'].unique()) > 1:
            solo_auc = roc_auc_score(d_solo['pa'], d_solo['probability'])
        else:
            solo_auc = 0.5 # 데이터 부족 시 랜덤 값 처리

    except Exception as e:
        print(f"   [Solo Error] {var}: {e}")
        solo_auc = 0.0

    # -------------------------------------------------------
    # Without Model (Drop): 이 변수만 빼고 나머지 사용
    # -------------------------------------------------------
    try:
        # 현재 변수(var)를 제외한 나머지 변수 리스트 생성
        drop_features = [b for b in feature_bands if b != var]

        drop_model = ee.Classifier.amnhMaxent(
            betaMultiplier=best_params.get('betaMultiplier', 1.0),
            autoFeature=best_params.get('autoFeature', True),
            addSamplesToBackground=True
        ).train(
            features=training_fc,
            classProperty='pa',
            inputProperties=drop_features
        )

        # 예측 및 평가
        c_drop = testing_fc.classify(drop_model, 'probability')
        d_drop = geemap.ee_to_df(c_drop.select(['pa', 'probability']))

        # 데이터프레임 컬럼 타입 강제 변환
        if not d_drop.empty:
             d_drop['pa'] = d_drop['pa'].astype(int)
             d_drop['probability'] = d_drop['probability'].astype(float)

        if not d_drop.empty and len(d_drop['pa'].unique()) > 1:
            drop_auc = roc_auc_score(d_drop['pa'], d_drop['probability'])
        else:
            drop_auc = 0.5

    except Exception as e:
        print(f"   [Without Error] {var}: {e}")
        drop_auc = 0.0

    # 결과 출력 및 저장
    print(f"   > {var} : Solo={solo_auc:.4f} | Without={drop_auc:.4f}")

    jackknife_results.append({
        'Variable': var,
        'Solo AUC (Blue)': solo_auc,
        'Without AUC (Green)': drop_auc,
        'Total AUC (Red)': total_auc_val
    })

# 시각화 (3색 막대 그래프)
if jackknife_results:
    df_res = pd.DataFrame(jackknife_results)
    # Solo AUC(파란색) 기준으로 정렬
    df_res = df_res.sort_values(by='Solo AUC (Blue)', ascending=True)

    fig, ax = plt.subplots(figsize=(12, max(6, len(feature_bands) * 0.5))) # 변수 개수에 따라 높이 조절

    y_pos = range(len(df_res))
    height = 0.25

    # 막대 그리기
    # Total (배경 기준선 역할)
    ax.barh([y + height for y in y_pos], df_res['Total AUC (Red)'], height,
            label='With All Variables (Red)', color='#ff6666', alpha=0.6, edgecolor='white')

    # Without (Drop)
    ax.barh([y for y in y_pos], df_res['Without AUC (Green)'], height,
            label='Without Variable (Green)', color='#66cc66', alpha=0.8, edgecolor='white')

    # Solo (Only)
    ax.barh([y - height for y in y_pos], df_res['Solo AUC (Blue)'], height,
            label='With Only Variable (Blue)', color='#6666ff', alpha=0.8, edgecolor='white')

    # 축 및 레이블 꾸미기
    ax.set_yticks(y_pos)
    ax.set_yticklabels(df_res['Variable'], fontsize=11, fontweight='bold')
    ax.set_xlabel('AUC Score', fontsize=12)
    ax.set_title('Jackknife Test of Variable Importance', fontsize=16, fontweight='bold')

    # X축 범위 설정
    try:
        min_auc = min(df_res['Without AUC (Green)'].min(), df_res['Solo AUC (Blue)'].min())
        ax.set_xlim(max(0.4, min_auc - 0.05), 1.02)
    except:
        ax.set_xlim(0.4, 1.0)

    ax.legend(loc='lower right', frameon=True, shadow=True)
    ax.grid(True, axis='x', linestyle='--', alpha=0.5)

    plt.tight_layout()
    plt.show()

    print("해석 팁:")
    print(" 1. 파란 막대가 길수록: 해당 변수 자체의 정보량이 많음 (중요 변수)")
    print(" 2. 초록 막대가 빨간 막대보다 현저히 짧을수록: 해당 변수가 없으면 모델 성능이 하락함 (대체 불가 변수)")
else:
    print("Jackknife 결과를 생성하지 못했습니다.")
print("="*50)

#8. 예측 지도 생성

In [None]:
# GEE Asset 불러오기
user_asset_path = "projects/ee-jjangmo91/assets"

asset_files = {
    'elevation': 'elevation',
    'slope': 'slope',
    'northness': 'northness',
    'tpi': 'tpi',
    'twi': 'twi',
    'tree_cover': 'tree_cover',
    'forest_age': 'forest_age',
    'forest_dmcls': 'forest_dmcls',
    'forest_type': 'forest_type'
}

print("GEE Asset에서 이미지를 불러와 병합합니다.")

img_list = []
valid_bands = []

for var_name, file_name in asset_files.items():
    full_path = f"{user_asset_path}/{file_name}"
    try:
        img = ee.Image(full_path).rename(var_name).toFloat()
        img_list.append(img)
        valid_bands.append(var_name)
    except Exception as e:
        print(f" [로드 실패] {var_name}: {e}")

if not img_list:
    print(" 불러온 이미지가 없습니다.")
else:
    predictors_final = ee.Image.cat(img_list)
    print(f" 이미지 병합 완료! (밴드 수: {len(img_list)}개)")

    # 서식지 적합성 지도 생성
    print("\nMaxEnt 모델을 사용하여 서식지 적합성 지도를 그립니다.")

    # 예측 수행
    raw_output = predictors_final.select(valid_bands).classify(final_model)

    # 밴드 확인(디버깅)
    output_bands = raw_output.bandNames().getInfo()
    print(f"모델 출력 밴드 목록: {output_bands}")
    suitability_map = raw_output.select(0).rename('probability') \
                                .reproject(crs='EPSG:3857', scale=30)
    vis_params = {
        'min': 0, 'max': 1,
        'palette': ['#440154', '#482677', '#404788', '#33638D', '#287D8E',
                    '#1F968B', '#29AF7F', '#55C667', '#95D840', '#DCE319']
    }

    m = geemap.Map(center=[36.54, 127.83], zoom=11)
    m.add_basemap('HYBRID')
    m.addLayer(suitability_map, vis_params, 'Habitat Suitability (Prob)')

    # 이미지 경계선
    tif_boundary = ee.Image().byte().paint(predictors_final.geometry(), 0, 2)
    m.addLayer(tif_boundary, {'palette': 'black'}, 'Study Area Boundary')

    # 출현 지점
    if 'presence_fc' in locals():
        m.addLayer(presence_fc.style(**{'color': 'red', 'pointSize': 3}), {}, 'Presence Points')

    m.add_colorbar(vis_params, label="Probability (0~1)", orientation="vertical", layer_name="Habitat Suitability (Prob)")

    print("지도 생성이 완료되었습니다.")
    display(m)

In [None]:
# 이진(Binary) 지도 생성
if 'optimal_threshold' not in locals():
    optimal_threshold = 0.510

print(f" 계산된 최적 임계값({optimal_threshold})을 적용하여 서식지를 구분합니다.")
print(f" (확률이 {optimal_threshold * 100:.1f}% 이상인 곳만 '서식지'로 판단)")

# 확률 >= 임계값
potential_distribution = suitability_map.gt(optimal_threshold)

# 시각화
binary_vis = {'min': 0, 'max': 1, 'palette': ['#D3D3D3', '#006400']}

m2 = geemap.Map(center=[36.54, 127.83], zoom=11)
m2.add_basemap('HYBRID')

m2.addLayer(potential_distribution, binary_vis, 'Potential Distribution (Binary)')
m2.addLayer(ee.Image().byte().paint(predictors_final.geometry(), 0, 2), {'palette': 'black'}, 'Study Area Boundary')

if 'presence_fc' in locals():
    m2.addLayer(presence_fc.style(**{'color': 'red', 'pointSize': 3}), {}, 'Presence Points')

# 범례
legend_keys = ['Suitable Habitat', 'Unsuitable Habitat', 'Presence Points']
legend_colors = ['#006400', '#D3D3D3', '#FF0000']
m2.add_legend(title="Distribution Legend", labels=legend_keys, colors=legend_colors)

print("잠재 분포 지도 생성이 완료되었습니다.")
display(m2)

# 9. 서식지 적합성 5등급 분류 및 면적 산출


In [None]:
# 분석 범위 설정
if 'predictors_final' in locals():
    study_geometry = predictors_final.geometry()
else:
    print("주의: 이미지 범위를 찾을 수 없어 기본 설정을 사용합니다.")

# 등급 분류 (Reclassification)
classified_map = suitability_map.expression(
    "(b('probability') > 0.8) ? 1 :" +
    "(b('probability') > 0.6) ? 2 :" +
    "(b('probability') > 0.4) ? 3 :" +
    "(b('probability') > 0.2) ? 4 : 5"
).rename('grade').toInt()

# 면적 산출 (Area Calculation)
print("등급별 면적을 계산 중입니다... (약간의 시간이 소요됩니다)")

# 픽셀 면적 이미지 생성 및 결합
pixel_area = ee.Image.pixelArea()
classification_with_area = pixel_area.addBands(classified_map)

# reduceRegion 실행
area_stats = classification_with_area.reduceRegion(
    reducer=ee.Reducer.sum().group(
        groupField=1, groupName='grade'
    ),
    geometry=study_geometry,
    scale=30,      # 해상도 30m
    maxPixels=1e13
)

# 결과 정리
stats_list = area_stats.get('groups').getInfo()
df_area = pd.DataFrame(stats_list)

if not df_area.empty:
    # 단위 변환 (m^2 -> km^2)
    df_area['area_km2'] = df_area['sum'] / 1_000_000
    df_area['grade'] = df_area['grade'].astype(int)

    # 비율(%) 계산
    total_area = df_area['area_km2'].sum()
    df_area['ratio_percent'] = (df_area['area_km2'] / total_area) * 100

    # 정렬 및 라벨링
    df_area = df_area.sort_values('grade')
    grade_labels = {
        1: 'Grade 1 (0.8~1.0)',
        2: 'Grade 2 (0.6~0.8)',
        3: 'Grade 3 (0.4~0.6)',
        4: 'Grade 4 (0.2~0.4)',
        5: 'Grade 5 (0.0~0.2)'
    }
    df_area['label'] = df_area['grade'].map(grade_labels)

    print("\n[등급별 면적 산출 결과]")
    print(df_area[['label', 'area_km2', 'ratio_percent']].to_string(index=False, float_format="%.2f"))
    print(f"\n - 분석된 총 면적: {total_area:.2f} km²")
else:
    print("면적 계산 결과가 없습니다.")

# 10. 결과 시각화 (면적 그래프 & 등급 지도)

In [None]:
# 그래프 설정
plt.figure(figsize=(10, 6))

# 색상 지정 (1등급: 진녹색 -> 5등급: 회색)
color_palette = ['#006400', '#74C476', '#FFFF00', '#FF7F00', '#A9A9A9']
bars = plt.bar(df_area['label'], df_area['area_km2'],
               color=color_palette, edgecolor='black', alpha=0.8)
plt.title('Area by Habitat Suitability Grade', fontsize=15, fontweight='bold')
plt.xlabel('Suitability Grade', fontsize=12)
plt.ylabel('Area (km²)', fontsize=12)
plt.grid(axis='y', linestyle='--', alpha=0.5)

total_area = df_area['area_km2'].sum()
for bar in bars:
    height = bar.get_height()
    pct = (height / total_area) * 100
    plt.text(bar.get_x() + bar.get_width()/2.0, height,
             f'{height:.1f} km²\n({pct:.1f}%)',
             ha='center', va='bottom', fontsize=10)

plt.tight_layout()
plt.show()

In [None]:
m = geemap.Map(center=[36.54, 127.83], zoom=11, height='600px', width='1000px')
m.add_basemap('HYBRID')

grade_vis_params = {
    'min': 1,
    'max': 5,
    'palette': ['#006400', '#74C476', '#FFFF00', '#FF7F00', '#A9A9A9']
}

m.addLayer(classified_map, grade_vis_params, 'Suitability Grades (1-5)')

if 'predictors_final' in locals():
    boundary_img = ee.Image().byte().paint(predictors_final.geometry(), 0, 2)
    m.addLayer(boundary_img, {'palette': 'black'}, 'Study Area Boundary')

legend_dict = {
    'Grade 1 (High Prob)': '#006400',
    'Grade 2': '#74C476',
    'Grade 3': '#FFFF00',
    'Grade 4': '#FF7F00',
    'Grade 5 (Low Prob)': '#A9A9A9'
}
m.add_legend(title="Habitat Grades", legend_dict=legend_dict)
m.add_layer_control()
display(m)

# 11. 결과 내보내기

In [None]:
print("\n--- 결과 내보내기 (GeoTIFF) 시작 ---")

# 저장할 경로 설정
output_folder = '/content/drive/MyDrive/KNPS/Siberian flying squirrel/data/'
output_tif_path = os.path.join(output_folder, 'result_suitability_grade_v1.tif')

# 폴더가 없으면 생성
os.makedirs(output_folder, exist_ok=True)

print(f"파일 저장 경로:\n{output_tif_path}")

# 내보낼 영역 설정 (이미지 범위 사용)
if 'predictors_final' in locals():
    export_region = predictors_final.geometry()
else:
    print("predictors_final 변수가 없어 region 설정에 실패했습니다.")

# 내보내기 수행
if export_region:
    print("지도 데이터를 다운로드 중입니다... (진행률이 표시됩니다)")

    try:
        geemap.ee_export_image(
            classified_map,
            filename=output_tif_path,
            scale=30,
            region=export_region,
            crs='EPSG:5186',
            file_per_band=False
        )
        print("\n TIF 저장 완료! 드라이브 폴더를 확인해보세요.")

    except Exception as e:
        print(f"\n 저장 중 오류 발생: {e}")
else:
    print(" 내보낼 영역(region)이 정의되지 않아 취소되었습니다.")