In [9]:
import glob
import os.path
import os
import random
import numpy as np
from sklearn.model_selection import StratifiedKFold
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.layers import Dense, Flatten, BatchNormalization, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from keras.layers import Input,Dense
INPUT_DATA = 'rock_test_a'
VALIDATION_PERCENTAGE = 10  # 保留10%的数据用于验证
TEST_PERCENTAGE = 10  # 保留10%的数据用于测试
LEARNING_RATE = 0.01#学习率
BATCH = 32  # 迭代样本数量

# 数据增强配置
datagen = ImageDataGenerator(
    rescale=1./255,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    vertical_flip=True)

# 载入所有图片路径和标签
def load_images_and_labels(image_lists):
    all_images = []
    all_labels = []
    for label_name, label_lists in image_lists.items():
        for category in ['training', 'testing', 'validation']:
            for image_name in label_lists[category]:
                image_path = os.path.join(INPUT_DATA, label_name, image_name)
                all_images.append(image_path)
                all_labels.append(label_name)
    return np.array(all_images), np.array(all_labels)



def create_rock_classification_model(num_classes):

    # 输入层
    input_tensor = Input(shape=(150, 150, 3))

    # 预训练的InceptionV3模型作为特征提取器，不包括顶部的全连接层
    base_model = InceptionV3(include_top=False, weights='imagenet', input_tensor=input_tensor)
    base_model.trainable = False  # 冻结预训练模型的权重

    # 添加自定义层
    x = base_model.output
    x = GlobalAveragePooling2D()(x)  # 添加全局平均池化层
    x = BatchNormalization()(x)      # 添加批量归一化层
    x = Dense(512, activation='relu', kernel_regularizer=l2(0.01))(x)  # 添加L2权重正则化
    x = Dropout(0.5)(x)              # 添加Dropout层，减少过拟合
    x = BatchNormalization()(x)
    x = Dense(512, activation='relu', kernel_regularizer=l2(0.01))(x)  # 再次添加L2权重正则化
    x = Dropout(0.5)(x)
    
    # 输出层
    predictions = Dense(num_classes, activation='softmax', kernel_regularizer=l2(0.01))(x)  # 添加L2权重正则化

    # 构建最终模型
    model = Model(inputs=input_tensor, outputs=predictions)

    # 编译模型
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# 打印模型摘要
    model.summary()

    return model


# 用于加载和预处理图片的函数
def load_and_preprocess_images(image_paths, target_size):
    images = [img_to_array(load_img(img, target_size=target_size)) for img in image_paths]
    return np.array(images)

# 将数据分割为交叉验证的训练和测试集
def get_data_for_fold(fold_num, skf, all_images, all_labels, num_classes):
    train_indices, test_indices = list(skf.split(all_images, all_labels))[fold_num]
    train_images = [all_images[i] for i in train_indices]
    test_images = [all_images[i] for i in test_indices]
    train_labels = to_categorical([all_labels[i] for i in train_indices], num_classes=num_classes)
    test_labels = to_categorical([all_labels[i] for i in test_indices], num_classes=num_classes)

    # 加载和预处理图片
    train_images = load_and_preprocess_images(train_images, target_size=(150, 150))
    test_images = load_and_preprocess_images(test_images, target_size=(150, 150))

    return train_images, train_labels, test_images, test_labels

