# Cyprus Terrace Detection - Model Training
Two-stage classification: (1) Binary terrace detection, (2) Multiclass terrace type classification

In [None]:
import os
import geopandas as gpd
import pandas as pd
import numpy as np
from autogluon.tabular import TabularPredictor
from sklearn.metrics import (
    precision_recall_curve, roc_curve, roc_auc_score, auc,
    accuracy_score, precision_score, recall_score, f1_score,
    balanced_accuracy_score, matthews_corrcoef
)
import matplotlib.pyplot as plt

# Set working directory
working_directory = "C:/Users/u0148406/OneDrive - KU Leuven/PhD KUL/TerraceDetection"
os.chdir(working_directory)

## 1. Data Preparation

In [None]:
def train_test_grid(data, percentage_validation=0.2, random_state=None, landcover=False):
    """Spatial train-test split based on grid numbers to avoid spatial autocorrelation"""
    data['unique_id'] = data.reset_index().index
    data = data[data['terrace'].isin([0, 1])]

    rows_select = round(percentage_validation * data.shape[0])
    data_select = data[data['selected'] == 1]
    valid_no_pergrid = round(np.mean(data_select['gridnumber'].value_counts()))
    grid_no = round(rows_select / valid_no_pergrid) - 1
    
    gridnumbers = pd.Series(data['gridnumber'].unique())
    gridnumbers_valid = gridnumbers.sample(n=grid_no, random_state=random_state)
    
    data_valid = data_select[data_select['gridnumber'].isin(gridnumbers_valid)]
    data_train = data[~data['gridnumber'].isin(gridnumbers_valid)]

    # Drop unnecessary columns
    columns_to_drop = ['unique_id', 'selected', 'gridnumber', 'name', 'geometry']
    data_train = data_train.drop(columns=[col for col in columns_to_drop if col in data_train.columns], errors='ignore')
    data_valid = data_valid.drop(columns=[col for col in columns_to_drop if col in data_valid.columns], errors='ignore')

    # Ensure ancient is 0 where terrace is 0
    data_train.loc[data_train['terrace'] == 0, 'ancient'] = 0
    data_valid.loc[data_valid['terrace'] == 0, 'ancient'] = 0
    
    data_train = data_train.dropna(subset=['ancient'])
    data_valid = data_valid.dropna(subset=['ancient'])

    if landcover:
        data_train['majority_landcover'] = data_train['majority_landcover'].astype(str)
        data_valid['majority_landcover'] = data_valid['majority_landcover'].astype(str)

    print(f"Training samples: {data_train.shape[0]}, Validation samples: {data_valid.shape[0]}")
    print(f"Validation percentage: {round(data_valid.shape[0]*100/(data_train.shape[0]+data_valid.shape[0]), 2)}%")
    print("\nClass distribution (terrace):")
    print("Train:", data_train['terrace'].value_counts().to_dict())
    print("Valid:", data_valid['terrace'].value_counts().to_dict())

    return data_train, data_valid


# Load and split data
file_path = "C:/Users/u0148406/OneDrive - KU Leuven/PhD KUL/TerraceDetection/Cyprus_large/Cyprus_only_train_polysanc.gpkg"
data = gpd.read_file(file_path)

data_train_cyprMatch, data_valid_cyprMatch = train_test_grid(
    data, percentage_validation=0.2, random_state=22, landcover=True
)

# Prepare binary classification data (drop 'ancient' column)
data_train_cyprMatch1 = data_train_cyprMatch.drop(columns=['ancient'])
data_valid_cyprMatch1 = data_valid_cyprMatch.drop(columns=['ancient'])

## 2. Binary Classification - Terrace Detection

In [None]:
# Train binary terrace detection model
label = 'terrace'
problem_type = 'binary'
eval_metric = 'mcc'
time_limit = 18000
path_binary = 'C:/Workdir/test_autogluon/model_CyprusSENtrain_rs22matchBEST'

excluded_model_types = ['RF', 'XT', 'KNN', 'XGB', 'GBM']  # Keep NN models and CatBoost

predictor_binary = TabularPredictor(
    label=label,
    problem_type=problem_type,
    eval_metric=eval_metric,
    path=path_binary,
).fit(
    train_data=data_train_cyprMatch1,
    excluded_model_types=excluded_model_types,
    presets='best_quality',
    time_limit=time_limit
)

