<a href="https://colab.research.google.com/github/jjangmo91/Cervus-nippon/blob/main/Maxent_cns.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1. 환경 설정 및 라이브러리 임포트

In [None]:
# Numpy 버전 2.0 미만 다운그레이드
!pip install 'numpy<2.0' -q

In [None]:
# 필수 패키지 설치
!pip install earthengine-api -U -q
!pip install eeSDM -q
!pip install geemap -U -q
!pip install pandas seaborn matplotlib -q
!pip install scikit-learn-extra -q

In [None]:
# 라이브러리 임포트
import pandas as pd
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt

import glob
import ee
import geemap
import eeSDM
import seaborn as sns

import sys
import time

from statsmodels.stats.outliers_influence import variance_inflation_factor
from shapely.geometry import Point
from ipyleaflet import WidgetControl
from ipywidgets import Label

# Earth Engine 인증 및 초기화
ee.Authenticate()
ee.Initialize(project='ee-jjangmo91')

# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')
data_path = '/content/drive/MyDrive/KNPS/Deer/Ecotopia_Data_2024_2025/'

#2. 데이터 준비 (AOI, 종 출현, 환경 변수)

In [None]:
# 연구지역(AOI) 설정
protected_areas = ee.FeatureCollection("WCMC/WDPA/current/polygons")
songnisan_park = protected_areas.filter(ee.Filter.eq('WDPAID', 773))
aoi = songnisan_park.geometry().buffer(5000) # 공원 경계 5km까지 완충 지역

# 종 출현(Occurrence) 데이터 설정
base_occurrence_file = '/content/drive/MyDrive/KNPS/Deer/SDM/Data/cervus_nippon_songni.csv'
df_occurrence = pd.read_csv(base_occurrence_file)
print(f"  - 종 출현 데이터: '{base_occurrence_file}'에서 {len(df_occurrence)}개 좌표 로드 완료")

# 환경 변수 설정
TARGET_SCALE = 30  # 최종 해상도를 30m로 통일
TARGET_CRS = 'EPSG:3857' # GEE 표준 좌표계

# (1, 2, 3) 고도, 경사, 사면향
dem = ee.Image('USGS/SRTMGL1_003')
elevation = dem.select('elevation')
slope = ee.Terrain.slope(dem)
aspect = ee.Terrain.aspect(dem)

# (4-8) ESA WorldCover v200 기반 거리 변수 일괄 생성
worldcover = ee.ImageCollection('ESA/WorldCover/v200').first().select('Map')
dist_to_forest = worldcover.eq(10).fastDistanceTransform().sqrt()   # 10: 산림 (Trees)
dist_to_grass = worldcover.eq(30).fastDistanceTransform().sqrt()    # 30: 초지 (Grassland)
dist_to_cropland = worldcover.eq(40).fastDistanceTransform().sqrt() # 40: 경작지 (Cropland)
dist_to_built = worldcover.eq(50).fastDistanceTransform().sqrt()    # 50: 건물밀집지역 (Built-up)
dist_to_water = worldcover.eq(80).fastDistanceTransform().sqrt()    # 80: 수계 (Permanent water bodies)

# 모든 변수를 하나의 이미지로 통합
predictors = ee.Image.cat([
    elevation.rename('elevation'),
    slope.rename('slope'),
    aspect.rename('aspect'),
    dist_to_forest.rename('dist_to_forest'),
    dist_to_grass.rename('dist_to_grass'),
    dist_to_cropland.rename('dist_to_cropland'),
    dist_to_built.rename('dist_to_built'),
    dist_to_water.rename('dist_to_water')
])

# 해상도 및 좌표계 통일 후 AOI에 맞게 자르기
predictors = predictors.setDefaultProjection(crs=TARGET_CRS, scale=TARGET_SCALE).clip(aoi)

print(f"  - 환경 변수: 사용자 정의 8종 구축 완료 (해상도: {TARGET_SCALE}m)")
print(f"  - 구축된 변수: {predictors.bandNames().getInfo()}")

In [None]:
aoi = ee.FeatureCollection("WCMC/WDPA/current/polygons") \
        .filter(ee.Filter.eq('WDPAID', 773)) \
        .geometry() \
        .buffer(5000)

# ESA WorldCover 이미지 불러오기
worldcover = ee.ImageCollection('ESA/WorldCover/v200').first().clip(aoi)

