In [2]:
# Requirements

from arcpy.da import *
from arcpy.sa import *

import numpy as np
import pandas as pd
import os, sys 
import matplotlib.pyplot as plt
from statsmodels.formula.api import ols
import warnings

from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import cross_validate, cross_val_score,cross_val_predict, KFold, StratifiedKFold, train_test_split, StratifiedShuffleSplit, GridSearchCV
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.utils._testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
from sklearn.metrics import accuracy_score, roc_auc_score

Project_path = r'E:\科研工作01\学术论文\01区域尺度滑坡易发性对比分析_英文\代码开源\Machine Learning for LSA\Machine Learning for LSA'
arcpy.env.workspace = os.path.join(Project_path, 'Machine Learning for LSA.gdb')
factors_address = r'E:\科研工作01\学术论文\01区域尺度滑坡易发性对比分析_英文\数据库\秦巴山区要素\秦巴山区要素.gdb'
tabel_address = r'E:\科研工作01\学术论文\01区域尺度滑坡易发性对比分析_英文\统计表格'

## Calculate factors' importance and collinearity

In [None]:
# Covert geographic feature to pd.DataFrame

def CovertFeature2df(feature, fields):
    feature_arr = arcpy.da.FeatureClassToNumPyArray(feature, fields)
    return pd.DataFrame(feature_arr)

In [None]:
# Geo-Environmental factor importance (SA) calculation function

def calculate_important_factor(landslide_feature_df, all_feature_df, feature_field, bin_x = None, Continue = True):
    
    if Continue:
        # If it is a continuous factor, calculate the maximum and minimum values of the factor
        max_value = max(all_feature_df.loc[:, feature_field])
        min_value = min(all_feature_df.loc[:, feature_field])
    else:
        # If it is a discrete factor, calculate the number of features
        unique_feature = len(all_feature_df.loc[:, feature_field].unique())

    # The name of the histogram horizontal axis
    bin_name = feature_field
    
    if bin_x is None:
        if Continue:
            bin_x = np.linspace(min_value, max_value, 30)        
        else:
            bin_x = np.array(range(unique_feature))+1



    # Calculate histogram
    h_land ,_ ,_ = plt.hist(landslide_feature_df.loc[:, feature_field],
                             bins=bin_x)
    
    h_full,_,_ = plt.hist(all_feature_df.loc[:, feature_field],
                              bins=bin_x)
    
    h_full[h_full == 0] = 1e-6 # avoid 0/0
    h_land[h_land == 0] = 1e-8
    
    # Bayes formula
    
    P_R = h_full/h_full.sum()
    P_L = h_land.sum()/h_full.sum()
    P_R_L = h_land/h_land.sum()
    P_L_R = h_land/h_full
    P_L_R[P_L_R > 1] = 1
    bin_width = 1/len(bin_x[0:-1])
    
    # calculate the importance
    Ent_func = lambda Px, Py: -(Px/(Px+Py)*np.log2(Px/(Px+Py))+Py/(Px+Py)*np.log2(Py/(Px+Py)))
    Vector_Ent_func = np.vectorize(Ent_func)
    
    Ent_feature = Vector_Ent_func(h_land, h_full)  
    Gain_temp = Ent_func(h_land.sum(), h_full.sum())- np.sum((h_land+h_full)/np.sum(h_land+h_full) * Ent_feature)
    
    JS_divergence_func = lambda x, y: 0.5*np.sum(x*np.log2(2*x/(x+y)))+0.5*np.sum(y*np.log2(2*y/(x+y)))
    P_R_L[P_R_L == 0]=1e-8 
    JS_temp = JS_divergence_func(P_R_L, P_R)
    
    return JS_temp, Gain_temp

In [None]:
# variance inflation factor calculation formula
def vif(test_data, factor):
    cols = list(test_data.columns)
    cols.remove(factor)
    cols_noti = cols
    formula = factor + '~' + '+'.join(cols_noti)
    r2 = ols(formula, test_data).fit().rsquared
    return 1. / (1. - r2)

In [None]:
# important matrix (SI)

"""

#The point file used to calculate the importance coefficient needs to be prepared in advance. 
#The point file consists of random points generated in the area plus high risk slopes (HRSs). 
#Then the values of each environmental geological factor are extracted into the attribute table of the point feature.

"""

#Sample for important analysis
QinBa_sample = os.path.join(factors_address, 'QinBa_SampleForSA')

#Fields of the sample
QinBa_sample_desc = arcpy.Describe(QinBa_sample)
QinBa_sample_fields = [i.name for i in QinBa_sample_desc.fields]

# Features for sensitivity analysis
fields_list = QinBa_sample_fields[3:]
QinBa_sample_df = CovertFeature2df(QinBa_sample, fields_list)

# Dataset labels, identify HRSs set (y = 1) and random set(y = 0) 
QinBa_sample_label = 'y'
QinBa_label_df = CovertFeature2df(QinBa_sample, [QinBa_sample_label])

#HRSs dataset and random dataset(all_feature)
HRSs_feature_df = QinBa_sample_df.loc[QinBa_label_df[QinBa_sample_label] == 1, :].reset_index()
RandomPoint_feature_df = QinBa_sample_df.loc[QinBa_label_df[QinBa_sample_label] == 0, :].reset_index()

In [None]:
## important matrix (SI)

fields_list = ['TWI', 'SurfaceRoughness', 'SoilType', 'Slope', 'ReliefDegree', 'PlaneCurvature', 'NDVI', 'Map', 'Lithology', 'Landuse', 'FaultDensity', 'DistanceToRoad', 'DistanceToRiver', 'DEM', 'Aspect']
continue_list = [True, True, False, True, True, False, True, True, False, False, True, True, True, True, False]

JS_list = []
Gain_list = []

