In [1]:
# InfoNetEstimator.py

import numpy as np
from scipy.stats import rankdata
import torch


class InfoNetEstimator:
    def __init__(self, model, proj_num=1024, seq_len=2000, batchsize=32, device=None):
        """
        InfoNet互信息估计器

        :param model: 已加载的InfoNet模型（手动加载后传入）
        :param proj_num: 投影数量（高维互信息估计用）
        :param seq_len: 采样点数
        :param batchsize: 批大小
        :param device: 设备（'cuda'或'cpu'），默认自动检测
        """
        self.model = model
        self.proj_num = proj_num
        self.seq_len = seq_len
        self.batchsize = batchsize
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")

    def estimate_1d(self, x, y):
        """
        一维互信息估计
        :param x: shape [N,]
        :param y: shape [N,]
        :return: 互信息估计值
        """
        model = self.model
        model.eval()
        x = rankdata(x) / len(x)
        y = rankdata(y) / len(y)
        batch = (
            torch.stack(
                (
                    torch.tensor(x, dtype=torch.float32),
                    torch.tensor(y, dtype=torch.float32),
                ),
                dim=1,
            )
            .unsqueeze(0)
            .to(self.device)
        )
        with torch.no_grad():
            mi_lb = model(batch)
        return mi_lb.squeeze().cpu().numpy()

    def estimate_highd(self, sample_x, sample_y):
        """
        高维互信息估计（Sliced MI）
        :param sample_x: shape [N, dx]
        :param sample_y: shape [N, dy]
        :return: 互信息估计值
        """
        dx = sample_x.shape[1]
        dy = sample_y.shape[1]
        results = []
        for i in range(self.proj_num // self.batchsize):
            batch = np.zeros((self.batchsize, self.seq_len, 2))
            for j in range(self.batchsize):
                theta = np.random.randn(dx)
                phi = np.random.randn(dy)
                x_proj = np.dot(sample_x, theta)
                y_proj = np.dot(sample_y, phi)
                x_proj = rankdata(x_proj) / self.seq_len
                y_proj = rankdata(y_proj) / self.seq_len
                xy = np.column_stack((x_proj, y_proj))
                batch[j, :, :] = xy
            batch_tensor = torch.tensor(batch, dtype=torch.float32, device=self.device)
            with torch.no_grad():
                mi_lb = self.model(batch_tensor)
                mean_infer1 = torch.mean(mi_lb).cpu().numpy()
            results.append(mean_infer1)
        return float(np.mean(np.array(results)))


In [3]:
from infonet.infer import load_model
model = load_model('../../configs/infonet/config.yaml', '../../data/checkpoint/infonet_cp/model_5000_32_1000-720--0.16.pt')
estimator = InfoNetEstimator(model, proj_num=512, seq_len=1000, batchsize=16)
x = np.random.randn(1000)
y = np.random.randn(1000)
mi_1d = estimator.estimate_1d(x, y)
print("一维互信息估计:", mi_1d)
X = np.random.randn(1000, 10)
Y = np.random.randn(1000, 10)
mi_hd = estimator.estimate_highd(X, Y)
print("高维互信息估计:", mi_hd)

一维互信息估计: 0.019440778
高维互信息估计: 0.039300866425037384


In [4]:
# paramter法查看model是否是cuda设备
def is_model_cuda(model):
    return next(model.parameters()).is_cuda
print("Model is on CUDA:", is_model_cuda(model))

Model is on CUDA: True


In [8]:
from utils.model import light
from torch import nn
from torch import Tensor

class LeNet(light.ModernModule):
    def __init__(self, config: light.ModelConfig):
        super().__init__(config)
        self._is_classification = True
        self.net = nn.Sequential(
            nn.LazyConv2d(6, kernel_size=5, stride=1, padding=2),
            nn.Sigmoid(),
            nn.AvgPool2d(kernel_size=2, stride=2),
            nn.LazyConv2d(
                16,
                kernel_size=5,
            ),
            nn.Sigmoid(),
            nn.AvgPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
            nn.LazyLinear(120),
            nn.Sigmoid(),
            nn.LazyLinear(84),
            nn.Sigmoid(),
            nn.LazyLinear(config.output_size),
        )

    def compute_loss(self, y_hat, y) -> Tensor:
        return nn.functional.cross_entropy(y_hat, y)

    def predict(self, x: Tensor):
        return self.net(x).argmax(dim=1)

    def predict_class(self, x: Tensor) -> Tensor:
        return self.predict(x).item()

    def evaluate(self, X, y):
        # 评估模型性能
        # 整体正确率 - FP - FN - F1score - AUC
        with torch.no_grad():
            y_hat = self.net(X)
            loss = self.compute_loss(y_hat, y)
            predicted_class = self.predict(X)
            accuracy = (predicted_class == y).float().mean().item()
            return {
                "loss": loss.item(),
                "accuracy": accuracy,
                "predicted_class": predicted_class,
            }


In [None]:
from torch.utils.data import DataLoader
import torchvision
from torchvision import transforms

datapath = "../../data/"

# 数据加载
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)

train_dataset = torchvision.datasets.MNIST(
    root=datapath, train=True, download=True, transform=transform
)
test_dataset = torchvision.datasets.MNIST(
    root=datapath, train=False, download=True, transform=transform
)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# 查看数据集是否在CUDA上运行
def is_dataset_on_cuda(dataset):
    return next(iter(dataset))[0].is_cuda
print("Train dataset on CUDA:", is_dataset_on_cuda(train_loader))
print("Test dataset on CUDA:", is_dataset_on_cuda(test_loader))
# 转移数据到CUDA设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Train dataset on CUDA: False
Test dataset on CUDA: False


In [None]:
# 互信息计算回调
import pytorch_lightning as pl

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class TestCallback(pl.Callback):
    def on_train_epoch_end(self, trainer, pl_module):
        # 在每个训练周期结束时计算互信息
        # 写入到 ./logs/infometrics.txt
        # 没有自动创建


In [14]:
# 从数据集读取class
num_classes = len(train_dataset.classes)
print("Number of classes in dataset:", num_classes)

config = light.ModelConfig(
    lr=0.001,  # 较小的学习率
    num_hiddens=256,
    output_size=num_classes,
    optimizer="sgd",  
    weight_decay=1e-4,  # 正则化
    momentum=0.9,  # 动量
    
)


trainer_lenet = light.TrainerFactory.basic(
    max_epochs=10,
)
trainer_lenet.add_callback(TestCallback())
trainer_lenet.fit(
    model=LeNet(config),
    train_loader=train_loader,
    val_loader=test_loader,
)

💡 Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name | Type       | Params | Mode 
--------------------------------------------
0 | net  | Sequential | 0      | train
--------------------------------------------
0         Trainable params
0         Non-trainable params
0         Total params
0.000     Total estimated model params size (MB)
13        Modules in train mode
0         Modules in eval mode


Number of classes in dataset: 10
                                                                            

c:\Src\SevFoxie\SevFoxie\Study\2025Summer\DeepLearning\code\.venv\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:425: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.
c:\Src\SevFoxie\SevFoxie\Study\2025Summer\DeepLearning\code\.venv\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:425: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Epoch 0: 100%|██████████| 938/938 [00:15<00:00, 62.12it/s, v_num=2, train_loss=2.290, train_acc=0.0938, val_loss=2.300, val_acc=0.113]

FileNotFoundError: [Errno 2] No such file or directory: './logs/infometrics.txt'