In [None]:
import rasterio             # raster data
import rioxarray as rxr     # rasterio with xarray support
import xarray as xr         # n-dimenzional data
import xvec                 # vector data
import pandas as pd         # data manipulation
import geopandas as gpd     # geospatial data manipulation

import numpy as np              # numerical operations
import matplotlib.pyplot as plt # plotting

import seaborn as sns                                   # statistical data visualization
from sklearn.model_selection import train_test_split    # splitting data into train and test sets
from sklearn.ensemble import RandomForestClassifier     # random forest classification
from sklearn.model_selection import GridSearchCV        # hyperparameter tuning
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay, classification_report, f1_score # model evaluation metrics

In [None]:
# load and open raster data
tiff_file = 'data/imad1Itnew_rescale_SWIR.tif'  
rds_imad = rxr.open_rasterio(tiff_file, masked=True).squeeze()
print(f'Landsat 9 raster dataset with {rds_imad.shape[0]} bands and {rds_imad.shape[1]} rows and {rds_imad.shape[2]} columns.')

# divide bands into separate variables
landsat9 = rds_imad.sel(band = [1, 2, 3, 4, 5, 6])
landsat5 = rds_imad.sel(band = [7, 8, 9, 10, 11, 12])
iamd = rds_imad.sel(band = [13, 14, 15, 16, 17, 18])
nc_pix = rds_imad.sel(band = [19])
srtm_sl_a = rds_imad.sel(band = [20,21,22])
print(f'Dataset consists of {landsat9.shape[0]} bands of Landsat 9, {landsat5.shape[0]} bands of Landsat 5, {iamd.shape[0]} bands of IAMD, \n{nc_pix.shape[0]} band of NC_Pixels and {srtm_sl_a.shape[0]} bands of SRTM - height, slope, aspect.')

In [None]:
# load vector data
gdf_l9 = gpd.read_file('data/SHP/l9_Samp_Intersect_new.shp')

# drop unneccessary columns
un_col = ['FID_Raspol', 'CID', 'FID_Rasp_1', 'Id']
gdf_l9 = gdf_l9.drop(columns=un_col)
print(f'Point dataset with {gdf_l9.shape[0]} rows and {gdf_l9.shape[1]} columns.')

# check nan values
print(f'\nCount of NaN values for L9 data:\n {gdf_l9.isna().sum()}')
# check if there are any duplicates
print(f'\nCount of duplicates for L9 data: {gdf_l9.duplicated().sum()}')

info_gdf = gdf_l9
type_name = {
    1: 'built-up area',
    2: 'water',
    3: 'forests',
    4: 'croplands',
    5: 'grasslands',
}
print(f'\nCount of unique values for landcover: {info_gdf["TYPE"].nunique()}')
info_gdf['TYPE_NAME'] = info_gdf['TYPE'].map(type_name)
info_gdf['TYPE_NAME'].value_counts()

In [None]:
# reproject gdf to the same crs as raster
gdf_imad_l9 = gdf_l9.to_crs(rds_imad.rio.crs)
# check if gdf and raster have the same CRS
print(f'Gdf and raster have the same CRS: {gdf_imad_l9.crs == rds_imad.rio.crs}')

In [None]:
# IMAD raster data select bands and create a cube
invar_pix = rds_imad.sel(band=19)
l9_blue = rds_imad.sel(band=1)
l9_green = rds_imad.sel(band=2)
l9_red = rds_imad.sel(band=3)
l9_nir = rds_imad.sel(band=4)
l9_swir1 = rds_imad.sel(band=5)
l9_swir2 = rds_imad.sel(band=6)

l5_blue = rds_imad.sel(band=7)
l5_green = rds_imad.sel(band=8)
l5_red = rds_imad.sel(band=9)
l5_nir = rds_imad.sel(band= 10)
l5_swir1 = rds_imad.sel(band=11)
l5_swir2 = rds_imad.sel(band=12)

iamd_blue = rds_imad.sel(band=13)
iamd_green = rds_imad.sel(band=14)
iamd_red = rds_imad.sel(band=15)
iamd_nir = rds_imad.sel(band=16)
iamd_swir1 = rds_imad.sel(band=17)
iamd_swir2 = rds_imad.sel(band=18)

elevation_imad = rds_imad.sel(band=20)
slope_imad = rds_imad.sel(band=21)
aspect_imad = rds_imad.sel(band=22)

imad_cube = xr.concat(
    [l9_blue, l9_green, l9_red, l9_nir, l9_swir1, l9_swir2, l5_blue, l5_green, l5_red, l5_nir, l5_swir1, l5_swir2, iamd_blue, iamd_green, iamd_red, iamd_nir, iamd_swir1, iamd_swir2 , invar_pix, elevation_imad, slope_imad, aspect_imad],
    dim=pd.Index(
        ["l9b", "l9g", "l9r", "l9n", "l9s1", "l9s2", "l5b", "l5g", "l5r", "l5n", "l5s1", "l5s2", "ib", "ig", "ir", "in", "is1", "is2", "invar_pix", "elevation", "slope", "aspect"],
        name="measurement",
    ) 
)
#imad_cube

In [None]:
# extract raster values to points
vector_irmad_cube_l9 = imad_cube.drop_vars("spatial_ref").xvec.extract_points(
    points=gdf_imad_l9.geometry,
    x_coords="x",
    y_coords="y",
)
vector_irmad_cube_l9

# convert to geopandas dataframe
gdf_raster_imad_l9 = vector_irmad_cube_l9.xvec.to_geopandas()
print(f'Landsat 9 data shape:{gdf_raster_imad_l9.shape}')
# sjoin 
l9_data = gdf_raster_imad_l9.sjoin(gdf_imad_l9[['geometry', 'TYPE']], how="left", predicate="intersects")
# dorp index_right column
l9_data.drop(columns="index_right", inplace=True)
l9_data = l9_data.dropna()
l9_data.head()

In [None]:
# multiply all l5, l9 and i * 100 - for percentage representation (for plotting data)
inner_cols = ["l9b", "l9g", "l9r", "l9n", "l9s1", "l9s2",
              "l5b", "l5g", "l5r", "l5n", "l5s1", "l5s2",
              "ib",  "ig",  "ir",  "in",  "is1",  "is2"]

l9_data_df = l9_data.copy()
l9_data_df[inner_cols] *= 100
l9_data_df

In [None]:
# Landsat 9 graph
# plotting mean spectral curves by landcover
l9_spect_bands = ['l5b', 'l5g', 'l5r', 'l5n', 'l5s1', 'l5s2']
class_labels = {
    1: ('built-up area', 'red'),
    2: ('water',    "blue"),
    3: ('forests', 'green'),
    4: ('croplands',   'yellow'),
    5: ('grasslands',  '#66c2a5')
}

for code in l9_data_df['TYPE'].unique():
    name, color = class_labels[code]
    subset = l9_data_df[l9_data_df['TYPE'] == code]
    mean_spect = subset[l9_spect_bands].mean()
    plt.plot(l9_spect_bands, mean_spect,
             label=name,                  
             color=color, linewidth=2)

plt.xlabel('band')
plt.ylabel('reflectance')
plt.title('Mean spectral curves by landuse type')
plt.legend(title='Land-use')      
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# sort values by TYPE
l9_data = l9_data.sort_values(by='TYPE')
#l9_data_df = l9_data_df.sort_values(by='TYPE')
count_samples = l9_data['TYPE'].value_counts()
count_samples

In [None]:
# calculate indices NDVI, NDMI, SAVI, MSAVI, EVI, BSI
def ndvi(red, nir):
    """
    Normalized Difference Vegetation Index (NDVI) calculation.
    """
    return (nir - red) / (nir + red)
# calculate NDVI
l9_data['ndvi9'] = ndvi(l9_data['l9r'], l9_data['l9n'])
l9_data['ndvi5'] = ndvi(l9_data['l5r'], l9_data['l5n'])
l9_data['ndviI'] = ndvi(l9_data['ir'], l9_data['in'])

def ndmi(nir, swir):
    """
    Normalized Difference Moisture Index (NDMI) calculation.
    """
    return (nir - swir) / (nir + swir)
# calculate NDMI
l9_data['ndmi9'] = ndmi(l9_data['l9n'], l9_data['l9s1'])
l9_data['ndmi5'] = ndmi(l9_data['l5n'], l9_data['l5s1'])
l9_data['ndmiI'] = ndmi(l9_data['in'], l9_data['is1'])

def savi(red, nir):
    """
    Soil Adjusted Vegetation Index (SAVI) calculation.
    """
    L = 0.5  # Soil brightness correction factor
    return ((nir - red) * (1 + L) / (nir + red + L)) 
# calculate SAVI
l9_data['savi9'] = savi(l9_data['l9r'], l9_data['l9n'])
l9_data['savi5'] = savi(l9_data['l5r'], l9_data['l5n'])
l9_data['saviI'] = savi(l9_data['ir'], l9_data['in'])

