In [None]:
# 安装必要库
% pip install opencv-python scikit-learn matplotlib numpy
import cv2
import numpy as np
import os
import random
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, ConfusionMatrixDisplay
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, BatchNormalization, ReLU, Dropout, Flatten, Dense
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#2 全局配置（适配小数据集）
class Config:
    def __init__(self):
        self.data_dir = "/content/drive/MyDrive/fingerprint-recognition/data"
        self.save_dir = "/content/fingerprint_results"
        self.img_size = (128, 128)
        self.feat_dim = 128  # 特征向量维度
        self.sample_ratio = 1.0  # 小数据集：禁用抽样（改为100%使用）
        self.epochs = 15
        self.batch_size = 8  # 小数据集：减小批大小
        self.learning_rate = 0.0001
        self.margin = 0.2
        self.match_threshold = 0.85
        self.feat_db_path = os.path.join(self.save_dir, "fingerprint_feat_db.npz")

# 初始化
config = Config()
os.makedirs(config.save_dir, exist_ok=True)

In [None]:

# 1. 解析数据集：读取Google Drive中data文件夹的真实指纹数据
def parse_and_group_fingerprints(data_dir):
    finger_groups = {}
    # 遍历data文件夹下的所有图片（支持子文件夹/直接放图片两种结构）
    for root, dirs, files in os.walk(data_dir):
        for file in files:
            # 只处理指纹图片（支持bmp/png/jpg格式）
            if file.lower().endswith((".bmp", ".png", ".jpg", ".jpeg")):
                # 方案1：按子文件夹划分指纹ID（推荐）
                # 比如 data/指纹ID1/1.png → ID=指纹ID1
                if root != data_dir:
                    finger_id = os.path.basename(root)
                # 方案2：按文件名划分（无ufer文件夹时）
                # 比如 data/finger_001_1.png → ID=finger_001
                else:
                    fname = os.path.splitext(file)[0]
                    finger_id = "_".join(fname.split("_")[:-1]) if "_" in fname else fname
                
                # 保存真实图片路径
                img_path = os.path.join(root, file)
                if finger_id not in finger_groups:
                    finger_groups[finger_id] = []
                finger_groups[finger_id].append(img_path)
    
    # 过滤：只保留有≥1个样本的指纹ID
    finger_groups = {fid: paths for fid, paths in finger_groups.items() if len(paths) >= 1}
    
    # 打印真实数据信息（关键！看是否读取到你的数据）
    print(f"解析完成！真实指纹ID数：{len(finger_groups)}")
    # 可选：打印每个ID的样本数，确认是否读取成功
    for fid, paths in finger_groups.items():
        print(f"指纹ID {fid}：{len(paths)} 个样本")
    return finger_groups

# 2. 图像预处理（不变）
def preprocess_img(img_path, img_size):
    try:
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if img is None:
            raise Exception(f"无法读取图片：{img_path}")
        img = cv2.resize(img, img_size)
        img = img / 255.0
        img = np.expand_dims(img, axis=-1)
        return img
    except Exception as e:
        print(f"预处理失败：{e}")
        return None

