##  ivector

In [None]:
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
import json
import optuna

# 解析 ivector.txt 文件
def parse_ivector_file(filepath):
    data = []
    with open(filepath, 'r') as file:
        for line in file:
            parts = line.strip().split(maxsplit=1)
            name_age = parts[0].split('/')
            subject = name_age[0]
            features = list(map(float, parts[1].strip('[]').split()))
            data.append((subject, features))
    return pd.DataFrame(data, columns=['Subject', 'Features'])

# 读取 ivector 文件
ivector_df = parse_ivector_file('../generated_ivector.txt')
# ivector_df = parse_ivector_file('./ivector_feature/ivector.txt')
# 读取 new_kin_relationship.csv
new_kin_relationships_file = pd.read_csv('./new_kin_relationships.csv')
new_kin_relationships_file.columns = ['Subject1', 'Subject2', 'Relationship1', 'Relationship2']
new_kin_relationships_file['Subject1'] = new_kin_relationships_file['Subject1'].str.strip()
new_kin_relationships_file['Subject2'] = new_kin_relationships_file['Subject2'].str.strip()

# 读取 genders.csv 文件并合并性别信息
genders_df = pd.read_csv('./new_genders.csv')
genders_df.columns = ['Subject', 'Gender']
genders_df['Subject'] = genders_df['Subject'].str.strip()

# 合并性别信息到ivector_df
ivector_df = ivector_df.merge(genders_df, on='Subject', how='inner')

# 合并数据
merged_df1 = new_kin_relationships_file.merge(ivector_df, left_on='Subject1', right_on='Subject', how='inner')
merged_df = merged_df1.merge(ivector_df, left_on='Subject2', right_on='Subject', how='inner', suffixes=('_1', '_2'))

# 打印合并后的数据
print(merged_df.head())
feature_lengths = merged_df['Features_1'].apply(len)
print("不同长度的特征数组数目：")
print(feature_lengths.value_counts())

# 按subject分割数据集
subjects = merged_df['Subject_1'].unique()
train_subjects, test_subjects = train_test_split(subjects, test_size=0.2, random_state=42)
train_subjects, val_subjects = train_test_split(train_subjects, test_size=0.1, random_state=42)
print(len(train_subjects),len(val_subjects),len(test_subjects))
train = merged_df[merged_df['Subject_1'].isin(train_subjects)]
val = merged_df[merged_df['Subject_1'].isin(val_subjects)]
test = merged_df[merged_df['Subject_1'].isin(test_subjects)]


# 标准化特征
scaler = StandardScaler()
train_features_1 = scaler.fit_transform(np.vstack(train['Features_1']))
train_features_2 = scaler.fit_transform(np.vstack(train['Features_2']))
val_features_1 = scaler.transform(np.vstack(val['Features_1']))
val_features_2 = scaler.transform(np.vstack(val['Features_2']))
test_features_1 = scaler.transform(np.vstack(test['Features_1']))
test_features_2 = scaler.transform(np.vstack(test['Features_2']))

class KinshipDataset(Dataset):
    def __init__(self, anchor_features, positive_features, relationships, ivector_df, scaler):
        self.anchor_features = anchor_features
        self.positive_features = positive_features
        self.relationships = relationships
        self.ivector_df = ivector_df
        self.scaler = scaler
        self.subjects = list(set(self.relationships['Subject1']).union(set(self.relationships['Subject2'])))
        self.subject_gender = dict(zip(self.ivector_df['Subject'], self.ivector_df['Gender']))

    def __len__(self):
        return len(self.anchor_features)

    def __getitem__(self, idx):
        anchor = self.anchor_features[idx]
        positive = self.positive_features[idx]
        anchor_subject = self.relationships.iloc[idx]['Subject1']
        positive_subject = self.relationships.iloc[idx]['Subject2']
        positive_gender = self.subject_gender[positive_subject]

        # 随机选择一个负样本，确保性别相同
        while True:
            negative_subject = np.random.choice(self.subjects)
            if (negative_subject != anchor_subject and 
                negative_subject != positive_subject and 
                self.subject_gender[negative_subject] == positive_gender):
                break
        
        negative_features = self.ivector_df[self.ivector_df['Subject'] == negative_subject]['Features'].values[0]
        negative = self.scaler.transform([negative_features])[0]

        return (
            torch.tensor(anchor, dtype=torch.float32), 
            torch.tensor(positive, dtype=torch.float32), 
            torch.tensor(negative, dtype=torch.float32),
            anchor_subject,
            positive_subject,
            negative_subject
        )

# 准备数据加载器
train_dataset = KinshipDataset(train_features_1, train_features_2, train, ivector_df, scaler)
val_dataset = KinshipDataset(val_features_1, val_features_2, val, ivector_df, scaler)
test_dataset = KinshipDataset(test_features_1, test_features_2, test, ivector_df, scaler)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

print(len(train_dataset),len(val_dataset),len(test_dataset))