for key, field in enumerate(fields_list):
    
    temp_landslide = HRSs_feature_df.loc[:, [field]]
    temp_No_landsldie = RandomPoint_feature_df.loc[:, [field]]
    
    temp_JS,  temp_Gain= calculate_important_factor(temp_landslide, temp_No_landsldie, field, bin_x = None, Continue = continue_list[key])
    JS_list.append(temp_JS)
    Gain_list.append(temp_Gain)
    
sen_pd = pd.DataFrame()
sen_pd['factor_name'] = np.array(fields_list)
sen_pd['JS'] = np.array(JS_list)
sen_pd['Gain'] = np.array(Gain_list)

In [None]:
## Collinearity test for each factor
test_data = RandomPoint_feature_df[fields_list]
for i in test_data.columns:
    print(i, '\t', vif(test_data, i)) 

### processing HRSs and generate non-HRSs, to reassign environmental geological factors and train susceptibility models

In [None]:
# processing HRSs
HRSs_data = os.path.join(factors_address, 'QinBa_HRSs')
arcpy.CopyFeatures_management(HRSs_data, os.path.join(factors_address, 'QinBa_HRSs_Copy'))

desc = arcpy.Describe(os.path.join(factors_address, 'QinBa_HRSs_Copy'))
field_list = [i.name for i in desc.fields]
print(field_list)
    
#Delete redundant fields for HRSs points
try:
    arcpy.DeleteField_management(os.path.join(factors_address, 'QinBa_HRSs_Copy'), 
                             field_list[2:], 'DELETE_FIELDS')
    print("deleted!")
except Exception as e:
    print(f"wrong：{str(e)}")
    
#generate non_HRSs
#first, calculate the No. of HRSs
with arcpy.da.SearchCursor(os.path.join(factors_address, 'QinBa_HRSs_Copy'), '*') as cursor:
    row_count = sum(1 for row in cursor)

no_landslide = arcpy.CreateRandomPoints_management(factors_address, 'Non_HRSs', 
                                    os.path.join(factors_address, 'QinBa_Areas'), "", row_count, "", 
                                    "POINT")
desc = arcpy.Describe(os.path.join(factors_address, 'Non_HRSs'))
field_list = [i.name for i in desc.fields]
print(field_list)
    
#Delete redundant fields for non_HRSs points
try:
    arcpy.DeleteField_management(os.path.join(factors_address, 'Non_HRSs'), 
                             field_list[2:], 'DELETE_FIELDS')
    print("deleted!")
except Exception as e:
    print(f"wrong：{str(e)}")

In [None]:
# add label for HRSs and non_HRSs
HRSs = os.path.join(factors_address, 'QinBa_HRSs_Copy')
Non_HRSs = os.path.join(factors_address, 'Non_HRSs')

field_name = 'label'
field_type = 'SHORT'

try:
    # 添加新字段
    arcpy.AddField_management(HRSs, field_name, field_type)
    arcpy.AddField_management(Non_HRSs, field_name, field_type)

    # 为新字段的所有行设置值为1
    land_field = arcpy.CalculateField_management(HRSs, field_name, 1, "PYTHON")
    noland_field = arcpy.CalculateField_management(Non_HRSs, field_name, 0, "PYTHON")

    print('Successfully added and assigned!')
except Exception as e:
    print(f"wrong：{str(e)}")
    
# Merge the HRSs and Non_HRSs
output_path = os.path.join(factors_address, 'training_data')
try:
    arcpy.management.Merge(inputs = [HRSs, Non_HRSs], output = output_path)
    print(f"Merge seccessful")
except Exception as e:
    print(f"wrong：{str(e)}")

In [None]:
# extract factors values to merged training data

#List the Geo-environmental factors after collinearity filtering
factors_name = ['Factor_QinBa_DEM', 'Factor_QinBa_Slope', 'Factor_QinBa_Aspect', 'Factor_QinBa_PlaneCurvature', 'Factor_QinBa_TWI',
               'Factor_QinBa_DistanceToRiver', 'Factor_QinBa_MAP', 'Factor_QinBa_Lithology', 'Factor_QinBa_FaultDensity', 'Factor_QinBa_NDVI',
               'Factor_QinBa_SoilType', 'Factor_QinBa_DistanceToRoad', 'Factor_QinBa_Landuse']
factors_rename = ['DEM', 'Slope', 'Aspect', 'Curvature', 'TWI', 'River', 'MAP',
                 'Lithology', 'Fault', 'NDVI', 'SoilType', 'Road', 'Landuse']

sample_point = os.path.join(factors_address, 'training_data')
Raster_path_pair = [[os.path.join(factors_address, factors_name[i]), factors_rename[i]] for i in range(len(factors_name))]

arcpy.sa.ExtractMultiValuesToPoints(sample_point, Raster_path_pair, "NONE")

In [None]:
#Convert the attribute table of training data to Pandas DataFrame

sample_point = os.path.join(factors_address, 'training_data')
land_field_list = factors_rename.copy()
land_field_list = land_field_list + ['OBJECTID', 'label']

sample_df = pd.DataFrame(arcpy.da.TableToNumPyArray(sample_point, land_field_list))
sample_df.set_index('OBJECTID', drop = True, inplace = True)

#Note: It is best not to set the fields in the attribute table to short integers. 
#If they are short integers, you need to manually fill in the missing values in the short integers.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(sample_df.iloc[:, :-1], sample_df.iloc[:, [-1]],test_size=0.3, random_state=42)