def msavi(red, nir):
    """
    Modified Soil Adjusted Vegetation Index (MSAVI) calculation.
    """
    return (2 * nir + 1 - np.sqrt((2 * nir + 1) ** 2 - 8 * (nir - red))) / 2
# calculate MSAVI
l9_data['msavi9'] = msavi(l9_data['l9r'], l9_data['l9n'])
l9_data['msavi5'] = msavi(l9_data['l5r'], l9_data['l5n'])
l9_data['msaviI'] = msavi(l9_data['ir'], l9_data['in'])

def evi(blue, red, nir):
    """
    Enhanced Vegetation Index calculation
    """
    G = 2.5     # Gain factor
    C1 = 6      # Coefficient for aerosol resistance term
    C2 = 7.5    # Coefficient for canopy background adjustment
    L = 1       # Canopy background adjustment   
    return G * (nir - red) / (nir + C1 * red - C2 * blue + L)
# calculate NDBI
l9_data['evi9'] = evi(l9_data['l9b'], l9_data['l9r'], l9_data['l9n'])
l9_data['evi5'] = evi(l9_data['l5b'], l9_data['l5r'], l9_data['l5n'])
l9_data['eviI'] = evi(l9_data['ib'], l9_data['ir'], l9_data['in'])

def bsi(blue, red, nir, swir):
    """
    Bare soil index calculation
    """
    return ((blue + swir) - (red - nir)) / ((blue + swir) + (red + nir))
l9_data['bsi9'] = bsi(l9_data['l9b'], l9_data['l9r'], l9_data['l9n'], l9_data['l9s1'])
l9_data['bsi5'] = bsi(l9_data['l5b'], l9_data['l5r'], l9_data['l5n'], l9_data['l5s1'])
l9_data['bsiI'] = bsi(l9_data['ib'], l9_data['ir'], l9_data['in'], l9_data['is1'])

In [None]:
l9_data.columns

In [None]:
# another spectral bands plots for Landsat 9
l9_spect_bands = ['l9b', 'l9g', 'l9r', 'l9n', 'l9s1', 'l9s2']
land_names = ['1) Built-up', '2) Water', '3) ForestS', '4) Croplands', '5) Grassland']

landcover_types = l9_data['TYPE'].unique() # unique landcover types

n_cols = 3 
n_rows = 2
fig, axes = plt.subplots(2, 3, figsize=(15, 8), squeeze=False)

for idx, landcover_type in enumerate(landcover_types):
    print(idx, landcover_type)

    # calculate row and column index for subplot
    row = idx // n_cols 
    col = idx % n_cols
    ax = axes[row, col]
    
    subset = l9_data[l9_data['TYPE'] == landcover_type] # subset data for each landcover type
    
    # plot each spectral curve - one line = one point
    for _, row_data in subset.iterrows():
        ax.plot(l9_spect_bands, row_data[l9_spect_bands], alpha=0.2, color='gray')
    
    # count the mean and std of the each band
    mean_spect = subset[l9_spect_bands].mean()
    std_spectrum = subset[l9_spect_bands].std()
    ax.plot(l9_spect_bands, mean_spect, color='red', linewidth=2, label='mean')
    ax.plot(l9_spect_bands, mean_spect - 2*std_spectrum, '--', color='darkred', linewidth=2.5, label='± 2 std')
    ax.plot(l9_spect_bands, mean_spect + 2*std_spectrum, '--', color='darkred', linewidth=2.5)
    
    ax.set_title(f'{land_names[idx]}')
    ax.set_xlabel('Band')
    ax.set_ylabel('SR')
    ax.grid(True)
    ax.legend()

fig.delaxes(axes[1,2]) # remove empty subplot
plt.suptitle('Landsat 9 Spectral Curves', fontsize=16, y=1.02)
plt.tight_layout()
plt.show()

In [None]:
# detecting LANDSAT 9 outliers
# data copy
l9_spect_bands = ['l9b', 'l9g', 'l9r', 'l9n', 'l9s1', 'l9s2']
data = l9_data.copy()
outlier_indices = [] # list of outlier indices

# for each landcover type, calculate the mean and find outliers using RMSE
for landcover_type in data['TYPE'].unique():
    subset = data[data['TYPE'] == landcover_type]
    mean_spect = subset[l9_spect_bands].mean() 
    
    # count the distance of each point from the mean curve (Root Mean Square Error - RMSE)
    # RMSE = sqrt(mean((x_i - mean)^2)) -> x_i = each row

    dist = np.sqrt(np.mean((subset[l9_spect_bands] - mean_spect)**2, axis=1))
    
    # threshold - percentile of the distance
    # filtering LANDSAT 9 - set 99
    # filtering LANDSAT 5 - set 96.5
    threshold = np.percentile(dist, 99)
    
    # find outliers - those points that are above the threshold
    outliers = dist[dist > threshold].index # get the index of outliers
    outlier_indices.extend(outliers)

# print total number of outliers and their indices
print(f'Total number of outliers: {len(outlier_indices)}')
print('Outlier indices:', outlier_indices)

# data copy for next analysis
data = l9_data.copy()
landcover_types = data['TYPE'].unique()

# prepare subplot grid
n_cols = 3
n_rows = 2

fig, axes = plt.subplots(2, 3, figsize=(15, 8), squeeze=False)

for idx, landcover_type in enumerate(landcover_types):
    row = idx // n_cols 
    col = idx % n_cols 
    ax = axes[row, col] 
    
    subset = data[data['TYPE'] == landcover_type] # subset of data for each landcover type 
    mean_spect = subset[l9_spect_bands].mean() # mean spectral curve of the subset
    
    for i, (index, row_data) in enumerate(subset.iterrows()):
        # plot each spectral curve, if it's an outlier - color it differently
        if index in outlier_indices:
            ax.plot(l9_spect_bands, row_data[l9_spect_bands], color='orange', alpha=0.7, linewidth=1.5)  
        else:
            ax.plot(l9_spect_bands, row_data[l9_spect_bands], color='lightgray', alpha=0.3)  
    
    # plot the mean spectral curve
    ax.plot(l9_spect_bands, mean_spect, color='red', linewidth=2, label='mean')
    
    land_names = ['1) Built-up', '2) Water', '3) Forests', '4) Croplands', '5) Grasslands']
    ax.set_title(f'{land_names[idx]}')
    ax.set_xlabel('Band')
    ax.set_ylabel('SR - rescaled')
    ax.grid(True)
    ax.legend()


fig.delaxes(axes[1,2])
plt.suptitle('Landsat 9 spectral curves with outliers', fontsize=16, y=1.02)
plt.tight_layout()
plt.show()

# clean data from outliers
clean_l9_data = data.drop(index=outlier_indices)

In [None]:
# detecting LANDSAT 5 outliers
# data copy
l5_spect_bands = ['l5b', 'l5g', 'l5r', 'l5n', 'l5s1', 'l5s2']
data = clean_l9_data.copy()
outlier_indices = [] # list of outlier indices

# for each landcover type, calculate the mean and find outliers using RMSE
for landcover_type in data['TYPE'].unique():
    subset = data[data['TYPE'] == landcover_type]
    mean_spect = subset[l5_spect_bands].mean() 
    
    # count the distance of each point from the mean curve (Root Mean Square Error - RMSE)
    # RMSE = sqrt(mean((x_i - mean)^2)) -> x_i = each row

    dist = np.sqrt(np.mean((subset[l5_spect_bands] - mean_spect)**2, axis=1))
    
    # threshold - percentile of the distance
    threshold = np.percentile(dist, 99)
    
    # find outliers
    outliers = dist[dist > threshold].index # get the index of outliers
    outlier_indices.extend(outliers)

# print total number of outliers and their indices
print(f'Total number of outliers: {len(outlier_indices)}')
print('Outlier indices:', outlier_indices)

# data copy for next analysis
data = clean_l9_data.copy()

# type of classes
landcover_types = data['TYPE'].unique()
fig, axes = plt.subplots(2, 3, figsize=(15, 8), squeeze=False)

for idx, landcover_type in enumerate(landcover_types):
    row = idx // n_cols
    col = idx % n_cols 
    ax = axes[row, col] 
    
    subset = data[data['TYPE'] == landcover_type] # subset of data for each landcover type 
    mean_spect = subset[l5_spect_bands].mean() # mean spectral curve of the subset
    
    for i, (index, row_data) in enumerate(subset.iterrows()):
        # plot each spectral curve
        if index in outlier_indices:
            ax.plot(l5_spect_bands, row_data[l5_spect_bands], color='orange', alpha=0.7, linewidth=1.5)  
        else:
            ax.plot(l5_spect_bands, row_data[l5_spect_bands], color='lightgray', alpha=0.3)  
    
    # plot the mean spectral curve
    ax.plot(l5_spect_bands, mean_spect, color='red', linewidth=2, label='mean')
    
    land_names = ['1) Built-up', '2) Water', '3) Forests', '4) Croplands', '5) Grasslands']
    ax.set_title(f'{land_names[idx]}')
    ax.set_xlabel('Band')
    ax.set_ylabel('SR - rescaled')
    ax.grid(True)
    ax.legend()

