# ___

# [ Machine Learning in Geosciences ]

### | Topic: **GeoAI: Land Cover classification with Sentinel-2 and ST_LUCAS** 

### | Thematic focus: Land Cover

.....................................................................................................................

Author: *Lukas Brodsky* *lukas.brodsky@natur.cuni.cz* 

---

# 1. Problem definition 

### 1.1 Background

Land cover classification is a crucial task in remote sensing and environmental monitoring, providing valuable insights for urban planning, agriculture, forestry, and climate studies. Sentinel-2, a satellite mission by the European Space Agency (ESA), offers high-resolution multispectral imagery suitable for land cover classification. The ST_LUCAS (LUCAS – Land Use/Cover Area frame Survey) dataset provides ground truth reference data, making it a valuable resource for training and validating machine learning models.

By leveraging Sentinel-2 imagery and ST_LUCAS reference data, machine learning models can efficiently classify land cover types, improving accuracy and automation in large-scale environmental analysis. The integration of these datasets enables the development of robust classification models that can generalize across different landscapes.


### 1.2. Objective: 

The primary objective of this task is to develop and evaluate a machine learning model for land cover classification using Sentinel-2 satellite imagery and ST_LUCAS reference data. The key goals include:

* Data Preprocessing – Extract and preprocess Sentinel-2 spectral bands, align them with ST_LUCAS reference points, and apply necessary corrections.
* Sample ST_LUCAS data based on spatial-kfold samling and apply spatial cross-validation. 
* Feature Engineering – Select relevant spectral indices (e.g., NDVI, NDWI) and additional features to enhance classification performance.
* Model Training & Optimization – Train a machine learning model (e.g., Random Forest) using labeled data from ST_LUCAS and optimize its parameters.
* Validation & Accuracy Assessment – Evaluate the model performance.
* Compare model results when using random sampling and spatial sampling. 

### 1.3 Expected results 

* Land Cover Classification Map – A high-resolution map with classified land cover types, generated from Sentinel-2 data, and validated using ST_LUCAS references.
* Performance Metrics – Random and spatial sampling classification accuracy metrics using F1-score.
* Feature Importance Analysis – Identification of the most influential Sentinel-2 bands and spectral indices contributing to classification accuracy.

In [None]:
import os 
import glob
import rasterio
import rasterio.features
from rasterio.enums import MergeAlg
from rasterio.plot import show
from rasterio.features import rasterize
from skimage import exposure
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
import numpy as np
from shapely.geometry import box
import geopandas as gpd 
# from st_lucas import LucasRequest
# from st_lucas import LucasIO

# Random Cross-Validation using KFold
from sklearn.model_selection import KFold
# Stratified Cross-Validation using StratifiedKFold
from sklearn.model_selection import StratifiedKFold

import matplotlib.pyplot as plt 
import matplotlib.patches as mpatches
import matplotlib as mpl

from spatialkfold.blocks import spatial_blocks 
from spatialkfold.datasets import load_ames
from spatialkfold.clusters import spatial_kfold_clusters 
from spatialkfold.plotting import spatial_kfold_plot
from spatialkfold.stats import spatial_kfold_stats

# 2. Data Collection & Preparation
EPSG: 32633 

In [None]:
# Google Drive to download data: 
# https://drive.google.com/drive/folders/1OraTxkjJy597P-vWyFZWVIwDvJ0iX_-u?usp=sharing 

In [None]:
# CONFIG
DIR_PATH = './data'
S2_BANDS_DIR = os.path.join(DIR_PATH, 'S2A_MSIL2A_20180506T100031_N0207_R122_T33UVQ_20180506T105839.SAFE/GRANULE/*/IMG_DATA/R10m') 
LUCAS_SHP = os.path.join(DIR_PATH, 'cz_lucas_region_grow.shp') 
OUTPUT_TIF = os.path.join(DIR_PATH, 'land_cover_cz_33UVQ_2018.tif') 
LABEL_FIELD = 'lc' 
EPSG_CODE = 32633
N_SPLITS = 5
SCALE_FACTOR = 10000

In [None]:
# Load LUCAS polygons
lucas_gdf = gpd.read_file(LUCAS_SHP)
lucas_gdf = lucas_gdf.to_crs(epsg=EPSG_CODE)
print("Number of polygons:", len(lucas_gdf))
print("Columns:", lucas_gdf.columns)

In [None]:
# Load Sentinel-2 bands
band_paths = {
    'B02': glob.glob(os.path.join(S2_BANDS_DIR, '*B02_10m.jp2'))[0],
    'B03': glob.glob(os.path.join(S2_BANDS_DIR, '*B03_10m.jp2'))[0],
    'B04': glob.glob(os.path.join(S2_BANDS_DIR, '*B04_10m.jp2'))[0],
    'B08': glob.glob(os.path.join(S2_BANDS_DIR, '*B08_10m.jp2'))[0]
}

In [None]:
# Read each band and stack them
band_arrays = []
for name, path in band_paths.items():
    with rasterio.open(path) as src:
        band = src.read(1)  # Read the first band (single layer)
        band_arrays.append(band)