# Evaluate performance
performance = predictor_binary.evaluate(data_valid_cyprMatch1, decision_threshold=0.5)
leaderboard = predictor_binary.leaderboard(data_valid_cyprMatch1, extra_metrics=['accuracy', 'balanced_accuracy', 'log_loss'])

print("Performance:", performance)
print("\nLeaderboard:\n", leaderboard)

In [None]:
# Load trained binary model
path_binary = 'C:/Workdir/test_autogluon/model_CyprusSENtrain_rs22matchBEST'
predictor_binary = TabularPredictor.load(path_binary)

# Evaluate and save leaderboard
performance = predictor_binary.evaluate(data_valid_cyprMatch1, decision_threshold=0.5)
leaderboard = predictor_binary.leaderboard(data_valid_cyprMatch1, extra_metrics=['accuracy', 'balanced_accuracy', 'log_loss'])

print("Performance:", performance)
print("\nLeaderboard:\n", leaderboard)

leaderboard_path = 'D:/Cyprus_paper_data/model_eval_autogluon/model_CyprusSENtrain_rs22matchBEST_leaderboard.csv'
leaderboard.to_csv(leaderboard_path)
print(f"\nLeaderboard saved to {leaderboard_path}")

## 4. Optimal Threshold Selection

In [None]:
def find_optimal_threshold(predicted_probabilities, true_labels, target_total):
    """Find threshold where TP + FP = target_total"""
    sorted_thresholds = sorted(set(predicted_probabilities), reverse=True)
    
    for threshold in sorted_thresholds:
        predictions = (predicted_probabilities >= threshold).astype(int)
        TP = sum((predictions == 1) & (true_labels == 1))
        FP = sum((predictions == 1) & (true_labels == 0))
        FN = sum((predictions == 0) & (true_labels == 1))
        TN = sum((predictions == 0) & (true_labels == 0))
        
        if TP + FP == target_total:
            return threshold, TP, FP, FN, TN
    
    return None, None, None, None, None


# Load model
path_binary = 'C:/Workdir/test_autogluon/model_CyprusSENtrain_rs22matchBEST'
predictor_binary = TabularPredictor.load(path_binary)

# Get predictions
label_column = 'terrace'
true_labels = data_valid_cyprMatch[label_column]
predicted_probabilities = predictor_binary.predict_proba(data_valid_cyprMatch, as_pandas=False)[:, 1]

# Calculate PR and ROC metrics
precision, recall, thresholds_pr = precision_recall_curve(true_labels, predicted_probabilities)
fpr, tpr, thresholds_roc = roc_curve(true_labels, predicted_probabilities)
auc_pr = auc(recall, precision)
auc_roc = roc_auc_score(true_labels, predicted_probabilities)

# Find optimal threshold
target_total = true_labels.sum()
optimal_threshold, TP, FP, FN, TN = find_optimal_threshold(predicted_probabilities, true_labels, target_total)

if optimal_threshold is not None:
    print(f"Optimal Threshold: {optimal_threshold:.4f}")
    print(f"TP: {TP}, FP: {FP}, FN: {FN}, TN: {TN}")
    print(f"Total TP + FP: {TP + FP} (Target: {target_total})")
    print(f"AUC-PR: {auc_pr:.4f}, AUC-ROC: {auc_roc:.4f}")
else:
    print("No optimal threshold found")

In [None]:
# Evaluate metrics across threshold range
thresholds = np.arange(0.1, 1.0, 0.01)

metrics_list = []
for threshold in thresholds:
    predictions = predictor_binary.predict(data_valid_cyprMatch1, as_pandas=False, decision_threshold=threshold)
    
    metrics_list.append({
        'threshold': threshold,
        'precision': precision_score(true_labels, predictions, zero_division=0),
        'recall': recall_score(true_labels, predictions),
        'f1': f1_score(true_labels, predictions),
        'accuracy': accuracy_score(true_labels, predictions),
        'balanced_accuracy': balanced_accuracy_score(true_labels, predictions),
        'mcc': matthews_corrcoef(true_labels, predictions)
    })

