# Our Model

In [4]:
import os
import numpy as np
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from tensorflow.keras.applications import EfficientNetB7
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Concatenate, Input, Dropout, Conv2D, SeparableConv2D, BatchNormalization, Activation
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Lambda, Reshape, Conv2D, Multiply, LayerNormalization, MultiHeadAttention, Add, Flatten
from tensorflow.keras import initializers
from tensorflow.keras.callbacks import ModelCheckpoint

data_path = "Bijie_landslide_dataset/Bijie-landslide-dataset"
img_size = (224, 224)
batch_size = 8
num_classes = 2


def spatial_attention_module(input_feature):
    # 使用一个1x1的卷积层来生成注意力图
    attention = Conv2D(1, (1, 1), activation='sigmoid')(input_feature)
    # 将注意力图乘以原特征图来增强重要特征
    enhanced_feature = Multiply()([input_feature, attention])
    return enhanced_feature

def build_image_model(input_shape):
    img_input = Input(shape=input_shape, name='image_input')
    img_base_model = EfficientNetB7(include_top=False, input_tensor=img_input, weights='imagenet')
    img_features = img_base_model.output
    img_features = spatial_attention_module(img_features)
    img_features = GlobalAveragePooling2D()(img_features)
    return Model(inputs=img_input, outputs=img_features, name='ImageModel')

def load_images(folder, filenames, color_mode='rgb'):
    images = []
    for filename in filenames:
        if filename.startswith('.'):  # 过滤掉隐藏文件
            continue
        img_path = os.path.join(folder, filename)
        img = load_img(img_path, target_size=img_size, color_mode=color_mode)
        img_array = img_to_array(img)
        images.append(img_array)
    return np.array(images)


def transformer_block(x, num_heads=4):
    # Layer Normalization
    x_norm = LayerNormalization()(x)
    # Multi-head self-attention
    attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=64)(x_norm, x_norm)
    # Skip connection
    x = Add()([x, attn_output])
    # Another layer normalization
    x_norm = LayerNormalization()(x)
    # Feed-forward network
    ff_output = Dense(units=512, activation='relu')(x_norm)
    # Final skip connection
    output = Add()([x, ff_output])
    return output

def prepare_dataset(data_path):
    landslide_path = os.path.join(data_path, "landslide")
    non_landslide_path = os.path.join(data_path, "non-landslide")
    
    landslide_filenames = [filename for filename in os.listdir(os.path.join(landslide_path, "image")) if not filename.startswith('.')]
    non_landslide_filenames = [filename for filename in os.listdir(os.path.join(non_landslide_path, "image")) if not filename.startswith('.')]

    landslide_images = load_images(os.path.join(landslide_path, "image"), landslide_filenames)
    landslide_dems = load_images(os.path.join(landslide_path, "dem"), landslide_filenames, color_mode='grayscale')
    non_landslide_images = load_images(os.path.join(non_landslide_path, "image"), non_landslide_filenames)
    non_landslide_dems = load_images(os.path.join(non_landslide_path, "dem"), non_landslide_filenames, color_mode='grayscale')

    landslide_labels = np.ones(len(landslide_filenames), dtype=int)
    non_landslide_labels = np.zeros(len(non_landslide_filenames), dtype=int)

    images = np.concatenate([landslide_images, non_landslide_images], axis=0)
    dems = np.concatenate([landslide_dems, non_landslide_dems], axis=0)
    labels = np.concatenate([landslide_labels, non_landslide_labels], axis=0)

    return images, dems, labels


images, dems, labels = prepare_dataset(data_path)
labels = to_categorical(labels, num_classes=num_classes)

x_img_train, x_img_val, x_dem_train, x_dem_val, y_train, y_val = train_test_split(images, dems, labels, test_size=0.2, random_state=42)