# WorldCover 데이터셋의 공식 범례 및 시각화 파라미터 정의
worldcover_vis_params = {
  "bands": ["Map"],
}
worldcover_legend_dict = {
    "Trees": "006400",
    "Shrubland": "ffbb22",
    "Grassland": "ffff4c",
    "Cropland": "f096ff",
    "Built-up": "fa0000",
    "Bare / sparse vegetation": "b4b4b4",
    "Snow and ice": "f0f0f0",
    "Permanent water bodies": "0064c8",
    "Herbaceous wetland": "0096a0",
    "Mangroves": "00cf75",
    "Moss and lichen": "fae6a0",
}

# 지도 생성 및 레이어 추가
m = geemap.Map(center=[36.54, 127.83], zoom=11)
m.add_basemap('HYBRID')

m.addLayer(worldcover, worldcover_vis_params, 'ESA WorldCover 2020')

# add_legend 함수에 'ESA' 템플릿을 명시적으로 지정
m.add_legend(
    title="ESA WorldCover V200",
    legend_dict=worldcover_legend_dict,
    position='bottomright',
    builtin_legend='ESA'
)

# 연구지역 경계선 추가
m.add_layer(ee.Image().paint(aoi, 0, 2), {'palette': 'yellow'}, 'Area of Interest')
m.add_layer_control()

display(m)

#3. 다중공선성 분석 및 변수 선택

In [None]:
!pip install geojson -q

try:
    # CSV의 실제 위도/경도 컬럼명 지정
    LAT_COL = 'latitude'
    LON_COL = 'longitude'

    # 출현 지점의 환경 변수 값 추출
    features = geemap.pandas_to_ee(df_occurrence, latitude_col=LAT_COL, longitude_col=LON_COL)
    sampled_points = predictors.sampleRegions(collection=features, scale=TARGET_SCALE, geometries=False)

    # GEE 결과를 Pandas 데이터프레임으로 변환
    sampled_info = sampled_points.getInfo()
    properties_list = [f['properties'] for f in sampled_info['features']]
    df_predictors = pd.DataFrame(properties_list)[predictors.bandNames().getInfo()].dropna()
    print("환경 값 추출 완료.")

    # 2. VIF 및 상관관계 행렬 계산
    correlation_matrix = df_predictors.corr()
    vif_data = pd.DataFrame()
    vif_data["feature"] = df_predictors.columns
    vif_data["VIF"] = [variance_inflation_factor(df_predictors.values, i) for i in range(len(df_predictors.columns))]

    print("\n[VIF 계산 결과]")
    print(vif_data.to_string(index=False))

    # 3. 결과 시각화
    vif_sorted = vif_data.set_index('feature').sort_values(by='VIF', ascending=False)
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 8))
    fig.suptitle("Initial Multicollinearity Analysis (8 Variables)", fontsize=18)

    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f", ax=ax1, linewidths=.5)
    ax1.set_title('Correlation Matrix', fontsize=14)
    ax1.tick_params(axis='x', rotation=45)

    sns.barplot(x=vif_sorted['VIF'], y=vif_sorted.index, palette='viridis_r', ax=ax2)
    ax2.set_title('Variance Inflation Factor (VIF)', fontsize=14)
    ax2.set_xlabel('VIF Value'); ax2.set_ylabel('')
    ax2.axvline(x=5, color='orange', linestyle='--', label='High Correlation (VIF=5)')
    ax2.axvline(x=10, color='red', linestyle='--', label='Very High Correlation (VIF=10)')
    ax2.legend()

    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()

except Exception as e:
    print(f"분석 중 오류가 발생했습니다: {e}")

