In [1]:
"""
XGBoost multiclassifier for roof materials in D.C.
"""

import os, sys, glob, time
import geopandas as gpd
import pandas as pd
import numpy as np
import rioxarray as rxr
import rasterio as rio

from sklearn.ensemble import IsolationForest
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import precision_score, recall_score, f1_score, matthews_corrcoef, accuracy_score

import xgboost as xgb

maindir = '/Users/max/Library/CloudStorage/OneDrive-Personal/mcook/earth-lab/opp-rooftop-mapping'

print("Imports successful!")

Imports successful!


In [2]:
# Functions!

sys.path.append(os.path.join(os.getcwd(),'code/'))
from __functions import *

print(os.getcwd())

/Users/max/Library/CloudStorage/OneDrive-Personal/mcook/earth-lab/opp-rooftop-mapping/code


In [3]:
# Load our image data to check on the format
stack_da_fp = os.path.join(maindir,'data/spatial/mod/dc_data/planet-data/dc_0623_psscene8b_final_norm.tif')
stack_da = rxr.open_rasterio(stack_da_fp, masked=True, cache=False).squeeze()
print_raster(stack_da, open_file=False)
band_names = stack_da.long_name
del stack_da

shape: (8223, 6714)
resolution: (3.0, -3.0)
bounds: (315267.0, 4294629.0, 335409.0, 4319298.0)
sum: 9.873291015625
CRS: EPSG:32618
NoData: None


In [4]:
band_names = list(band_names)
band_names.append('class_code')
band_names.append('uid')
band_names

['nir',
 'NDBIbg',
 'NDBIrg',
 'NISI',
 'MNF1',
 'NISI9x9',
 'NISI27x27',
 'class_code',
 'uid']

In [None]:
# # Load the training data (footprints)
# gdf_path = os.path.join(maindir,'data/spatial/mod/dc_data/training/dc_data_reference_footprints.gpkg')
# ref = gpd.read_file(gdf_path)
# footprints.head()

In [None]:
# Load the training data (sampled building footprints)
######################################################

# Footprints (from "sample-stack.py")
ref_tbl_path_fp = os.path.join(maindir,'data/tabular/mod/dc_data/training/dc_data_reference_sampled_footprint.csv')
ref = pd.read_csv(ref_tbl_path_fp)

# Retain samples of band matches
ref = ref[band_names]

# Create a numeric class code
class_mapping = {label: idx for idx, label in enumerate(ref['class_code'].unique())}
ref['Y'] = ref['class_code'].map(class_mapping)

ref.head()

In [None]:
print(ref['class_code'].value_counts())

In [None]:
t0 = time.time()

# Set up the model data
y = ref['Y']
X = ref.drop(['class_code', 'uid', 'Y'], axis=1)

# Define dataframes to store results for this feature set
results = pd.DataFrame()  # to store the model performance metrics
feat_imps = pd.DataFrame()  # to store the feature importances
prob_preds = pd.DataFrame()  # for testing optimum cutoff

# Calculate class weights
class_counts = y.value_counts()
total_samples = len(y)
class_weights = {cls: total_samples / count for cls, count in class_counts.items()}
print(f'Class weights: {class_weights}')

# Set up the stratified K-fold
skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

# Loop the folds
fold_idx = 1
for train_index, test_index in skf.split(X, y):
    print(f'Fold: {fold_idx}')

    # Split into train/test sets
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    # Map class weights to each sample in the training set
    sample_weights = y_train.map(class_weights).values
    # print(sample_weights)

    # Initialize the XGBoost classifier for multi-class classification
    xgb_model = xgb.XGBClassifier(
        objective='multi:softmax',
        num_class=len(np.unique(y)),
        n_estimators=1001,
        learning_rate=0.01,
        max_depth=8,
        random_state=42
    )

    # Fit the model
    xgb_model.fit(X_train, y_train, sample_weight=sample_weights)

    # Store feature importance
    fold_imps = pd.DataFrame({
        'Fold': fold_idx,
        'Feature': X.columns,
        'Importance': xgb_model.feature_importances_
    })

    feat_imps = pd.concat([feat_imps, fold_imps], axis=0)

    # Predict on the test set
    y_pred = xgb_model.predict(X_test)

    # Retrieve the accuracy/performance metrics
    precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
    f1 = f1_score(y_test, y_pred, average='weighted')
    mcc = matthews_corrcoef(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)

    # Store the metrics into the results data frame
    fold_results = pd.DataFrame({
        'Fold': [fold_idx],
        'Precision': [precision],
        'Recall': [recall],
        'F1': [f1],
        'MCC': [mcc],
        'Accuracy': [accuracy]
    })
    results = pd.concat([results, fold_results], ignore_index=True)

    # Store the probability values for cutoff testing
    y_pred_proba = xgb_model.predict_proba(X_test)

    # Store probabilities and true labels
    fold_probs = pd.DataFrame({
        'TrueLabel': y_test,
        'PredictedProb': list(y_pred_proba),
        'Fold': fold_idx
    })
    prob_preds = pd.concat([prob_preds, fold_probs], ignore_index=True)

    fold_idx += 1

    del fold_probs, fold_results, fold_imps

    t1 = (time.time() - t0) / 60
    print(f"Total elapsed time for fold {fold_idx}: {t1:.2f} minutes.")
    print("\n~~~~~~~~~~\n")

t2 = (time.time() - t0) / 60
print(f"Total elapsed time: {t2:.2f} minutes.")

# # Append the feature set-specific results to the overall results dataframes
# all_results = pd.concat([all_results, results], ignore_index=True)
# all_feat_imps = pd.concat([all_feat_imps, feat_imps], ignore_index=True)
# all_prob_preds = pd.concat([all_prob_preds, prob_preds], ignore_index=True)

# del results, feat_imps, prob_preds

In [None]:
results.head()

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Combine true labels and predicted labels across all folds
all_true_labels = []
all_pred_labels = []

for fold in range(1, fold_idx):
    fold_data = prob_preds[prob_preds['Fold'] == fold]
    all_true_labels.extend(fold_data['TrueLabel'])
    all_pred_labels.extend(np.argmax(np.vstack(fold_data['PredictedProb']), axis=1))

# Create the confusion matrix
cm = confusion_matrix(all_true_labels, all_pred_labels)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=np.unique(y))

# Plot the confusion matrix
fig, ax = plt.subplots(figsize=(10, 10))
disp.plot(ax=ax)
plt.title('Confusion Matrix')

plt.savefig(os.path.join(maindir,'figures/FigX_xgboost_confusion_matrix.png'), dpi=300, bbox_inches='tight')

plt.show()

In [None]:
from sklearn.metrics import classification_report

# Create a classification report and convert it to a DataFrame
cor_labels = [label for label, idx in sorted(class_mapping.items(), key=lambda item: item[1])]
cr_df = pd.DataFrame(classification_report(all_true_labels, all_pred_labels, target_names=cor_labels, output_dict=True)).transpose()

# Compute the average accuracy metrics across the 10 folds
average_metrics = cr_df.loc[cor_labels].mean()

# Display the DataFrame
cr_df

In [None]:
# Save out the results
results.to_csv(os.path.join(maindir,'xgboost_folds_results.csv'))
feat_imps.to_csv(os.path.join(maindir,'xgboost_folds_feat_imps.csv'))
prob_preds.to_csv(os.path.join(maindir,'xgboost_folds_prob_peds.csv'))
cr_df.to_csv(os.path.join(maindir,'xgboost_classification_report_avg.csv'))