fig.delaxes(axes[1, 2])
plt.suptitle('Landsat 5 spectral curves with outliers', fontsize=16, y=1.02)
plt.tight_layout()
plt.show()

# clean data from outliers
clean_l9_data = data.drop(index=outlier_indices)

In [None]:
clean_l9_data.columns

In [None]:
# pairplot for Landsat 9 / Landsat 5 data
pairplot_columns = ['l9b', 'l9g', 'l9r', 'l9n', 'l9s1', 'l9s2', 'l5b', 'l5g', 'l5r', 'l5n', 'l5s1', 'l5s2', 'TYPE']
# ['l5b', 'l5g', 'l5r', 'l5n', 'l5s', 'slope', 'ndvi5', 'ndmi5', 'TYPE']

sns.pairplot(
    clean_l9_data[pairplot_columns],
    hue='TYPE',
    palette='Set2',
    diag_kind='kde',
    height=2.5,
    aspect=1.2,
)
plt.show()

In [None]:
# calculate correlation
corr_col = pairplot_columns[:-1]
correlations = clean_l9_data[corr_col + ['TYPE']].corr()['TYPE'].drop('TYPE')

# sort correlations by value
correlations = correlations.reindex(correlations.abs().sort_values(ascending=False).index)
print(correlations)

### PREPARE DATA FOR TRAINING
- split data for train, validation and test set
    - ensure that sets are stratified
- hypertuning using gridsearch
- train model
    - show most important features
- validate model
    - count error, show confusion matrix and classification report
- classify data with trained model
- test model
    - count error, show confusion matrix and classification report

In [None]:
clean_l9_data.columns

### LANDSAT 5

In [None]:
features = ['l5b', 'l5g', 'l5r', 'l5n', 'l5s1', 'l5s2', 'elevation', 'slope', 'aspect', 'ndvi5', 'ndmi5', 'savi5', 'msavi5', 'evi5', 'bsi5'] # features for Landsat 5 data
X_l5 = clean_l9_data[features] # not cleaned data: l5_data, cleaned data: clean_l5_data
y_l5 = clean_l9_data['TYPE']
print(f' dataset count: {X_l5.shape}, y shape: {y_l5.shape}')

X_temp_l5, X_test_l5, y_temp_l5, y_test_l5 = train_test_split(X_l5, y_l5, test_size=0.25, random_state=42, stratify=y_l5, shuffle=True) # 0.2
X_train_l5, X_valid_l5, y_train_l5, y_valid_l5 = train_test_split(X_temp_l5, y_temp_l5, test_size=0.3333, random_state=42, stratify=y_temp_l5, shuffle=True) # 0.25; 60% train, 20% test, 20% validation

# check the sets shapes
print(f'For L5 data:\nTrain dataset shape: {X_train_l5.shape}, Valid dataset shape: {X_valid_l5.shape}, Test dataset shape: {X_test_l5.shape}')
#print(f'Train labels shape: {y_train_l5.shape}, Test labels shape: {y_test_l5.shape}\n')

print("Train:\n", y_train_l5.value_counts(normalize=True))
print("Valid:\n", y_valid_l5.value_counts(normalize=True))
print("Test:\n", y_test_l5.value_counts(normalize=True))

# count of each type of training data
train_data = pd.concat([X_train_l5, y_train_l5], axis=1)
min_samples = train_data['TYPE'].value_counts()
min_samples


In [None]:
#  create new df of clean_l9_data, but just with y_test_l5 rows
clean_l5_data_test = clean_l9_data.loc[y_test_l5.index, ['geometry', 'TYPE']] # clean_l9_data, l9_data
# we use it in final classification evaluation

In [None]:
max_depth_range = [1,3, 5, 7, 10, 15, 20]
n_estimators = 300 

train_errors = []
valid_errors = []

for d in max_depth_range: # for each max_depth value
    rf = RandomForestClassifier(random_state=42,n_estimators=n_estimators, min_samples_split=4, min_samples_leaf=2, max_depth=d, max_features='sqrt', criterion = 'entropy', class_weight='balanced') 
    
    rf.fit(X_train_l5, y_train_l5)
    y_train_pred = rf.predict(X_train_l5)
    y_valid_pred = rf.predict(X_valid_l5)
    
    train_error = 1 - accuracy_score(y_train_l5, y_train_pred)
    valid_error = 1 - accuracy_score(y_valid_l5, y_valid_pred)

    train_errors.append(train_error)
    valid_errors.append(valid_error)
# plotting train and validation errors
plt.figure(figsize=(8, 6))
plt.plot(max_depth_range, train_errors, label='Train Error', color='salmon')
plt.plot(max_depth_range, valid_errors, label='Validation Error', color='cornflowerblue')
plt.xlabel('max_depth')
plt.ylabel('Error (1 − accuracy)')
plt.title(f'Train/Test Error by max_depth, n_estimators={n_estimators}')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
forest5 = RandomForestClassifier(random_state=42, min_samples_split=4, min_samples_leaf=2, criterion='entropy', max_features= 'sqrt', class_weight='balanced')

# parameters for grid search
param_grid = { # Difference between train and test accuracy: 0.0563 for balanced data
    'n_estimators': [50, 100, 150, 200, 250, 300],   # numeber of trees
    'max_depth': [5, 7, 10, 15, 20],   # maximum depth of the tree
    # 'max_features': [4, 5, 7, 8, 9] # number of features to consider for the best split
    # 'scriterion': ['gini', 'entropy'], # criterion for splitting
    # 'max_features': ['sqrt', 'log2'], # number of features to consider for the best split
}    
# grid search definition
gs = GridSearchCV(estimator=forest5, param_grid=param_grid, cv=5, n_jobs=-1, verbose=2, scoring='accuracy')

# run grid search
gs.fit(X_train_l5, y_train_l5)
# get df from best_parameters
best_params = pd.DataFrame(gs.cv_results_).sort_values(by='rank_test_score')
best_params = best_params[['params', 'mean_test_score', 'std_test_score', 'rank_test_score']]

# get best parameters
print("Best parameters from GridSearch:")
print(gs.best_params_)
best_params.head(15)

In [None]:
best_rf = RandomForestClassifier(random_state=42, n_estimators=100, max_depth=15, min_samples_split=4, min_samples_leaf=2, max_features='sqrt', criterion = 'entropy', class_weight='balanced') # min_samples_split=10, min_samples_leaf=12
l5_model = best_rf.fit(X_train_l5, y_train_l5) 

# Train model on the balanced training set
y_train_pred = l5_model.predict(X_train_l5) 

# training accuracy and error
train_accuracy = accuracy_score(y_train_l5, y_train_pred) 
train_error = 1 - train_accuracy

# feature importances
importances = pd.Series(l5_model.feature_importances_, index=X_train_l5.columns).sort_values(ascending=False) 

print("\nFeature Importances:")
print(f'{importances}\n')
print(f"Training Accuracy: {train_accuracy:.4f}")
print(f"Training Error: {train_error:.4f}")

y_valid_pred = l5_model.predict(X_valid_l5)

# test accuracy and test error
valid_accuracy = accuracy_score(y_valid_l5, y_valid_pred)
valid_error = 1 - valid_accuracy

print(f"Validation Accuracy: {valid_accuracy:.4f}")
print(f"Validation Error: {valid_error:.4f}\n")

# is model well trained? - we can count the difference between train and test accuracy
train_val_diff = train_accuracy - valid_accuracy
print(f"Difference between train and validation accuracy: {train_val_diff:.4f}")

In [None]:
# cm count
l5_c_matrix = confusion_matrix(y_valid_l5, y_valid_pred)
# confusion matrix
class_names = ['built-up', 'water', 'forests', 'croplands', 'grasslands']
disp = ConfusionMatrixDisplay(confusion_matrix=l5_c_matrix, display_labels=class_names)
fig, ax = plt.subplots(figsize=(8, 6))
disp.plot(ax=ax, cmap='Blues', colorbar=False, xticks_rotation=20)
plt.title('Confusion Matrix - Landsat 5')
plt.show()

# classification report
report = classification_report(y_valid_l5, y_valid_pred, target_names=[str(cls) for cls in np.unique(y_valid_l5)])
print("Classification Report:")
print(report)# cm count
l5_c_matrix = confusion_matrix(y_valid_l5, y_valid_pred)

- <b>Precision</b> = TP / (TP + FP) → kolik predikcí je správných