In [None]:
# Frequency ratio function
def RasterFrequencyRatio(Raster_address,landslide_data, landslide_field, bins_num = 30, bins_input = False, train_ratio = 0.7):
    
    #Geo-environmental factors' information
    Raster_data = Raster(Raster_address)
    
    Raster_data_maximum = Raster_data.maximum
    Raster_data_minmum = Raster_data.minimum
    
    Raster_data_cell_width = Raster_data.meanCellWidth
    Raster_data_cell_height = Raster_data.meanCellHeight
    
    #Convert raster to NumPyArray and calculate histogram
    Raster_array = arcpy.RasterToNumPyArray(Raster_data, nodata_to_value = 999999)
    Raster_array = Raster_array.flatten()
    
    if bins_input:
        hist_raster, bins_raster = np.histogram(Raster_array, bins = bins_input)
    else:
        hist_raster, bins_raster = np.histogram(Raster_array, bins = np.linspace(Raster_data_minmum, Raster_data_maximum+0.1, bins_num))
    
    #Calculate the percentage of rasters within each histogram
    hist_raster_density = hist_raster/hist_raster.sum()
    
    #Calculate the percentage of HRSs within each histogram
    train_set, test_set = train_test_split(landslide_data, test_size = 1-train_ratio, random_state = 42)
    hist_land,_ = np.histogram(train_set.loc[:, landslide_field], bins = bins_raster)#landslide_field为所需统计直方的因子名称
    hist_land_density = hist_land/hist_land.sum()
    
    #Calculate FR
    hist_raster_density[hist_raster_density == 0] = 0.001
    Fr = hist_land_density/hist_raster_density
    record = {'start_point': bins_raster[:-1], 'end_point':bins_raster[1:], 
              'raster_num':hist_raster,'landslide_num':hist_land,
              'raster_percentage':hist_raster_density,'landslide_percentage':hist_land_density,'Fr_value':Fr}
    record_df = pd.DataFrame(record)
    return record_df

In [None]:
# Frequency ratio function
def RasterFrequencyRatio(Raster_address,landslide_data, landslide_field, bins_num = 30, bins_input = False, train_ratio = 0.7):
    
    #Geo-environmental factors' information
    Raster_data = Raster(Raster_address)
    
    Raster_data_maximum = Raster_data.maximum
    Raster_data_minmum = Raster_data.minimum
    
    Raster_data_cell_width = Raster_data.meanCellWidth
    Raster_data_cell_height = Raster_data.meanCellHeight
    
    #Convert raster to NumPyArray and calculate histogram
    Raster_array = arcpy.RasterToNumPyArray(Raster_data, nodata_to_value = 999999)
    Raster_array = Raster_array.flatten()
    
    if bins_input:
        hist_raster, bins_raster = np.histogram(Raster_array, bins = bins_input)
    else:
        hist_raster, bins_raster = np.histogram(Raster_array, bins = np.linspace(Raster_data_minmum, Raster_data_maximum+0.1, bins_num))
    
    #Calculate the percentage of rasters within each histogram
    hist_raster_density = hist_raster/hist_raster.sum()
    
    #Calculate the percentage of HRSs within each histogram
    train_set, test_set = train_test_split(landslide_data, test_size = 1-train_ratio, random_state = 42)
    hist_land,_ = np.histogram(train_set.loc[:, landslide_field], bins = bins_raster)#landslide_field为所需统计直方的因子名称
    hist_land_density = hist_land/hist_land.sum()
    
    #Calculate FR
    hist_raster_density[hist_raster_density == 0] = 0.001
    Fr = hist_land_density/hist_raster_density
    record = {'start_point': bins_raster[:-1], 'end_point':bins_raster[1:], 
              'raster_num':hist_raster,'landslide_num':hist_land,
              'raster_percentage':hist_raster_density,'landslide_percentage':hist_land_density,'Fr_value':Fr}
    record_df = pd.DataFrame(record)
    return record_df

In [None]:
#Define another function to reassign the factors
def reclass_raster(Raster_address, reclass_statistics_table):
    Raster_Raster = Raster(Raster_address)
    rmap = []
    for index in reclass_statistics_table.index:
        temp_list = []
        temp_list = reclass_statistics_table.loc[index, ['start_point', 'end_point']].to_list()
        temp_list.append(int(reclass_statistics_table.loc[index, 'Fr_value']*10000))
        rmap.append(temp_list)
    remap = RemapRange(rmap)
    Raster_reclass_temp = Reclassify(Raster_Raster, 'VALUE', remap)/10000.0
    Raster_reclass_temp.save(str(Raster_address)+'_rereclass')
    print(str(Raster_address)+'_rereclass')

In [None]:
# Reassign continuous factors

continuous_factors = ['Factor_QinBa_DEM', 'Factor_QinBa_Slope', 'Factor_QinBa_TWI', 'Factor_QinBa_DistanceToRiver',
                      'Factor_QinBa_MAP', 'Factor_QinBa_FaultDensity', 'Factor_QinBa_NDVI', 'Factor_QinBa_DistanceToRoad']

continuous_factors_rename = ['DEM', 'Slope', 'TWI', 'River', 'MAP', 'Fault', 'NDVI', 'Road']


for raster_index in range(len(continuous_factors)):
    Raster_path = os.path.join(factors_address, continuous_factors[raster_index])
    HRSs_df = X_train[y_train.label == 1]
    field_name = continuous_factors_rename[raster_index]

    reclass_table = RasterFrequencyRatio(Raster_path, HRSs_df, field_name, bins_num = 30)
    reclass_table.to_excel(os.path.join(tabel_address, continuous_factors[raster_index] +'_1FR.xlsx'))
    reclass_raster(Raster_path, reclass_table) #这是第二函数，用来进行重赋值

In [None]:
#Calculate the frequency ratio of the rock group and reassign it

bins = [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5]
factors_name = 'Factor_QinBa_Lithology'
Raster_path = os.path.join(factors_address, factors_name)
field_name = 'Lithology'

reclass_table = RasterFrequencyRatio(Raster_path, HRSs_df, field_name, bins_input = bins)
reclass_table.to_excel(os.path.join(tabel_address, factors_name +'_FR.xlsx'))
reclass_raster(Raster_path, reclass_table)

In [None]:
#Calculate the frequency ratio of the Aspect and reassign it
bins = [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5]
factors_name = 'Factor_QinBa_Aspect'
Raster_path = os.path.join(factors_address, factors_name)
field_name = 'Aspect'

