#### 训练集：
	建议利用序号1-200图片
#### 测试集：
	建议利用序号201-236图片

### 项目要求：
	(1) 设计CNN神经网络模型，计算每张图片中煤的占比(煤炭在前景图像中的比例)
	(2) 撰写项目报告，主要包含如下章节结构
		Abstract：高度概括研究动机、思路、结果
		Introduction：研究背景与意义、主要创新点
		Related Research：相关研究调研
		Methods：采用的方法的详细说明，网络结构等
		Results：数据集，训练细节，在测试集上的实验结果等(ROC PR曲线)
		Conclusion


In [1]:
import random
import numpy as np
import matplotlib.pyplot as plt


plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

# for auto-reloading extenrnal modules
# see http://stackoverflow.com/questions/1907993/autoreload-of-modules-in-ipython



In [2]:
import os
from PIL import Image
from load_data import load_data

X_train,y_train,X_val,y_val,X_test,y_test = load_data()
X_test1 = np.copy(X_test)
print(X_train.shape)


In [3]:
import matplotlib.pyplot as plt

# 设置显示大小
plt.figure(figsize=(20, 10))

# 展示前10个训练样本
for i in range(10):
    # 获取训练集中的图像和标签
    image = X_train[i]
    label = y_train[i]

    # 绘制图像
    plt.subplot(2, 10, i + 1)
    plt.imshow(image)
    plt.title(f'Train Image {i+1}')
    plt.axis('off')

    # 绘制标签
    plt.subplot(2, 10, i + 11)
    plt.imshow(label)
    plt.title(f'Train Label {i+1}')
    plt.axis('off')

# 调整子图间距
plt.subplots_adjust(wspace=0.2, hspace=0.3)

plt.tight_layout()
plt.show()


In [4]:
from torch.utils.data import DataLoader, TensorDataset
import torch
import numpy as np
from load_data import load_data

X_train,y_train,X_val,y_val,X_test,y_test = load_data()

# def print_arrays_containing_value(y_train, value=28):
#     # 遍历y_train中的每个子数组
#     for idx, y in enumerate(y_train):
#         if value in y:
#             print(f"Array index {idx} contains value {value}:\n{y}\n")


X_train = np.transpose(X_train, (0, 3, 1, 2))
X_val = np.transpose(X_val, (0, 3, 1, 2))
X_test = np.transpose(X_test, (0, 3, 1, 2))
X_train_tensor = torch.FloatTensor(X_train)
Y_train_tensor = torch.LongTensor(y_train)
X_val_tensor = torch.FloatTensor(X_val)
Y_val_tensor = torch.LongTensor(y_val)
X_test_tensor = torch.FloatTensor(X_test)
Y_test_tensor = torch.LongTensor(y_test)

# 定义数据加载器
train_dataset = TensorDataset(X_train_tensor, Y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, Y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, Y_test_tensor)
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)



In [4]:
import torch

# 检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
torch.cuda.empty_cache()  # 清理未使用的缓存

import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128'
torch.cuda.memory_summary(device=None, abbreviated=False)


In [None]:
import gc
gc.collect()

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from fcn import MyUNet

# 定义模型、损失函数和优化器
model = MyUNet(n_channels=3, n_classes=3)
criterion = nn.CrossEntropyLoss()  # 使用交叉熵损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001)  # 使用Adam优化器

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

def train(model, device, train_loader, optimizer, criterion, n_epochs, val_loader=None):
    train_losses = []  # 记录每个epoch的训练损失
    val_losses = []    # 记录每个epoch的验证损失
    for epoch in range(n_epochs):
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * images.size(0)
        epoch_train_loss = running_loss / len(train_loader.dataset)
        train_losses.append(epoch_train_loss)
        print(f'Epoch {epoch+1} Training Loss: {epoch_train_loss:.4f}')

        if val_loader is not None:
            val_loss = validate(model, device, val_loader, criterion)
            val_losses.append(val_loss)
        # 绘制训练和验证损失曲线
    plt.figure(figsize=(10, 5))
    plt.plot(range(1, n_epochs+1), train_losses, label='Training Loss')
    if val_loader is not None:
        plt.plot(range(1, n_epochs+1), val_losses, label='Validation Loss')
    plt.title('Loss over epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
    return train_losses, val_losses

def validate(model, device, val_loader, criterion):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.numel()
            correct += (predicted == labels).sum().item()
            print(correct)

    val_loss /= len(val_loader.dataset)
    accuracy = 100 * correct / total
    print(f'Validation Loss: {val_loss:.4f}, Accuracy: {correct}/ {total}/{accuracy:.2f}%')
    return val_loss

# 假设train_loader和val_loader是你的训练集和验证集数据加载器
train_losses, val_losses = train(model, device, train_loader, optimizer, criterion, n_epochs=10, val_loader=val_loader)

# 打印训练损失和验证损失的变化
print("Training Losses:")
for epoch, loss in enumerate(train_losses):
    print(f"Epoch {epoch+1}: {loss:.4f}")

print("\nValidation Losses:")
for epoch, loss in enumerate(val_losses):
    print(f"Epoch {epoch+1}: {loss:.4f}")



In [8]:
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict()
}, 'model_and_optimizer.pth')

In [5]:
import torch
import torch.nn as nn
from fcn import MyUNet
import torch.optim as optim
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 加载模型参数和优化器状态
checkpoint = torch.load('model_and_optimizer.pth')
model = MyUNet(n_channels=3, n_classes=3)
optimizer = optim.Adam(model.parameters(), lr=0.001)

model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

model.to(device)  # 不要忘记将模型发送到正确
model.eval()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def label_mapping(label):
    if label == 0:
        return "背景"
    elif label == 1:
        return "煤"
    elif label == 2:
        return "煤矸石"