# 使用交叉验证进行模型训练和评估的函数
def cross_validate(image_lists, num_classes, k=5):
    all_images, all_labels = load_images_and_labels(image_lists)
    label_to_int = {label: index for index, label in enumerate(np.unique(all_labels))}
    all_labels = np.array([label_to_int[label] for label in all_labels])

    skf = StratifiedKFold(n_splits=k, shuffle=True)
    fold_number = 0
    for fold in range(k):
        # 获取当前fold的数据
        train_images, train_labels, test_images, test_labels = get_data_for_fold(fold, skf, all_images, all_labels, num_classes)

        # 创建数据生成器
        train_datagen = datagen.flow(train_images, train_labels, batch_size=BATCH)
        test_datagen = ImageDataGenerator(rescale=1./255).flow(test_images, test_labels, batch_size=BATCH)

        # 创建新的模型实例
        model = create_rock_classification_model(num_classes)

        # 训练模型
        model.fit(
        train_datagen,
        steps_per_epoch=len(train_images) // BATCH,
        epochs=20,
        validation_data=test_datagen,
        validation_steps=len(test_images) // BATCH)

       # 在测试集上进行预测
        test_predictions = model.predict(test_images)  # 直接使用测试集图像，不再使用 test_datagen
        predicted_labels = np.argmax(test_predictions, axis=1)
        true_labels = np.argmax(test_labels, axis=1)

        # 计算混淆矩阵和每个类别的准确率
        conf_matrix = confusion_matrix(true_labels, predicted_labels)
        class_report = classification_report(true_labels, predicted_labels)

        print(f"Confusion Matrix for fold {fold_number}:\n", conf_matrix)
        print(f"Classification Report for fold {fold_number}:\n", class_report)

        # 计算整体准确率
        overall_accuracy = np.sum(np.diag(conf_matrix)) / np.sum(conf_matrix)
        print(f'Overall Accuracy for fold {fold_number}: {overall_accuracy}')

        fold_number += 1
        model.save(f'rock_fold_{fold_number}.h5')





def create_image_lists(validation_pct, test_pct):
    # 初始化一个空字典以存储图像列表
    result = {}
    
    # 获取指定输入目录中所有子目录（文件夹）的列表
    sub_dirs = [x[0] for x in os.walk(INPUT_DATA)]
    
    # 标志，用于检查当前子目录是否是根目录
    is_root_dir = True
    
    # 遍历每个子目录
    for sub_dir in sub_dirs:
        # 跳过根目录
        if is_root_dir:
            is_root_dir = False
            continue
        
        # 定义要考虑的文件扩展名列表（图片）
        extensions = ['jpg', 'jpeg', 'JPG', 'JPEG']
        
        # 初始化一个空列表以存储文件路径
        file_list = []
        
        # 从当前子目录中提取基本名称
        dir_name = os.path.basename(sub_dir)
        
        # 遍历每个文件扩展名
        for extension in extensions:
            # 创建文件glob模式，以匹配具有当前扩展名的文件
            file_glob = os.path.join(sub_dir, '*.' + extension)
            
            # 使用glob获取与模式匹配的文件路径列表
            file_list.extend(glob.glob(file_glob))
        
        # 检查当前子目录中是否有任何文件
        if not file_list:
            continue
        
        # 从当前子目录中提取标签名称
        label_name = os.path.basename(sub_dir)
        
        # 初始化列表以存储训练、测试和验证图像
        training_images = []
        testing_images = []
        validation_images = []
        
        # 遍历文件列表中的每个文件
        for file_name in file_list:
            # 提取文件的基本名称
            base_name = os.path.basename(file_name)
            
            # 生成介于0和99之间的随机数
            chance = np.random.randint(100)
            
            # 根据随机数将文件分配到训练、测试或验证集中
            if chance < validation_pct:
                validation_images.append(base_name)
            elif chance < (test_pct + validation_pct):
                testing_images.append(base_name)
            else:
                training_images.append(base_name)
        
        # 将结果存储在当前标签的字典中
        result[label_name] = {
            'dir': dir_name,
            'training': training_images,
            'testing': testing_images,
            'validation': validation_images,
        }
    
    # 返回包含图像列表的最终字典
    return result



if __name__ == '__main__':
    # 读取所有的图片并创建图像列表
    image_lists = create_image_lists(VALIDATION_PERCENTAGE, TEST_PERCENTAGE)
    n_classes = len(image_lists.keys())
    # 应用交叉验证来训练和评估模型
    cross_validate(image_lists, n_classes, k=5)


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_5 (InputLayer)        [(None, 150, 150, 3)]        0         []                            
                                                                                                  
 conv2d_376 (Conv2D)         (None, 74, 74, 32)           864       ['input_5[0][0]']             
                                                                                                  
 batch_normalization_381 (B  (None, 74, 74, 32)           96        ['conv2d_376[0][0]']          
 atchNormalization)                                                                               
                                                                                                  
 activation_376 (Activation  (None, 74, 74, 32)           0         ['batch_normalization_38

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  saving_api.save_model(


Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_6 (InputLayer)        [(None, 150, 150, 3)]        0         []                            
                                                                                                  
 conv2d_470 (Conv2D)         (None, 74, 74, 32)           864       ['input_6[0][0]']             
                                                                                                  
 batch_normalization_477 (B  (None, 74, 74, 32)           96        ['conv2d_470[0][0]']          
 atchNormalization)                                                                               
                                                                                                  
 activation_470 (Activation  (None, 74, 74, 32)           0         ['batch_normalization_47

KeyboardInterrupt: 