reclass_table = RasterFrequencyRatio(Raster_path, HRSs_df, field_name, bins_input = bins)
reclass_table.to_excel(os.path.join(tabel_address, factors_name +'_FR.xlsx'))
reclass_raster(Raster_path, reclass_table)

In [None]:
#Calculate the frequency ratio of the PlaneCurvature and reassign it
bins = [0.5, 1.5, 2.5, 3.5]
factors_name = 'Factor_QinBa_PlaneCurvature'
Raster_path = os.path.join(factors_address, factors_name)
field_name = 'Curvature'

reclass_table = RasterFrequencyRatio(Raster_path, HRSs_df, field_name, bins_input = bins)
reclass_table.to_excel(os.path.join(tabel_address, factors_name +'_FR.xlsx'))
reclass_raster(Raster_path, reclass_table)

In [None]:
#Calculate the frequency ratio of the SoilType and reassign it
bins = [0.5, 1.5, 2.5, 3.5, 4.5, 5.5]
factors_name = 'Factor_QinBa_Landuse'
Raster_path = os.path.join(factors_address, factors_name)
field_name = 'SoilType'

reclass_table = RasterFrequencyRatio(Raster_path, HRSs_df, field_name, bins_input = bins)
reclass_table.to_excel(os.path.join(tabel_address, factors_name +'_FR.xlsx'))
reclass_raster(Raster_path, reclass_table)

In [None]:
#Calculate the frequency ratio of the Landuse and reassign it
bins = [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5]
factors_name = 'Factor_QinBa_Landuse'
Raster_path = os.path.join(factors_address, factors_name)
field_name = 'Landuse'

reclass_table = RasterFrequencyRatio(Raster_path, HRSs_df, field_name, bins_input = bins)
reclass_table.to_excel(os.path.join(tabel_address, factors_name +'_FR.xlsx'))
reclass_raster(Raster_path, reclass_table)

## Landslide susceptibility training

In [None]:
desc = arcpy.Describe(os.path.join(factors_address, 'training_data'))
field_list = [i.name for i in desc.fields]
print(field_list)

In [None]:
#Extract the reassigned factor values into sample points

#delete the extra fields
try:
    arcpy.DeleteField_management(os.path.join(factors_address, 'training_data'), 
                             field_list[3:], 'DELETE_FIELDS')
    print("deleted!")
except Exception as e:
    print(f"wrong：{str(e)}")
    
factors_name = ['Factor_QinBa_DEM', 'Factor_QinBa_Slope', 'Factor_QinBa_Aspect', 'Factor_QinBa_PlaneCurvature', 'Factor_QinBa_TWI',
               'Factor_QinBa_DistanceToRiver', 'Factor_QinBa_MAP', 'Factor_QinBa_Lithology', 'Factor_QinBa_FaultDensity', 'Factor_QinBa_NDVI',
               'Factor_QinBa_SoilType', 'Factor_QinBa_DistanceToRoad', 'Factor_QinBa_Landuse']
factors_ressign_name = [i+'_rereclass' for i in factors_name]
fields_rename = ['DEM', 'Slope', 'Aspect', 'Curvature', 'TWI', 'River', 'MAP',
                 'Lithology', 'Fault', 'NDVI', 'SoilType', 'Road', 'Landuse']

Raster_path_pair = [[os.path.join(factors_address, factors_ressign_name[i]), fields_rename[i]] for i in range(len(factors_name))]

arcpy.sa.ExtractMultiValuesToPoints(os.path.join(factors_address, 'training_data'), Raster_path_pair, "NONE")

In [None]:
# covert the 'training_datas' to DataFrame
sample_data_fields = arcpy.ListFields(os.path.join(factors_address, 'training_data'))
fields_list = [i.name for i in sample_data_fields]
sample_data = arcpy.da.TableToNumPyArray(os.path.join(factors_address, 'training_data'), fields_list[2:])
sample_data_df = pd.DataFrame(sample_data)
sample_data_df = sample_data_df.fillna(0)

#stratified sampling
split = StratifiedShuffleSplit(n_splits=1, test_size = 0.3, random_state= 42)
for train_index, test_index in split.split(sample_data_df, sample_data_df.loc[:, 'label']):
    train_set = sample_data_df.loc[train_index]
    test_set = sample_data_df.loc[test_index]

In [None]:
# Logistic regression

classifier_LR = make_pipeline(StandardScaler(), LogisticRegression(random_state = 0))

param_grid = {'logisticregression__C': np.logspace(-1, 1, 10)}

grid_search = GridSearchCV(estimator = classifier_LR, param_grid = param_grid,
                          cv=5, scoring='accuracy', verbose=1)

with ignore_warnings(category= ConvergenceWarning):
    grid_search.fit(train_set.iloc[:, 1:], train_set.iloc[:, 0])
    
print("best parameters: ", grid_search.best_params_)

# Use the model with the best parameters to make predictions
best_lr_classifier = grid_search.best_estimator_
y_pred_prob = best_lr_classifier.predict_proba(test_set.iloc[:, 1:])[:, 1]
y_pred = best_lr_classifier.predict(test_set.iloc[:, 1:])

# Calculate model accuracy
accuracy = accuracy_score(test_set.iloc[:, 0], y_pred)
print("Validation set accuracy: {:.2f}%".format(accuracy * 100))

#Calculate AUC
auc = roc_auc_score(test_set.iloc[:, 0], y_pred_prob)
print("AUC of Validation set: {:.2f}%".format(auc*100))

In [None]:
# DA
classifier_DA = make_pipeline(StandardScaler(), LinearDiscriminantAnalysis())
with ignore_warnings(category= ConvergenceWarning):
    classifier_DA.fit(train_set.iloc[:, 1:], train_set.iloc[:, 0])
    

y_pred_prob = classifier_DA.predict_proba(test_set.iloc[:, 1:])[:, 1]
y_pred = classifier_DA.predict(test_set.iloc[:, 1:])