- <b>Recall</b> = TP / (TP + FN) → kolik správných případů jsme zachytili

- <b>F1-Score</b> = 2 * (Precision * Recall) / (Precision + Recall)

#### LANDSAT 5 CLASIFICATION

In [None]:
# create function for reshaping the data for classification
def reshape_data(raster_data):
    bands, height, width = raster_data.shape
    reshaped_data = raster_data.reshape(bands, -1).T # (height * width, bands)
    return reshaped_data, height, width # (height * width, bands)

# create function for converting the data into DataFrame
def create_dataframe(reshaped_data):  # , profile
    # create DataFrame from the reshaped data
    df = pd.DataFrame(reshaped_data, columns=[f'band_{i}' for i in range(reshaped_data.shape[1])])
    return df

with rasterio.open('data/imad1Itnew_rescale_SWIR.tif') as src: # data/imad1Itnew_rescale_SWIR.tif
    image = src.read()  # tvar: (bands, height, width)
    profile = src.profile
print(f'image shape: {image.shape}')
print(f'profile: {profile}')

# Landsat 5 bands
landsat5_selected = image[6:12, :, :] # Landsat 5 bands 
srtm_selected = image[19:22, :, :] # SRTM, slope, aspect

print(f'Landsat 5 bands shape: {landsat5_selected.shape}')
print(f'SRTM bands shape: {srtm_selected.shape}')
landsat5_bands = np.concatenate([landsat5_selected, srtm_selected], axis=0)
print(f'final_stack shape: {landsat5_bands.shape}')

reshaped_l5, height_l9, width_l9= reshape_data(landsat5_bands)
df_l5 = create_dataframe(reshaped_l5)
print(f'Landsat 5 DataFrame shape: {df_l5.shape}')

In [None]:
# create mask for NaN values
columns_to_replace = ['band_0', 'band_1', 'band_2', 'band_3', 'band_4', 'band_5', 'band_6']
nan_mask = df_l5[columns_to_replace].isna()
df_l5[columns_to_replace] = df_l5[columns_to_replace].fillna(99999)
print(f'max val for slope and aspect: \n{df_l5[['band_7', 'band_8']].max()}')
print(f'min val for slope and aspect: \n{df_l5[['band_7', 'band_8']].min()}')
print(f'median val for slope and aspect: \n{df_l5[['band_7', 'band_8']].median()}')
# if band_7 or band_8 are NaN values, set them to 0
df_l5['band_7'] = df_l5['band_7'].fillna(0)
df_l5['band_8'] = df_l5['band_8'].fillna(0)
columns = ['l5b', 'l5g', 'l5r', 'l5n', 'l5s1', 'l5s2', 'elevation', 'slope', 'aspect']
# rename columns for landsat 5
df_l5.columns = columns

In [None]:
# calculating ndvi5, ndmi 5, savi5, msavi5, evi5, bsi5
df_l5['ndvi5'] = ndvi(df_l5['l5r'], df_l5['l5n'])
df_l5['ndmi5'] = ndmi(df_l5['l5n'], df_l5['l5s1'])
df_l5['savi5'] = savi(df_l5['l5r'], df_l5['l5n'])
df_l5['msavi5'] = msavi(df_l5['l5r'], df_l5['l5n'])
df_l5['evi5'] = evi(df_l5['l5b'], df_l5['l5r'], df_l5['l5n'])
df_l5['bsi5'] = bsi(df_l5['l5b'], df_l5['l5r'], df_l5['l5n'], df_l5['l5s1'])
df_l5

In [None]:
# check for invalid values
bad_mask = ~np.isfinite(df_l5).all(axis=1)    
print(f"invalid rows count: {bad_mask.sum()}")
df_l5.replace([np.inf, -np.inf, 99999], 99999.0, inplace=True)     
bad_mask = ~np.isfinite(df_l5).all(axis=1)    
print(f"invalid rows count: {bad_mask.sum()}")

In [None]:
X_l5 = df_l5.values
y_pred_l5 = l5_model.predict(X_l5)

In [None]:
prediction_raster_l5 = y_pred_l5.reshape(height_l9, width_l9).astype(rasterio.uint16)

In [None]:
# save the prediction raster
output_file = 'data/l5_pred_new.tif'
with rasterio.open(output_file, 'w', driver='GTiff', height=height_l9, width=width_l9, count=1, dtype=rasterio.uint16, crs=profile['crs'], transform=profile['transform']) as dst:
    dst.write(prediction_raster_l5, 1)
    dst.update_tags(landcover_type='Landsat 5', classifier = 'Random Forest', classes='1=Built-up, 2=Water, 3=Forest, 4=Cropland, 5=Grassland') 

In [None]:
unique, counts = np.unique(prediction_raster_l5, return_counts=True)
for label, count in zip(unique, counts):
    print(f'Class {label}: {count} px')

In [None]:
import matplotlib.patches as mpatches
from matplotlib.colors import ListedColormap

class_labels = {
    1: ('built-up areas', 'red'),    
    2: ('water', 'blue'),       
    3: ('forests', 'green'),       
    4: ('croplands', 'yellow'),     
    5: ('grasslands', '#66c2a5')     
}

cmap_list = [class_labels[i][1] for i in range(1, 6)]
cmap = ListedColormap(cmap_list)
crop = 3                                    
pred_crop = prediction_raster_l5[1:-crop, 4:-4]

fig, ax = plt.subplots(figsize=(12, 10))
im = ax.imshow(pred_crop, cmap=cmap, interpolation='nearest') # 
ax.set_title('Landsat 5 classification (Random Forest)', fontsize=16)
#ax.set_xlabel('Pixel (X)', fontsize=12)
#ax.set_ylabel('Pixel (Y)', fontsize=12)
ax.axis('off')
ax.grid(False)
# legend
legend_patches = [mpatches.Patch(color=color, label=label) for label, color in [class_labels[i] for i in range(1, 6)]]
ax.legend(handles=legend_patches, loc='lower center', bbox_to_anchor=(0.5, -0.06), ncol=5, fontsize=12, frameon=False)

plt.tight_layout()
plt.show()

#### L5 Classification evaluation 

In [None]:
with rasterio.open('data/l5_pred.tif') as src:
    class_l5 = src.read(1)  # load the first band - classification band
    profile = src.profile
# close file
tiff_file = 'data/l5_pred.tif' 
with rxr.open_rasterio(tiff_file, masked=True).squeeze() as class_l5:
    class_l5 = class_l5.squeeze()  # remove single-dimensional entries from the shape of an array
    print(f'Landsat 5 classification raster shape: {class_l5.shape}')
    #print(class_l5)
gdf_class_l5 = gdf_imad_l9.to_crs(class_l5.rio.crs)
print(f'Same CRS: {gdf_class_l5.crs == class_l5.rio.crs}')  # check if CRS are the same

In [None]:
# create new column with 15 raster values
class_cube = xr.concat(
    [class_l5],
    dim=pd.Index(
        ["class_l5"],
        name="measurement",
    )
)
class_cube
vector_class_cube_l5 = class_cube.drop_vars("spatial_ref").xvec.extract_points(
    points=gdf_imad_l9.geometry,
    x_coords="x",
    y_coords="y",
)

gdf_imad_l9.isna().sum() 
gdf_class_l5= vector_class_cube_l5.xvec.to_geopandas()
print(f'Landsat class 5 data shape:{gdf_class_l5.shape}')

# sjoin
class_comp_l5 = gdf_class_l5.sjoin(clean_l9_data[['geometry', 'TYPE']], predicate="intersects") 
# dorp index_right column
class_comp_l5.drop(columns="index_right", inplace=True)
class_comp_l5 = class_comp_l5.dropna()

#clean_l5_data_test -filter test data 
l5_class_filtered = class_comp_l5.loc[clean_l5_data_test.index]
l5_class_filtered.sort_index(inplace=True)
l5_class_filtered

In [None]:
# check how well the raster classification matches the vector data
test_accuracy = accuracy_score(l5_class_filtered['TYPE'], l5_class_filtered['class_l5'])
test_error = 1 - test_accuracy
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Error: {test_error:.4f}")

# count producer's accuracy
producer_accuracy = l5_class_filtered.groupby('TYPE').apply(lambda x: (x['class_l5'] == x['TYPE']).sum() / len(x))
print("Producer's Accuracy:")
print(producer_accuracy)
# count user's accuracy
user_accuracy = l5_class_filtered.groupby('class_l5').apply(lambda x: (x['class_l5'] == x['TYPE']).sum() / len(x))
print("User's Accuracy:")
print(user_accuracy)
# confusion matrix
class_names = ['built-up', 'water', 'forests', 'croplands', 'grasslands']
l9_c_matrix = confusion_matrix(l5_class_filtered['TYPE'], l5_class_filtered['class_l5'])
disp = ConfusionMatrixDisplay(confusion_matrix=l9_c_matrix, display_labels=class_names)
fig, ax = plt.subplots(figsize=(8, 6))
disp.plot(ax=ax, cmap='Blues', colorbar=False, xticks_rotation=20)
plt.title('Confusion Matrix for Landsat 5')
plt.show()