In [None]:
# Stack the bands into a 3D array: (height, width, num_bands)
stacked_img = np.stack(band_arrays, axis=-1)
print("stacked shape:", stacked_img.shape)  # Should be (height, width, num_bands)

### Feature engineering 

In [None]:
# Extract individual bands for index computation
B03, B04, B08 = band_arrays[1], band_arrays[2], band_arrays[3]

In [None]:
# NDVI = (NIR - Red) / (NIR + Red)
ndvi = (B08 - B04) / (B08 + B04 + 1e-6) * SCALE_FACTOR

In [None]:
# NDWI = (Green - NIR) / (Green + NIR)
ndwi = (B03 - B08) / (B03 + B08 + 1e-6) * SCALE_FACTOR

In [None]:
# Add indices to the stack
stacked_img = np.concatenate([stacked_img, ndvi[..., np.newaxis], ndwi[..., np.newaxis]], axis=-1)

### Data harmonization

In [None]:
# Get raster bounds and load raster base layer for visualization
with rasterio.open(band_paths['B02']) as src:
    raster_bounds = src.bounds
    raster_image = src.read(1)
    raster_shape = src.shape
    raster_transform = src.transform
    extent = [raster_bounds.left, raster_bounds.right, raster_bounds.bottom, raster_bounds.top]

In [None]:
# Convert bounds to shapely box for filtering
raster_bbox = box(*raster_bounds)

In [None]:
# Filter only LUCAS polygons intersecting the raster
lucas_subset = lucas_gdf[lucas_gdf.intersects(raster_bbox)]

In [None]:
# Check unique values to confirm it's correct
print("LUCAS land cover classes:", lucas_gdf[LABEL_FIELD].unique())

In [None]:
# Prepare shapes for rasterization
shapes = ((geom, int(val)) for geom, val in zip(lucas_gdf.geometry, lucas_gdf[LABEL_FIELD]))
# Rasterize with original raster shape and transform
rasterized = rasterize(
    shapes=shapes,
    out_shape=raster_shape,
    transform=raster_transform,
    fill=0,  # 0 = NoData
    dtype='uint8'
)

In [None]:
# Flatten stacked_img to (num_pixels, num_features)

valid_mask = (rasterized > 0) # & (group_raster > 0)

stacked_flat = stacked_img.reshape(-1, stacked_img.shape[-1])
valid_mask_flat = valid_mask.flatten()
rasterized_flat = rasterized.flatten()

In [None]:
# ML-ready arrays
print("stacked shape:", stacked_img.shape)
print("stacked_flat shape:", stacked_flat.shape)
print("valid_mask_flat shape:", valid_mask_flat.shape)

In [None]:
X = stacked_flat[valid_mask_flat]  # Features based on valid mask
y = rasterized_flat[valid_mask_flat]  # Corresponding labels

# 3. Exploratory Data Analysis (EDA)


In [None]:
# View image and samples
fig, ax = plt.subplots(figsize=(10, 10))
img = ax.imshow(raster_image, cmap='gray', vmin=100, vmax=1000, extent=extent, origin='upper')
lucas_subset.boundary.plot(ax=ax, edgecolor='red', linewidth=1)
ax.set_title('Sentinel-2 Tile with LUCAS Polygon Overlap')
ax.set_xlabel('Easting')
ax.set_ylabel('Northing')
plt.grid(True)
plt.show()

In [None]:
# Nomenclature coding 
nomenclature ={
        1: "artificial land ",
        2: "agriculture",
        3: "woodland",
        4: "shrubland",
        5: "grassland",
        6: "bareland", 
        7: "water", 
        8: "wetlands"
}

In [None]:
colours = {
    1: (255./255,   0./255,   0./255, 0.5),   # red 
    2: (255./255, 255./255,   0./255, 0.5),   # yeallow
    3: ( 51./255, 160./255,  44./255, 1  ),   # dark green
    4: (223./255, 223./255, 138./255, 1  ),   # light greyish green
    5: (  0./255, 223./255,   0./255, 0.5),   # green
    6: (211./255, 211./255, 211./255, 1  ),   # grey
    7: (  0./255,   0./255, 255./255, 0.5),   # blue
    8: (238./255, 130./255, 238./255, 0.5)    # violet
}


In [None]:
# Feature scatter plot
# plt.scatter(s2array_flat[2, :], s2array_flat[4, :], s=1, alpha=0.7, 
#             c=lucas_array_flat, cmap=cmap)
# plt.xlabel('RED')
# plt.ylabel('NIR')
# plt.colorbar()

# 4. Model Selection & Training

# 5. Model fine-tuning 
* hyperparameters fine-tuning 
* random and spatial sampling comparison  
* models evaluation

### Random sampling


In [None]:
### Training and validation
def evaluate_model(clf, X_train, y_train, X_test, y_test):
    y_train_pred = clf.predict(X_train)
    y_test_pred = clf.predict(X_test)
    f1_train = f1_score(y_train, y_train_pred, average='weighted')
    f1_test = f1_score(y_test, y_test_pred, average='weighted')
    return f1_train, f1_test