# 3. 三元组生成器（保留Gemini修复的__getitem__，但恢复读取真实图片）
class TripletGenerator(tf.keras.utils.Sequence): # 注意：要加tf.keras.utils.，否则会报错
    def __init__(self, finger_groups, img_size, batch_size):
        self.finger_groups = finger_groups
        self.img_size = img_size
        self.batch_size = batch_size
        self.finger_ids = list(finger_groups.keys())
        self.valid_triplets = self._generate_valid_triplets()
        self.on_epoch_end()

    def _generate_valid_triplets(self):
        triplets = []
        # 处理无数据的情况（避免报错）
        if len(self.finger_ids) == 0:
            return [("dummy", "dummy", "dummy")]
        
        # 遍历真实指纹ID生成三元组
        for anchor_id in self.finger_ids:
            anchor_paths = self.finger_groups[anchor_id]
            if len(anchor_paths) == 0:
                continue
            # 锚点样本
            anchor_path = random.choice(anchor_paths)
            # 正样本（同一ID）
            pos_path = random.choice(anchor_paths) if len(anchor_paths)>=1 else anchor_path
            # 负样本（不同ID）
            neg_ids = [id for id in self.finger_ids if id != anchor_id]
            if len(neg_ids) == 0:
                neg_path = anchor_path # 无其他ID时用自身（仅临时）
            else:
                neg_id = random.choice(neg_ids)
                neg_path = random.choice(self.finger_groups[neg_id])
            
            triplets.append((anchor_path, pos_path, neg_path))
        
        # 确保至少有batch_size个三元组
        if len(triplets) < self.batch_size:
            triplets += triplets[:self.batch_size - len(triplets)]
        return triplets

    def __len__(self):
        return max(1, len(self.valid_triplets) // self.batch_size)

    def __getitem__(self, idx):
        batch_triplets = self.valid_triplets[idx*self.batch_size : (idx+1)*self.batch_size]
        if len(batch_triplets) < self.batch_size:
            batch_triplets += self.valid_triplets[:self.batch_size - len(batch_triplets)]
        
        anchors, positives, negatives = [], [], []
        for a_path, p_path, n_path in batch_triplets:
            # 核心修改：读取真实图片（而非随机图）
            if a_path != "dummy":
                a_img = preprocess_img(a_path, self.img_size)
                p_img = preprocess_img(p_path, self.img_size)
                n_img = preprocess_img(n_path, self.img_size)
            else:
                # 无数据时用随机图兜底
                a_img = np.random.rand(*self.img_size, 1).astype(np.float32)
                p_img = np.random.rand(*self.img_size, 1).astype(np.float32)
                n_img = np.random.rand(*self.img_size, 1).astype(np.float32)
            
            anchors.append(a_img)
            positives.append(p_img)
            negatives.append(n_img)
        
        # 转为张量
        anchors = tf.convert_to_tensor(anchors, dtype=tf.float32)
        positives = tf.convert_to_tensor(positives, dtype=tf.float32)
        negatives = tf.convert_to_tensor(negatives, dtype=tf.float32)
        return (anchors, positives, negatives), tf.zeros((len(anchors),), dtype=tf.float32)

    def on_epoch_end(self):
        random.shuffle(self.valid_triplets)

# 读取真实数据（你的Google Drive中的data文件夹）
finger_groups = parse_and_group_fingerprints(config.data_dir)
train_generator = TripletGenerator(finger_groups, config.img_size, config.batch_size)
print(f"三元组生成器初始化完成！每轮批次数：{len(train_generator)}")

In [None]:
#4
# 基础CNN特征提取骨干网络（修正Input形状）
def build_backbone(img_size, feat_dim):
    # 核心修复：Input形状需包含通道维度（*img_size, 1）
    inputs = Input(shape=(*img_size, 1))
    x = Conv2D(16, (3,3), padding="same")(inputs)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = MaxPooling2D((2,2))(x)

    x = Conv2D(32, (3,3), padding="same")(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = MaxPooling2D((2,2))(x)

    x = Conv2D(64, (3,3), padding="same")(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = MaxPooling2D((2,2))(x)

    x = Flatten()(x)
    x = Dense(128)(x)
    x = ReLU()(x)
    x = Dropout(0.3)(x)
    outputs = Dense(feat_dim)(x)
    # 核心修复：将tf.nn.l2_normalize封装在Lambda层中
    outputs = Lambda(lambda x: tf.nn.l2_normalize(x, axis=1), name='l2_normalize')(outputs)
    return Model(inputs, outputs, name="fingerprint_backbone")

# 核心修复：用Keras层包装损失计算
class TripletLossLayer(tf.keras.layers.Layer):
    def __init__(self, margin=0.2, **kwargs):
        super().__init__(**kwargs)
        self.margin = margin

    def call(self, inputs):
        anchor_feat, pos_feat, neg_feat = inputs
        # 计算距离（用Keras操作）
        pos_dist = tf.reduce_sum(tf.square(anchor_feat - pos_feat), axis=1)
        neg_dist = tf.reduce_sum(tf.square(anchor_feat - neg_feat), axis=1)
        # 三元组损失
        loss = tf.maximum(pos_dist - neg_dist + self.margin, 0.0)
        self.add_loss(tf.reduce_mean(loss))  # 将损失添加到模型
        return anchor_feat  # 仅返回锚点特征（无实际意义）

# 构建三元组模型
backbone = build_backbone(config.img_size, config.feat_dim)
anchor_input = Input(shape=(*config.img_size, 1), name="anchor_input")
pos_input = Input(shape=(*config.img_size, 1), name="pos_input")
neg_input = Input(shape=(*config.img_size, 1), name="neg_input")

# 提取特征
anchor_feat = backbone(anchor_input)
pos_feat = backbone(pos_input)
neg_feat = backbone(neg_input)

# 核心修复：用自定义层计算损失
outputs = TripletLossLayer(margin=config.margin)([anchor_feat, pos_feat, neg_feat])

# 编译模型
triplet_model = Model(inputs=[anchor_input, pos_input, neg_input], outputs=outputs)
triplet_model.compile(optimizer=Adam(learning_rate=config.learning_rate))

# 打印模型结构
backbone.summary()

In [None]:
#5
# 回调函数
callbacks = [
    ModelCheckpoint(
        os.path.join(config.save_dir, "best_backbone.weights.h5"), # 核心修复：文件名改为.weights.h5
        monitor="loss",
        save_best_only=True,
        save_weights_only=True
    ),
    EarlyStopping(monitor="loss", patience=3, verbose=1)
]

# 开始训练
print("开始训练特征提取器...")
history = triplet_model.fit(
    train_generator,
    epochs=config.epochs,
    callbacks=callbacks,
    verbose=1
)

# 核心修复：
# 1. 首先将保存的最佳权重加载回 tripet_model
triplet_model.load_weights(os.path.join(config.save_dir, "best_backbone.weights.h5"))
# 2. 从加载了最佳权重的 tripet_model 中，获取 backbone 层的权重
best_backbone_weights = triplet_model.get_layer("fingerprint_backbone").get_weights()
# 3. 将这些权重设置到独立的 backbone 实例上
backbone.set_weights(best_backbone_weights)

# 保存最终的 backbone 模型
backbone.save(os.path.join(config.save_dir, "fingerprint_feat_extractor.h5"))
print("模型训练完成！")

# 绘制损失曲线
plt.plot(history.history["loss"], label="训练损失")
plt.title("三元组损失曲线")
plt.xlabel("轮数")
plt.ylabel("损失")
plt.legend()
plt.savefig(os.path.join(config.save_dir, "triplet_loss_curve.png"))
plt.show()

In [None]:
#6
# 提取单张指纹特征
def extract_fingerprint_feat(img_path, feat_extractor, img_size):
    img = preprocess_img(img_path, img_size)
    if img is None:
        return None
    img = np.expand_dims(img, axis=0)
    feat = feat_extractor.predict(img, verbose=0)[0]
    return feat

# 构建特征库
def build_feat_database(finger_groups, feat_extractor, config):
    feat_db = {}
    print("构建特征库...")
    for fid, img_paths in finger_groups.items():
        base_img_path = img_paths[0]
        feat = extract_fingerprint_feat(base_img_path, feat_extractor, config.img_size)
        if feat is not None:
            feat_db[fid] = feat
    # 保存
    np.savez(config.feat_db_path, **feat_db)
    print(f"特征库保存完成！共{len(feat_db)}个指纹")
    return feat_db

# 执行
feat_db = build_feat_database(finger_groups, backbone, config)

In [None]:
# 加载特征库
def load_feat_database(feat_db_path):
    if not os.path.exists(feat_db_path):
        print("特征库不存在！")
        return {}
    data = np.load(feat_db_path)
    feat_db = {key: data[key] for key in data.files}
    return feat_db

# 比对函数
def match_fingerprint(img_path, feat_extractor, feat_db, config):
    # 提取输入特征
    input_feat = extract_fingerprint_feat(img_path, feat_extractor, config.img_size)
    if input_feat is None:
        print("输入指纹特征提取失败！")
        return None, 0.0, False
    
    # 空特征库处理
    if len(feat_db) == 0:
        print("特征库为空！")
        return None, 0.0, False
    
    # 计算相似度
    match_results = []
    for fid, db_feat in feat_db.items():
        sim = cosine_similarity([input_feat], [db_feat])[0][0]
        match_results.append((fid, sim))
    
    # 排序并判断
    match_results.sort(key=lambda x: x[1], reverse=True)
    best_fid, best_sim = match_results[0]
    is_match = best_sim >= config.match_threshold
    
    # 输出结果
    print("\n===== 比对结果 =====")
    print(f"输入指纹：{img_path}")
    print(f"最高相似度：{best_sim:.4f}（阈值：{config.match_threshold}）")
    if is_match:
        print(f"匹配成功！指纹ID：{best_fid}")
    else:
        print("匹配失败：未找到相似指纹")
    return best_fid, best_sim, is_match

# ========== 测试比对 ==========
# 替换为你的测试指纹路径
test_img_path = "/content/fingerprint_dataset/finger_001_2.png"

# 加载特征库
feat_db = load_feat_database(config.feat_db_path)
# 执行比对
match_id, similarity, is_match = match_fingerprint(test_img_path, backbone, feat_db, config)