# classification report
report = classification_report(l5_class_filtered['TYPE'], l5_class_filtered['class_l5'], target_names=[str(cls) for cls in np.unique(l5_class_filtered['TYPE'])])
print("Classification Report for Landsat 5:")
print(report)

### LANDSAT 9

In [None]:
features = ['l9b', 'l9g', 'l9r', 'l9n', 'l9s1', 'l9s2', 'elevation', 'slope', 'aspect', 'ndvi9', 'ndmi9', 'savi9', 'msavi9', 'evi9', 'bsi9'] # features for Landsat 9 data  , 'ndvi9', 'ndmi9', 'savi9', 'msavi9', 'evi9', 'bsi9'
X_l9 = clean_l9_data[features] # not cleaned data: l9_data, cleaned data: clean_l9_data
y_l9 = clean_l9_data['TYPE']
print(f' dataset count: {X_l9.shape}, y shape: {y_l9.shape}')

X_temp_l9, X_test_l9, y_temp_l9, y_test_l9 = train_test_split(X_l9, y_l9, test_size=0.25, random_state=42, stratify=y_l9, shuffle=True) # 0.2
X_train_l9, X_valid_l9, y_train_l9, y_valid_l9 = train_test_split(X_temp_l9, y_temp_l9, test_size=0.3333, random_state=42, stratify=y_temp_l9, shuffle=True) # 0.25; 60% train, 20% test, 20% validation

# check the sets shapes
print(f'For L5 data:\nTrain dataset shape: {X_train_l9.shape}, Valid dataset shape: {X_valid_l9.shape}, Test dataset shape: {X_test_l9.shape}')

In [None]:
print("Train:\n", y_train_l9.value_counts(normalize=True))
print("Valid:\n", y_valid_l9.value_counts(normalize=True))
print("Test:\n", y_test_l9.value_counts(normalize=True))

In [None]:
# the distribution of the classes
y_test_l9.hist()

In [None]:
# count of each type of training data
train_data = pd.concat([X_train_l9, y_train_l9], axis=1)
min_samples = train_data['TYPE'].value_counts()
min_samples

In [None]:
#  create new df of clean_l9_data, but just with y_test_l9 rows
clean_l9_data_test = clean_l9_data.loc[y_test_l9.index, ['geometry', 'TYPE']] # clean_l9_data, l9_data
# we use it in final classification consideration

In [None]:
max_depth_range = [1,3, 5, 7, 10, 15, 20]
n_estimators = 300

train_errors = []
valid_errors = []

for d in max_depth_range:
    rf = RandomForestClassifier(random_state=42,n_estimators=n_estimators, min_samples_split=4, min_samples_leaf=2, max_depth=d, max_features=8, criterion = 'log_loss', class_weight='balanced') 
    
    rf.fit(X_train_l9, y_train_l9)
    y_train_pred = rf.predict(X_train_l9)
    y_valid_pred = rf.predict(X_valid_l9)
    # count train and valid errors
    train_error = 1 - accuracy_score(y_train_l9, y_train_pred)
    valid_error = 1 - accuracy_score(y_valid_l9, y_valid_pred)

    train_errors.append(train_error)
    valid_errors.append(valid_error)

plt.figure(figsize=(8, 6))
plt.plot(max_depth_range, train_errors, label='Train Error', color='salmon')
plt.plot(max_depth_range, valid_errors, label='Validation Error', color='cornflowerblue')
plt.xlabel('max_depth')
plt.ylabel('Chyba (1 − accuracy)')
plt.title(f'Train/Test Error by max_depth, n_estimators={n_estimators}')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# random forest definition
forest9 = RandomForestClassifier(random_state=42, min_samples_split=4, min_samples_leaf=2, max_features= 'sqrt', criterion = 'entropy', class_weight='balanced') # 

# parameters for grid search
param_grid = { 
    'n_estimators': [50, 100, 150, 200, 250, 300],   # number of trees
    'max_depth': [5, 7, 10, 15, 20, 25, 30],   # maximum depth of the tree
    # 'max_features': [4, 5, 7, 8, 9] # number of features to consider for the best split
    # 'scriterion': ['gini', 'entropy'], # criterion for splitting
    # 'max_features': ['sqrt', 'log2'], # number of features to consider for the best split
}    

# grid search definition
gs = GridSearchCV(estimator=forest9, param_grid=param_grid, cv=5, n_jobs=-1, verbose=2, scoring='accuracy')

# run grid search
gs.fit(X_train_l9, y_train_l9)
# get df from best_parameters
best_params = pd.DataFrame(gs.cv_results_).sort_values(by='rank_test_score')
best_params = best_params[['params', 'mean_test_score', 'std_test_score', 'rank_test_score']]

# get best parameters
print("Best parameters from GridSearch:")
print(gs.best_params_)
best_params.head(20)

In [None]:
best_rf = RandomForestClassifier(random_state=42, n_estimators=200, max_depth=10, min_samples_split=4, min_samples_leaf=2, max_features='sqrt', criterion = 'entropy', class_weight='balanced') # min_samples_split=10, min_samples_leaf=12
l9_model = best_rf.fit(X_train_l9, y_train_l9)

In [None]:
# Train model on the balanced training set
y_train_pred = l9_model.predict(X_train_l9) 

# training accuracy and error
train_accuracy = accuracy_score(y_train_l9, y_train_pred) 
train_error = 1 - train_accuracy

# feature importances
importances = pd.Series(l9_model.feature_importances_, index=X_train_l9.columns).sort_values(ascending=False) 

print("\nFeature Importances:")
print(f'{importances}\n')

print(f"Training Accuracy: {train_accuracy:.4f}")
print(f"Training Error: {train_error:.4f}")
y_valid_pred = l9_model.predict(X_valid_l9)

# test accuracy and test error
valid_accuracy = accuracy_score(y_valid_l9, y_valid_pred)
valid_error = 1 - valid_accuracy

print(f"Validation Accuracy: {valid_accuracy:.4f}")
print(f"Validation Error: {valid_error:.4f}\n")

# is model well trained? - we can count the difference between train and test accuracy
train_val_diff = train_accuracy - valid_accuracy
print(f"Difference between train and validation accuracy: {train_val_diff:.4f}")

In [None]:
l9_model.classes_

In [None]:
# cm count
l9_c_matrix = confusion_matrix(y_valid_l9, y_valid_pred)
# confusion matrix
class_names = ['built-up', 'water', 'forests', 'croplands', 'grasslands']
disp = ConfusionMatrixDisplay(confusion_matrix=l9_c_matrix, display_labels=class_names) # np.unique(y_valid_l9)
fig, ax = plt.subplots(figsize=(8, 6))
#ax.tick_params(axis='x', labelrotation=45)
disp.plot(ax=ax, cmap='Blues', colorbar=False, xticks_rotation=20)
plt.title('Confusion Matrix - Landsat 9')
plt.show()

# classification report
report = classification_report(y_valid_l9, y_valid_pred, target_names=[str(cls) for cls in l9_model.classes_], )
print("Classification Report:")
print(report)# cm count
l9_c_matrix = confusion_matrix(y_valid_l9, y_valid_pred)

### Raster data preprocessing

In [None]:
with rasterio.open('data/imad1Itnew_rescale_SWIR.tif') as src: # data/imadMaxItnew_rescale_SWIR.tif, kompozity: data/imad5Itnew_colab_01_composite_mode.tif
    image = src.read()  # shape: (bands, height, width)
    profile = src.profile
print(f'image shape: {image.shape}')
print(f'profile: {profile}')

In [None]:
# visualize the image
plt.figure(figsize=(10, 10))
plt.imshow(image[9], cmap='gray')
plt.title('IRMAD Image - Band NIR')
plt.axis('off')
plt.show()

In [None]:
landsat9_selected = image[0:6, :, :] # Landsat 5 bands 
srtm_selected = image[19:22, :, :] # SRTM, slope, aspect
'''indices_selected = np.stack([ # indices ndvi and ndmi - bands 22,23,26,27 
    image[23, :, :], # ndvi9
    image[24, :, :] # ndmi9
], axis=0)'''

print(f'Landsat 5 bands shape: {landsat9_selected.shape}')
print(f'SRTM bands shape: {srtm_selected.shape}')
#print(f'Indices bands shape: {indices_selected.shape}')
landsat9_bands = np.concatenate([landsat9_selected, srtm_selected], axis=0) # , indices_selected
print(f'final_stack shape: {landsat9_bands.shape}')