In [None]:
try:
    # 최종 변수 세트 생성
    bands_to_remove = ['dist_to_forest', 'dist_to_cropland']
    bands_to_keep = predictors.bandNames().removeAll(bands_to_remove)
    predictors_final = predictors.select(bands_to_keep)

    final_predictor_names = predictors_final.bandNames().getInfo()
    print(f"제거된 변수: {bands_to_remove}")
    print(f"최종 선택된 변수 ({len(final_predictor_names)}개): {final_predictor_names}")

    # 다중공선성 재검증
    # 최종 변수 세트(predictors_final)를 사용하여 환경 값을 다시 추출
    features = geemap.pandas_to_ee(df_occurrence, latitude_col='latitude', longitude_col='longitude')
    sampled_points_final = predictors_final.sampleRegions(collection=features, scale=TARGET_SCALE, geometries=False)

    # GEE 결과를 Pandas 데이터프레임으로 변환
    sampled_info_final = sampled_points_final.getInfo()
    properties_list_final = [f['properties'] for f in sampled_info_final['features']]
    df_predictors_final = pd.DataFrame(properties_list_final)[final_predictor_names].dropna()

    # VIF와 상관관계 행렬을 다시 계산
    correlation_matrix_final = df_predictors_final.corr()
    vif_data_final = pd.DataFrame()
    vif_data_final["feature"] = df_predictors_final.columns
    vif_data_final["VIF"] = [variance_inflation_factor(df_predictors_final.values, i) for i in range(len(df_predictors_final.columns))]

    print("\n[최종 VIF 계산 결과]")
    print(vif_data_final.to_string(index=False))

    # 최종 결과 시각화
    vif_sorted_final = vif_data_final.set_index('feature').sort_values(by='VIF', ascending=False)
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 8))
    fig.suptitle("Final Multicollinearity Analysis (6 Variables)", fontsize=18)

    # 상관관계 히트맵
    sns.heatmap(correlation_matrix_final, annot=True, cmap='coolwarm', fmt=".2f", ax=ax1, linewidths=.5)
    ax1.set_title('Final Correlation Matrix', fontsize=14)
    ax1.tick_params(axis='x', rotation=45)

    # VIF 막대그래프
    sns.barplot(x=vif_sorted_final['VIF'], y=vif_sorted_final.index, palette='viridis_r', ax=ax2)
    ax2.set_title('Final Variance Inflation Factor (VIF)', fontsize=14)
    ax2.set_xlabel('VIF Value'); ax2.set_ylabel('')
    ax2.axvline(x=5, color='orange', linestyle='--', label='Threshold (VIF=5)')
    ax2.legend()

    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()

except Exception as e:
    print(f"분석 중 오류가 발생했습니다: {e}")

In [None]:
# 최종 선택된 거리 변수들의 통계치 확인
print(df_predictors_final[['dist_to_grass', 'dist_to_built', 'dist_to_water']].describe())

In [None]:
# 시각화를 위한 geemap 지도 객체 생성
m = geemap.Map(center=[36.54, 127.83], zoom=11)
m.add_basemap('HYBRID')

# 각 변수별 시각화 파라미터 정의
vis_params = {
    'elevation': {'min': 100, 'max': 1500, 'palette': ['#006633', '#E5FFCC', '#662A00', '#D8D8D8', '#F5F5F5']},
    'slope': {'min': 0, 'max': 60, 'palette': ['#FFFFFF', '#F0E3A2', '#F1BC67', '#ED7E3A', '#E94423']},
    'aspect': {'min': 0, 'max': 360, 'palette': ['#FF0000', '#FFFF00', '#00FF00', '#00FFFF', '#0000FF', '#FF00FF', '#FF0000']},
    # 아래 변수들의 max 값을 더 작게 조정하여 세밀한 변화를 확인
    'dist_to_grass': {'min': 0, 'max': 50, 'palette': ['#E6E6FA', '#C0C0C0', '#808080']},
    'dist_to_built': {'min': 0, 'max': 50, 'palette': ['#FFFFCC', '#FEB24C', '#FD8D3C', '#F03B20', '#BD0026']},
    'dist_to_water': {'min': 0, 'max': 150, 'palette': ['#F7FBFF', '#DEEBF7', '#C6DBEF', '#9ECAE1', '#6BAED6']}
}

# 최종 변수들을 반복문을 통해 지도에 추가
for band in predictors_final.bandNames().getInfo():
    m.addLayer(
        predictors_final.select(band), # 개별 밴드 선택
        vis_params[band],              # 해당 밴드의 시각화 파라미터 적용
        band,                          # 레이어 이름
        True                           # 처음에는 레이어를 켜진 상태로 둠
    )

# 연구지역 경계선 및 범례 추가
m.add_layer(ee.Image().paint(aoi, 0, 2), {'palette': 'yellow'}, 'Area of Interest')
m.add_colorbar_branca(colors=vis_params['elevation']['palette'], vmin=vis_params['elevation']['min'], vmax=vis_params['elevation']['max'],
                      layer_name='elevation')