def build_dem_cnn_model(input_shape):
    inputs = Input(shape=input_shape, name='dem_input')
    x = SeparableConv2D(128, (3, 3), activation='relu', padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = SeparableConv2D(256, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = SeparableConv2D(512, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = GlobalAveragePooling2D()(x)
    x = Reshape((1, -1))(x)
    x = transformer_block(x)
    x = Flatten()(x)
    return Model(inputs=inputs, outputs=x, name='Advanced_DEM_CNN')


def build_vae_feature_merger(input_dim):
    input_layer = Input(shape=(input_dim,))
    x = Dense(128, activation='relu')(input_layer)
    z_mean = Dense(64)(x)
    z_log_var = Dense(64)(x)
    
    def sampling(args):
        z_mean, z_log_var = args
        batch = K.shape(z_mean)[0]
        dim = K.int_shape(z_mean)[1]
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + K.exp(0.5 * z_log_var) * epsilon

    z = Lambda(sampling)([z_mean, z_log_var])
    encoder = Model(input_layer, [z_mean, z_log_var, z], name='vae_merger')
    return encoder



# Build the dual input model
def build_dual_input_model(input_shape):
    img_model = build_image_model(input_shape)
    img_features = img_model.output

    dem_input_shape = (img_size[0], img_size[1], 1)  
    dem_model = build_dem_cnn_model(dem_input_shape)
    dem_features = dem_model.output

    merged_features = Concatenate()([img_features, dem_features])
    vae_encoder = build_vae_feature_merger(merged_features.shape[1])
    z_mean, z_log_var, z = vae_encoder(merged_features)

    x = Dense(256, activation='relu')(z)
    x = Dropout(0.5)(x)
    output = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=[img_model.input, dem_model.input], outputs=output, name='DualInputModel')
    return model

# Compile the model
model = build_dual_input_model(input_shape=(224, 224, 3))
model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

# Define the checkpoint filepath
checkpoint_filepath = 'best_model.h5'

# Define the ModelCheckpoint callback
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True,
    save_weights_only=False,
    verbose=1
)

# Train the model with the ModelCheckpoint callback
history = model.fit(
    [x_img_train, x_dem_train],
    y_train,
    validation_data=([x_img_val, x_dem_val], y_val),
    batch_size=batch_size,
    epochs=20,
    callbacks=[checkpoint_callback]  # Pass the ModelCheckpoint callback here
)

# Load the best weights
model.load_weights(checkpoint_filepath)

# Predict using the best weights
y_pred_probs = model.predict([x_img_val, x_dem_val])
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_val, axis=1)

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')

print("Classification Report:")
print(classification_report(y_true, y_pred, target_names=['non-landslide', 'landslide']))
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")


Epoch 1/20
Epoch 1: val_accuracy improved from -inf to 0.94234, saving model to best_model.h5
Epoch 2/20
Epoch 2: val_accuracy improved from 0.94234 to 0.97477, saving model to best_model.h5
Epoch 3/20
Epoch 3: val_accuracy did not improve from 0.97477
Epoch 4/20
Epoch 4: val_accuracy did not improve from 0.97477
Epoch 5/20
Epoch 5: val_accuracy improved from 0.97477 to 0.97838, saving model to best_model.h5
Epoch 6/20
Epoch 6: val_accuracy did not improve from 0.97838
Epoch 7/20
Epoch 7: val_accuracy did not improve from 0.97838
Epoch 8/20
Epoch 8: val_accuracy improved from 0.97838 to 0.98198, saving model to best_model.h5
Epoch 9/20
Epoch 9: val_accuracy did not improve from 0.98198
Epoch 10/20
Epoch 10: val_accuracy improved from 0.98198 to 0.98739, saving model to best_model.h5
Epoch 11/20
Epoch 11: val_accuracy did not improve from 0.98739
Epoch 12/20
Epoch 12: val_accuracy did not improve from 0.98739
Epoch 13/20
Epoch 13: val_accuracy did not improve from 0.98739
Epoch 14/20
Ep

## Other Model

In [4]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用ResNet50）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = ResNet50(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用简单的CNN模型）
input_dem = Input(shape=(image_height, image_width, 3))
dem_conv1 = Conv2D(32, kernel_size=(3, 3), activation='relu')(input_dem)
dem_pool1 = MaxPooling2D(pool_size=(2, 2))(dem_conv1)
dem_conv2 = Conv2D(64, kernel_size=(3, 3), activation='relu')(dem_pool1)
dem_pool2 = MaxPooling2D(pool_size=(2, 2))(dem_conv2)
dem_flat = Flatten()(dem_pool2)
dem_output = Dense(256, activation='relu')(dem_flat)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)




Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.3931 - accuracy: 0.8685 - 629ms/epoch - 35ms/step

Test accuracy: 0.8684684634208679
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.85      1.00      0.92       399
    landslide       0.99      0.54      0.70       156

     accuracy                           0.87       555
    macro avg       0.92      0.77      0.81       555
 w