In [None]:
# Setup for random sampling
scores_random = []
kf_random = KFold(n_splits=N_SPLITS, shuffle=True, random_state=42)


In [None]:
# Train the model using RandomForestClassifier and perform cross-validation
param_grid = {
    'n_estimators': [100, 200],
    'max_depth': [10, 20, 30, 50, 100]
}

In [None]:
# training 
for train_idx, test_idx in kf_random.split(X, y):
    # break
    X_train, y_train = X[train_idx], y[train_idx]
    X_test, y_test = X[test_idx], y[test_idx]
    
    # Grid search with 3-fold CV
    rf_base = RandomForestClassifier(n_jobs=8, random_state=42)
    grid_search = GridSearchCV(rf_base, param_grid, cv=3, scoring='f1_weighted', n_jobs=os.cpu_count()-2, verbose=1)
    grid_search.fit(X_train, y_train)

    # Best model
    clf = grid_search.best_estimator_
    print("Best Parameters:", grid_search.best_params_)
    f1_train, f1_test = evaluate_model(clf, X[train_idx], y[train_idx], X[test_idx], y[test_idx])
    scores_random.append((f1_train, f1_test))


In [None]:
# Output Random CV Results
print(f"Random Cross-Validation F1-scores: {scores_random}")


### Spatial cross-validatin 

In [None]:
# Create blocks using centroid-based selection
blocks = spatial_blocks(gdf=lucas_subset, width=10000, height=10000, 
                        method='random', nfolds=N_SPLITS, 
                        random_state=42)

In [None]:
# Function to rasterize blocks for a given fold
def rasterize_fold(selected_blocks, fold_value, raster_shape, raster_transform):
    
    # replace to union_all()
    selected_geom = selected_blocks.unary_union
    selected_lucas = lucas_subset[lucas_subset.intersects(selected_geom)]
    
    # Prepare shapes for rasterization
    shapes = ((geom, int(val)) for geom, val in zip(selected_lucas.geometry, selected_lucas[LABEL_FIELD]))
    
    rasterized_fold = rasterize(
        shapes=shapes,
        out_shape=raster_shape,
        transform=raster_transform,
        fill=0,  # 0 = NoData
        dtype='uint8'
    )
    return rasterized_fold

In [None]:
def spatial_cross_validation(blocks, raster_shape, raster_transform, n_splits):
    scores = []
    for fold_value in range(1, n_splits + 1):
        print(f"Processing fold {fold_value}...")
        train_blocks = blocks[blocks['folds'] != fold_value]
        test_blocks = blocks[blocks['folds'] == fold_value]
        train_raster = rasterize_fold(train_blocks, fold_value, raster_shape, raster_transform)
        test_raster = rasterize_fold(test_blocks, fold_value, raster_shape, raster_transform)
        # Best model
        clf = grid_search.best_estimator_
        print("Best Parameters:", grid_search.best_params_)
        f1_train, f1_test = evaluate_model(clf, X_train, y_train, X_test, y_test)
        scores.append((f1_train, f1_test))

    return scores

In [None]:
# Run spatial cross-validation
scores_spatial = spatial_cross_validation(blocks, raster_shape, raster_transform, N_SPLITS)


In [None]:
# Output results
print(f"Spatial Cross-Validation F1-scores: {scores_spatial}")

In [None]:
# Combine scores and calculate means
cv_results = {
    'Random': scores_random,
    'Spatial': scores_spatial, 
}   

In [None]:
# Print average scores 
for method, scores in cv_results.items():
    train_avg = np.mean([s[0] for s in scores])
    test_avg = np.mean([s[1] for s in scores])
    print(f"{method} CV → Avg Train F1: {train_avg:.3f}, Avg Test F1: {test_avg:.3f}")


In [None]:
# cv_results

# 6. Results presentation and interpretation 

In [None]:
### Explanations
import shap

In [None]:
# xAI: apply SHAP for model interpretation
explainer_shap = shap.TreeExplainer(clf)   

In [None]:
shap_values = explainer_shap.shap_values(X_test[:100])

In [None]:
# plot 
shap.initjs()
shap.force_plot(explainer_shap.expected_value[0], shap_values[..., 0], X_test)

In [None]:
### Predictions 

In [None]:
# Predict the entire image using the final model
predictions = clf.predict(stacked_flat)

In [None]:
# Save the result to a GeoTIFF
with rasterio.open(band_paths['B02']) as src:
    profile = src.profile
    profile.update(dtype=rasterio.uint8, count=1, compress='lzw')

    with rasterio.open(OUTPUT_TIF, 'w', **profile) as dst:
        dst.write(predictions_raster, 1)

print("Land cover map saved as:", OUTPUT_TIF)

In [None]:
# Plot
plt.figure(figsize=(10, 10))
im = plt.imshow(predictions_raster, cmap=colormap, interpolation='none')
cbar = plt.colorbar(im, ticks=unique_labels)
cbar.set_label('Land Cover Class')
plt.title('Land Cover Classification Map')
plt.xlabel('Easting')
plt.ylabel('Northing')
plt.grid(True)
plt.show()

In [None]:
# Resutls interpretation