# Calculate model accuracy
accuracy = accuracy_score(test_set.iloc[:, 0], y_pred)
print("Validation set accuracy: {:.2f}%".format(accuracy * 100))

#Calculate AUC
auc = roc_auc_score(test_set.iloc[:, 0], y_pred_prob)
print("AUC of Validation set: {:.2f}%".format(auc*100))

In [None]:
# SVM
classifier_svc = make_pipeline(StandardScaler(), SVC(random_state = 0, probability = True))
GridSearch_svc = {'svc__C': [-1, 1, 10],'svc__gamma':[0.01, 0.25, 0.5, 1] }
grid_search = GridSearchCV(estimator = classifier_svc, param_grid = GridSearch_svc, cv = 5)
with ignore_warnings(category= ConvergenceWarning):
    grid_search.fit(train_set.iloc[:, 1:], train_set.iloc[:, 0])
    
print("best parameters: ", grid_search.best_params_)

# Use the model with the best parameters to make predictions
best_svm_classifier = grid_search.best_estimator_
y_pred_prob = best_svm_classifier.predict_proba(test_set.iloc[:, 1:])[:, 1]
y_pred = best_svm_classifier.predict(test_set.iloc[:, 1:])

# Calculate model accuracy
accuracy = accuracy_score(test_set.iloc[:, 0], y_pred)
print("Validation set accuracy: {:.2f}%".format(accuracy * 100))

#Calculate AUC
auc = roc_auc_score(test_set.iloc[:, 0], y_pred_prob)
print("AUC of Validation set: {:.2f}%".format(auc*100))  

In [None]:
#MLP

GridSearch_mlp = {'mlpclassifier__hidden_layer_sizes':[(15,), (20,), (25,), (30)],
             'mlpclassifier__activation': ['tanh', 'relu'],
             'mlpclassifier__alpha':[0.0001, 0.001, 0.01, 0.1],
             'mlpclassifier__learning_rate_init':[0.001, 0.01, 0.1]}


classifier_mlp = make_pipeline(StandardScaler(), MLPClassifier(solver = 'sgd', random_state = 42))

grid_search = GridSearchCV(estimator = classifier_mlp, param_grid = GridSearch_mlp, cv=5 )

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=ConvergenceWarning, module="sklearn")
    grid_search.fit(train_set.iloc[:, 1:], train_set.iloc[:, 0])
    
print('best parameters:', grid_search.best_params_)


# Use the model with the best parameters to make predictions
y_pred_prob = grid_search.predict_proba(test_set.iloc[:, 1:])[:, 1]
y_pred = grid_search.predict(test_set.iloc[:, 1:])

# Calculate model accuracy
accuracy = accuracy_score(test_set.iloc[:, 0], y_pred)
print("Validation set accuracy: {:.2f}%".format(accuracy * 100))

#Calculate AUC
auc = roc_auc_score(test_set.iloc[:, 0], y_pred_prob)
print("AUC of Validation set: {:.2f}%".format(auc*100))  

In [None]:
#RF
rf_classifier = RandomForestClassifier(random_state=42)
param_grid = {
    'n_estimators': [30, 50, 100, 200],
    'max_depth': [None, 5, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'max_features': [1, 2, 4]
}

grid_search = GridSearchCV(estimator=rf_classifier, param_grid=param_grid, 
                           cv=5, scoring='accuracy', verbose=1)

grid_search.fit(train_set.iloc[:, 1:], train_set.iloc[:, 0])

print("best parameters: ", grid_search.best_params_)

# Use the model with the best parameters to make predictions
best_RFC_classifier = grid_search.best_estimator_
y_pred_prob = best_RFC_classifier.predict_proba(test_set.iloc[:, 1:])[:, 1]
y_pred = best_RFC_classifier.predict(test_set.iloc[:, 1:])

# Calculate model accuracy
accuracy = accuracy_score(test_set.iloc[:, 0], y_pred)
print("Validation set accuracy: {:.2f}%".format(accuracy * 100))

#Calculate AUC
auc = roc_auc_score(test_set.iloc[:, 0], y_pred_prob)
print("AUC of Validation set: {:.2f}%".format(auc*100))

In [None]:
#Convert the raster of the study area (30m by 30m) into a point file, 
#extract the factor values into the points, and use four classifiers 
#to calculate the susceptibility index of each point separately

#Four optimal machine learning classifiers were obtained, namely：
best_lr_classifier
best_svm_classifier
classifier_mlp
best_RFC_classifier

# Convert raster to point features
Raster_point = os.path.join(factors_address, 'Raster_point')
arcpy.conversion.RasterToPoint('Factor_QinBa_DEM', os.path.join(factors_address, Raster_point), 'Value')

arcpy.sa.ExtractMultiValuesToPoints(Raster_point, Raster_path_pair, 'None')

In [None]:
# Convert feature point attribute table to pd.DataFrame
field_list = arcpy.ListFields(Raster_point)
field_list = [i.name for i in field_list]
sus_calculate_array = arcpy.da.TableToNumPyArray(Raster_point,[field_list[0]]+field_list[4:])
sus_calculate_df = pd.DataFrame(sus_calculate_array, columns = [field_list[0]]+field_list[4:])

# Fill in NA values
# The filling method is to fill in the nearest neighboring value
sus_calculate_df_withoutNA = sus_calculate_df.fillna(method="ffill")
#The NA value in the first row is filled in with 0
sus_calculate_df_withoutNA = sus_calculate_df_withoutNA.fillna(0)

In [None]:
# Compute susceptibility index of grid points using different models

#LR
prob_LR0_3k = best_lr_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[:30000000,1:])[:,1]
prob_LR3_6k = best_lr_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[30000000:60000000,1:])[:,1]
prob_LR6_9k = best_lr_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[60000000:,1:])[:,1]

prob_LR = np.concatenate((prob_LR0_3k, prob_LR3_6k, prob_LR6_9k))