In [5]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG16, InceptionV3
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用VGG16）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = VGG16(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用InceptionV3）
input_dem = Input(shape=(image_height, image_width, 3))
dem_features = InceptionV3(include_top=False, weights='imagenet', input_tensor=input_dem)(input_dem)
dem_features = GlobalAveragePooling2D()(dem_features)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.2778 - accuracy: 0.9297 - 1s/epoch - 67ms/step

Test accuracy: 0.929729700088501
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.94      0.96      0.95       399
    landslide       0.90      0.84      0.87       156

     accuracy                           0.93       555
    macro avg       0.92      0.90      0.91       555
 weigh

In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50, MobileNetV2
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用ResNet50）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = ResNet50(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用MobileNetV2）
input_dem = Input(shape=(image_height, image_width, 3))
dem_features = MobileNetV2(include_top=False, weights='imagenet', input_tensor=input_dem)(input_dem)
dem_features = GlobalAveragePooling2D()(dem_features)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


2024-05-13 11:35:22.958020: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-05-13 11:35:36.746160: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-05-13 11:35:37.181305: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 22134 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4090, pci bus id: 0000:1b:00.0, compute capability: 8.9


Epoch 1/50


2024-05-13 11:35:51.244375: I tensorflow/stream_executor/cuda/cuda_dnn.cc:384] Loaded cuDNN version 8101
2024-05-13 11:35:51.934578: W tensorflow/stream_executor/gpu/asm_compiler.cc:230] Falling back to the CUDA driver for PTX compilation; ptxas does not support CC 8.9
2024-05-13 11:35:51.934595: W tensorflow/stream_executor/gpu/asm_compiler.cc:233] Used ptxas at ptxas
2024-05-13 11:35:51.934666: W tensorflow/stream_executor/gpu/redzone_allocator.cc:314] UNIMPLEMENTED: ptxas ptxas too old. Falling back to the driver to compile.
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.
2024-05-13 11:35:52.720975: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.3996 - accuracy: 0.9297 - 830ms/epoch - 46ms/step

Test accuracy: 0.929729700088501
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.93      0.97      0.95       399
    landslide       0.93      0.81      0.87       156

     accuracy                           0.93       555
    macro avg       0.93      0.89      0.91       555
 weighted avg 

In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2, Xception
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用MobileNetV2）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = MobileNetV2(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用Xception）
input_dem = Input(shape=(image_height, image_width, 3))
dem_features = Xception(include_top=False, weights='imagenet', input_tensor=input_dem)(input_dem)
dem_features = GlobalAveragePooling2D()(dem_features)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


2024-05-13 11:59:58.884454: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.




2024-05-13 12:00:12.912068: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-05-13 12:00:13.332656: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 22134 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4090, pci bus id: 0000:39:00.0, compute capability: 8.9


Epoch 1/50


2024-05-13 12:00:25.916309: I tensorflow/stream_executor/cuda/cuda_dnn.cc:384] Loaded cuDNN version 8101
2024-05-13 12:00:26.561397: W tensorflow/stream_executor/gpu/asm_compiler.cc:230] Falling back to the CUDA driver for PTX compilation; ptxas does not support CC 8.9
2024-05-13 12:00:26.561414: W tensorflow/stream_executor/gpu/asm_compiler.cc:233] Used ptxas at ptxas
2024-05-13 12:00:26.561497: W tensorflow/stream_executor/gpu/redzone_allocator.cc:314] UNIMPLEMENTED: ptxas ptxas too old. Falling back to the driver to compile.
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.
2024-05-13 12:00:27.376543: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.5129 - accuracy: 0.9459 - 864ms/epoch - 48ms/step

Test accuracy: 0.9459459185600281
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.98      0.95      0.96       399
    landslide       0.88      0.94      0.91       156

     accuracy                           0.95       555
    macro avg       0.93      0.94      0.93       555
 weighted avg