def print_portion(prediction):
    # 计算预测结果中各标签的占比
    unique, counts = np.unique(prediction, return_counts=True)
    label_counts = dict(zip(unique, counts))
    total_pixels = prediction.size
    proportions = {label: count / total_pixels for label, count in label_counts.items()}

    # 打印标签占比
    print("标签占比:")
    for label1, proportion in proportions.items():
        # Ensure label mapping does not fail with a KeyError
        label_name = label_mapping(label1) if label1 in [0, 1, 2] else "未知"
        print(f"标签 {label_name}: {proportion:.2%}")

    if 1 in proportions and 2 in proportions:
        print(f"煤矸石占比为: {proportions[2] / (proportions[1] + proportions[2]):.2%}")
    else:
        print("煤或煤矸石未出现在预测中。")

def visualize_prediction(model, data_loader, device):
    model.eval()  # 确保模型处于评估模式
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            preds = outputs.argmax(dim=1)

            # 遍历每张图片
            for idx in range(images.size(0)):
                label = labels[idx].cpu().numpy()
                prediction = preds[idx].cpu().numpy()

                print(f"===== 图片 {idx+1} 原始label =====")
                print_portion(label)

                print(f"===== 图片 {idx+1} 预测结果 =====")
                print_portion(prediction)

                # 可视化，只展示Ground Truth和Prediction
                fig, ax = plt.subplots(1, 2, figsize=(5, 2.5))  # 改为显示两列
                ax[0].imshow(label, cmap='gray')
                ax[0].set_title('Ground Truth')
                ax[1].imshow(prediction, cmap='gray')
                ax[1].set_title('Prediction')
                plt.show()

# 测试并可视化结果
visualize_prediction(model, test_loader, device)


In [14]:
import numpy as np
import torch
from sklearn.metrics import roc_curve, precision_recall_curve, auc
import matplotlib.pyplot as plt
from fcn import MyUNet
import torch.optim as optim


# 设置打印选项，禁用省略
np.set_printoptions(threshold=1000)

def get_predictions_and_labels(model, data_loader, device):
    model.eval()
    all_labels = []
    all_scores = []
    
    with torch.no_grad():
        for images, labels in data_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            # 获取预测分数（softmax后的概率）
            scores = torch.softmax(outputs, dim=1)  # (batch_size, num_classes, H, W)
            max_scores, max_indices = torch.max(scores, dim=1)
            #max_indices_np = max_indices.cpu().numpy()  # 如果是在 PyTorch 中

            # 选择要统计的图像索引，例如第一张图像
            #selected_image = max_indices_np[0]
            
            # 使用 np.unique 计算每个类别索引的出现次数
            #unique_values, counts = np.unique(selected_image, return_counts=True)
            
            # 打印结果
            #for value, count in zip(unique_values, counts):
            ##    print(f"Value {value} appears {count} times")
            #break
            #print(labels.cpu().numpy().shape)
            all_labels.append(labels.cpu().numpy().flatten())
            all_scores.append(max_indices.cpu().numpy().flatten())
    
    all_labels = np.concatenate(all_labels)
    all_scores = np.concatenate(all_scores)
    
    return all_labels, all_scores

def plot_roc_pr_curves(labels, predictions, num_classes):
    fig, axs = plt.subplots(num_classes, 2, figsize=(12, 18))

    for i in range(num_classes):
        # 生成每个类别的伪概率
        # 对于每个类别i，预测为i的设置为1，否则为0
        pseudo_probabilities = (predictions == i).astype(float)

        # 将标签转换为二分类格式：当前类别为1，其他类别为0
        binary_labels = (labels == i).astype(int)

        # 计算 ROC 曲线
        fpr, tpr, _ = roc_curve(binary_labels, pseudo_probabilities)
        roc_auc = auc(fpr, tpr)

        # 计算 PR 曲线
        precision, recall, _ = precision_recall_curve(binary_labels, pseudo_probabilities)
        pr_auc = auc(recall, precision)

        # 绘制 ROC 曲线
        axs[i, 0].plot(fpr, tpr, label=f'ROC curve (area = {roc_auc:.2f})')
        axs[i, 0].plot([0, 1], [0, 1], 'k--')
        axs[i, 0].set_xlim([0.0, 1.0])
        axs[i, 0].set_ylim([0.0, 1.05])
        axs[i, 0].set_xlabel('False Positive Rate')
        axs[i, 0].set_ylabel('True Positive Rate')
        axs[i, 0].set_title(f'ROC - Class {i}')
        axs[i, 0].legend(loc="lower right")
        
        # 绘制 PR 曲线
        axs[i, 1].plot(recall, precision, label=f'PR curve (area = {pr_auc:.2f})')
        axs[i, 1].set_xlim([0.0, 1.0])
        axs[i, 1].set_ylim([0.0, 1.05])
        axs[i, 1].set_xlabel('Recall')
        axs[i, 1].set_ylabel('Precision')
        axs[i, 1].set_title(f'PR - Class {i}')
        axs[i, 1].legend(loc="lower right")
    
    plt.tight_layout()
    plt.show()

# 加载模型和优化器状态
checkpoint = torch.load('model_and_optimizer.pth')
model = MyUNet(n_channels=3, n_classes=3)
optimizer = optim.Adam(model.parameters(), lr=0.001)

model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)  # 不要忘记将模型发送到正确的设备
model.eval()

# 获取测试集上的预测结果和真实标签
labels, scores = get_predictions_and_labels(model, test_loader, device)
# 确保长度一致
assert len(labels) == scores.shape[0], "Labels and scores length mismatch"

# 绘制 ROC 曲线和 PR 曲线
plot_roc_pr_curves(labels, scores, num_classes=3)