# 레이어 컨트롤 추가 및 지도 표시
m.add_layer_control()
display(m)

#4. Presence-Background 데이터 생성

In [None]:
# Background 포인트 생성 및 환경 값 추출
print("Background 데이터 생성 및 Google Drive Export를 시작합니다...")
background_points = ee.FeatureCollection.randomPoints(region=aoi, points=5000, seed=0)
background_values = predictors_final.sampleRegions(
    collection=background_points,
    scale=TARGET_SCALE,
    geometries=False
)

# Google Drive로 Export 작업 설정 (최상위 폴더에 저장)
EXPORT_FILENAME = 'background_data_cns'

task = ee.batch.Export.table.toDrive(
    collection=background_values,
    description='Export_Background_Points_to_CSV',
    folder='',  # <-- 폴더를 비워두어 '내 드라이브' 최상위에 저장
    fileNamePrefix=EXPORT_FILENAME,
    fileFormat='CSV'
)

# 작업 시작 및 Colab 내에서 완료 여부 확인
task.start()
print(f"\n작업이 시작되었습니다. (Task ID: {task.id})")
print(f"'내 드라이브'에 '{EXPORT_FILENAME}.csv' 파일이 생성될 때까지 기다려주세요.")

while task.active():
  print('작업 실행 중...')
  time.sleep(30)

final_status = task.status()['state']
if final_status == 'COMPLETED':
  print(f"작업 완료! 이제 다음 셀을 실행하여 파일을 이동하고 로드하세요. (상태: {final_status})")
else:
  print(f"작업이 완료되지 않았습니다. (상태: {final_status})")
  print(f"오류 메시지: {task.status()['error_message']}")

In [None]:
# 최종 경로에서 Background 데이터 로드
BACKGROUND_CSV_PATH = '/content/drive/MyDrive/KNPS/Deer/SDM/Data/background_data_cns.csv'

try:
    background_df = pd.read_csv(BACKGROUND_CSV_PATH)
    if 'system:index' in background_df.columns:
        background_df = background_df.drop(columns=['system:index', '.geo'])
    print(f"Google Drive에서 Background 데이터 로드 완료: {len(background_df)}개")
except FileNotFoundError:
    print(f"오류: '{BACKGROUND_CSV_PATH}' 파일을 찾을 수 없습니다.")
    print("파일을 정확한 위치로 옮겼는지 확인 후 다시 시도해주세요.")
    sys.exit() # <-- 파일이 없으면 여기서 실행을 중단합니다.


# Presence 데이터 환경 값 추출 및 통합
features = geemap.pandas_to_ee(df_occurrence, latitude_col='latitude', longitude_col='longitude')
presence_values = predictors_final.sampleRegions(collection=features, scale=TARGET_SCALE, geometries=False)
presence_info = presence_values.getInfo()
properties_list_pr = [f['properties'] for f in presence_info['features']]
presence_df = pd.DataFrame(properties_list_pr)

background_df.dropna(inplace=True)
presence_df.dropna(inplace=True)

background_df['pa'] = 0
presence_df['pa'] = 1

final_modeling_df = pd.concat([presence_df, background_df], ignore_index=True)

# 최종 데이터 확인
print(f"\n최종 Presence-Background 데이터 생성 완료:")
print(f" - 총 데이터 수: {len(final_modeling_df)} 개")
print(f" - Presence (pa=1): {len(final_modeling_df[final_modeling_df['pa']==1])} 개")
print(f" - Background (pa=0): {len(final_modeling_df[final_modeling_df['pa']==0])} 개")
print("\n최종 데이터 샘플:")
display(final_modeling_df.head())

#5. Spatial Thinning & Spatial Block Corss-Validation