# model and loss function 
# improved model
class TripletNet(nn.Module):
    def __init__(self, input_dim):
        super(TripletNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.dropout = nn.Dropout(0.5)  
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(256, 128)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.dropout(x)  
        x = self.relu(x)
        x = self.fc2(x)
        return x

class TripletLoss(nn.Module):
    def __init__(self, margin=1.0, lambda_reg=0.01): 
        super(TripletLoss, self).__init__()
        self.margin = margin
        self.lambda_reg = lambda_reg

    def forward(self, anchor, positive, negative):
        pos_dist = torch.nn.functional.pairwise_distance(anchor, positive)
        neg_dist = torch.nn.functional.pairwise_distance(anchor, negative)
        triplet_loss = torch.relu(pos_dist - neg_dist + self.margin).mean()

        reg_term = (anchor.norm(2) + positive.norm(2) + negative.norm(2)).mean()
        loss = triplet_loss + self.lambda_reg * reg_term
        return loss


def evaluate_model(model, criterion, data_loader):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for data in data_loader:
            anchor, positive, negative = data[:3]  
            anchor_out = model(anchor)
            positive_out = model(positive)
            negative_out = model(negative)
            loss = criterion(anchor_out, positive_out, negative_out)
            total_loss += loss.item()
            pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
            neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
            correct += (pos_dist < neg_dist).sum().item()
            total += anchor.size(0)
    avg_loss = total_loss / len(data_loader)
    accuracy = correct / total
    return avg_loss, accuracy

model = TripletNet(input_dim=train_features_1.shape[1])
criterion = TripletLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0
    for anchor, positive, negative, _, _, _ in train_loader:
        optimizer.zero_grad()
        anchor_out = model(anchor)
        positive_out = model(positive)
        negative_out = model(negative)
        loss = criterion(anchor_out, positive_out, negative_out)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
        neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
        correct += (pos_dist < neg_dist).sum().item()
        total += anchor.size(0)
    
    avg_train_loss = total_loss / len(train_loader)
    train_accuracy = correct / total
    

    val_loss, val_accuracy = evaluate_model(model, criterion, val_loader)
    test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
    
    print(f'Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
    print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
    print(f'Epoch {epoch+1}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
print(f'Test Loss: {test_loss:.4f}')
print(f'Overall Test Accuracy: {test_accuracy * 100:.2f}%')

def evaluate_accuracy(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            anchor, positive, negative = data[:3]  
            anchor_out = model(anchor)
            positive_out = model(positive)
            negative_out = model(negative)
            pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
            neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
            correct += (pos_dist < neg_dist).sum().item()
            total += anchor.size(0)
    accuracy = correct / total
    return accuracy
accuracy = evaluate_accuracy(model, test_loader)
print(f'Overall Accuracy: {accuracy * 100:.2f}%')

relationship_accuracies = {
    'BB': ['Brother', 'Brother'],
    'SS': ['Sister', 'Sister'],
    'BS': ['Brother', 'Sister'],
    'FD': ['Father', 'Daughter'],
    'FS': ['Father', 'Son'],
    'MD': ['Mother', 'Daughter'],
    'MS': ['Mother', 'Son']
}
for key, value in relationship_accuracies.items():
    relationship_df = test[(test['Relationship1'] == value[0]) & (test['Relationship2'] == value[1])]
    if not relationship_df.empty:
        relationship_dataset = KinshipDataset(
            scaler.transform(np.vstack(relationship_df['Features_1'])),
            scaler.transform(np.vstack(relationship_df['Features_2'])),
            relationship_df,
            ivector_df,
            scaler
        )
        relationship_loader = DataLoader(relationship_dataset, batch_size=32, shuffle=True)
        relationship_accuracy = evaluate_accuracy(model, relationship_loader)
        relationship_accuracies[key] = relationship_accuracy * 100

print("Accuracy by Relationship:")
for relationship, acc in relationship_accuracies.items():
    print(f"{relationship}: {acc:.2f}%")


sample_idx = 0  
anchor, positive, negative, anchor_subject, positive_subject, negative_subject = test_dataset[sample_idx]
anchor_out = model(anchor.unsqueeze(0))
positive_out = model(positive.unsqueeze(0))
negative_out = model(negative.unsqueeze(0))


pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)

print(f"Anchor Subject: {anchor_subject}")
print(f"Positive Subject: {positive_subject}")
print(f"Negative Subject: {negative_subject}")
print(f"Positive distance: {pos_dist.item()}")
print(f"Negative distance: {neg_dist.item()}")
print(f"Correct prediction: {pos_dist.item() < neg_dist.item()}") 

# original model
# class TripletNet(nn.Module):
#     def __init__(self, input_dim):
#         super(TripletNet, self).__init__()
#         self.fc1 = nn.Linear(input_dim, 256)
#         self.bn1 = nn.BatchNorm1d(256)
#         self.relu = nn.ReLU()
#         self.fc2 = nn.Linear(256, 128)

#     def forward(self, x):
#         x = self.fc1(x)
#         x = self.bn1(x)
#         x = self.relu(x)
#         x = self.fc2(x)
#         return x

# class TripletLoss(nn.Module):
#     def __init__(self, margin=1.0, lambda_reg=0.001):
#         super(TripletLoss, self).__init__()
#         self.margin = margin
#         self.lambda_reg = lambda_reg

#     def forward(self, anchor, positive, negative):
#         pos_dist = torch.nn.functional.pairwise_distance(anchor, positive)
#         neg_dist = torch.nn.functional.pairwise_distance(anchor, negative)
#         triplet_loss = torch.relu(pos_dist - neg_dist + self.margin).mean()

#         reg_term = (anchor.norm(2) + positive.norm(2) + negative.norm(2)).mean()
#         loss = triplet_loss + self.lambda_reg * reg_term
#         return loss



# model = TripletNet(input_dim=train_features_1.shape[1])
# criterion = TripletLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# # 训练模型
# num_epochs = 5
# for epoch in range(num_epochs):
#     model.train()
#     total_loss = 0.0
#     correct = 0
#     total = 0
#     for anchor, positive, negative, _, _, _ in train_loader:
#         optimizer.zero_grad()
#         anchor_out = model(anchor)
#         positive_out = model(positive)
#         negative_out = model(negative)
#         loss = criterion(anchor_out, positive_out, negative_out)
#         loss.backward()
#         optimizer.step()
        
#         total_loss += loss.item()
#         pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
#         neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
#         correct += (pos_dist < neg_dist).sum().item()
#         total += anchor.size(0)
    
#     avg_train_loss = total_loss / len(train_loader)
#     train_accuracy = correct / total
    

#     val_loss, val_accuracy = evaluate_model(model, criterion, val_loader)
#     test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
    
#     print(f'Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
#     print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
#     print(f'Epoch {epoch+1}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')


# test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
# print(f'Test Loss: {test_loss:.4f}')
# print(f'Overall Test Accuracy: {test_accuracy * 100:.2f}%')

# def evaluate_accuracy(model, test_loader):
#     model.eval()
#     correct = 0
#     total = 0
#     with torch.no_grad():
#         for data in test_loader:
#             anchor, positive, negative = data[:3]  
#             anchor_out = model(anchor)
#             positive_out = model(positive)
#             negative_out = model(negative)
#             pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
#             neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
#             correct += (pos_dist < neg_dist).sum().item()
#             total += anchor.size(0)
#     accuracy = correct / total
#     return accuracy
# accuracy = evaluate_accuracy(model, test_loader)
# print(f'Overall Accuracy: {accuracy * 100:.2f}%')
#
# relationship_accuracies = {
#     'BB': ['Brother', 'Brother'],
#     'SS': ['Sister', 'Sister'],
#     'BS': ['Brother', 'Sister'],
#     'FD': ['Father', 'Daughter'],
#     'FS': ['Father', 'Son'],
#     'MD': ['Mother', 'Daughter'],
#     'MS': ['Mother', 'Son']
# }
# for key, value in relationship_accuracies.items():
#     relationship_df = test[(test['Relationship1'] == value[0]) & (test['Relationship2'] == value[1])]
#     if not relationship_df.empty:
#         relationship_dataset = KinshipDataset(
#             scaler.transform(np.vstack(relationship_df['Features_1'])),
#             scaler.transform(np.vstack(relationship_df['Features_2'])),
#             relationship_df,
#             ivector_df,
#             scaler
#         )
#         relationship_loader = DataLoader(relationship_dataset, batch_size=32, shuffle=True)
#         relationship_accuracy = evaluate_accuracy(model, relationship_loader)
#         relationship_accuracies[key] = relationship_accuracy * 100

# print("Accuracy by Relationship:")
# for relationship, acc in relationship_accuracies.items():
#     print(f"{relationship}: {acc:.2f}%")

# sample_idx = 4000  
# anchor, positive, negative, anchor_subject, positive_subject, negative_subject = test_dataset[sample_idx]
# anchor_out = model(anchor.unsqueeze(0))
# positive_out = model(positive.unsqueeze(0))
# negative_out = model(negative.unsqueeze(0))

# pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
# neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)

# print(f"Anchor Subject: {anchor_subject}")
# print(f"Positive Subject: {positive_subject}")
# print(f"Negative Subject: {negative_subject}")
# print(f"Positive distance: {pos_dist.item()}")
# print(f"Negative distance: {neg_dist.item()}")
# print(f"Correct prediction: {pos_dist.item() < neg_dist.item()}") 

## xvector

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


def parse_xvector_file(filepath):
    data = []
    with open(filepath, 'r') as file:
        for line in file:
            parts = line.strip().split(maxsplit=1)
            name = parts[0].rsplit('-', 1)[0]  
            features = parts[1].strip('[]').split()
            features = np.array([float(x) for x in features])
            data.append((name, features))
    return pd.DataFrame(data, columns=['Subject', 'Features'])

xvector_df = parse_xvector_file('../generated_xvector.txt')
# xvector_df = parse_xvector_file('./xvector_feature/xvector.txt')
# 读取 new_kin_relationship.csv 文件并清洗数据
new_kin_relationships_file = pd.read_csv('./new_kin_relationships.csv')
new_kin_relationships_file.columns = ['Subject1', 'Subject2', 'Relationship1', 'Relationship2']
new_kin_relationships_file['Subject1'] = new_kin_relationships_file['Subject1'].str.strip()
new_kin_relationships_file['Subject2'] = new_kin_relationships_file['Subject2'].str.strip()

# 读取 genders.csv 文件并合并性别信息
genders_df = pd.read_csv('./new_genders.csv')
genders_df.columns = ['Subject', 'Gender']
genders_df['Subject'] = genders_df['Subject'].str.strip()

# 合并性别信息到xvector_df
xvector_df = xvector_df.merge(genders_df, on='Subject', how='inner')

# 合并数据
merged_df1 = new_kin_relationships_file.merge(xvector_df, left_on='Subject1', right_on='Subject', how='inner')
merged_df = merged_df1.merge(xvector_df, left_on='Subject2', right_on='Subject', how='inner', suffixes=('_1', '_2'))

# 打印合并后的数据
print(merged_df.head())
feature_lengths = merged_df['Features_1'].apply(len)
print("不同长度的特征数组数目：")
print(feature_lengths.value_counts())


subjects = merged_df['Subject_1'].unique()
train_subjects, test_subjects = train_test_split(subjects, test_size=0.2, random_state=42)
train_subjects, val_subjects = train_test_split(train_subjects, test_size=0.1, random_state=42)
print(len(train_subjects),len(val_subjects),len(test_subjects))
train = merged_df[merged_df['Subject_1'].isin(train_subjects)]
val = merged_df[merged_df['Subject_1'].isin(val_subjects)]
test = merged_df[merged_df['Subject_1'].isin(test_subjects)]


# 标准化特征
scaler = StandardScaler()
train_features_1 = scaler.fit_transform(np.vstack(train['Features_1']))
train_features_2 = scaler.fit_transform(np.vstack(train['Features_2']))
val_features_1 = scaler.transform(np.vstack(val['Features_1']))
val_features_2 = scaler.transform(np.vstack(val['Features_2']))
test_features_1 = scaler.transform(np.vstack(test['Features_1']))
test_features_2 = scaler.transform(np.vstack(test['Features_2']))

class KinshipDataset(Dataset):
    def __init__(self, anchor_features, positive_features, relationships, xvector_df, scaler):
        self.anchor_features = anchor_features
        self.positive_features = positive_features
        self.relationships = relationships
        self.xvector_df = xvector_df
        self.scaler = scaler
        self.subjects = list(set(self.relationships['Subject1']).union(set(self.relationships['Subject2'])))
        self.subject_gender = dict(zip(self.xvector_df['Subject'], self.xvector_df['Gender']))

    def __len__(self):
        return len(self.anchor_features)

    def __getitem__(self, idx):
        anchor = self.anchor_features[idx]
        positive = self.positive_features[idx]
        anchor_subject = self.relationships.iloc[idx]['Subject1']
        positive_subject = self.relationships.iloc[idx]['Subject2']
        positive_gender = self.subject_gender[positive_subject]

        # 随机选择一个负样本，确保性别相同
        while True:
            negative_subject = np.random.choice(self.subjects)
            if (negative_subject != anchor_subject and 
                negative_subject != positive_subject and 
                self.subject_gender[negative_subject] == positive_gender):
                break
        
        negative_features = self.xvector_df[self.xvector_df['Subject'] == negative_subject]['Features'].values[0]
        negative = self.scaler.transform([negative_features])[0]

        return (
            torch.tensor(anchor, dtype=torch.float32), 
            torch.tensor(positive, dtype=torch.float32), 
            torch.tensor(negative, dtype=torch.float32),
            anchor_subject,
            positive_subject,
            negative_subject
        )


train_dataset = KinshipDataset(train_features_1, train_features_2, train, xvector_df, scaler)
val_dataset = KinshipDataset(val_features_1, val_features_2, val, xvector_df, scaler)
test_dataset = KinshipDataset(test_features_1, test_features_2, test, xvector_df, scaler)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

print(len(train_dataset),len(val_dataset),len(test_dataset))


# improved model
class TripletNet(nn.Module):
    def __init__(self, input_dim):
        super(TripletNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(256, 128)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.dropout(x)  
        x = self.relu(x)
        x = self.fc2(x)
        return x

class TripletLoss(nn.Module):
    def __init__(self, margin=1.0, lambda_reg=0.01):  
        super(TripletLoss, self).__init__()
        self.margin = margin
        self.lambda_reg = lambda_reg

    def forward(self, anchor, positive, negative):
        pos_dist = torch.nn.functional.pairwise_distance(anchor, positive)
        neg_dist = torch.nn.functional.pairwise_distance(anchor, negative)
        triplet_loss = torch.relu(pos_dist - neg_dist + self.margin).mean()

        reg_term = (anchor.norm(2) + positive.norm(2) + negative.norm(2)).mean()
        loss = triplet_loss + self.lambda_reg * reg_term
        return loss



def evaluate_model(model, criterion, data_loader):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for data in data_loader:
            anchor, positive, negative = data[:3]  
            anchor_out = model(anchor)
            positive_out = model(positive)
            negative_out = model(negative)
            loss = criterion(anchor_out, positive_out, negative_out)
            total_loss += loss.item()
            pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
            neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
            correct += (pos_dist < neg_dist).sum().item()
            total += anchor.size(0)
    avg_loss = total_loss / len(data_loader)
    accuracy = correct / total
    return avg_loss, accuracy

model = TripletNet(input_dim=train_features_1.shape[1])
criterion = TripletLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)


num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0
    for anchor, positive, negative, _, _, _ in train_loader:
        optimizer.zero_grad()
        anchor_out = model(anchor)
        positive_out = model(positive)
        negative_out = model(negative)
        loss = criterion(anchor_out, positive_out, negative_out)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
        neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
        correct += (pos_dist < neg_dist).sum().item()
        total += anchor.size(0)
    
    avg_train_loss = total_loss / len(train_loader)
    train_accuracy = correct / total
    
    # 在验证集上进行评估
    val_loss, val_accuracy = evaluate_model(model, criterion, val_loader)
    test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
    
    print(f'Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
    print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
    print(f'Epoch {epoch+1}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

# 测试模型
test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
print(f'Test Loss: {test_loss:.4f}')
print(f'Overall Test Accuracy: {test_accuracy * 100:.2f}%')

def evaluate_accuracy(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            anchor, positive, negative = data[:3]  # 只提取前三个元素
            anchor_out = model(anchor)
            positive_out = model(positive)
            negative_out = model(negative)
            pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
            neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
            correct += (pos_dist < neg_dist).sum().item()
            total += anchor.size(0)
    accuracy = correct / total
    return accuracy
accuracy = evaluate_accuracy(model, test_loader)
print(f'Overall Accuracy: {accuracy * 100:.2f}%')
# 按亲属关系计算准确率
relationship_accuracies = {
    'BB': ['Brother', 'Brother'],
    'SS': ['Sister', 'Sister'],
    'BS': ['Brother', 'Sister'],
    'FD': ['Father', 'Daughter'],
    'FS': ['Father', 'Son'],
    'MD': ['Mother', 'Daughter'],
    'MS': ['Mother', 'Son']
}
for key, value in relationship_accuracies.items():
    relationship_df = test[(test['Relationship1'] == value[0]) & (test['Relationship2'] == value[1])]
    if not relationship_df.empty:
        relationship_dataset = KinshipDataset(
            scaler.transform(np.vstack(relationship_df['Features_1'])),
            scaler.transform(np.vstack(relationship_df['Features_2'])),
            relationship_df,
            xvector_df,
            scaler
        )
        relationship_loader = DataLoader(relationship_dataset, batch_size=32, shuffle=True)
        relationship_accuracy = evaluate_accuracy(model, relationship_loader)
        relationship_accuracies[key] = relationship_accuracy * 100

print("Accuracy by Relationship:")
for relationship, acc in relationship_accuracies.items():
    print(f"{relationship}: {acc:.2f}%")

# 选择一个三元组进行测试
sample_idx = 0  # 可以更改为其他索引
anchor, positive, negative, anchor_subject, positive_subject, negative_subject = test_dataset[sample_idx]
anchor_out = model(anchor.unsqueeze(0))
positive_out = model(positive.unsqueeze(0))
negative_out = model(negative.unsqueeze(0))

# 计算距离
pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)

print(f"Anchor Subject: {anchor_subject}")
print(f"Positive Subject: {positive_subject}")
print(f"Negative Subject: {negative_subject}")
print(f"Positive distance: {pos_dist.item()}")
print(f"Negative distance: {neg_dist.item()}")
print(f"Correct prediction: {pos_dist.item() < neg_dist.item()}") 

# original model
# class TripletNet(nn.Module):
#     def __init__(self, input_dim):
#         super(TripletNet, self).__init__()
#         self.fc1 = nn.Linear(input_dim, 256)
#         self.bn1 = nn.BatchNorm1d(256)
#         self.relu = nn.ReLU()
#         self.fc2 = nn.Linear(256, 128)

#     def forward(self, x):
#         x = self.fc1(x)
#         x = self.bn1(x)
#         x = self.relu(x)
#         x = self.fc2(x)
#         return x

# class TripletLoss(nn.Module):
#     def __init__(self, margin=1.0, lambda_reg=0.001):
#         super(TripletLoss, self).__init__()
#         self.margin = margin
#         self.lambda_reg = lambda_reg

#     def forward(self, anchor, positive, negative):
#         pos_dist = torch.nn.functional.pairwise_distance(anchor, positive)
#         neg_dist = torch.nn.functional.pairwise_distance(anchor, negative)
#         triplet_loss = torch.relu(pos_dist - neg_dist + self.margin).mean()


#         reg_term = (anchor.norm(2) + positive.norm(2) + negative.norm(2)).mean()
#         loss = triplet_loss + self.lambda_reg * reg_term
#         return loss



# model = TripletNet(input_dim=train_features_1.shape[1])
# criterion = TripletLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)


# num_epochs = 5
# for epoch in range(num_epochs):
#     model.train()
#     total_loss = 0.0
#     correct = 0
#     total = 0
#     for anchor, positive, negative, _, _, _ in train_loader:
#         optimizer.zero_grad()
#         anchor_out = model(anchor)
#         positive_out = model(positive)
#         negative_out = model(negative)
#         loss = criterion(anchor_out, positive_out, negative_out)
#         loss.backward()
#         optimizer.step()
        
#         total_loss += loss.item()
#         pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
#         neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
#         correct += (pos_dist < neg_dist).sum().item()
#         total += anchor.size(0)
    
#     avg_train_loss = total_loss / len(train_loader)
#     train_accuracy = correct / total
    
#     # 在验证集上进行评估
#     val_loss, val_accuracy = evaluate_model(model, criterion, val_loader)
#     test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
    
#     print(f'Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
#     print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
#     print(f'Epoch {epoch+1}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

# # 测试模型
# test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
# print(f'Test Loss: {test_loss:.4f}')
# print(f'Overall Test Accuracy: {test_accuracy * 100:.2f}%')

# def evaluate_accuracy(model, test_loader):
#     model.eval()
#     correct = 0
#     total = 0
#     with torch.no_grad():
#         for data in test_loader:
#             anchor, positive, negative = data[:3] 
#             anchor_out = model(anchor)
#             positive_out = model(positive)
#             negative_out = model(negative)
#             pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
#             neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
#             correct += (pos_dist < neg_dist).sum().item()
#             total += anchor.size(0)
#     accuracy = correct / total
#     return accuracy
# accuracy = evaluate_accuracy(model, test_loader)
# print(f'Overall Accuracy: {accuracy * 100:.2f}%')

# relationship_accuracies = {
#     'BB': ['Brother', 'Brother'],
#     'SS': ['Sister', 'Sister'],
#     'BS': ['Brother', 'Sister'],
#     'FD': ['Father', 'Daughter'],
#     'FS': ['Father', 'Son'],
#     'MD': ['Mother', 'Daughter'],
#     'MS': ['Mother', 'Son']
# }
# for key, value in relationship_accuracies.items():
#     relationship_df = test[(test['Relationship1'] == value[0]) & (test['Relationship2'] == value[1])]
#     if not relationship_df.empty:
#         relationship_dataset = KinshipDataset(
#             scaler.transform(np.vstack(relationship_df['Features_1'])),
#             scaler.transform(np.vstack(relationship_df['Features_2'])),
#             relationship_df,
#             xvector_df,
#             scaler
#         )
#         relationship_loader = DataLoader(relationship_dataset, batch_size=32, shuffle=True)
#         relationship_accuracy = evaluate_accuracy(model, relationship_loader)
#         relationship_accuracies[key] = relationship_accuracy * 100

# print("Accuracy by Relationship:")
# for relationship, acc in relationship_accuracies.items():
#     print(f"{relationship}: {acc:.2f}%")


# sample_idx = 4000  
# anchor, positive, negative, anchor_subject, positive_subject, negative_subject = test_dataset[sample_idx]
# anchor_out = model(anchor.unsqueeze(0))
# positive_out = model(positive.unsqueeze(0))
# negative_out = model(negative.unsqueeze(0))


# pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
# neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)

# print(f"Anchor Subject: {anchor_subject}")
# print(f"Positive Subject: {positive_subject}")
# print(f"Negative Subject: {negative_subject}")
# print(f"Positive distance: {pos_dist.item()}")
# print(f"Negative distance: {neg_dist.item()}")
# print(f"Correct prediction: {pos_dist.item() < neg_dist.item()}") 


### wav2vec features

In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

# 解析 resvector.txt 文件
def parse_resvector_file(filepath):
    data = []
    with open(filepath, 'r') as file:
        for line in file:
            parts = line.strip().split(maxsplit=1)
            name_age = parts[0].split('/')
#             print(name_age)
            subject = name_age[2]
            age = int(name_age[3])
            features_str = parts[1].strip('[]').rstrip(',')
            features = list(map(float, features_str.split(', ')))
            data.append((subject, age, features))
    return pd.DataFrame(data, columns=['Subject', 'Age', 'Features'])

# 读取 ivector 文件
xvector_df = parse_resvector_file('../generated_wav2vec_pretrained.txt')
# xvector_df = parse_resvector_file('./generated_wav2vec_pretrained.txt')

# 读取 new_kin_relationship.csv 文件并清洗数据
new_kin_relationships_file = pd.read_csv('./new_kin_relationships.csv')
new_kin_relationships_file.columns = ['Subject1', 'Subject2', 'Relationship1', 'Relationship2']
new_kin_relationships_file['Subject1'] = new_kin_relationships_file['Subject1'].str.strip()
new_kin_relationships_file['Subject2'] = new_kin_relationships_file['Subject2'].str.strip()

# 读取 genders.csv 文件并合并性别信息
genders_df = pd.read_csv('./new_genders.csv')
genders_df.columns = ['Subject', 'Gender']
genders_df['Subject'] = genders_df['Subject'].str.strip()

# 合并性别信息到xvector_df
xvector_df = xvector_df.merge(genders_df, on='Subject', how='inner')

# 合并数据
merged_df1 = new_kin_relationships_file.merge(xvector_df, left_on='Subject1', right_on='Subject', how='inner')
merged_df = merged_df1.merge(xvector_df, left_on='Subject2', right_on='Subject', how='inner', suffixes=('_1', '_2'))

# 打印合并后的数据
print(merged_df.head())
feature_lengths = merged_df['Features_1'].apply(len)
print("不同长度的特征数组数目：")
print(feature_lengths.value_counts())




# 按subject分割数据集
subjects = merged_df['Subject_1'].unique()
train_subjects, test_subjects = train_test_split(subjects, test_size=0.2, random_state=42)
train_subjects, val_subjects = train_test_split(train_subjects, test_size=0.1, random_state=42)
print(len(train_subjects),len(val_subjects),len(test_subjects))
train = merged_df[merged_df['Subject_1'].isin(train_subjects)]
val = merged_df[merged_df['Subject_1'].isin(val_subjects)]
test = merged_df[merged_df['Subject_1'].isin(test_subjects)]


# 标准化特征
scaler = StandardScaler()
train_features_1 = scaler.fit_transform(np.vstack(train['Features_1']))
train_features_2 = scaler.fit_transform(np.vstack(train['Features_2']))
val_features_1 = scaler.transform(np.vstack(val['Features_1']))
val_features_2 = scaler.transform(np.vstack(val['Features_2']))
test_features_1 = scaler.transform(np.vstack(test['Features_1']))
test_features_2 = scaler.transform(np.vstack(test['Features_2']))

class KinshipDataset(Dataset):
    def __init__(self, anchor_features, positive_features, relationships, xvector_df, scaler):
        self.anchor_features = anchor_features
        self.positive_features = positive_features
        self.relationships = relationships
        self.xvector_df = xvector_df
        self.scaler = scaler
        self.subjects = list(set(self.relationships['Subject1']).union(set(self.relationships['Subject2'])))
        self.subject_gender = dict(zip(self.xvector_df['Subject'], self.xvector_df['Gender']))

    def __len__(self):
        return len(self.anchor_features)

    def __getitem__(self, idx):
        anchor = self.anchor_features[idx]
        positive = self.positive_features[idx]
        anchor_subject = self.relationships.iloc[idx]['Subject1']
        positive_subject = self.relationships.iloc[idx]['Subject2']
        positive_gender = self.subject_gender[positive_subject]

        # 随机选择一个负样本，确保性别相同
        while True:
            negative_subject = np.random.choice(self.subjects)
            if (negative_subject != anchor_subject and 
                negative_subject != positive_subject and 
                self.subject_gender[negative_subject] == positive_gender):
                break
        
        negative_features = self.xvector_df[self.xvector_df['Subject'] == negative_subject]['Features'].values[0]
        negative = self.scaler.transform([negative_features])[0]

        return (
            torch.tensor(anchor, dtype=torch.float32), 
            torch.tensor(positive, dtype=torch.float32), 
            torch.tensor(negative, dtype=torch.float32),
            anchor_subject,
            positive_subject,
            negative_subject
        )

# 准备数据加载器
train_dataset = KinshipDataset(train_features_1, train_features_2, train, xvector_df, scaler)
val_dataset = KinshipDataset(val_features_1, val_features_2, val, xvector_df, scaler)
test_dataset = KinshipDataset(test_features_1, test_features_2, test, xvector_df, scaler)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

print(len(train_dataset),len(val_dataset),len(test_dataset))


# improved model
class TripletNet(nn.Module):
    def __init__(self, input_dim):
        super(TripletNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.dropout = nn.Dropout(0.5)  
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(256, 128)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.dropout(x) 
        x = self.relu(x)
        x = self.fc2(x)
        return x

class TripletLoss(nn.Module):
    def __init__(self, margin=1.0, lambda_reg=0.01):  
        super(TripletLoss, self).__init__()
        self.margin = margin
        self.lambda_reg = lambda_reg

    def forward(self, anchor, positive, negative):
        pos_dist = torch.nn.functional.pairwise_distance(anchor, positive)
        neg_dist = torch.nn.functional.pairwise_distance(anchor, negative)
        triplet_loss = torch.relu(pos_dist - neg_dist + self.margin).mean()

        # 正则化项
        reg_term = (anchor.norm(2) + positive.norm(2) + negative.norm(2)).mean()
        loss = triplet_loss + self.lambda_reg * reg_term
        return loss




def evaluate_model(model, criterion, data_loader):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for data in data_loader:
            anchor, positive, negative = data[:3] 
            anchor_out = model(anchor)
            positive_out = model(positive)
            negative_out = model(negative)
            loss = criterion(anchor_out, positive_out, negative_out)
            total_loss += loss.item()
            pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
            neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
            correct += (pos_dist < neg_dist).sum().item()
            total += anchor.size(0)
    avg_loss = total_loss / len(data_loader)
    accuracy = correct / total
    return avg_loss, accuracy

model = TripletNet(input_dim=train_features_1.shape[1])
criterion = TripletLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0
    for anchor, positive, negative, _, _, _ in train_loader:
        optimizer.zero_grad()
        anchor_out = model(anchor)
        positive_out = model(positive)
        negative_out = model(negative)
        loss = criterion(anchor_out, positive_out, negative_out)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
        neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
        correct += (pos_dist < neg_dist).sum().item()
        total += anchor.size(0)
    
    avg_train_loss = total_loss / len(train_loader)
    train_accuracy = correct / total
    
    # 在验证集上进行评估
    val_loss, val_accuracy = evaluate_model(model, criterion, val_loader)
    test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
    
    print(f'Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
    print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
    print(f'Epoch {epoch+1}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

# 测试模型
test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
print(f'Test Loss: {test_loss:.4f}')
print(f'Overall Test Accuracy: {test_accuracy * 100:.2f}%')

def evaluate_accuracy(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            anchor, positive, negative = data[:3]  # 只提取前三个元素
            anchor_out = model(anchor)
            positive_out = model(positive)
            negative_out = model(negative)
            pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
            neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
            correct += (pos_dist < neg_dist).sum().item()
            total += anchor.size(0)
    accuracy = correct / total
    return accuracy
accuracy = evaluate_accuracy(model, test_loader)
print(f'Overall Accuracy: {accuracy * 100:.2f}%')
# 按亲属关系计算准确率
relationship_accuracies = {
    'BB': ['Brother', 'Brother'],
    'SS': ['Sister', 'Sister'],
    'BS': ['Brother', 'Sister'],
    'FD': ['Father', 'Daughter'],
    'FS': ['Father', 'Son'],
    'MD': ['Mother', 'Daughter'],
    'MS': ['Mother', 'Son']
}
for key, value in relationship_accuracies.items():
    relationship_df = test[(test['Relationship1'] == value[0]) & (test['Relationship2'] == value[1])]
    if not relationship_df.empty:
        relationship_dataset = KinshipDataset(
            scaler.transform(np.vstack(relationship_df['Features_1'])),
            scaler.transform(np.vstack(relationship_df['Features_2'])),
            relationship_df,
            xvector_df,
            scaler
        )
        relationship_loader = DataLoader(relationship_dataset, batch_size=32, shuffle=True)
        relationship_accuracy = evaluate_accuracy(model, relationship_loader)
        relationship_accuracies[key] = relationship_accuracy * 100

print("Accuracy by Relationship:")
for relationship, acc in relationship_accuracies.items():
    print(f"{relationship}: {acc:.2f}%")

# 选择一个三元组进行测试
sample_idx = 0  # 可以更改为其他索引
anchor, positive, negative, anchor_subject, positive_subject, negative_subject = test_dataset[sample_idx]
anchor_out = model(anchor.unsqueeze(0))
positive_out = model(positive.unsqueeze(0))
negative_out = model(negative.unsqueeze(0))

# 计算距离
pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)

print(f"Anchor Subject: {anchor_subject}")
print(f"Positive Subject: {positive_subject}")
print(f"Negative Subject: {negative_subject}")
print(f"Positive distance: {pos_dist.item()}")
print(f"Negative distance: {neg_dist.item()}")
print(f"Correct prediction: {pos_dist.item() < neg_dist.item()}")  # 如果正样本距离小于负样本距离，则预测正确

# orginal model
# class TripletNet(nn.Module):
#     def __init__(self, input_dim):
#         super(TripletNet, self).__init__()
#         self.fc1 = nn.Linear(input_dim, 256)
#         self.bn1 = nn.BatchNorm1d(256)
#         self.relu = nn.ReLU()
#         self.fc2 = nn.Linear(256, 128)

#     def forward(self, x):
#         x = self.fc1(x)
#         x = self.bn1(x)
#         x = self.relu(x)
#         x = self.fc2(x)
#         return x

# class TripletLoss(nn.Module):
#     def __init__(self, margin=1.0, lambda_reg=0.001):
#         super(TripletLoss, self).__init__()
#         self.margin = margin
#         self.lambda_reg = lambda_reg

#     def forward(self, anchor, positive, negative):
#         pos_dist = torch.nn.functional.pairwise_distance(anchor, positive)
#         neg_dist = torch.nn.functional.pairwise_distance(anchor, negative)
#         triplet_loss = torch.relu(pos_dist - neg_dist + self.margin).mean()


#         reg_term = (anchor.norm(2) + positive.norm(2) + negative.norm(2)).mean()
#         loss = triplet_loss + self.lambda_reg * reg_term
#         return loss

# def evaluate_model(model, criterion, data_loader):
#     model.eval()
#     total_loss = 0.0
#     correct = 0
#     total = 0
#     with torch.no_grad():
#         for data in data_loader:
#             anchor, positive, negative = data[:3]  
#             anchor_out = model(anchor)
#             positive_out = model(positive)
#             negative_out = model(negative)
#             loss = criterion(anchor_out, positive_out, negative_out)
#             total_loss += loss.item()
#             pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
#             neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
#             correct += (pos_dist < neg_dist).sum().item()
#             total += anchor.size(0)
#     avg_loss = total_loss / len(data_loader)
#     accuracy = correct / total
#     return avg_loss, accuracy


# model = TripletNet(input_dim=train_features_1.shape[1])
# criterion = TripletLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)


# num_epochs = 5
# for epoch in range(num_epochs):
#     model.train()
#     total_loss = 0.0
#     correct = 0
#     total = 0
#     for anchor, positive, negative, _, _, _ in train_loader:
#         optimizer.zero_grad()
#         anchor_out = model(anchor)
#         positive_out = model(positive)
#         negative_out = model(negative)
#         loss = criterion(anchor_out, positive_out, negative_out)
#         loss.backward()
#         optimizer.step()
        
#         total_loss += loss.item()
#         pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
#         neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
#         correct += (pos_dist < neg_dist).sum().item()
#         total += anchor.size(0)
    
#     avg_train_loss = total_loss / len(train_loader)
#     train_accuracy = correct / total
    
#     # 在验证集上进行评估
#     val_loss, val_accuracy = evaluate_model(model, criterion, val_loader)
#     test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
    
#     print(f'Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
#     print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
#     print(f'Epoch {epoch+1}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

# # 测试模型
# test_loss, test_accuracy = evaluate_model(model, criterion, test_loader)
# print(f'Test Loss: {test_loss:.4f}')
# print(f'Overall Test Accuracy: {test_accuracy * 100:.2f}%')

# def evaluate_accuracy(model, test_loader):
#     model.eval()
#     correct = 0
#     total = 0
#     with torch.no_grad():
#         for data in test_loader:
#             anchor, positive, negative = data[:3]
#             anchor_out = model(anchor)
#             positive_out = model(positive)
#             negative_out = model(negative)
#             pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
#             neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
#             correct += (pos_dist < neg_dist).sum().item()
#             total += anchor.size(0)
#     accuracy = correct / total
#     return accuracy
# accuracy = evaluate_accuracy(model, test_loader)
# print(f'Overall Accuracy: {accuracy * 100:.2f}%')
# # 按亲属关系计算准确率
# relationship_accuracies = {
#     'BB': ['Brother', 'Brother'],
#     'SS': ['Sister', 'Sister'],
#     'BS': ['Brother', 'Sister'],
#     'FD': ['Father', 'Daughter'],
#     'FS': ['Father', 'Son'],
#     'MD': ['Mother', 'Daughter'],
#     'MS': ['Mother', 'Son']
# }
# for key, value in relationship_accuracies.items():
#     relationship_df = test[(test['Relationship1'] == value[0]) & (test['Relationship2'] == value[1])]
#     if not relationship_df.empty:
#         relationship_dataset = KinshipDataset(
#             scaler.transform(np.vstack(relationship_df['Features_1'])),
#             scaler.transform(np.vstack(relationship_df['Features_2'])),
#             relationship_df,
#             xvector_df,
#             scaler
#         )
#         relationship_loader = DataLoader(relationship_dataset, batch_size=32, shuffle=True)
#         relationship_accuracy = evaluate_accuracy(model, relationship_loader)
#         relationship_accuracies[key] = relationship_accuracy * 100

# print("Accuracy by Relationship:")
# for relationship, acc in relationship_accuracies.items():
#     print(f"{relationship}: {acc:.2f}%")

# # 选择一个三元组进行测试
# sample_idx = 4000 
# anchor, positive, negative, anchor_subject, positive_subject, negative_subject = test_dataset[sample_idx]
# anchor_out = model(anchor.unsqueeze(0))
# positive_out = model(positive.unsqueeze(0))
# negative_out = model(negative.unsqueeze(0))


# pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
# neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)

# print(f"Anchor Subject: {anchor_subject}")
# print(f"Positive Subject: {positive_subject}")
# print(f"Negative Subject: {negative_subject}")
# print(f"Positive distance: {pos_dist.item()}")
# print(f"Negative distance: {neg_dist.item()}")
# print(f"Correct prediction: {pos_dist.item() < neg_dist.item()}") 