In [None]:
# create function for reshaping the data for classification
def reshape_data(raster_data):
    bands, height, width = raster_data.shape
    # reshape the data to 2D array
    # data shape (bands, height, width) to (height * width, bands)
    reshaped_data = raster_data.reshape(bands, -1).T # (height * width, bands)
    return reshaped_data, height, width # (height * width, bands)

# create function for converting the data into DataFrame
def create_dataframe(reshaped_data):  # , profile
    # create DataFrame from the reshaped data
    df = pd.DataFrame(reshaped_data, columns=[f'band_{i}' for i in range(reshaped_data.shape[1])])
    # add the profile information to the DataFrame
    # df['x'] = np.repeat(np.arange(profile['width']), profile['height'])
    # df['y'] = np.tile(np.arange(profile['height']), profile['width'])
    return df

In [None]:
# reshape l5, l9 and irmad data for classification
reshaped_l9, height_l9, width_l9 = reshape_data(landsat9_bands)
#reshaped_l9, height_l9, width_l9 = reshape_data(landsat9_bands)
#reshaped_irmad, height_im, width_im = reshape_data(irmad_bands)

In [None]:
# convert to DataFrame
df_l9 = create_dataframe(reshaped_l9)
#df_l9 = create_dataframe(reshaped_l9)
#df_irmad = create_dataframe(reshaped_irmad)

# check the shape of the DataFrames
print(f'Landsat 9 DataFrame shape: {df_l9.shape}')
#print(f'Landsat 9 DataFrame shape: {df_l9.shape}')
#print(f'IRMAD DataFrame shape: {df_irmad.shape}')

In [None]:
df_l9

In [None]:
columns_to_replace = ['band_0', 'band_1', 'band_2', 'band_3', 'band_4', 'band_5', 'band_6']
nan_mask = df_l9[columns_to_replace].isna()
df_l9[columns_to_replace] = df_l9[columns_to_replace].fillna(99999)
nan_mask

In [None]:
df_l9

In [None]:
# check values for band_6 and band_7
print(f'max val for slope and aspect: \n{df_l9[['band_7', 'band_8']].max()}')
print(f'min val for slope and aspect: \n{df_l9[['band_7', 'band_8']].min()}')
print(f'median val for slope and aspect: \n{df_l9[['band_7', 'band_8']].median()}')

In [None]:
# if band_6 and band_7 are NaN values, set them to 0
df_l9['band_7'] = df_l9['band_7'].fillna(0)
df_l9['band_8'] = df_l9['band_8'].fillna(0)

In [None]:
df_l9

In [None]:
columns = ['l9b', 'l9g', 'l9r', 'l9n', 'l9s1', 'l9s2', 'elevation', 'slope', 'aspect']
# rename columns for landsat 5
df_l9.columns = columns
# check the shape of the DataFrame
df_l9

In [None]:
# calculating ndvi9, ndmi9, savi9, msavi9, ndbi9, bsi9
df_l9['ndvi9'] = ndvi(df_l9['l9r'], df_l9['l9n'])
# calculate NDMI for Landsat 9
df_l9['ndmi9'] = ndmi(df_l9['l9n'], df_l9['l9s1'])

df_l9['savi9'] = savi(df_l9['l9r'], df_l9['l9n'])
# calculate MSAVI for Landsat 9
df_l9['msavi9'] = msavi(df_l9['l9r'], df_l9['l9n'])
# calculate EVI for Landsat 9
df_l9['evi9'] = evi(df_l9['l9b'], df_l9['l9r'], df_l9['l9n'])

# calculate BSI for Landsat 9
df_l9['bsi9'] = bsi(df_l9['l9b'], df_l9['l9r'], df_l9['l9n'], df_l9['l9s1'])
df_l9

In [None]:
#inf/-inf/NaN
bad_mask = ~np.isfinite(df_l9).all(axis=1)    
print(f"invalid rows count: {bad_mask.sum()}")
df_l9.replace([np.inf, -np.inf, 99999], 99999.0, inplace=True)
#X_clean = X_l9.dropna()      

In [None]:
bad_mask = ~np.isfinite(df_l9).all(axis=1)    
print(f"invalid rows count: {bad_mask.sum()}")

### Classification

In [None]:
X_l9 = df_l9#.values
y_pred_l9 = l9_model.predict(X_l9)

In [None]:
prediction_raster_l9 = y_pred_l9.reshape(height_l9, width_l9).astype(rasterio.uint16)

In [None]:
# save the prediction raster
output_file = 'data/l9_pred.tif'
with rasterio.open(output_file, 'w', driver='GTiff', height=height_l9, width=width_l9, count=1, dtype=rasterio.uint16, crs=profile['crs'], transform=profile['transform']) as dst:
    dst.write(prediction_raster_l9, 1)
    dst.update_tags(landcover_type='Landsat 9', classifier = 'Random Forest', classes='1=Built-up, 2=Water, 3=Forest, 4=Cropland, 5=Grassland') 

In [None]:
unique, counts = np.unique(prediction_raster_l9, return_counts=True)
for label, count in zip(unique, counts):
    print(f'Class {label}: {count} px')


In [None]:
import matplotlib.patches as mpatches
from matplotlib.colors import ListedColormap

class_labels = {
    1: ('built-up areas', 'red'),    
    2: ('water', 'blue'),       
    3: ('forests', 'green'),       
    4: ('croplands', 'yellow'),     
    5: ('grasslands', '#66c2a5')     
}
pred_crop = prediction_raster_l9[1:-3, 4:-4]

cmap_list = [class_labels[i][1] for i in range(1, 6)]
cmap = ListedColormap(cmap_list)

fig, ax = plt.subplots(figsize=(12, 10))
im = ax.imshow(pred_crop, cmap=cmap, interpolation='nearest')
ax.set_title('Landsat 9 classification (Random Forest)', fontsize=16)
#ax.set_xlabel('Pixel (X)', fontsize=12)
#ax.set_ylabel('Pixel (Y)', fontsize=12)
ax.axis('off')
ax.grid(False)

legend_patches = [mpatches.Patch(color=color, label=label) for label, color in [class_labels[i] for i in range(1, 6)]]
ax.legend(handles=legend_patches, loc='lower center', bbox_to_anchor=(0.5, -0.06), ncol=5, fontsize=12, frameon=False)

plt.tight_layout()
plt.show()

#### L9 EVALUATION OF CLASSIFICATION

In [None]:
with rasterio.open('data/l9_pred.tif') as src:
    class_l9 = src.read(1) 
    profile = src.profile

In [None]:
tiff_file = 'data/l9_pred.tif'

# open just one band with rioxarray
class_l9 = rxr.open_rasterio(tiff_file, masked=True).squeeze()

In [None]:
tiff_file = 'data/l9_pred.tif'
with rxr.open_rasterio(tiff_file, masked=True).squeeze() as class_l9:
    class_l9 = class_l9.squeeze()  # remove single-dimensional entries from the shape of an array
    print(class_l9.shape)
    print(class_l9)

In [None]:
# reproject shp to the same CRS as the raster
gdf_class_l9 = gdf_imad_l9.to_crs(class_l9.rio.crs)
gdf_class_l9.crs == class_l9.rio.crs # check if the CRS is the same

# create new column with 19 raster values
class_cube = xr.concat(
    [class_l9],
    dim=pd.Index(
        ["class_l9"],
        name="measurement",
    )
)
class_cube

vector_class_cube_l9 = class_cube.drop_vars("spatial_ref").xvec.extract_points(
    points=gdf_imad_l9.geometry,
    x_coords="x",
    y_coords="y",
)
vector_class_cube_l9

In [None]:
gdf_imad_l9.isna().sum() # check if there are any NaN values in the gdf_imad_l9

In [None]:
# landsat 9
gdf_class_l9= vector_class_cube_l9.xvec.to_geopandas()
print(f'Landsat class 9 data shape:{gdf_class_l9.shape}')

# sjoin
class_comp_l9 = gdf_class_l9.sjoin(clean_l9_data[['geometry', 'TYPE']], predicate="intersects")
# dorp index_right column
class_comp_l9.drop(columns="index_right", inplace=True)
class_comp_l9 = class_comp_l9.dropna()

#clean_l9_data_test - filter test data
l9_class_filtered = class_comp_l9.loc[clean_l9_data_test.index]
l9_class_filtered.sort_index(inplace=True)
l9_class_filtered

In [None]:
# check how well the raster classification matches the vector data
test_accuracy = accuracy_score(l9_class_filtered['TYPE'], l9_class_filtered['class_l9'])
test_error = 1 - test_accuracy
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Error: {test_error:.4f}")