In [None]:
# Spatial Thinning
def spatial_thinning(df, distance_m, lat_col='latitude', lon_col='longitude'):
    """
    주어진 거리(m)를 기준으로 종 출현 데이터의 공간 씬닝을 수행

    :param df: Pandas 데이터프레임 (위도, 경도 컬럼 포함)
    :param distance_m: 최소 허용 거리 (미터 단위)
    :param lat_col: 위도 컬럼명
    :param lon_col: 경도 컬럼명
    :return: 씬닝이 적용된 Pandas 데이터프레임
    """
    # 데이터프레임을 GeoDataFrame으로 변환
    gdf = gpd.GeoDataFrame(
        df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs="EPSG:4326"
    )
    # 미터 단위 계산을 위해 UTM 좌표계로 변환 (한국 중부원점: EPSG:5186)
    gdf_proj = gdf.to_crs("EPSG:5186")

    # 제거할 포인트의 인덱스를 저장할 집합
    to_remove_indices = set()

    # 각 포인트에 대해 반복
    for index, point in gdf_proj.iterrows():
        if index in to_remove_indices:
            continue

        # 현재 포인트로부터 지정된 거리 내에 있는 다른 포인트들을 찾음
        buffer = point.geometry.buffer(distance_m)
        nearby_points_indices = gdf_proj.sindex.query(buffer, predicate='intersects')

        # 자기 자신을 제외한 인덱스만 필터링
        nearby_points_indices = [i for i in nearby_points_indices if i != index]

        # 제거할 인덱스에 추가
        to_remove_indices.update(nearby_points_indices)

    # 제거할 인덱스를 제외하고 최종 데이터프레임 생성
    thinned_df = df.drop(index=list(to_remove_indices)).reset_index(drop=True)

    return thinned_df

# 공간 씬닝 실행
THINNING_DISTANCE = 500  # 씬닝 거리: 500m

# df_occurrence는 이전에 로드한 원본 출현 데이터
df_occurrence_thinned = spatial_thinning(df_occurrence, THINNING_DISTANCE)

print(f"공간 씬닝(Spatial Thinning) 완료 (기준 거리: {THINNING_DISTANCE}m)")
print(f" - 원본 데이터 수: {len(df_occurrence)} 개")
print(f" - 씬닝 후 데이터 수: {len(df_occurrence_thinned)} 개")
print(f" - 제거된 데이터 수: {len(df_occurrence) - len(df_occurrence_thinned)} 개")

# 씬닝 후 데이터 샘플 확인
display(df_occurrence_thinned.head())

In [None]:
# Spatial Block Corss-Validation
presence_fc = geemap.pandas_to_ee(df_occurrence_thinned, latitude_col='latitude', longitude_col='longitude')
presence_fc = presence_fc.map(lambda ft: ft.set('pa', 1))
background_points = ee.FeatureCollection.randomPoints(region=aoi, points=5000, seed=0)
background_fc = background_points.map(lambda ft: ft.set('pa', 0))
full_fc = presence_fc.merge(background_fc)

# 공간 블록 생성
scale = 2000  # 2km
grid = aoi.coveringGrid(scale=scale, proj='EPSG:4326')

# 3. 공간 블록을 훈련(70%) 및 테스트(30%)용으로 임의 분할
split = 0.7
training_grid = grid.randomColumn(seed=0).filter(ee.Filter.lt('random', split))
testing_grid = grid.randomColumn(seed=0).filter(ee.Filter.gte('random', split))

# 4. 분할된 블록을 기준으로 훈련/테스트 데이터 세트 생성
training_data = full_fc.filter(ee.Filter.bounds(training_grid))
testing_data = full_fc.filter(ee.Filter.bounds(testing_grid))

# 5. 각 데이터 세트에 환경 변수 값 추출
bands = predictors_final.bandNames()
training_df = ee.data.computeFeatures({
    'expression': predictors_final.sampleRegions(collection=training_data, properties=['pa'], scale=30),
    'fileFormat': 'PANDAS_DATAFRAME'
})
testing_df = ee.data.computeFeatures({
    'expression': predictors_final.sampleRegions(collection=testing_data, properties=['pa'], scale=30),
    'fileFormat': 'PANDAS_DATAFRAME'
})

print("GEE 기반 공간 분할 및 데이터 준비 완료.")
print(f" - 훈련 데이터 수: {len(training_df)}")
print(f" - 테스트 데이터 수: {len(testing_df)}")

# 6. 시각화
m = geemap.Map(center=[36.54, 127.83], zoom=11) # 확대 수준 조정
m.add_basemap('HYBRID')
m.addLayer(training_grid, {'color': 'blue', 'fillColor': 'blue_01'}, 'Training Blocks')
m.addLayer(testing_grid, {'color': 'red', 'fillColor': 'red_01'}, 'Testing Blocks')
m.addLayer(presence_fc, {'color': 'yellow'}, 'Thinned Presence Points')
m.add_layer_control()
display(m)