In [6]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNet, DenseNet121
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用MobileNet）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = MobileNet(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用DenseNet121）
input_dem = Input(shape=(image_height, image_width, 3))
dem_features = DenseNet121(include_top=False, weights='imagenet', input_tensor=input_dem)(input_dem)
dem_features = GlobalAveragePooling2D()(dem_features)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.1397 - accuracy: 0.9730 - 741ms/epoch - 41ms/step

Test accuracy: 0.9729729890823364
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.98      0.98      0.98       399
    landslide       0.96      0.94      0.95       156

     accuracy                           0.97       555
    macro avg       0.97      0.96      0.97       555
 w

In [3]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2, InceptionResNetV2
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用MobileNetV2）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = MobileNetV2(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用InceptionResNetV2）
input_dem = Input(shape=(image_height, image_width, 3))
dem_features = InceptionResNetV2(include_top=False, weights='imagenet', input_tensor=input_dem)(input_dem)
dem_features = GlobalAveragePooling2D()(dem_features)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_resnet_v2/inception_resnet_v2_weights_tf_dim_ordering_tf_kernels_notop.h5
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 1.7403 - accuracy: 0.8468 - 1s/epoch - 64ms/step

Test accuracy: 0.8468468189239502
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.96      0.82      0.88       399
    

In [6]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import DenseNet121, VGG16
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用DenseNet121）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = DenseNet121(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用VGG16）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = VGG16(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.6326 - accuracy: 0.8919 - 872ms/epoch - 48ms/step

Test accuracy: 0.8918918967247009
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.87      1.00      0.93       399
    landslide       1.00      0.62      0.76       156

     accuracy                           0.89       555
    macro avg       0.93      0.81      0.85       555
 w

In [9]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import DenseNet121, VGG16
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用DenseNet121）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = DenseNet121(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用VGG16）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = VGG16(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 1.1681 - accuracy: 0.8523 - 839ms/epoch - 47ms/step

Test accuracy: 0.8522522449493408
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.83      0.99      0.91       399
    landslide       0.97      0.49      0.65       156

     accuracy                           0.85       555
    macro avg       0.90      0.74      0.78       555
 w

In [7]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import InceptionV3, MobileNetV2
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用InceptionV3）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = InceptionV3(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用MobileNetV2）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = MobileNetV2(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.2057 - accuracy: 0.9495 - 637ms/epoch - 35ms/step

Test accuracy: 0.9495495557785034
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.96      0.96      0.96       399
    landslide       0.91      0.91      0.91       156

     accuracy                           0.95       555
    macro avg       0.94      0.94      0.94       555
 w

In [8]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import DenseNet121, VGG16
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用DenseNet121）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = DenseNet121(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用VGG16）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = VGG16(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.4962 - accuracy: 0.8505 - 860ms/epoch - 48ms/step

Test accuracy: 0.8504504561424255
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.97      0.81      0.89       399
    landslide       0.67      0.94      0.78       156

     accuracy                           0.85       555
    macro avg       0.82      0.88      0.83       555
 w

In [10]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG19, MobileNetV2
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用VGG19）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = VGG19(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用MobileNetV2）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = MobileNetV2(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg19/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.3033 - accuracy: 0.8775 - 811ms/epoch - 45ms/step

Test accuracy: 0.8774774670600891
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.94      0.88      0.91       399
    landslide       0.74     

In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import InceptionV3, DenseNet121
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用InceptionV3）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = InceptionV3(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用DenseNet121）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = DenseNet121(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


2024-05-13 13:34:26.356605: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-05-13 13:34:39.622451: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-05-13 13:34:40.061789: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 22134 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4090, pci bus id: 0000:39:00.0, compute capability: 8.9


Epoch 1/50


2024-05-13 13:35:01.083177: I tensorflow/stream_executor/cuda/cuda_dnn.cc:384] Loaded cuDNN version 8101
2024-05-13 13:35:01.742881: W tensorflow/stream_executor/gpu/asm_compiler.cc:230] Falling back to the CUDA driver for PTX compilation; ptxas does not support CC 8.9
2024-05-13 13:35:01.742895: W tensorflow/stream_executor/gpu/asm_compiler.cc:233] Used ptxas at ptxas
2024-05-13 13:35:01.742940: W tensorflow/stream_executor/gpu/redzone_allocator.cc:314] UNIMPLEMENTED: ptxas ptxas too old. Falling back to the driver to compile.
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.
2024-05-13 13:35:02.814401: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.1392 - accuracy: 0.9676 - 1s/epoch - 81ms/step

Test accuracy: 0.9675675630569458
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.99      0.96      0.98       399
    landslide       0.91      0.98      0.94       156

     accuracy                           0.97       555
    macro avg       0.95      0.97      0.96       555
 weighted avg   

In [2]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2, Xception
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用MobileNetV2）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = MobileNetV2(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用Xception）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = Xception(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 6.8317 - accuracy: 0.6577 - 900ms/epoch - 50ms/step

Test accuracy: 0.6576576828956604
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.99      0.53      0.69       399
    landslide       0.45      0.98      0.62       156

     accuracy                           0.66       555
    macro avg       0.72      0.76      0.65       555
 w

In [3]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import EfficientNetB0, ResNet152V2
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用EfficientNetB0）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = EfficientNetB0(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用ResNet152V2）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = ResNet152V2(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152v2_weights_tf_dim_ordering_tf_kernels_notop.h5
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 0.2105 - accuracy: 0.9550 - 1s/epoch - 73ms/step

Test accuracy: 0.954954981803894
Classification Report:
                precision    recall  f1-sc

In [5]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import NASNetMobile, Xception
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Concatenate
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 数据集路径
data_dir = "Bijie_landslide_dataset/Bijie-landslide-dataset"

# 定义数据生成器
datagen = ImageDataGenerator(rescale=1./255)

# 加载数据集
batch_size = 32
image_height = 224
image_width = 224

# 加载landslide数据
landslide_dir = os.path.join(data_dir, "landslide")
landslide_dem_dir = os.path.join(landslide_dir, "dem")
landslide_image_dir = os.path.join(landslide_dir, "image")
landslide_mask_dir = os.path.join(landslide_dir, "mask")

# 加载non-landslide数据
non_landslide_dir = os.path.join(data_dir, "non-landslide")
non_landslide_dem_dir = os.path.join(non_landslide_dir, "dem")
non_landslide_image_dir = os.path.join(non_landslide_dir, "image")

# 读取并预处理数据函数
def read_and_preprocess_data(image_dir, dem_dir=None):
    image_paths = [os.path.join(image_dir, filename) for filename in os.listdir(image_dir) if not filename.startswith('.')]
    images = [tf.keras.preprocessing.image.load_img(img_path, target_size=(image_height, image_width)) for img_path in image_paths]
    images = [tf.keras.preprocessing.image.img_to_array(img) for img in images]
    images = np.array(images)

    if dem_dir:
        dem_paths = [os.path.join(dem_dir, filename) for filename in os.listdir(dem_dir) if not filename.startswith('.')]
        dem_images = [tf.keras.preprocessing.image.load_img(dem_path, target_size=(image_height, image_width)) for dem_path in dem_paths]
        dem_images = [tf.keras.preprocessing.image.img_to_array(dem) for dem in dem_images]
        dem_images = np.array(dem_images)
        return images, dem_images
    else:
        return images

# 读取并预处理landslide数据
landslide_images, landslide_dem_images = read_and_preprocess_data(landslide_image_dir, landslide_dem_dir)
landslide_labels = np.ones(len(landslide_images))

# 读取并预处理non-landslide数据
non_landslide_images = read_and_preprocess_data(non_landslide_image_dir)
non_landslide_labels = np.zeros(len(non_landslide_images))

# 合并数据集
images = np.concatenate((landslide_images, non_landslide_images), axis=0)
dem_images = np.concatenate((landslide_dem_images, np.zeros_like(non_landslide_images)), axis=0)  # 对于non-landslide数据，DEM图像全为0
labels = np.concatenate((landslide_labels, non_landslide_labels), axis=0)

# 切分数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

# 构建图像处理模型（使用NASNetMobile）
input_image = Input(shape=(image_height, image_width, 3))
base_model_image = NASNetMobile(include_top=False, weights='imagenet', input_tensor=input_image)
image_features = GlobalAveragePooling2D()(base_model_image.output)
image_output = Dense(256, activation='relu')(image_features)

# 构建DEM处理模型（使用Xception）
input_dem = Input(shape=(image_height, image_width, 3))
base_model_dem = Xception(include_top=False, weights='imagenet', input_tensor=input_dem)
dem_features = GlobalAveragePooling2D()(base_model_dem.output)
dem_output = Dense(256, activation='relu')(dem_features)

# 将两个模型输出连接
merged = Concatenate()([image_output, dem_output])

# 最终的分类层
output = Dense(1, activation='sigmoid')(merged)

# 定义模型
model = Model(inputs=[input_image, input_dem], outputs=output)

# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 训练模型
model.fit([X_train, dem_images[:len(X_train)]], y_train, batch_size=batch_size, epochs=50, validation_split=0.2)

# 评估模型
test_loss, test_acc = model.evaluate([X_test, dem_images[len(X_train):]], y_test, verbose=2)
print('\nTest accuracy:', test_acc)

# 模型预测
y_pred = model.predict([X_test, dem_images[len(X_train):]])
y_pred_classes = np.round(y_pred)

# 输出性能报告结果
report = classification_report(y_test, y_pred_classes, target_names=['non-landslide', 'landslide'])
print("Classification Report:\n", report)

# 输出其他性能指标
accuracy = np.mean(y_test == y_pred_classes)
precision = np.mean(y_pred_classes[y_test == 1] == 1)
recall = np.mean(y_pred_classes[y_test == 1] == y_test[y_test == 1])
f1_score = 2 * precision * recall / (precision + recall)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/nasnet/NASNet-mobile-no-top.h5
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
18/18 - 1s - loss: 3.7473 - accuracy: 0.5928 - 1s/epoch - 57ms/step

Test accuracy: 0.592792809009552
Classification Report:
                precision    recall  f1-score   support

non-landslide       0.68      0.81      0.74       399
    landslide       0.08      0.04      0.06       156

  