# count producer's accuracy
producer_accuracy = l9_class_filtered.groupby('TYPE').apply(lambda x: (x['class_l9'] == x['TYPE']).sum() / len(x))
print("Producer's Accuracy:")
print(producer_accuracy)
# count user's accuracy
user_accuracy = l9_class_filtered.groupby('class_l9').apply(lambda x: (x['class_l9'] == x['TYPE']).sum() / len(x))
print("User's Accuracy:")
print(user_accuracy)
# confusion matrix
class_names = ['built-up', 'water', 'forests', 'croplands', 'grasslands']
l9_c_matrix = confusion_matrix(l9_class_filtered['TYPE'], l9_class_filtered['class_l9'])
disp = ConfusionMatrixDisplay(confusion_matrix=l9_c_matrix, display_labels=class_names)
fig, ax = plt.subplots(figsize=(8, 6))
disp.plot(ax=ax, cmap='Blues', colorbar=False, xticks_rotation=20)
plt.title('Confusion Matrix for Landsat 9')
plt.show()

# classification report
report = classification_report(l9_class_filtered['TYPE'], l9_class_filtered['class_l9'], target_names=[str(cls) for cls in np.unique(l9_class_filtered['TYPE'])])
print("Classification Report for Landsat 9:")
print(report)

In [None]:
# check how well the raster classification matches the vector data
test_accuracy = accuracy_score(l9_class_filtered['TYPE'], l9_class_filtered['class_l9'])
test_error = 1 - test_accuracy
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Error: {test_error:.4f}")

### CLASSIFICATION OF NORMALIZED IMAGE

In [None]:
with rasterio.open('data/imad1Itnew_rescale_SWIR.tif') as src: # , data/imad1Itnew_rescale_SWIR.tif, data/imad15Itnew_colab_01.tif, imad10Itnew_colab_01.tif, imad1Itnew_rescale_SWIR.tif
    image = src.read()  # tvar: (bands, height, width)
    profile = src.profile
print(f'image shape: {image.shape}')
print(f'profile: {profile}')

In [None]:
# pro Python
iamd_selected = image[12:18, :, :] # Landsat 5 bands 
srtm_selected = image[19:22, :, :] # SRTM, slope, aspect

print(f'IMAD bands shape: {iamd_selected.shape}')
print(f'SRTM bands shape: {srtm_selected.shape}')
#print(f'Indices bands shape: {indices_selected.shape}')
iamd_bands = np.concatenate([iamd_selected, srtm_selected], axis=0) #, indices_selected
print(f'final_stack shape: {iamd_bands.shape}')

# reshape l5, l9 and irmad data for classification
reshaped_imad, height_l9, width_l9 = reshape_data(iamd_bands)
df_imad = create_dataframe(reshaped_imad)

# check the shape of the DataFrames
print(f'IRMAD DataFrame shape: {df_imad.shape}')

columns_to_replace = ['band_0', 'band_1', 'band_2', 'band_3', 'band_4', 'band_5', 'band_6']
nan_mask = df_imad[columns_to_replace].isna()
df_imad[columns_to_replace] = df_imad[columns_to_replace].fillna(99999)
nan_mask

In [None]:
# check values for band_6 and band_7
print(f'max val for slope and aspect: \n{df_imad[['band_7', 'band_8']].max()}')
print(f'min val for slope and aspect: \n{df_imad[['band_7', 'band_8']].min()}')
print(f'median val for slope and aspect: \n{df_imad[['band_7', 'band_8']].median()}')
# if band_6 and band_7 are NaN values, set them to 0
df_imad['band_7'] = df_imad['band_7'].fillna(0)
df_imad['band_8'] = df_imad['band_8'].fillna(0)

In [None]:
columns = ['l9b', 'l9g', 'l9r', 'l9n', 'l9s1', 'l9s2', 'elevation', 'slope', 'aspect']
# rename columns for 'irmad'
df_imad.columns = columns
# check the shape of the DataFrame
df_imad

In [None]:
# calculating ndvi9,ndmi9, savi9, msavi9, ndbi9, bsi9
df_imad['ndvi9'] = ndvi(df_imad['l9r'], df_imad['l9n'])
# calculate NDMI for Landsat 9
df_imad['ndmi9'] = ndmi(df_imad['l9n'], df_imad['l9s1'])
# calculate SAVI for Landsat 9
df_imad['savi9'] = savi(df_imad['l9r'], df_imad['l9n'])
# calculate MSAVI for Landsat 9
df_imad['msavi9'] = msavi(df_imad['l9r'], df_imad['l9n'])
# calculate EVI for Landsat 9
df_imad['evi9'] = evi(df_imad['l9b'], df_imad['l9r'], df_imad['l9n'])
# calculate BSI for Landsat 9
df_imad['bsi9'] = bsi(df_imad['l9b'], df_imad['l9r'], df_imad['l9n'], df_imad['l9s1'])
df_imad

In [None]:
# CLASSIFICATION OF NORMALIZED IMAGE
X_imad = df_imad.values
y_pred_imad = l9_model.predict(X_imad)

In [None]:
prediction_raster_imad = y_pred_imad.reshape(height_l9, width_l9).astype(rasterio.uint16)

In [None]:
# save the prediction raster
output_file = 'data/imad_pred_10it_new.tif'
with rasterio.open(output_file, 'w', driver='GTiff', height=height_l9, width=width_l9, count=1, dtype=rasterio.uint16, crs=profile['crs'], transform=profile['transform']) as dst:
    dst.write(prediction_raster_imad, 1)
    dst.update_tags(landcover_type='IMAD', classifier = 'Random Forest', classes='1=Built-up, 2=Water, 3=Forest, 4=Cropland, 5=Grassland')

In [None]:
unique, counts = np.unique(prediction_raster_imad, return_counts=True)
for label, count in zip(unique, counts):
    print(f'Class {label}: {count} px')

In [None]:
import matplotlib.patches as mpatches
from matplotlib.colors import ListedColormap

class_labels = {
    1: ('built-up areas', 'red'),    
    2: ('water', 'blue'),       
    3: ('forests', 'green'),       
    4: ('croplands', 'yellow'),     
    5: ('grasslands', '#66c2a5')     
}
pred_crop = prediction_raster_imad[1:-3, 4:-4]

cmap_list = [class_labels[i][1] for i in range(1, 6)]
cmap = ListedColormap(cmap_list)

fig, ax = plt.subplots(figsize=(12, 10))
im = ax.imshow(pred_crop, cmap=cmap, interpolation='nearest')
ax.set_title('MAD classification', fontsize=16) # IRMAD (5 it.)
#ax.set_xlabel('Pixel (X)', fontsize=12)
#ax.set_ylabel('Pixel (Y)', fontsize=12)
ax.axis('off')
ax.grid(False)

legend_patches = [mpatches.Patch(color=color, label=label) for label, color in [class_labels[i] for i in range(1, 6)]]
ax.legend(handles=legend_patches, loc='lower center', bbox_to_anchor=(0.5, -0.06), ncol=5, fontsize=12, frameon=False)

plt.tight_layout()
plt.show()

### CLASSIFICATION RESULTS

In [None]:
tiff_file = 'data/imad_pred_1it_new.tif' 

# open just one band with rioxarray
with rxr.open_rasterio(tiff_file, masked=True).squeeze() as class_imad:
    class_imad = class_imad.squeeze()  # remove single-dimensional entries from the shape of an array
    print(class_imad.shape)
    print(class_imad)
#class_imad = rxr.open_rasterio(tiff_file, masked=True).squeeze()

# create new column with 19 raster values
class_imad_cube = xr.concat(
    [class_imad],
    dim=pd.Index(
        ["class_imad"],
        name="measurement",
    )
)
# class_imad_cube
vector_class_cube_imad = class_imad_cube.drop_vars("spatial_ref").xvec.extract_points(
    points=gdf_imad_l9.geometry,
    x_coords="x",
    y_coords="y",
)
#vector_class_cube_imad

# landsat 9
gdf_class_imad = vector_class_cube_imad.xvec.to_geopandas()
print(f'Landsat class 9 data shape:{gdf_class_l9.shape}')
# sjoin - prepared databefore filtering
class_comp_imad = gdf_class_imad.sjoin(l9_data[['geometry', 'TYPE']], predicate="intersects") # clean_l9_data_test
# dorp index_right column
class_comp_imad.drop(columns="index_right", inplace=True)
class_comp_imad = class_comp_imad.dropna()

#clean_l9_data_test - filtered test data with geometry prepared for comparison
imad_class_filtered = class_comp_imad.loc[clean_l9_data_test.index] # 
imad_class_filtered.sort_index(inplace=True)
imad_class_filtered.head(5)

In [None]:
# check how well the raster classification matches the vector data
test_accuracy = accuracy_score(imad_class_filtered['TYPE'], imad_class_filtered['class_imad'])
test_error = 1 - test_accuracy
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Error: {test_error:.4f}")