#SVM
# Takes about 5 hours
prob_SVM0_3k = best_svm_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[:30000000,1:])[:,1]
prob_SVM3_6k = best_svm_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[30000000:60000000,1:])[:,1]
prob_SVM6_9k = best_svm_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[60000000:,1:])[:,1]

prob_SVM = np.concatenate((prob_SVM0_3k, prob_SVM3_6k, prob_SVM6_9k))

#MLP
prob_MLP0_3k = classifier_mlp.predict_proba(sus_calculate_df_withoutNA.iloc[:30000000,1:])[:,1]
prob_MLP3_6k = classifier_mlp.predict_proba(sus_calculate_df_withoutNA.iloc[30000000:60000000,1:])[:,1]
prob_MLP6_9k = classifier_mlp.predict_proba(sus_calculate_df_withoutNA.iloc[60000000:,1:])[:,1]

prob_MLP = np.concatenate((prob_MLP0_3k, prob_MLP3_6k, prob_MLP6_9k))

#RF
# take about 5 minutes
prob_RFC0_3k = best_RFC_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[:30000000,1:])[:,1]
prob_RFC3_6k = best_RFC_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[30000000:60000000,1:])[:,1]
prob_RFC6_9k = best_RFC_classifier.predict_proba(sus_calculate_df_withoutNA.iloc[60000000:,1:])[:,1]

prob_RFC = np.concatenate((prob_RFC0_3k, prob_RFC3_6k, prob_RFC6_9k))

sus_calculate_df_withoutNA['prob_LR'] = prob_LR
sus_calculate_df_withoutNA['prob_SVM'] = prob_SVM
sus_calculate_df_withoutNA['prob_MLP'] = prob_MLP
sus_calculate_df_withoutNA['prob_RFC'] = prob_RFC

#Extract the latitude and longitude coordinates of each raster point
sus_calculate_array = arcpy.da.TableToNumPyArray(Raster_point,['SHAPE@X','SHAPE@Y']+[field_list[0]])
sus_calculate_df = pd.DataFrame(sus_calculate_array)
data = sus_calculate_df.merge(sus_calculate_df_withoutNA, left_on = 'OBJECTID', right_on = 'OBJECTID')

In [None]:
#Using gdal to generate susceptibility distribution plots
def conduct_LSM(data_array, Raster_model, save_path, file_name):
    #Identify longitude and latitude grid coordinates
    var_lon = data_array.columns.map(float)
    var_lat = data_array.index
    
    #Identify the susceptibility value of each grid point
    var_value = data_array.values
    #data_arr = np.asarray(gdal_array)
    
    #Identify the scope of LSM
    LonMin, LatMax, LonMax, LatMin = [var_lon.min(), var_lat.max(), var_lon.max(), var_lat.min()]
    N_Lat = len(var_lat)
    N_Lon = len(var_lon)

    #Calculate raster spatial resolution
    Lon_Res = (var_lon[1:].values - var_lon[:-1].values)[0]
    Lat_Res = (var_lat[1:].values - var_lat[:-1].values)[0]
    
    Lon_Res = abs(Lon_Res)
    Lat_Res = abs(Lat_Res)
    
    # Create a grid driver
    driver = gdal.GetDriverByName('GTiff')
    
    #Create raster
    out_tif = driver.Create(os.path.join(save_path, file_name), N_Lon, N_Lat, 1, gdal.GDT_Float32)
    
    # Set projection and conversion information for an image
    
    #Get projection and transformation information
    GeoTransform = (LonMin, Lon_Res, 0, LatMax, 0, -Lat_Res)
    GeoProjection = gdal.Open(Raster_model).GetProjection()
    #
    out_tif.SetGeoTransform(GeoTransform)
    out_tif.SetProjection(GeoProjection)
    
    # Data writing
    out_tif.GetRasterBand(1).WriteArray(var_value)  # 将数据写入内存，此时没有写入硬盘
    out_tif.FlushCache()  # 将数据写入硬盘
    out_tif = None  # 注意必须关闭tif文件

In [None]:
# generate LSM
gdal_array_MLP = data.pivot(index = 'SHAPE@Y', columns = 'SHAPE@X', values='prob_MLP')
gdal_array_LR = data.pivot(index = 'SHAPE@Y', columns = 'SHAPE@X', values='prob_LR')
gdal_array_RFC = data.pivot(index = 'SHAPE@Y', columns = 'SHAPE@X', values='prob_RFC')
gdal_array_SVM = data.pivot(index = 'SHAPE@Y', columns = 'SHAPE@X', values='prob_SVM')

file_path = r'E:\LSM'
raster_model = 'DEM.tif'
file_name_LR = 'LSM_LR.tif'
file_name_MLP = 'LSM_MLP.tif'
file_name_RFC = 'LSM_RFC.tif'
file_name_SVM = 'LSM_SVM.tif'
conduct_LSM(gdal_array_MLP.iloc[::-1], os.path.join(file_path, raster_model), file_path, file_name_MLP)
conduct_LSM(gdal_array_LR.iloc[::-1], os.path.join(file_path, raster_model), file_path, file_name_LR)
conduct_LSM(gdal_array_RFC.iloc[::-1], os.path.join(file_path, raster_model), file_path, file_name_RFC)
conduct_LSM(gdal_array_SVM.iloc[::-1], os.path.join(file_path, raster_model), file_path, file_name_SVM)

## performance evaluation