#6. MaxEnt 하이퍼파라미터 튜닝 및 모델 훈련

In [None]:
# GEE FeatureCollection을 직접 사용하여 변환 완료
training_fc = training_data
testing_fc = testing_data

print("GEE FeatureCollection을 직접 사용하여 변환 완료.")
print(f" - 훈련 데이터: {training_fc.size().getInfo()} 개")
print(f" - 테스트 데이터: {testing_fc.size().getInfo()} 개")

In [None]:
# 데이터 준비 및 하이퍼파라미터 튜닝 통합
from sklearn.metrics import roc_auc_score
import ee
import geemap

print("데이터 준비 및 하이퍼파라미터 튜닝을 시작합니다...")

# 'pa' (presence/absence)를 제외한 최종 변수 목록
feature_bands = predictors_final.bandNames().getInfo()

# 공간 분할된 데이터에 환경 변수 값 추출
# training_data와 testing_data에 환경 변수 값을 추가합니다.
training_data_with_vars = predictors_final.sampleRegions(
    collection=training_data,
    properties=['pa'],
    scale=30
)
testing_data_with_vars = predictors_final.sampleRegions(
    collection=testing_data,
    properties=['pa'],
    scale=30
)

# 데이터 준비 완료 확인
print(f" - 훈련 데이터: {training_data_with_vars.size().getInfo()} 개")
print(f" - 테스트 데이터: {testing_data_with_vars.size().getInfo()} 개")

# 하이퍼파라미터 튜닝 루프
best_auc = -1
best_params = {}

reg_multipliers = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0]
auto_feature_options = [True, False]

for multiplier in reg_multipliers:
    for auto_feat in auto_feature_options:
        try:
            maxent_model = ee.Classifier.amnhMaxent(
                betaMultiplier=multiplier,
                autoFeature=auto_feat
            )

            trained_model = maxent_model.train(
                features=training_data_with_vars,
                classProperty='pa',
                inputProperties=feature_bands
            )

            classified_test = testing_data_with_vars.classify(trained_model, 'probability')
            predicted_df = geemap.ee_to_df(classified_test)

            if predicted_df is not None and not predicted_df.empty:
                auc_score = roc_auc_score(predicted_df['pa'], predicted_df['probability'])
                print(f" - 베타 승수: {multiplier}, 자동 특징: {auto_feat}, AUC: {auc_score:.4f}")

                if auc_score > best_auc:
                    best_auc = auc_score
                    best_params['betaMultiplier'] = multiplier
                    best_params['autoFeature'] = auto_feat

        except Exception as e:
            print(f" - 오류 발생: {e}")
            continue

print("\n하이퍼파라미터 튜닝 완료.")
print(f"최적의 하이퍼파라미터: {best_params}")
print(f"최고 AUC 점수: {best_auc:.4f}")

if best_params:
    best_beta_multiplier = best_params.get('betaMultiplier')
    best_auto_feature = best_params.get('autoFeature')
else:
    best_beta_multiplier = None
    best_auto_feature = None

#7. 예측 지도 생성

In [None]:
# 최적의 하이퍼파라미터로 최종 모델을 생성
final_maxent_model = ee.Classifier.amnhMaxent(
    betaMultiplier=best_params['betaMultiplier'],
    autoFeature=best_params['autoFeature']
).train(
    features=training_data_with_vars,
    classProperty='pa',
    inputProperties=feature_bands
)

# 훈련된 최종 모델로 예측 변수 이미지(predictors) 전체를 분류
suitability_map = predictors.classify(final_maxent_model)

# 예측 결과 시각화
vis_params = {
    'min': 0,
    'max': 1,
    'palette': ['#440154', '#482677', '#404788', '#33638D',
                '#287D8E', '#1F968B', '#29AF7F', '#55C667',
                '#95D840', '#DCE319']
}

# 지도 객체를 초기화하고 생성
Map = geemap.Map(center=[36.54, 127.83], zoom=11)

# 서식지 적합성 지도 레이어 추가
Map.addLayer(suitability_map.select('probability'), vis_params, 'Habitat Suitability')
Map.add_colorbar(vis_params, label="Habitat Suitability", orientation="vertical", layer_name="Habitat Suitability")