# count producer's accuracy
producer_accuracy = imad_class_filtered.groupby('TYPE').apply(lambda x: (x['class_imad'] == x['TYPE']).sum() / len(x))
print("Producer's Accuracy:")
print(producer_accuracy)
# count user's accuracy
user_accuracy = imad_class_filtered.groupby('class_imad').apply(lambda x: (x['class_imad'] == x['TYPE']).sum() / len(x))
print("User's Accuracy:")
print(user_accuracy)
# confusion matrix
class_names = ['built-up', 'water', 'forests', 'croplands', 'grasslands']
l9_c_matrix = confusion_matrix(imad_class_filtered['TYPE'], imad_class_filtered['class_imad'])
disp = ConfusionMatrixDisplay(confusion_matrix=l9_c_matrix, display_labels=class_names)
fig, ax = plt.subplots(figsize=(8, 6))
disp.plot(ax=ax, cmap='Blues', colorbar=False, xticks_rotation=20)
plt.title('Confusion Matrix for MAD') # IRMAD (5 it.) X MAD
plt.show()
# classification report
report = classification_report(imad_class_filtered['TYPE'], imad_class_filtered['class_imad'])
print("Classification Report for IMAD:")
print(report)

### Ověření na Landsat 5
 - pokud budou výsledky klasifikace Landsat 5 na modelu z roku 2023 horší než u IRMAD lze považovat za úspěch

In [None]:
# Landsat 5 bands
landsat5_selected = image[6:12, :, :] # Landsat 5 bands 
srtm_selected = image[19:22, :, :] # SRTM, slope, aspect

print(f'Landsat 5 bands shape: {landsat5_selected.shape}')
print(f'SRTM bands shape: {srtm_selected.shape}')
#print(f'Indices bands shape: {indices_selected.shape}')
landsat5_bands = np.concatenate([landsat5_selected, srtm_selected], axis=0) #, indices_selected
print(f'final_stack shape: {landsat5_bands.shape}')

reshaped_l5, height_l9, width_l9= reshape_data(landsat5_bands)
df_l5 = create_dataframe(reshaped_l5)
print(f'Landsat 5 DataFrame shape: {df_l5.shape}')

columns_to_replace = ['band_0', 'band_1', 'band_2', 'band_3', 'band_4', 'band_5', 'band_6']
nan_mask = df_l5[columns_to_replace].isna()
df_l5[columns_to_replace] = df_l5[columns_to_replace].fillna(99999)
nan_mask

In [None]:
# check values for band_6 and band_7
print(f'max val for slope and aspect: \n{df_l5[['band_7', 'band_8']].max()}')
print(f'min val for slope and aspect: \n{df_l5[['band_7', 'band_8']].min()}')
print(f'median val for slope and aspect: \n{df_l5[['band_7', 'band_8']].median()}')
# if band_6 and band_7 are NaN values, set them to 0
df_l5['band_7'] = df_l5['band_7'].fillna(0)
df_l5['band_8'] = df_l5['band_8'].fillna(0)

columns = ['l9b', 'l9g', 'l9r', 'l9n', 'l9s1', 'l9s2', 'elevation', 'slope', 'aspect']
# features for classification need to be the same as for Landsat 9 model
df_l5.columns = columns
df_l5

In [None]:
# calculating ndvi9,ndmi9, savi9, msavi9, ndbi9, bsi9 - but for landsat 5 
# features for classification need to be the same as for Landsat 9 model
df_l5['ndvi9'] = ndvi(df_l5['l9r'], df_l5['l9n'])
df_l5['ndmi9'] = ndmi(df_l5['l9n'], df_l5['l9s1'])
df_l5['savi9'] = savi(df_l5['l9r'], df_l5['l9n'])
df_l5['msavi9'] = msavi(df_l5['l9r'], df_l5['l9n'])
df_l5['evi9'] = evi(df_l5['l9b'], df_l5['l9r'], df_l5['l9n'])
df_l5['bsi9'] = bsi(df_l5['l9b'], df_l5['l9r'], df_l5['l9n'], df_l5['l9s1'])
df_l5

In [None]:
X_l5 = df_l5.values
y_pred_l5 = l9_model.predict(X_l5)

In [None]:
prediction_raster_l5 = y_pred_l5.reshape(height_l9, width_l9).astype(rasterio.uint16)

In [None]:
# save classification
# save the prediction raster
output_file = 'data/l5_pred_new.tif'
with rasterio.open(output_file, 'w', driver='GTiff', height=height_l9, width=width_l9, count=1, dtype=rasterio.uint16, crs=profile['crs'], transform=profile['transform']) as dst:
    dst.write(prediction_raster_l5, 1)
    dst.update_tags(landcover_type='Landsat 9', classifier = 'Random Forest', classes='1=Built-up, 2=Water, 3=Forest, 4=Cropland, 5=Grassland') 

In [None]:
unique, counts = np.unique(prediction_raster_l5, return_counts=True)
for label, count in zip(unique, counts):
    print(f'Class {label}: {count} px')

In [None]:
class_labels = {
    1: ('built-up areas', 'red'),    
    2: ('water', 'blue'),       
    3: ('forests', 'green'),       
    4: ('croplands', 'yellow'),     
    5: ('grasslands', '#66c2a5')     
}
pred_crop = prediction_raster_l5[1:-3, 4:-4]
cmap_list = [class_labels[i][1] for i in range(1, 6)]
cmap = ListedColormap(cmap_list)

fig, ax = plt.subplots(figsize=(12, 10))
im = ax.imshow(pred_crop, cmap=cmap, interpolation='nearest') # 
ax.set_title('Landsat 5 classification (according to the Landsat 9 model)', fontsize=16)
#ax.set_xlabel('Pixel (X)', fontsize=12)
#ax.set_ylabel('Pixel (Y)', fontsize=12)
ax.grid(False)
ax.axis('off')

legend_patches = [mpatches.Patch(color=color, label=label) for label, color in [class_labels[i] for i in range(1, 6)]]
ax.legend(handles=legend_patches, loc='lower center', bbox_to_anchor=(0.5, -0.06), ncol=5, fontsize=12, frameon=False)

plt.tight_layout()
plt.show()

In [None]:
tiff_file = 'data/l5_pred_new.tif' 

with rxr.open_rasterio(tiff_file, masked=True).squeeze() as class_imad:
    class_imad = class_imad.squeeze()  # remove single-dimensional entries from the shape of an array
    print(class_imad.shape)
    print(class_imad)
# create new column with 19 raster values
class_imad_cube = xr.concat(
    [class_imad],
    dim=pd.Index(
        ["class_imad"],
        name="measurement",
    )
)
# gdf_class_l5 to same CRS as class_imad 
gdf_class_l5 = gdf_imad_l9.to_crs(class_imad.rio.crs)

# class_imad_cube
vector_class_cube_imad = class_imad_cube.drop_vars("spatial_ref").xvec.extract_points(
    points=gdf_class_l5.geometry, 
    x_coords="x",
    y_coords="y",
)

gdf_class_imad = vector_class_cube_imad.xvec.to_geopandas()
print(f'Landsat class 5 data shape:{gdf_class_l5.shape}')
# sjoin
class_comp_imad = gdf_class_imad.sjoin(gdf_class_l5[['geometry', 'TYPE']], predicate="intersects")
# dorp index_right column
class_comp_imad.drop(columns="index_right", inplace=True)
class_comp_imad = class_comp_imad.dropna()

#clean_l9_data_test
imad_class_filtered = class_comp_imad.loc[clean_l9_data_test.index] # 
imad_class_filtered.sort_index(inplace=True)
imad_class_filtered.head(5)

In [None]:
# check how well the raster classification matches the vector data - tady porovnáno na všech datech včetně trénovací množiny
test_accuracy = accuracy_score(imad_class_filtered['TYPE'], imad_class_filtered['class_imad'])
test_error = 1 - test_accuracy
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Error: {test_error:.4f}")

# count producer's accuracy
producer_accuracy = imad_class_filtered.groupby('TYPE').apply(lambda x: (x['class_imad'] == x['TYPE']).sum() / len(x))
print("Producer's Accuracy:")
print(producer_accuracy)
# count user's accuracy
user_accuracy = imad_class_filtered.groupby('class_imad').apply(lambda x: (x['class_imad'] == x['TYPE']).sum() / len(x))
print("User's Accuracy:")
print(user_accuracy)
# confusion matrix
class_names = ['built-up', 'water', 'forests', 'croplands', 'grasslands']
l9_c_matrix = confusion_matrix(imad_class_filtered['TYPE'], imad_class_filtered['class_imad'])
disp = ConfusionMatrixDisplay(confusion_matrix=l9_c_matrix, display_labels=class_names)
fig, ax = plt.subplots(figsize=(8, 6))
disp.plot(ax=ax, cmap='Blues', colorbar=False, xticks_rotation=20)
plt.title('Confusion Matrix for Landsat 5')
plt.show()
# classification report
report = classification_report(imad_class_filtered['TYPE'], imad_class_filtered['class_imad'])
print("Classification Report for Landsat 5:")
print(report)