In [None]:
# Define calculation functions for ROC, AUC, Acc, Precision, Recall, F-measure and other indicators
def calculate_common_indicators(feature_class, prob_field, label, bins = np.linspace(0, 1, 50)):
    # feature class is a combined data set of landslide and non-landslide points, which is used to test the performance of LSM
    # prob_field is the field name of the landslide susceptibility index in the data set
    # label indicates whether the columns is a landslide or a non-landslide.
    
    feature_df = arcpy.da.TableToNumPyArray(feature_class, [prob_field, label])
    feature_df = pd.DataFrame(feature_df)
    feature_df.fillna(0.5, inplace = True)
    tpr_list = []
    fpr_list = []
    for index in bins:
        temp_tp = feature_df.loc[(feature_df.loc[:, prob_field] >= index) & (feature_df.loc[:, label] == 1)].shape[0]
        temp_fp = feature_df.loc[(feature_df.loc[:, prob_field] >= index) & (feature_df.loc[:, label] == 0)].shape[0]
        temp_fn = feature_df.loc[(feature_df.loc[:, prob_field] <  index) & (feature_df.loc[:, label] == 1)].shape[0]
        temp_tn = feature_df.loc[(feature_df.loc[:, prob_field] <  index) & (feature_df.loc[:, label] == 0)].shape[0]
        
        temp_TPR = temp_tp/(temp_tp+temp_fn)
        temp_FPR = temp_fp/(temp_fp+temp_tn)
        
        tpr_list.append(temp_TPR)
        fpr_list.append(temp_FPR)
        
    index = 0.5
    tp = feature_df.loc[(feature_df.loc[:, prob_field] >= index) & (feature_df.loc[:, label] == 1)].shape[0]
    fp = feature_df.loc[(feature_df.loc[:, prob_field] >= index) & (feature_df.loc[:, label] == 0)].shape[0]
    fn = feature_df.loc[(feature_df.loc[:, prob_field] <  index) & (feature_df.loc[:, label] == 1)].shape[0]
    tn = feature_df.loc[(feature_df.loc[:, prob_field] <  index) & (feature_df.loc[:, label] == 0)].shape[0]
    
    
    ACC = (tn + tp)/(tn + tp + fn + fp)
    Precision = tp/(tp + fp)
    Recall = tp/(tp + fn)
    F_measure = (2*tp)/(2*tp + fn + fp)
    
    ROC = pd.DataFrame()    
    ROC[prob_field+ '_FPR'] = fpr_list
    ROC[prob_field+ '_TPR'] = tpr_list
    
    
    bins_width = np.array(fpr_list[:-1]) - np.array(fpr_list[1:])
    tpr_hight = np.array(tpr_list[:-1]) + (np.array(tpr_list[1:]) - np.array(tpr_list[:-1]))/2
    AUC = sum(bins_width*tpr_hight)
    return AUC, ROC, ACC, Precision, Recall, F_measure

In [None]:
# Define prediction rate curve calculation function
# The number of grid cells in each bin is equal
def Calculate_PredicCurve(landslide_class, field_name, LSM_raster, bins = 30):
    
    temp_raster = Raster(LSM_raster)
    
    #Calculate the maximum and minimum values of a raster
    max_boundary = temp_raster.maximum
    min_boundary = temp_raster.minimum
    
    # Convert raster to numpy array
    arr = arcpy.RasterToNumPyArray(temp_raster,nodata_to_value=-99999)
    
    # Flatten and sort numpy array
    arr = arr.flatten()
    arr = np.sort(arr)
    
    # Get valid values (remove the value of Nodata)
    arr = arr[arr>= min_boundary]
    
    # Count the number of rasters within a bin cell
    raster_num = len(arr)
    bin_width = raster_num//bins + 1
    bins_edge = bin_width*np.arange(0, bins+1)
    
    # Calculate the boundary value of bin
    bins_edge = [arr[i] for i in bins_edge[:-1]]
    bins_edge.append(max_boundary)
    
    bins_edge[1:] = [values + 0.00000001*(key+1) for key, values in enumerate(bins_edge[1:])]
    
    Raster_num = pd.cut(pd.Series(arr), bins_edge, right = False).value_counts(normalize=True,sort = False)
    
    # Count the number of landslide points in each bin 
    feature_df = arcpy.da.TableToNumPyArray(landslide_class, [field_name])
    feature_df = pd.DataFrame(feature_df)
    feature_df.fillna(0.5, inplace = True)
    landslide_num = pd.cut(feature_df[field_name], bins_edge).value_counts(sort = False).values
    
    #Output results
    statistic_infor = pd.DataFrame(Raster_num.values*100, index = Raster_num.index, columns = ['Raster_num / %'])
    
    statistic_infor['LSI'] = (np.array(bins_edge[:-1]) + np.array(bins_edge[1:]))/2
    statistic_infor['landslide_num'] = landslide_num
    statistic_infor['landslide_perc'] = landslide_num/landslide_num.sum()*100
    statistic_infor['landslide_cum_perc'] = landslide_num.cumsum()/landslide_num.sum()
    statistic_infor['landslide_cumsum'] = landslide_num.cumsum()
    statistic_infor['FR'] = statistic_infor['landslide_perc']/statistic_infor['Raster_num / %']
    
    
    # Calculate the area under the prediction rate curve, which is the indicator PAS
    area_perc = landslide_num[::-1]*np.linspace(1, 0, bins)
    PAS = area_perc.sum()/landslide_num.sum()
    
    return PAS, statistic_infor

