# Package

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import csv

In [None]:
class Network(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        return x

In [None]:
model = Network()

In [None]:
type(model)

In [None]:
for name, param in model.named_parameters():
    print(f"{name}: {param.size()}")

In [None]:
def get_weight(config):
    AUs_num = config.DATA.CLASS_NUM
    aus = ['AU'+str(au_name) for au_name in config.DATA.AU_LIST]
    with open(config.DATA.SOURCE.TRAIN_LIST, 'r') as f:
        reader = csv.DictReader(f)
        labels = np.array([[int(row[au]) for au in aus] for row in reader])
        all_ = [len(labels)] * AUs_num
        positive = np.sum(labels, 0)

    negative = np.array(all_) - positive

    weight_cls = WeightNorm(positive.tolist(), negative.tolist())
    norm_weight = weight_cls.normalize()
    norm_weight =  np.array(norm_weight)
    norm_weight = norm_weight.tolist()
    norm_weight = torch.FloatTensor(norm_weight)
    return norm_weight

# 修改tensor的维度的函数

In [None]:
import torch

In [None]:
# 1.reshape

x1 = torch.randn((128, 512, 7, 7))
x2 = torch.randn((128, 512, 7, 7))

In [None]:
torch.stack([x1, x2]).shape  # (2, 128, 512, 7, 7)

# 余弦退火算法

In [None]:
model = nn.Linear(256, 5)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.1)

epochs = 100

scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

In [None]:
y = [scheduler.get_last_lr()[0]]
print(f"Initial Learning Rate: {scheduler.get_last_lr()[0]:.6f}")
for epoch in range(epochs):
    # 训练模型
    optimizer.step()
    
    # 更新学习率
    scheduler.step()
    
    # 打印当前学习率
    print(f"Epoch {epoch+1}/{epochs}, Learning Rate: {scheduler.get_last_lr()[0]:.6f}")
    y.append(scheduler.get_last_lr()[0])

In [None]:
plt.plot(range(epochs+1), y)

# Transforming and augmenting images

In [None]:
import torch
from torchvision.transforms import v2

In [None]:
H, W = 32, 32
img = torch.randint(0, 256, size=(3, H, W), dtype=torch.uint8)

In [None]:
plt.imshow(img.permute(1, 2, 0))

# Cross Entropy

In [None]:
input = torch.randn(3, 3, requires_grad=True)
target = torch.randint(3, (3,), dtype=torch.int64)
loss = F.cross_entropy(input, target, reduction='none')
loss

In [None]:
input

In [None]:
target

In [None]:
target = torch.randint(2, (3, 3), dtype=torch.int64)

In [None]:
target.bool()

In [None]:
row_1 = input[0]

In [None]:
row_1_log_softmax = F.log_softmax(input)
row_1_log_softmax

In [None]:
F.nll_loss(row_1_log_softmax, target, reduction='none')

In [None]:
target

In [None]:
target /target.sum(dim=1).unsqueeze(1)

In [None]:
target.sum(dim=1)

In [None]:
row_1_log_softmax[target.bool()]

In [None]:
pos = -torch.where(target.bool(), row_1_log_softmax, torch.tensor(0))

In [None]:
pos

In [None]:
pos.sum(dim=1) 

In [None]:
pos.sum(dim=1)  / target.sum(dim=1)

In [None]:
pos.mean()

In [None]:
torch.div(12, 0+1e-8)

In [None]:
import torch
import pdb

In [None]:
def compare_matrix_rows_no_loop(pseudo_label, labels):
    row_equal = torch.eq(pseudo_label, labels).all(dim=1) # 比较每一行
    count = torch.sum(row_equal).item() #计算相等行数
    print(f"Count: {count}")
    return count

In [None]:

