In [1]:
from network import *

In [2]:
# 读取所有设备的IQ数据
device_files = {
    "bladerf": "../dataset/raw data/bladerf/bladerf.iq",
    "hackrf0": "../dataset/raw data/hackrf0/hackrf0.iq",
    "hackrf1": "../dataset/raw data/hackrf1/hackrf1.iq",
    "limesdr": "../dataset/raw data/limesdr/limesdr.iq"
}

all_data = []
all_labels = []

for label_idx, (device_name, file_path) in enumerate(device_files.items()):
    print(f"正在处理 {device_name} 的数据...",end="")
    # 读取数据
    data = load_iq_file(file_path)
    
    # 分段数据（将连续IQ数据分成固定长度的片段）
    segment_length = 1024  # 每个样本的IQ点数，根据需要调整
    num_segments = len(data) // segment_length
    segments = data[:num_segments * segment_length].reshape(num_segments, segment_length)
    
    # 添加到数据集
    all_data.append(segments)
    all_labels.append(np.full(num_segments, label_idx))
    
    print(f"✅处理完成，获取了 {num_segments} 个样本")

正在处理 bladerf 的数据...✅处理完成，获取了 48828 个样本
正在处理 hackrf0 的数据...✅处理完成，获取了 48828 个样本
正在处理 hackrf1 的数据...✅处理完成，获取了 48828 个样本
正在处理 limesdr 的数据...✅处理完成，获取了 48828 个样本


In [3]:
# 合并数据
X = np.vstack(all_data)
y = np.concatenate(all_labels)

# 分割为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

print(f"训练集样本数: {len(X_train)}")
print(f"测试集样本数: {len(X_test)}")

训练集样本数: 156249
测试集样本数: 39063


In [4]:
# 创建数据加载器
batch_size = 32
train_loader, test_loader, scalers = create_dataloaders(
    X_train, y_train, X_test, y_test, batch_size)

In [5]:
# 初始化模型
num_features = X_train.shape[1]  # 每个样本的IQ点数
num_classes = 4  # bladerf, hackrf0, hackrf1, limesdr
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = create_model(num_features, num_classes).to(device)

In [None]:
# 加载之前训练好的模型参数（如果有）
# model.load_state_dict(torch.load('complex_cnn_model.pth'))

# 训练模型并记录准确率
train_accuracies, test_accuracies = train_and_evaluate(model, train_loader, test_loader, device)

# 保存模型
torch.save(model.state_dict(), './models/complex_cnn_model_2.pth')

# 保存标准化器
torch.save(scalers, './models/scalers2.pth')

 # 获取测试集上的预测结果
y_pred, y_true = get_predictions(model, test_loader, device)

In [None]:
 # 绘制混淆矩阵
class_names = ['BladeRF', 'HackRF0', 'HackRF1', 'LimeSDR']
plot_confusion_matrix(y_true, y_pred, class_names)

# 绘制准确率曲线
plot_accuracy_curve(train_accuracies, test_accuracies)

# 计算总体准确率
accuracy = accuracy_score(y_true, y_pred)
print(f"总体准确率: {accuracy:.4f}")

In [6]:
# # 定义损失函数和优化器
# criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# # 训练循环
# num_epochs = 30
# for epoch in range(num_epochs):
#     # 训练
#     model.train()
#     train_loss = 0
#     correct = 0
#     total = 0
    
#     for inputs, labels in train_loader:
#         inputs, labels = inputs.to(device), labels.to(device)
        
#         optimizer.zero_grad()
#         outputs = model(inputs)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()
        
#         train_loss += loss.item()
#         _, predicted = outputs.max(1)
#         total += labels.size(0)
#         correct += predicted.eq(labels).sum().item()
#         # print(inputs,labels)
    
#     print(f'第 {epoch+1}/{num_epochs} 轮, 损失: {train_loss/len(train_loader):.4f}, 准确率: {100.*correct/total:.2f}%')
    
#     # 验证
#     model.eval()
#     test_loss = 0
#     correct = 0
#     total = 0
    
#     with torch.no_grad():
#         for inputs, labels in test_loader:
#             inputs, labels = inputs.to(device), labels.to(device)
#             outputs = model(inputs)
#             loss = criterion(outputs, labels)
            
#             test_loss += loss.item()
#             _, predicted = outputs.max(1)
#             total += labels.size(0)
#             correct += predicted.eq(labels).sum().item()
    
#     print(f'测试损失: {test_loss/len(test_loader):.4f}, 测试准确率: {100.*correct/total:.2f}%')


In [15]:
# 保存模型
torch.save(model.state_dict(), './models/complex_cnn_model.pth')

torch.save(scalers, './models/scalers.pth')

In [2]:
def predict_device_with_unknown_detection(model, iq_file_path, scalers, device, confidence_threshold=0.7):
    """预测设备类型，包含未知设备检测"""
    # 读取数据
    data = load_iq_file(iq_file_path)
    
    # 分段数据
    segment_length = 1024  # 与训练时相同
    num_segments = len(data) // segment_length
    segments = data[:num_segments * segment_length].reshape(num_segments, segment_length)
    
    # 创建数据集和加载器
    dataset = ComplexSignalDataset(segments, np.zeros(len(segments)), scalers[0], scalers[1])
    dataloader = DataLoader(dataset, batch_size=32, shuffle=False)
    
    # 预测
    model.eval()
    all_probs = []
    
    with torch.no_grad():
        for inputs, _ in dataloader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            probs = F.softmax(outputs, dim=1)
            all_probs.append(probs.cpu().numpy())
    
    all_probs = np.vstack(all_probs)
    
    # 获取最高概率及其对应的类别
    max_probs = np.max(all_probs, axis=1)
    predictions = np.argmax(all_probs, axis=1)
    
    # 计算每个段的平均置信度
    avg_confidence = np.mean(max_probs)
    
    # 统计各类别的数量
    unique, counts = np.unique(predictions, return_counts=True)
    result = dict(zip(unique, counts))
    
    # 映射到设备名称
    device_names = ['bladerf', 'hackrf0', 'hackrf1', 'limesdr']
    final_result = {device_names[int(k)]: v for k, v in result.items()}
    
    # 判断是否为未知设备
    if avg_confidence < confidence_threshold:
        return "未知设备", final_result, avg_confidence
    else:
        # 返回出现最多的设备类型
        majority_device = max(final_result, key=final_result.get)
        return majority_device, final_result, avg_confidence

In [None]:
# 加载模型和标准化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_features = 1024  # 与训练时相同
num_classes = 4  # 四种已知设备
model = create_model(num_features, num_classes).to(device)
model.load_state_dict(torch.load('./models/complex_cnn_model.pth'))
scalers = torch.load('./models/scalers.pth')

In [None]:
# 使用模型预测未知设备
unknown_file = "../dataset/raw data/bladerf/bladerf.iq"
device_type, class_counts, confidence = predict_device_with_unknown_detection(
    model, unknown_file, scalers, device, confidence_threshold=0.7)

print(f"预测的设备类型: {device_type}")
print(f"各类别的预测数量: {class_counts}")
print(f"平均置信度: {confidence:.4f}")

# 如果是已知设备，可以进一步分析
if device_type != "未知设备":
    print(f"这是一个已知设备: {device_type}")
else:
    print("这可能是一个训练数据中未包含的设备！")