In [None]:
# Define LSM sensitivity index(SI) calculation function
def FrequencyRatio_Curve(landslide_class, field_name, LSM_raster, bins = 30):
    
    temp_raster = Raster(LSM_raster)
    
    max_boundary = temp_raster.maximum
    min_boundary = temp_raster.minimum
    
    arr = arcpy.RasterToNumPyArray(temp_raster,nodata_to_value=-99999)
    
    arr = arr.flatten()
    arr = np.sort(arr)
    
    arr = arr[arr>= min_boundary]
        
    bins_edge = np.linspace(0, 1.0001, bins)
    
    Raster_num_norm = pd.cut(pd.Series(arr), bins_edge, right = False).value_counts(normalize=True,sort = False)
    Raster_num = pd.cut(pd.Series(arr), bins_edge, right = False).value_counts(sort = False)
    
    feature_df = arcpy.da.TableToNumPyArray(landslide_class, [field_name])
    feature_df = pd.DataFrame(feature_df)
    feature_df.fillna(0.5, inplace = True)
    landslide_num = pd.cut(feature_df[field_name], bins_edge).value_counts(sort = False).values
    
    
    # Calculate sensitivity coefficient
    h_full = Raster_num.values
    h_land = landslide_num
    
    h_full[h_full == 0] = 100 # avoid 0/0
    h_land[h_land == 0] = 1
    
    # Parts of Bayesian Formula
    P_R = h_full/h_full.sum()
    P_L = h_land.sum()/h_full.sum()
    P_R_L = h_land/h_land.sum()
    P_L_R = h_land/h_full
    P_L_R[P_L_R > 1] = 1
    bin_width = 1/len(bins_edge[0:-1])
    
    # Calculate SI
    Ent_func = lambda Px, Py: -(Px/(Px+Py)*np.log2(Px/(Px+Py))+Py/(Px+Py)*np.log2(Py/(Px+Py)))
    Vector_Ent_func = np.vectorize(Ent_func)
    
    Ent_feature = Vector_Ent_func(h_land, h_full)  
    Gain_temp = Ent_func(h_land.sum(), h_full.sum())- np.sum((h_land+h_full)/np.sum(h_land+h_full) * Ent_feature)
    
    JS_divergence_func = lambda x, y: 0.5*np.sum(x*np.log2(2*x/(x+y)))+0.5*np.sum(y*np.log2(2*y/(x+y)))
    P_R_L[P_R_L == 0]=1e-8 
    JS_temp = JS_divergence_func(P_R_L, P_R)
    
    
    #output results
    statistic_infor = pd.DataFrame(Raster_num_norm.values*100, index = Raster_num_norm.index, columns = ['Raster_num / %'])
    
    statistic_infor['LSI'] = (np.array(bins_edge[:-1]) + np.array(bins_edge[1:]))/2
    statistic_infor['landslide_num'] = landslide_num
    statistic_infor['landslide_perc'] = landslide_num/landslide_num.sum()*100
    statistic_infor['landslide_cum_perc'] = landslide_num.cumsum()/landslide_num.sum()
    statistic_infor['landslide_cumsum'] = landslide_num.cumsum()
    statistic_infor['FR'] = statistic_infor['landslide_perc']/statistic_infor['Raster_num / %']
    
    return JS_temp, Gain_temp, statistic_infor

In [None]:
# Extract the value of LSM to the landslide point
landslides_data = 'landslides_data'
Merged_data = 'performance_evaluated_data'

LSM_results = ['LSM_AHP', 'LSM_LR', 'LSM_DA', 'LSM_SVM', 'LSM_MLP', 'LSM_RFC', 'LSM_AlexNet', 'LSM_CNN1D', 'LSM_CNN2D', 'LSM_LSTM']

fields_rename = ['AHP', 'LR', 'DA','SVM', 'MLP', 'RFC', 'AlexNet', 'CNN1D', 'CNN2D', 'LSTM']

Raster_field_pair = [[os.path.join(factors_address, LSM_results[i]), fields_rename[i]] for i in np.arange(0,len(LSM_results))]
arcpy.sa.ExtractMultiValuesToPoints(landslides_data, Raster_field_pair, 'NONE')
arcpy.sa.ExtractMultiValuesToPoints(Merged_data, Raster_field_pair, 'NONE')

In [None]:
# Calculate ROC, AUC, ACC and other indices

data_fields = arcpy.ListFields(Merged_data)
fields_list = [i.name for i in data_fields]
fields_list = fields_list[3:]
print(fields_list)

label = 'label'
AUC_list = []
ACC_list = []
Precision_list = []
Recall_list = []
F_list = [] 
ROC_df = pd.DataFrame()

for field in fields_list:
    AUC, ROC, ACC, Precision, Recall, F_measure= calculate_ROC(Merged_data, field, label)
    
    AUC_list.append(AUC)
    ACC_list.append(ACC)
    Precision_list.append(Precision)
    Recall_list.append(Recall)
    F_list.append(F_measure)
    
    ROC_df = pd.concat([ROC_df, ROC], axis = 1)
    
matrixs_df = pd.DataFrame()
matrixs_df['field'] = fields_list
matrixs_df['AUC'] = AUC_list
matrixs_df['ACC'] = ACC_list
matrixs_df['Precision'] = Precision_list
matrixs_df['Recall'] = Recall_list
matrixs_df['F_measure'] = F_list

In [None]:
#Calculate PAS

data_fields = arcpy.ListFields(landslides_data)
fields_list = [i.name for i in data_fields]
fields_list = fields_list[3:]

area_under_PredicCurve = []
area_under_PredicCurve_df = pd.DataFrame()
#for field in fields_list[-1]:
for field in Raster_field_pair:
    print('Start calculating the prediction rate curve of {}'.format(field[1]))
    area, statistic_infor = Calculate_PredicCurve(landslides_data, field[1], field[0], bins = 30)
    area_under_PredicCurve.append(area)
    statistic_infor.to_excel(os.path.join(table_address, field[1]+'.xlsx'))
    print('completed')
area_under_PredicCurve_df['LSM_type'] = fields_list
area_under_PredicCurve_df['Area'] = area_under_PredicCurve

In [None]:
# calculate SI

data_fields = arcpy.ListFields(landslides_data)
fields_list = [i.name for i in data_fields]
fields_list = fields_list[3:]

SI_JS = []
SI_Gain = []
SI_df = pd.DataFrame()
#for field in fields_list[-1]:
for field in Raster_field_pair:
    print('Start calculating the frequence of {}'.format(field[1]))
    temp_JS, temp_Gain, statistic_infor = FrequencyRatio_Curve(landslides_data, field[1], field[0], bins = 30)
    SI_JS.append(temp_JS)
    SI_Gain.append(temp_Gain)
    #statistic_infor.to_excel(os.path.join(table_address, field[1]+'_FR.xlsx'))
    print('completed')
    
SI_df['LSM_type'] = fields_list
SI_df['JS'] = SI_JS
SI_df['Gain'] = SI_Gain