pseudo_label = torch.tensor([[0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
                             [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
                             [1., 0., 0., 0., 0., 0., 0., 0., 0., 0.]], device='cuda:0')
labels = torch.tensor([[0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
                       [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
                       [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.]], device='cuda:0')

compare_matrix_rows_no_loop(pseudo_label, labels)

# CLIP

In [11]:
import clip
import torch

In [12]:
dir(clip)

['__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__spec__',
 'available_models',
 'clip',
 'load',
 'model',
 'simple_tokenizer',
 'tokenize']

In [13]:
clip.available_models()

['RN50',
 'RN101',
 'RN50x4',
 'RN50x16',
 'RN50x64',
 'ViT-B/32',
 'ViT-B/16',
 'ViT-L/14',
 'ViT-L/14@336px']

In [14]:
# RN50中没有dropout
# 参数冻结的CLIP模型中
clip_model, _ = clip.load('RN101', device='cpu')

In [15]:
data_input = torch.randn(1, 3, 224, 224)

In [16]:
x1, x2, x3, x4, out = clip_model.visual(data_input)

In [18]:
print(f"The shape of x1 is: {x1.shape}")
print(f"The shape of x2 is: {x2.shape}")
print(f"The shape of x3 is: {x3.shape}")
print(f"The shape of x4 is: {x4.shape}")

The shape of x1 is: torch.Size([1, 256, 56, 56])
The shape of x2 is: torch.Size([1, 512, 28, 28])
The shape of x3 is: torch.Size([1, 1024, 14, 14])
The shape of x4 is: torch.Size([1, 2048, 7, 7])


In [None]:
# clip_model

In [None]:
# clip_model
# clip
# ----visual
# ----transformer
# ----token_embedding
# ----ln_final

In [None]:
for module in clip_model.modules():
    if isinstance(module, nn.Dropout):
        print(f"{module}")

In [None]:
# clip_model.visual

In [None]:
images = torch.randn(1, 3, 224, 224)
output = clip_model.encode_image(images)
output.shape

# 学习率调度策略

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
lr_list = []

epochs = 30
model = nn.Linear(200, 10)
optimizer = optim.SGD(model.parameters(), lr=0.1)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
for epoch in range(epochs):
    print(f"E({epoch}): lr:{scheduler.get_last_lr()}")
    lr_list.append(scheduler.get_last_lr()[0])
    scheduler.step()

In [None]:
plt.plot(lr_list)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
import matplotlib.pyplot as plt

# 假设你已经定义好了模型、数据集和数据加载器
# 这里的 dataloader 是一个模拟，实际使用时请替换
class DummyDataset(torch.utils.data.Dataset):
    def __len__(self):
        return 100 # 假设每个 epoch 有 100 个 batch/iteration

    def __getitem__(self, idx):
        return torch.randn(10), torch.randint(0, 2, (1,)) # 假的输入和标签

train_dataloader = torch.utils.data.DataLoader(DummyDataset(), batch_size=10) # 假设 batch_size 为 10

# 1. 定义超参数
total_epochs = 50 # 总的训练 epoch 数，请根据你的任务调整
initial_lr = 0.0001 # 直接从这个学习率开始衰减
eta_min_cosine = 0 # Cosine Annealing 衰减到的最小学习率 (可以设为0或一个很小的值)

# 假设模型的参数数量为 10
dummy_params = [torch.randn(10, 1, requires_grad=True)]

# 2. 初始化优化器
# 优化器直接使用初始学习率
optimizer = optim.Adam(dummy_params, lr=initial_lr)

# 3. 计算总的迭代步数
steps_per_epoch = len(train_dataloader)
total_iterations = total_epochs * steps_per_epoch

print(f"Steps per epoch: {steps_per_epoch}")
print(f"Total iterations: {total_iterations}")

# 4. 初始化 CosineAnnealingLR 调度器
# T_max 是 Cosine Annealing 周期所需的总步数
scheduler = CosineAnnealingLR(optimizer, T_max=total_iterations, eta_min=eta_min_cosine)

# 5. 训练循环
global_step = 0 # 全局迭代步数计数器
lr_history_cosine = [] # 用于记录学习率变化

print("\nSimulating training steps with Cosine Annealing Decay:")
for epoch in range(total_epochs):
    # model.train() # 设置模型为训练模式
    # for i, (inputs, labels) in enumerate(train_dataloader):
    for i in range(steps_per_epoch): # 使用模拟的循环
        # 前向传播、计算损失、反向传播 (模拟这些步骤)
        # optimizer.zero_grad()
        # loss.backward()

        optimizer.step() # 模拟参数更新
        optimizer.zero_grad() # 模拟梯度清零

        # --- 调度器更新 ---
        # 在 optimizer.step() 之后调用 scheduler.step()
        scheduler.step()

        # 记录当前学习率
        current_lr = optimizer.param_groups[0]['lr']
        lr_history_cosine.append(current_lr)

        # 递增全局步数计数器
        global_step += 1

        # 打印当前学习率 (可选，用于调试)
        # if (global_step) % 10 == 0 or global_step <= 5 or global_step > total_iterations - 5:
        #      print(f"Step {global_step}: LR = {current_lr:.8f}")

    print(f"Epoch {epoch} finished. Current LR: {optimizer.param_groups[0]['lr']:.8f}")

print("\nCosine Annealing Decay Simulation finished.")

# 6. 可视化学习率变化
plt.figure(figsize=(12, 6))
plt.plot(range(1, total_iterations + 1), lr_history_cosine)
plt.title('Learning Rate Schedule: Cosine Annealing Decay')
plt.xlabel('Steps')
plt.ylabel('Learning Rate')
plt.grid(True)
plt.show()

# 领域自适应中遍历数据集的写法

In [None]:
from torch.utils.data import Dataset, DataLoader

In [None]:
np.arange(10)

In [None]:
class DataSetTest(Dataset):
    
    def __init__(self, data, label):
        super().__init__()
        self.data = data
        self.label = label
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        result = self.data[index]
        return result, self.label

In [None]:
source_dataset = DataSetTest(np.arange(50), 0)
target_dataset = DataSetTest(np.arange(120), 1)

In [None]:
source_loader = DataLoader(source_dataset, batch_size=25)
target_loader = DataLoader(target_dataset, batch_size=25)

In [None]:
for x,y in source_loader:
    print(f"{x}")

In [None]:
for x,y in target_loader:
    print(f"{x}")

In [None]:
# 这种办法不行的
for x,y in zip(source_loader, target_loader):
    print(f"{y}")

In [None]:
# 在一个迭代器耗尽时，重新创建该迭代器。
source_iter = iter(source_loader)
for batch_idx, (x_t, y_t) in enumerate(target_loader): # 放target以尽可能的利用目标域的无标签数据
    try:
        x_s, y_s = next(source_iter)
    except:
        source_iter = iter(source_loader)
        x_s, y_s = next(source_iter)
    print(f"batch: {batch_idx}")
    print(f"t_x: {x_t}")
    print(f"s_x: {x_s}")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [None]:
acc_matrix = np.load('/mnt/sda/yiren/code/uda/Unsupervised-Domain-Adaptation/results/gft2bp4d+/two_train_cross_fold_3_seed_1000_date_2025-05-10_12_16-59_PM/acc_matrix.npy')

In [None]:
for acc in acc_matrix:
    print(acc)

In [None]:
plt.figure(figsize=(8, 6)) # 设置图像大小
sns.heatmap(
    acc_matrix*100,
    annot=True,        # 在单元格上显示数值
    fmt=".2f",         # 格式化字符串，.2f 表示保留两位小数的浮点数
    cmap="viridis",    # 选择一个颜色映射 (e.g., "viridis", "coolwarm", "YlGnBu")
    linewidths=.5,     # 单元格之间的线条宽度
    cbar=True,         # 显示颜色条
    square=True        # 使单元格为正方形 (如果行数和列数相近)
)
plt.title("Heatmap of acc_matrix (保留两位小数)")
plt.xlabel("列索引 (Column Index)") # 或者你的列标签
plt.ylabel("行索引 (Row Index)")   # 或者你的行标签

# 如果你有行和列的标签，可以这样设置：
# row_labels = [f"Row {i+1}" for i in range(acc_matrix.shape[0])]
# col_labels = [f"Col {j+1}" for j in range(acc_matrix.shape[1])]
# sns.heatmap(..., xticklabels=col_labels, yticklabels=row_labels)

plt.show()

# 伪标签筛选

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [None]:
out_std_list = np.load('/mnt/sda/yiren/code/uda/Unsupervised-Domain-Adaptation/results/disfa2bp4d/two_train_cross_fold_2_seed_1000_date_2025-05-09_05_12-06_PM/out_std_list.npy', allow_pickle=True)
out_prob_list = np.load('/mnt/sda/yiren/code/uda/Unsupervised-Domain-Adaptation/results/disfa2bp4d/two_train_cross_fold_2_seed_1000_date_2025-05-09_05_12-06_PM/out_prob_list.npy', allow_pickle=True)

images_list = np.load('/mnt/sda/yiren/code/uda/Unsupervised-Domain-Adaptation/results/disfa2bp4d/two_train_cross_fold_2_seed_1000_date_2025-05-09_05_12-06_PM/images_list.npy', allow_pickle=True)
truth_labels = np.load('/mnt/sda/yiren/code/uda/Unsupervised-Domain-Adaptation/results/disfa2bp4d/two_train_cross_fold_2_seed_1000_date_2025-05-09_05_12-06_PM/truth_labels.npy', allow_pickle=True)
pseudo_labels = np.load('/mnt/sda/yiren/code/uda/Unsupervised-Domain-Adaptation/results/disfa2bp4d/two_train_cross_fold_2_seed_1000_date_2025-05-09_05_12-06_PM/pseudo_labels.npy', allow_pickle=True)

acc_matrix = np.load('/mnt/sda/yiren/code/uda/Unsupervised-Domain-Adaptation/results/disfa2bp4d/two_train_cross_fold_2_seed_1000_date_2025-05-09_05_12-06_PM/acc_matrix.npy', allow_pickle=True)

In [None]:
out_std_list.shape, out_prob_list.shape, images_list.shape, truth_labels.shape, pseudo_labels.shape, acc_matrix.shape

In [None]:
# Number of completelpy correct samples: 36004
# Proportion of completely correct samples: 0.38669903121173715
# accuracy_per_au: [0.7886388  0.8292054  0.81597316 0.78358    0.8070801 ]

In [None]:
np.max(out_std_list), np.min(out_std_list)

In [None]:
tau_p = 0.7
kappa_p = 0.05


pseudo_labels = (out_std_list<=kappa_p)*(out_prob_list>=tau_p)
n_correct_pos = (truth_labels*pseudo_labels).sum(0)
n_correct_pos / truth_labels.sum(0)

# 伪标签准确率
completely_correct_samples_mask = np.all(pseudo_labels == truth_labels, axis=1)
num_completely_correct_samples = np.sum(completely_correct_samples_mask)
pl_acc = num_completely_correct_samples/len(pseudo_labels)
print(f"伪标签准确率: {pl_acc}")
print(f"真实标签中值为1且预测为1的准确率：{n_correct_pos / truth_labels.sum(0)}")

In [None]:
tau_p = 0.7
kappa_p = 0.05
tau_n = 0.05
kappa_n = 0.005

mean_probs = out_prob_list
std_probs = out_std_list
# [batch_size, num_classes] boolean tensor for selected positive pseudo-labels
# 条件1: 置信度足够高
cond_confidence_pos = (mean_probs >= tau_p)
# 条件2: 不确定性足够低 (如果启用)
cond_uncertainty_pos = (std_probs < kappa_p)
selected_positive_pseudo_labels_mask = cond_confidence_pos & cond_uncertainty_pos

# selected_positive_pseudo_labels_mask 现在是一个布尔张量，True表示该类别被选为正伪标签

# --- 2. 负伪标签筛选 (Negative Pseudo-label Selection) ---

# [batch_size, num_classes] boolean tensor for selected negative pseudo-labels
# 条件1: (负)置信度足够高，即存在概率足够低
cond_confidence_neg = (mean_probs < tau_n)
# 条件2: 不确定性足够低 (如果启用)
cond_uncertainty_neg = (std_probs < kappa_n) # Assuming std_probs for negative is calculated similarly
selected_negative_pseudo_labels_mask = cond_confidence_neg & cond_uncertainty_neg

# 冲突解决: 如果一个类别被选为正，则不能被选为负
selected_negative_pseudo_labels_mask[selected_positive_pseudo_labels_mask] = False


# --- 3. 评估伪标签质量 (Converting to NumPy for your snippet's logic if needed) ---
# 为了使用您提供的 NumPy 代码段，我们将 PyTorch 张量转换为 NumPy 数组
# 注意：在实际训练循环中，尽量保持数据在 GPU 上的 PyTorch 张量中以提高效率

# Positive pseudo-labels
pseudo_labels_pos_np = selected_positive_pseudo_labels_mask
truth_labels_np = truth_labels # 确保 truth_labels_tensor 是你的真实标签

# Negative pseudo-labels
pseudo_labels_neg_np = selected_negative_pseudo_labels_mask


# --- 评估正伪标签 ---
# "真实标签中值为1且预测为1的准确率" (This is essentially Precision for the positive class predictions)
# More accurately, this is recall of positive labels among those selected as pseudo-labels,
# or precision of pseudo-labels if we consider pseudo_labels_pos_np as predictions.
# Let's re-interpret based on common metrics.

# True Positives (TP): GT is 1, Pseudo is 1
tp_pos = (truth_labels_np == 1) & (pseudo_labels_pos_np == 1)
# Predicted Positives (PP): Pseudo is 1 (i.e., selected as positive pseudo-label)
pp_pos = (pseudo_labels_pos_np == 1)
# Actual Positives (AP): GT is 1
ap_pos = (truth_labels_np == 1)

# Precision of positive pseudo-labels: Of all labels chosen as positive pseudo-labels, how many were actually positive?
# (Sum over all samples and classes)
precision_positive_pseudo = tp_pos.sum() / pp_pos.sum() if pp_pos.sum() > 0 else 0.0
print(f"正伪标签精确率 (Precision for positive pseudo-labels): {precision_positive_pseudo * 100:.2f}%")

# Recall of positive pseudo-labels: Of all actual positive labels, how many were selected as positive pseudo-labels?
recall_positive_pseudo = tp_pos.sum() / ap_pos.sum() if ap_pos.sum() > 0 else 0.0
print(f"正伪标签召回率 (Recall for positive pseudo-labels): {recall_positive_pseudo * 100:.2f}%")


# "伪标签准确率" (Sample-level exact match for positive pseudo-labels)
# This metric considers a sample's positive pseudo-labels entirely correct ONLY IF the selected positive pseudo-label mask
# for that sample exactly matches the ground truth mask FOR THE POSITIONS WHERE PSEUDO-LABELS WERE MADE.
# Or, if you mean exact match of the full multi-hot vector:
# For positive pseudo-labels, this is tricky. `pseudo_labels_pos_np` only marks selected positives.
# A more meaningful sample-level accuracy for multi-label is often Jaccard index or Hamming accuracy.
# Your definition "completely_correct_samples_mask = np.all(pseudo_labels == truth_labels, axis=1)"
# implies comparing the *full* pseudo_label mask (which might include 0s where no decision was made or decision was negative)
# with the *full* truth_label mask.
# Let's define `pseudo_labels` as a combined decision for clarity for this metric.
# For simplicity, let's evaluate based on the *selected positive pseudo-labels* only.
# If you want an overall pseudo-label (combining positive and implicitly negative where not positive):
combined_pseudo_for_acc = pseudo_labels_pos_np # Where 1 is positive, 0 is not selected as positive.
completely_correct_samples_mask_pos = np.all(combined_pseudo_for_acc == truth_labels_np, axis=1)
num_completely_correct_samples_pos = np.sum(completely_correct_samples_mask_pos)
pl_acc_sample_level_pos = num_completely_correct_samples_pos / len(truth_labels_np) if len(truth_labels_np) > 0 else 0.0
print(f"基于正伪标签的样本级完全匹配准确率: {pl_acc_sample_level_pos * 100:.2f}%")
# This sample-level exact match is very strict for multi-label.


# --- 评估负伪标签 ---
# True Negatives (TN): GT is 0, Pseudo_neg is 1 (selected as negative)
tn_neg = (truth_labels_np == 0) & (pseudo_labels_neg_np == 1)
# Predicted Negatives (PN_sel): Pseudo_neg is 1 (selected as negative)
pn_sel_neg = (pseudo_labels_neg_np == 1)
# Actual Negatives (AN): GT is 0
an_neg = (truth_labels_np == 0)

# Accuracy of negative pseudo-labels: Of all labels chosen as negative pseudo-labels, how many were actually negative?
accuracy_negative_pseudo = tn_neg.sum() / pn_sel_neg.sum() if pn_sel_neg.sum() > 0 else 0.0
print(f"负伪标签准确率 (Accuracy for negative pseudo-labels): {accuracy_negative_pseudo * 100:.2f}%")

# Recall for negative pseudo-labels (how many of the true negatives were captured by negative pseudo-labeling)
recall_negative_pseudo = tn_neg.sum() / an_neg.sum() if an_neg.sum() > 0 else 0.0
print(f"负伪标签召回率 (Recall for negative pseudo-labels): {recall_negative_pseudo * 100:.2f}%")


# --- 整合到返回字典 (示例) ---
# 筛选后的正伪标签及其原始索引
# (假设你有一个 `indexs_tensor` 对应当前批次的原始样本索引)
# samples_with_any_pos_pseudo = selected_positive_pseudo_labels_mask.any(dim=1)
# selected_pos_indices_in_batch = samples_with_any_pos_pseudo.nonzero(as_tuple=True)[0]
# original_indices_for_pos = indexs_tensor[selected_pos_indices_in_batch]
# final_positive_pseudo_targets = selected_positive_pseudo_labels_mask[selected_pos_indices_in_batch]

# 筛选后的负伪标签及其原始索引
# samples_with_any_neg_pseudo = selected_negative_pseudo_labels_mask.any(dim=1)
# selected_neg_indices_in_batch = samples_with_any_neg_pseudo.nonzero(as_tuple=True)[0]
# original_indices_for_neg = indexs_tensor[selected_neg_indices_in_batch]
# final_negative_pseudo_masks = selected_negative_pseudo_labels_mask[selected_neg_indices_in_batch]

# pseudo_label_dict = {
#     'pseudo_idx_positive': original_indices_for_pos.cpu().tolist() if samples_with_any_pos_pseudo.any() else [],
#     'pseudo_target_positive': final_positive_pseudo_targets.cpu().tolist() if samples_with_any_pos_pseudo.any() else [],
#     'pseudo_idx_negative': original_indices_for_neg.cpu().tolist() if samples_with_any_neg_pseudo.any() else [],
#     'pseudo_mask_negative': final_negative_pseudo_masks.cpu().tolist() if samples_with_any_neg_pseudo.any() else []
# }

# print("Generated pseudo_label_dict:", pseudo_label_dict)

In [None]:
pseudo_labels_pos_np.shape

# 真实标签的分布

In [None]:
truth_labels_au1 = truth_labels[:, 0]
truth_labels_au2 = truth_labels[:, 1]
truth_labels_au4 = truth_labels[:, 2]
truth_labels_au6 = truth_labels[:, 3]
truth_labels_au12 = truth_labels[:, 4]

truth_labels_au1_num_0 = np.sum(truth_labels_au1 == 0)
truth_labels_au1_num_1 = np.sum(truth_labels_au1 == 1)

truth_labels_au2_num_0 = np.sum(truth_labels_au2 == 0)
truth_labels_au2_num_1 = np.sum(truth_labels_au2 == 1)

truth_labels_au4_num_0 = np.sum(truth_labels_au4 == 0)
truth_labels_au4_num_1 = np.sum(truth_labels_au4 == 1)

truth_labels_au6_num_0 = np.sum(truth_labels_au6 == 0)
truth_labels_au6_num_1 = np.sum(truth_labels_au6 == 1)

truth_labels_au12_num_0 = np.sum(truth_labels_au12 == 0)
truth_labels_au12_num_1 = np.sum(truth_labels_au12 == 1)

species = ('AU 1', 'AU 2', 'AU 4', 'AU 6', 'AU 12')
sex_counts = {
    'number 0': np.array([truth_labels_au1_num_0,
                          truth_labels_au2_num_0,
                          truth_labels_au4_num_0,
                          truth_labels_au6_num_0,
                          truth_labels_au12_num_0]),
    'number 1': np.array([truth_labels_au1_num_1,
                          truth_labels_au2_num_1,
                          truth_labels_au4_num_1,
                          truth_labels_au6_num_1,
                          truth_labels_au12_num_1]),
}
width = 0.6  # the width of the bars: can also be len(x) sequence


fig, ax = plt.subplots()
bottom = np.zeros(5)

for sex, sex_count in sex_counts.items():
    p = ax.bar(species, sex_count, width, label=sex, bottom=bottom)
    bottom += sex_count

    ax.bar_label(p, label_type='center')

ax.set_title('Number of penguins by sex')
ax.legend()

plt.show()

# 伪标签的分布

In [None]:
truth_labels_au1 = pseudo_labels[:, 0]
truth_labels_au2 = pseudo_labels[:, 1]
truth_labels_au4 = pseudo_labels[:, 2]
truth_labels_au6 = pseudo_labels[:, 3]
truth_labels_au12 = pseudo_labels[:, 4]

truth_labels_au1_num_0 = np.sum(truth_labels_au1 == 0)
truth_labels_au1_num_1 = np.sum(truth_labels_au1 == 1)

truth_labels_au2_num_0 = np.sum(truth_labels_au2 == 0)
truth_labels_au2_num_1 = np.sum(truth_labels_au2 == 1)

truth_labels_au4_num_0 = np.sum(truth_labels_au4 == 0)
truth_labels_au4_num_1 = np.sum(truth_labels_au4 == 1)

truth_labels_au6_num_0 = np.sum(truth_labels_au6 == 0)
truth_labels_au6_num_1 = np.sum(truth_labels_au6 == 1)

truth_labels_au12_num_0 = np.sum(truth_labels_au12 == 0)
truth_labels_au12_num_1 = np.sum(truth_labels_au12 == 1)

species = ('AU 1', 'AU 2', 'AU 4', 'AU 6', 'AU 12')
sex_counts = {
    'number 0': np.array([truth_labels_au1_num_0,
                          truth_labels_au2_num_0,
                          truth_labels_au4_num_0,
                          truth_labels_au6_num_0,
                          truth_labels_au12_num_0]),
    'number 1': np.array([truth_labels_au1_num_1,
                          truth_labels_au2_num_1,
                          truth_labels_au4_num_1,
                          truth_labels_au6_num_1,
                          truth_labels_au12_num_1]),
}
width = 0.6  # the width of the bars: can also be len(x) sequence


fig, ax = plt.subplots()
bottom = np.zeros(5)

for sex, sex_count in sex_counts.items():
    p = ax.bar(species, sex_count, width, label=sex, bottom=bottom)
    bottom += sex_count

    ax.bar_label(p, label_type='center')

ax.set_title('Number of penguins by sex')
ax.legend()

plt.show()

In [None]:
n_correct_pos

In [None]:
truth_labels.sum(0)

# 深度可分离卷积

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torchinfo import summary

In [None]:
data_input = torch.randn(1, 512, 224, 224).cuda()
d_conv = nn.Conv2d(512, 256, kernel_size=3, padding=3//2,groups=256).cuda()
n_conv = nn.Conv2d(256, 256, kernel_size=1)

In [None]:
print(summary(d_conv, input_size=(1, 512, 56, 56)))

In [None]:
print(summary(n_conv, input_size=(1, 256, 56, 56)))

In [None]:
class VisionAdapterV2(nn.Module):
    """Applies a 1x1 Conv, BatchNorm, ReLU, and AvgPool."""
    def __init__(self, in_dim, out_dim, pool_kernel, pool_stride):
        super().__init__()
        
        self.bn = nn.BatchNorm2d(in_dim)
        self.dw_conv1 = nn.Conv2d(in_dim, in_dim, kernel_size=3, padding=3 // 2, groups=in_dim)
        self.dw_conv2 = nn.Conv2d(in_dim, in_dim, kernel_size=5, padding=5 // 2, groups=in_dim)
        self.dw_conv3 = nn.Conv2d(in_dim, in_dim, kernel_size=7, padding=7 // 2, groups=in_dim)
        
        self.conv1x1 = nn.Conv2d(in_dim, out_dim, kernel_size=1)
        
        self.relu = nn.ReLU(inplace=True)
        # Ensure pool output size is consistent if needed, e.g., target 7x7
        # Simple AvgPool is used here as in the original code.
        self.pool = nn.AvgPool2d(kernel_size=pool_kernel, stride=pool_stride)

    def forward(self, x):
        x = self.bn(x)
        identity = x
        conv1_x = self.dw_conv1(x)
        conv2_x = self.dw_conv2(x)
        conv3_x = self.dw_conv3(x)
        
        x = (conv1_x + conv2_x + conv3_x) / 3.0 + identity
        
        x = self.conv1x1(x)
        x = self.relu(x)
        x = self.pool(x)
        
        return x

In [None]:
x = torch.randn(1, 2048, 7, 7)
in_dim = 2048
out_dim = 256

In [None]:
adapter = VisionAdapterV2(in_dim, out_dim, pool_kernel=1, pool_stride=1)

In [None]:
adapter(x).shape

# 池化

In [None]:
x = torch.randn(1, 1, 3, 3)

In [None]:
pool = nn.AdaptiveAvgPool2d((1, 1))

In [None]:
x

In [None]:
pool(x)

# Tensorboard

In [None]:
import torch
from torch.utils.tensorboard import SummaryWriter

In [None]:
writer = SummaryWriter(log_dir=f'res/disfa')

In [None]:
writer.

In [None]:
x = torch.arange(-5, 5, 0.1).view(-1, 1)
y = -5 * x + 0.1 * torch.randn(x.size())

model = torch.nn.Linear(1, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr = 0.1)

def train_model(iter):
    for epoch in range(iter):
        y1 = model(x)
        loss = criterion(y1, y)
        writer.add_scalar("Loss/train", loss, epoch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

train_model(10)
writer.flush()

In [None]:
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import time

writer = SummaryWriter('runs/method_one_example')

num_epochs = 3
steps_per_epoch = 5

for epoch in range(num_epochs):
    for i in range(steps_per_epoch):
        # 模拟训练损失逐渐下降
        loss = np.exp(-(epoch * steps_per_epoch + i) / 5.0)
        global_step = epoch * steps_per_epoch + i
        writer.add_scalar('Loss/train', loss, global_step)
        time.sleep(0.1) # 模拟训练过程

writer.close()