metrics_df = pd.DataFrame(metrics_list)
print(metrics_df.head(10))

In [None]:
# Compare optimal vs default threshold
optimal_threshold = 0.6707
default_threshold = 0.5

for thresh, name in [(optimal_threshold, "Optimal"), (default_threshold, "Default")]:
    predictions = predictor_binary.predict(data_valid_cyprMatch1, as_pandas=False, decision_threshold=thresh)
    
    print(f"\n{name} Threshold: {thresh:.4f}")
    print(f"  Precision:        {precision_score(true_labels, predictions, zero_division=0):.4f}")
    print(f"  Recall:           {recall_score(true_labels, predictions):.4f}")
    print(f"  F1-Score:         {f1_score(true_labels, predictions):.4f}")
    print(f"  Accuracy:         {accuracy_score(true_labels, predictions):.4f}")
    print(f"  Balanced Acc:     {balanced_accuracy_score(true_labels, predictions):.4f}")
    print(f"  MCC:              {matthews_corrcoef(true_labels, predictions):.4f}")

## 3. Multiclass Classification - Terrace Type

In [None]:
# Prepare data for multiclass classification (only terraces)
train_data_terrace = data_train_cyprMatch[data_train_cyprMatch['terrace'] == 1].drop(columns=['terrace'])
valid_data_terrace = data_valid_cyprMatch[data_valid_cyprMatch['terrace'] == 1].drop(columns=['terrace'])

print(f"Training terraces: {train_data_terrace.shape[0]}, Validation terraces: {valid_data_terrace.shape[0]}")
print("\nClass distribution (ancient):")
print("Train:", train_data_terrace['ancient'].value_counts().to_dict())
print("Valid:", valid_data_terrace['ancient'].value_counts().to_dict())

In [None]:
# Train multiclass terrace type model
label = 'ancient'
problem_type = 'multiclass'
eval_metric = 'mcc'
time_limit = 15000
path_multiclass = 'C:/Workdir/test_autogluon/model_CyprusMultinewBEST'

excluded_model_types = ['RF', 'XT', 'KNN', 'XGB', 'GBM']

predictor_multiclass = TabularPredictor(
    label=label,
    problem_type=problem_type,
    eval_metric=eval_metric,
    path=path_multiclass,
).fit(
    train_data=train_data_terrace,
    excluded_model_types=excluded_model_types,
    presets='best_quality',
    time_limit=time_limit
)

# Evaluate performance
performance = predictor_multiclass.evaluate(valid_data_terrace)
leaderboard = predictor_multiclass.leaderboard(valid_data_terrace, extra_metrics=['accuracy', 'balanced_accuracy', 'log_loss'])

print("Performance:", performance)
print("\nLeaderboard:\n", leaderboard)

## 5. Permutation Feature Importance (PFI)

In [None]:
# Load both models
path_binary = 'C:/Workdir/test_autogluon/model_CyprusSENtrain_rs22matchBEST'
path_multiclass = 'C:/Workdir/test_autogluon/model_CyprusMultinewBEST'

predictor_terrace = TabularPredictor.load(path_binary)
predictor_ancient = TabularPredictor.load(path_multiclass)

# Compute feature importance (PFI with permutation shuffling)
print("Computing feature importance for binary terrace model...")
feature_importance_terrace = predictor_terrace.feature_importance(
    data_valid_cyprMatch1, 
    num_shuffle_sets=10, 
    confidence_level=0.95
)

print("Computing feature importance for multiclass terrace type model...")
feature_importance_ancient = predictor_ancient.feature_importance(
    valid_data_terrace, 
    num_shuffle_sets=10, 
    confidence_level=0.95
)

# Display and save results
print("\n=== Feature Importance - Binary Terrace Detection ===")
print(feature_importance_terrace)
feature_importance_terrace.to_csv('E:/Cyprus_paper_data/figures/ModelImpFeatTerrace_feat.csv')

print("\n=== Feature Importance - Multiclass Terrace Type ===")
print(feature_importance_ancient)
feature_importance_ancient.to_csv('E:/Cyprus_paper_data/figures/ModelImpFeatAncient_feat.csv')

print("\n✓ Feature importance analysis complete")