In [1]:
from dataset import CarbonDataset
from torch.utils.data import DataLoader
from torchvision import transforms
import torch
import torch.utils.data as data
from torch.utils.data.sampler import WeightedRandomSampler


fp = "Dataset/Training/image/AP25_Forest_IMAGE"

label_size = 256//2
label_transform = transforms.Compose([
    transforms.Resize((label_size, label_size)),  # 라벨 크기 조정
])
    
# 데이터셋을 위한 변환 정의
image_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])
sh_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])
# 데이터셋 및 데이터 로더 생성
dataset1 = CarbonDataset(fp, image_transform, sh_transform, label_transform,mode="Train")
dataset2 = CarbonDataset(fp, image_transform,sh_transform, label_transform,mode="Valid")
# 데이터 크기 기반 샘플링 비율 계산
num_samples1 = len(dataset1)
num_samples2 = len(dataset2)
sample_ratio = num_samples2 / num_samples1

# WeightedRandomSampler 사용
sampler1 = torch.utils.data.sampler.SequentialSampler(dataset1)
sampler2 = WeightedRandomSampler(weights=[1.0] * num_samples2, num_samples=int(num_samples1 * sample_ratio))

dataloader1 = DataLoader(dataset1, batch_size=1, shuffle=True,num_workers=8,pin_memory=True,sampler=sampler1)
dataloader2 = DataLoader(dataset2, batch_size=1, shuffle=False,num_workers=8,pin_memory=True,sampler=sampler2)

for batch1, batch2 in zip(dataloader1, dataloader2):
    image1, sh1 , carbon1 , gt1 = batch1
    image2, sh2 , carbon2 , gt2 = batch2

    patches1 = extract_patches(image1, 4)
    patches2 = extract_patches(image2, 4)

    shuffled_patches1 = patches1[torch.randperm(patches1.size(0))]
    shuffled_patches2 = patches2[torch.randperm(patches2.size(0))]

    # 셔플된 패치로 새로운 배치 구성
    new_batch = torch.cat((shuffled_patches1, shuffled_patches2), dim=0)
    new_labels = torch.cat((labels1, labels2), dim=0)


Training Set
Dataset/Training/image/AP25_Forest_IMAGE Done./
Dataset/Training/image/AP25_Forest_SH Done./
Dataset/Training/label/AP25_Forest_Carbon Done./
Dataset/Training/label/AP25_Forest_GT Done./
Validation Set
Dataset/Validation/image/AP25_Forest_IMAGE Done./
Dataset/Validation/image/AP25_Forest_SH Done./
Dataset/Validation/label/AP25_Forest_Carbon Done./
Dataset/Validation/label/AP25_Forest_GT Done./
[tensor([[[[0.2941, 0.3216, 0.3647,  ..., 0.3608, 0.3725, 0.4157],
          [0.3059, 0.3176, 0.3255,  ..., 0.3725, 0.4000, 0.4196],
          [0.3294, 0.3608, 0.3569,  ..., 0.3765, 0.3961, 0.4235],
          ...,
          [0.3137, 0.2824, 0.2824,  ..., 0.4431, 0.3961, 0.3294],
          [0.2706, 0.2824, 0.2863,  ..., 0.3922, 0.4745, 0.4196],
          [0.2588, 0.2863, 0.2941,  ..., 0.4745, 0.4118, 0.3451]],

         [[0.3608, 0.3882, 0.4275,  ..., 0.3608, 0.3725, 0.4078],
          [0.3647, 0.3765, 0.3843,  ..., 0.3725, 0.3961, 0.4118],
          [0.3843, 0.4118, 0.4039,  ..., 0.3

In [44]:
import torch.nn.functional as F

def swap_random_patches(image_batch1, image_batch2, patch_size):
    """
    두 이미지 배치에서 특정 패치들을 랜덤하게 선택하여 서로 교환한 후 배치 차원으로 합친 후 반환하는 함수.

    Args:
        image_batch1 (torch.Tensor): 첫 번째 이미지 배치 텐서 (B, C, H, W).
        image_batch2 (torch.Tensor): 두 번째 이미지 배치 텐서 (B, C, H, W).
        patch_size (tuple): 추출할 패치의 크기를 나타내는 튜플 (h, w).

    Returns:
        torch.Tensor: 패치를 교환하고 배치 차원으로 합친 이미지 배치 텐서.
    """
    # 이미지 배치를 unfold하여 패치 추출
    unfolded1 = F.unfold(image_batch1, kernel_size=patch_size, stride=patch_size)
    unfolded2 = F.unfold(image_batch2, kernel_size=patch_size, stride=patch_size)

    # 랜덤한 패치 인덱스를 생성
    num_patches = unfolded1.size(1)
    permuted_indices = torch.randperm(num_patches)

    # 두 이미지의 패치를 서로 교환
    unfolded1_swapped = unfolded1.clone()
    unfolded2_swapped = unfolded2.clone()
    unfolded1_swapped[:, permuted_indices, :] = unfolded2[:, permuted_indices, :]
    unfolded2_swapped[:, permuted_indices, :] = unfolded1[:, permuted_indices, :]

    # 패치를 이미지 배치로 복원
    restored_batch1 = F.fold(unfolded1_swapped, output_size=image_batch1.shape[-2:], kernel_size=patch_size, stride=patch_size)
    restored_batch2 = F.fold(unfolded2_swapped, output_size=image_batch2.shape[-2:], kernel_size=patch_size, stride=patch_size)

    # 두 이미지 배치를 배치 차원으로 합침
    swapped_and_concatenated_batch = torch.cat((restored_batch1, restored_batch2), dim=0)

    return swapped_and_concatenated_batch

# 사용 예제
image_batch1 = torch.zeros(2, 3, 256, 256)  # 첫 번째 이미지 배치 (2개의 이미지, 3 채널, 256x256)
image_batch2 = torch.ones(2, 3, 256, 256)  # 두 번째 이미지 배치 (2개의 이미지, 3 채널, 256x256)
patch_size = (4, 4)  # 추출할 패치의 크기

# 함수 호출하여 이미지 배치에서 랜덤하게 패치 교환하고 배치 차원으로 합침
swapped_and_concatenated_batch = swap_random_patches(image_batch1, image_batch2, patch_size)

# 확인을 위해 교환 및 합친 후의 이미지 배치의 크기 출력
print(swapped_and_concatenated_batch.size())  # 결과: torch.Size([4, 3, 256, 256])
print(swapped_and_concatenated_batch)

torch.Size([4, 3, 256, 256])
tensor([[[[1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          ...,
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.]],

         [[1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          ...,
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.]],

         [[1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          ...,
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.]]],


        [[[1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          [1., 1., 1.,  ..., 1., 1., 1.],
          ...,
          [1., 1., 1.