<a href="https://colab.research.google.com/github/tbeucler/2022_ML_Earth_Env_Sci/blob/main/Lab_Notebooks/S3_4_Wildfire_Risk_Italy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This



---



**Credits**

This online tutorial would not be possible without invaluable contributions from [Andrea Trucchia](https://www.researchgate.net/profile/Andrea-Trucchia) (reduced data, methods), [Giorgio Meschi](https://www.linkedin.com/in/giorgio-meschi-86216b180/) (code, methods), and [Marj Tonini](https://www.researchgate.net/profile/Marj-Tonini-2) (presentation, methods). It builds upon the following article:

[Trucchia, A.; Meschi, G.; Fiorucci, P.; Gollini, A.; Negro, D., Defining Wildfire Susceptibility Maps in Italy for Understanding Seasonal Wildfire Regimes at the National Level, *Fire*, (2022)](https://www.mdpi.com/2571-6255/5/1/30) 

which generalizes the study below from the Liguria region to all of Italy:

[Tonini, Marj, et al. "A machine learning-based approach for wildfire susceptibility mapping. The case study of the Liguria region in Italy." *Geosciences* 10.3 (2020): 105.](https://www.mdpi.com/2076-3263/10/3/105)



---



In [None]:
# -*- coding: utf-8 -*-
"""
Created on Thu Feb 17 11:38:08 2022

@author: Giorg
"""
#%% funtions

import geopandas as gpd
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib

# sklearn
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance

# sklearn metrics
from sklearn.metrics import roc_curve
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import roc_curve, roc_auc_score, auc
from sklearn.metrics import mean_squared_error



def process_points(path):
    points_df = pd.read_pickle(path)
    points_df = gpd.GeoDataFrame(points_df, geometry=gpd.points_from_xy(np.float64(points_df.x), np.float64(points_df.y))) 
    return points_df


def one_hot(points_df, column):
    '''

    Parameters
    ----------
    points_df : points dataset
        
    column : string
        a column of points_df in which you want to perform one hot encoding.

    Returns
    -------
    points_df : points dataset updated --> new columns are called is_nameColumns_categoricalName
        

    '''
    
    
    values = points_df[column]
    # integer encode
    label_encoder = LabelEncoder()
    integer_encoded = label_encoder.fit_transform(values)
    # binary encode
    onehot_encoder = OneHotEncoder(sparse=False)
    integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)
    onehot_encoded = onehot_encoder.fit_transform(integer_encoded)
    

    # now this information is added to the dataset, each column gets the name of the categorical type   
    for i in range(onehot_encoded.shape[1]):
        index = np.argwhere(integer_encoded == i)[0]
        cat_type = values[index[0]]
        names = 'is_' + column + '_' + str(cat_type)
        points_df[names.split('.')[0]] = onehot_encoded[:,i]
    
    return points_df

#check if the column of a dataset is an object and does the one hot encoding if so 
def one_hot_automatic(points_df):
    
    for i in points_df.columns:
        type_ = points_df[i].dtype
        if type_ == object:
            print('I found "', i, '" as categorical then I am appling one hot encoding')
            points_df = one_hot(points_df, i)
            print('I am removing the categorical var from the dataset')
            points_df.drop([i], axis = 1, inplace = True)
    
    return points_df

def build_dataset(points_df, fires_df, sea):
    '''
    

    Parameters
    ----------
    points_df : df
        points dataset.
    fires_df : df
        fires dataset.
    sea : int
        a number that identifies a season (1 = winter, 2 = summer).
        this info is stored in the column 'season' of fires_df

    Returns
    -------
    dataset_train : df
        training dataset.
    x_label : array
        the labels related to the training set.
    dataset_test : df
        test dataset.
    y_label : array
        the labels related to the test set.

    '''
    
    try:
        fires_df['point_index'] = np.float64(fires_df.index)
    except KeyError:
        pass
    
    
    season_fires_df = fires_df[fires_df['season'] == sea]
    
    # put 70% of burned point in training dataset
    lenght_training_fires = int(len(season_fires_df) * 70/100)
    
    # at this point you can add a filter for sub sampling the training points --> reducing the number 
    # of burned points will reduce the number of not burned points having so a smaller training dataset.
    
    
    fires_training =  season_fires_df.sample(n = lenght_training_fires)    
    # fires_training = season_fires_df.loc[(season_fires_df['year'] >= year_from) & (season_fires_df['year'] < year_test)]
    fire_points_train = fires_training.point_index 
    presences_train = points_df[points_df['point_index'].isin(fire_points_train)]
    
    absences_df = points_df[~points_df['point_index'].isin(season_fires_df.point_index)]
    
    # select number of non burned points equal to the number of fire points
    sample_train = len(presences_train)
    absence_train = absences_df.sample(n = sample_train)

    # fires_testing = season_fires_df.loc[(season_fires_df['year'] >= year_test)]
    fires_testing = season_fires_df[~season_fires_df['point_index'].isin(fires_training.point_index)]

    presences_testing = points_df[points_df['point_index'].isin(fires_testing.point_index)]                                    
    
    #take test unburned data that are not even in the absence_train 
    sample_test = len(presences_testing)
    absence_test = absences_df[~absences_df['point_index'].isin(absence_train.point_index)].sample(n = sample_test)
    
    # add the binary label --> dependent variable
    pd.set_option('mode.chained_assignment', None)
    presences_train['fires'] = 1
    presences_testing['fires'] = 1

    absence_train['fires'] = 0
    absence_test['fires'] = 0
    
    # merge presences and absences                                
    dataset_train = presences_train.append(absence_train)
    dataset_test = presences_testing.append(absence_test)
 
    print('lenght of training dataset ', len(dataset_train))
    print('lenght of testing dataset ', len(dataset_test))

    print('dataset train - label count ', np.unique(dataset_train.fires, return_counts = True))
    print('dataset test - label count', np.unique(dataset_test.fires, return_counts = True))
    

    x_label = dataset_train.pop('fires')
    y_label = dataset_test.pop('fires')
    
    return dataset_train, x_label, dataset_test, y_label

def seasonal_clim_division(sea, dataset_train, dataset_test,
                           names_clim_summer = ['temp_2', 'prec_2'], 
                           names_clim_winter = ['temp_1', 'prec_1']):
    '''
    

    Parameters
    ----------
    sea : int
        the season of the experiment, 1 if winter, 2 if summer.
    dataset_train : df
        trainig dataset.
    dataset_test : df
        test dataset.
    names_clim_winter : list of objects
        it contains the name of the climate varaibles only for the winter season (i.e ['T_mean_winter']).
    names_clim_summer : list of objects
        it contains the name of the climate varaibles only for the summer season (i.e ['T_mean_summer']).

    Returns
    -------
    dataset_train : df
        the training dataset updated depending on the season of the experiment.
    dataset_test : df
        the test dataset updated depending on the season of the experiment.

    '''
    if names_clim_winter == None:
        pass
    else:
        # in winter the columns referred to summer are dropped
        if sea == 1:
            dataset_train = dataset_train.drop(names_clim_summer, axis = 1)
            dataset_test = dataset_test.drop(names_clim_summer, axis = 1)
        # in summer the columns related to winter are dropped    
        elif sea == 2:            
            dataset_train = dataset_train.drop(names_clim_winter, axis = 1)
            dataset_test = dataset_test.drop(names_clim_winter, axis = 1)
    
    return dataset_train, dataset_test




def test_scores(model, y_test, y_label):
    '''

    Parameters
    ----------
    model : sklearn object
        selected ML algorithm.
    y_test : df
        test dataset.
    y_label : array
        labels of test dataset.

    Returns
    -------
    auc : float
        auc of the model.
    cm : list
        confusion matrix.
    report : 
        some test scores and the general model's accuracy.
    rmse : float
        mean squared error.

    '''
    
        
    #the probability predicion
    p_test = model.predict_proba(y_test)[:,1]
    
    #calculate score
    auc = roc_auc_score(y_label, p_test)
    
    # no skill auc for plotting roc curve
    ns_probs = [0 for _ in range(len(y_label))]
    ns_auc = roc_auc_score(y_label, ns_probs)
    
    # calculate roc curves
    ns_fpr, ns_tpr, _ = roc_curve(y_label, ns_probs)
    fpr, tpr, _ = roc_curve(y_label, p_test)
    # plot the roc curve for the model
    plt.figure()
    plt.plot(ns_fpr, ns_tpr, label='No Skill')
    plt.plot(fpr, tpr, label='Classification model')
    # axis labels
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC curve')
    # show the legend
    plt.legend()
    
    
    
    mse = mean_squared_error(y_label, p_test)
    
    names_ = ['no fire','fire']
    
    #the actulal 0 1 class prediction
    p_test1 = model.predict(y_test)
    
    cm = confusion_matrix(y_label, p_test1, normalize = 'true')
    report = classification_report(y_label, p_test1, target_names = names_) 
    
    
    plt.figure()
    sns.heatmap(cm.T, annot = True, fmt = ".0%", cmap = "cividis", xticklabels = names_, yticklabels = names_)
    plt.xlabel("True label")
    plt.ylabel("Predicted label")    
    plt.title('confusion matrix')

    print('roc_auc_test = ',auc)
    print('report: \n', report)
    print('mse: ', mse)
    
    return auc, cm, report, mse


# just in case a susc map tif format is wanted
def to_raster(path_dem, p_burned, points_df, my_x , my_y, out_path):
    '''
    

    Parameters
    ----------
    path_dem : string
        path of the dem file in tif or tiff format.
    p_burned : array
        array of probabilities for the label=1.
    points_df : df
        points set.
    my_x : array
        array of x coordinates.
    my_y : TYPE
        array of y coordinates.
    out_path : string
        path output file.

    Returns
    -------
    None

    '''
    
    dem = rio.open(path_dem)
    # d = dem.read()
    
    coordx = np.float64(my_x)
    coordy = np.float64(my_y)


    # bounds and resolution that I want for my raster (the dem one)
    xmin = dem.bounds[0]
    xmax = dem.bounds[2]
    ymin = dem.bounds[1]
    ymax = dem.bounds[3]
    
    xres = dem.transform[0]
    yres = -dem.transform[4]

    #create 2D array from 1D array of probabilities
    statistic,x_edge,y_edge,binnumber = binned_statistic_2d(coordx, coordy, p_burned, 'mean',bins =[np.arange(xmin, xmax+xres, xres), np.arange(ymin, ymax+yres, yres)] )
    
    statistic = statistic.T
    # np.shape(statistic)
     
    # the new meta for the new raster has to be equalt to the dem one     
    new_meta = {'driver': 'GTiff',
      'dtype': 'float64',
      'nodata': dem.nodata,
      'width': statistic.shape[1],
      'height': statistic.shape[0],
      'count': 1,
      'crs': dem.crs,
      'transform': (dem.transform[0], dem.transform[1], dem.transform[2], dem.transform[3], -dem.transform[4], dem.bounds[1] )}
    
    #prepare the right shape for exporting the matrix as tif file
    statistics = np.zeros((1,statistic.shape[0], statistic.shape[1]))
    statistics[0,:,:] = statistic
    
    # creating the map --> this has resolution avaraged when upload on qgis
    folder_path = out_path.split('.')[0]
    name_temporary_file = folder_path + '_temporary.tif'
    with rio.open(name_temporary_file, 'w', **new_meta) as dest:
        dest.write(statistics)
    
    # this 2 operations allow to visualize the right resolution on qgis
    
    gdal.Warp(out_path, name_temporary_file, xRes=xres, yRes=yres, geoloc = True)
    gdal.Warp(out_path, name_temporary_file, xRes=xres, yRes=yres)
    
    print('I have created the susceptibility map')
    
    dest.close()
    dem.close() 
    
    os.remove(name_temporary_file)

#%% input data

vs = 'vs0'

# input
complete_df_path = r"C:\Users\Giorg\Dropbox\CIMA\tirocinio\Liguria\liguria\ML_data\ML_datasets\{}\points_liguria_vs0.1.pkl".format(vs) 
fires_df_path    = r"C:\Users\Giorg\Dropbox\CIMA\tirocinio\Liguria\liguria\ML_data\ML_datasets\{}\fires_liguria_vs0.1.pkl".format(vs) 
veg_path = r"C:\Users\Giorg\Dropbox\CIMA\tirocinio\Liguria\liguria\ML_data\veg\{}\raster_ML\vegetation.tiff".format(vs)


# model params
sea = 1          # seasonal analysis --> 1 = winter 2 = summer
ntree = 10       # number of random forest estimators




#%%  model

# load the datasets and apply one hot encoding whenever needed
points_df = process_points(complete_df_path)
points_df = one_hot_automatic(points_df) 
fires_df = process_points(fires_df_path)
   
# building dataset train and test 
x_train, x_label, y_test, y_label = build_dataset(points_df, fires_df, sea)

# keep features related to the season of the analysis 
x_train, y_test = seasonal_clim_division(sea, x_train, y_test)

# selection of feature to drop --> not used by the ML
my_excluded_cols = ['row', 'col', 'x', 'y', 'point_index', 'geometry']
columns =  [col for col in x_train if col not in my_excluded_cols]

print('\nfeatures of the model\n')
for i in columns:
    print(i)


# remove not needed features
x_train = x_train[columns]
y_test = y_test[columns]

# save coordinates for plotting the ssuc maps before dropping them 
my_x = points_df['x']
my_y = points_df['y']


points_df = points_df[columns]

# inizialize the model 
model  = RandomForestClassifier(n_estimators = ntree, verbose = 2)
# fit the model
model.fit(x_train, x_label)
# classification
p = model.predict_proba(points_df)

p_burned = p[:,1]

# some performance indicators
auc, cm, report, mse = test_scores(model, y_test, y_label)


plt.scatter(my_x, my_y, c=p_burned)
plt.legend()


my_theme = {
              'legend.fontsize': 14,
              'xtick.labelsize': 14,
              'ytick.labelsize': 14,
              'axes.labelsize': 16,
              'axes.titlesize': 20,
              }

matplotlib.rcParams.update(my_theme)


f, ax = plt.subplots(figsize= (10,6))

dots = ax.scatter(my_x, my_y, c=p_burned, s=0.5)
f.colorbar(dots)

ax.set_xlabel('coord x')
ax.set_ylabel('coord y')
ax.set_title('Wildfire Susceptibility Map')


#%% if you want to produce a susc map - tif file


# for susc map
from scipy.stats import binned_statistic_2d
import os
import gdal 
import rasterio as rio

# more input
dem_path = r"C:\Users\Giorg\Dropbox\CIMA\tirocinio\Liguria\liguria\ML_data\data_for_server\vs0_marj_gurmej\dtm_liguria_2017_100m_3003.tif"

# output 
output_susc_map = r"C:\Users\Giorg\Dropbox\CIMA\tirocinio\Liguria\liguria\ML_data\susc_maps\{}\susc_map_prova2.tif".format(vs)  


# produce susc map
to_raster(dem_path, p_burned, points_df, my_x , my_y, output_susc_map)





#%%