# 속리산 국립공원 경계 레이어 추가
protected_areas = ee.FeatureCollection("WCMC/WDPA/current/polygons")
songnisan_park = protected_areas.filter(ee.Filter.eq('WDPAID', 773))
park_boundary_style = {
    'color': '000000',
    'width': 2,
    'fillColor': '00000000'
}
Map.addLayer(songnisan_park.style(**park_boundary_style), {}, 'Songnisan Park Boundary')

# 출현 지점 레이어 추가
presence_style = {
    'color': 'FF0000', # 빨간색 점
    'pointSize': 5
}
Map.addLayer(presence_fc.style(**presence_style), {}, 'Thinned Presence Points')

# 최종 지도를 출력
Map

In [None]:
# 잠재적 분포 지도
optimal_threshold = 0.6

# suitability_map에 임계값을 적용하여 이진 지도
potential_distribution_map = suitability_map.gt(optimal_threshold)

# 이진 지도를 위한 시각화 팔레트
binary_vis_params = {
    'min': 0,
    'max': 1,
    'palette': ['white', 'green'] # 비서식지(흰색), 서식지(초록색)
}

# 지도 객체를 초기화하고 생성
Map = geemap.Map(center=[36.54, 127.83], zoom=11)

# 잠재적 분포 지도 레이어만 추가
Map.addLayer(potential_distribution_map.select('probability'), binary_vis_params, 'Potential Distribution Map')

# 속리산 국립공원 경계 레이어 추가
protected_areas = ee.FeatureCollection("WCMC/WDPA/current/polygons")
songnisan_park = protected_areas.filter(ee.Filter.eq('WDPAID', 773))
park_boundary_style = {
    'color': '000000',
    'width': 2,
    'fillColor': '00000000'
}
Map.addLayer(songnisan_park.style(**park_boundary_style), {}, 'Songnisan Park Boundary')

# 출현 지점 레이어 추가
presence_style = {
    'color': 'FF0000', # 빨간색 점
    'pointSize': 5
}
Map.addLayer(presence_fc.style(**presence_style), {}, 'Thinned Presence Points')

# 지도 레이어 컨트롤 추가 및 출력
Map.add_layer_control()
Map

#8. 모델 적합성 평가

In [None]:
# 모델 적합성 평가 (TSS, Kappa)
from sklearn.metrics import confusion_matrix
import numpy as np

try:
    # 최적의 임계값(Threshold) 찾기
    # Maxent 모델의 기본 최적 임계값은 Maximize sum of sensitivity and specificity를 사용
    # 이를 수동으로 찾아 TSS, Kappa를 계산
    y_true = predicted_df['pa']
    y_pred_proba = predicted_df['probability']

    thresholds = np.arange(0, 1, 0.01)
    best_threshold = 0
    max_tss = -1

    for threshold in thresholds:
        y_pred_binary = (y_pred_proba > threshold).astype(int)

        # sklearn.metrics.confusion_matrix는 True Negative, False Positive, False Negative, True Positive를 반환
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred_binary).ravel()

        # 민감도(sensitivity)와 특이도(specificity) 계산
        sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

        # TSS (True Skill Statistic) 계산
        tss = sensitivity + specificity - 1

        if tss > max_tss:
            max_tss = tss
            best_threshold = threshold

    # 최적 임계값으로 최종 지표 계산
    y_pred_binary_final = (y_pred_proba > best_threshold).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred_binary_final).ravel()

    # TSS (True Skill Statistic) 계산
    sensitivity_final = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity_final = tn / (tn + fp) if (tn + fp) > 0 else 0
    tss_final = sensitivity_final + specificity_final - 1

    # Kappa 계산
    p_o = (tp + tn) / (tp + tn + fp + fn)
    p_e = ((tp + fn)*(tp + fp) + (fp + tn)*(fn + tn)) / ((tp + tn + fp + fn)**2)
    kappa_final = (p_o - p_e) / (1 - p_e) if p_e < 1 else 0

    print("\n[모델 적합성 지표 계산 결과]")
    print(f" - 최적 임계값: {best_threshold:.4f}")
    print(f" - TSS (True Skill Statistic): {tss_final:.4f}")
    print(f" - Kappa: {kappa_final:.4f}")

except Exception as e:
    print(f"모델 적합성 평가 중 오류 발생: {e}")