In [None]:
from typing import NewType
import argparse
import os
import shutil
import time
import numpy as np
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim as optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.models as models
import matplotlib.pyplot as plt
import math
from tqdm import tqdm
import random
from torch.utils.data.sampler import SubsetRandomSampler as Subset
from copy import deepcopy
import torch.nn.functional as F
import pandas as pd
from sklearn.metrics import mean_absolute_error, r2_score
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer, mean_squared_error
import gc

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# seed 고정
seed = 1234
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
momemtum = 0.9

In [None]:
import torch
import pandas as pd
import tifffile as tiff
import numpy as np

# Load the CSV file
labels_df = pd.read_csv('/content/drive/MyDrive/pro626/train/train_data.csv')

# Assuming the TIFF files are named with their patient ID and stored in 'images/' directory
def load_tiff_images_to_tensor(num_patients, num_channels=52, image_size=(150, 150)):
    # Initialize a tensor to hold the image data
    images_tensor = torch.zeros((num_patients, num_channels, *image_size))

    for patient_id in range(1, num_patients + 1):  # Assuming IDs start from 1 to 225
        image_path = f'/content/drive/MyDrive/pro626/train/images/{patient_id}.tiff'
        image_data = tiff.imread(image_path)  # This loads the multi-channel image
        image_tensor = torch.from_numpy(image_data).float()
        images_tensor[patient_id - 1] = image_tensor

    return images_tensor

# Load images into a tensor
num_patients = 225  # Total number of patients
images_tensor = load_tiff_images_to_tensor(num_patients)

# Check if the tensor dimensions match your expectation
print(images_tensor.shape)  # Should print torch.Size([225, 52, 150, 150])


In [None]:
images_tensor.shape

### Gaussian Blur
##### preprocessing

In [None]:
import torch
import torch.nn as nn
import math

class GaussianBlur(nn.Module):
    def __init__(self, kernel_size, sigma, in_channels):
        super(GaussianBlur, self).__init__()
        self.kernel_size = kernel_size
        self.sigma = sigma
        self.in_channels = in_channels
        self.padding = kernel_size // 2
        self.kernel = self.create_kernel(kernel_size, sigma, in_channels)

    def create_kernel(self, kernel_size, sigma, in_channels):
        # Create a x, y coordinate grid of shape (kernel_size, kernel_size, 2)
        x_coord = torch.arange(kernel_size)
        x_grid = x_coord.repeat(kernel_size).view(kernel_size, kernel_size)
        y_grid = x_grid.t()
        xy_grid = torch.stack([x_grid, y_grid], dim=-1).float()

        mean = (kernel_size - 1) / 2.
        variance = sigma ** 2.

        # Calculate the 2-dimensional gaussian kernel
        gaussian_kernel = (1. / (2. * math.pi * variance)) * \
                          torch.exp(
                              -torch.sum((xy_grid - mean) ** 2., dim=-1) / \
                              (2 * variance)
                          )
        # Make sure sum of values in gaussian kernel equals 1.
        gaussian_kernel = gaussian_kernel / torch.sum(gaussian_kernel)

        # Reshape to 2d depthwise convolutional weight
        gaussian_kernel = gaussian_kernel.view(1, 1, kernel_size, kernel_size)
        gaussian_kernel = gaussian_kernel.repeat(in_channels, 1, 1, 1)

        gaussian_filter = nn.Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
                                    groups=in_channels, bias=False, padding=self.padding)

        gaussian_filter.weight.data = gaussian_kernel
        gaussian_filter.weight.requires_grad = False

        return gaussian_filter

    def forward(self, x):
        return self.kernel(x)

# 이미지 텐서에 가우시안 블러 적용
# 여기서 images_tensor_normalized는 (N, C, H, W)의 형태를 가진 텐서이고, C는 채널 수입니다.
gaussian_blur = GaussianBlur(kernel_size=5, sigma=1.0, in_channels=52)
# images_tensor_blurred = gaussian_blur(images_tensor_normalized)


In [None]:
images_tensor_blurred = gaussian_blur(images_tensor)

In [None]:
# Convert survival times into a tensor
survival_times = torch.tensor(labels_df['OSmonth'].values).float()

# Each image tensor at index i corresponds to the survival time at index i in survival_times


# Normalizaiton

In [None]:
# Normalize the images tensor to have mean=0 and std=1
# Calculate the mean and std if not already known. Here, assuming the need to calculate:
mean = images_tensor.mean()
std = images_tensor.std()

# Normalize
images_tensor_normalized = (images_tensor - mean) / std

print(f"Mean: {images_tensor.mean()}, Std: {images_tensor.std()}")


# Splitting the Data

In [None]:
from torch.utils.data import TensorDataset, DataLoader, random_split

# Create a dataset from tensors
full_dataset = TensorDataset(images_tensor_normalized, survival_times)


# Data Loading and Training

In [None]:
class SEBlock(nn.Module):
    def __init__(self, channel, reduction=16):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

원래꺼

MSE: 1306.7742668761416

MAE: 30.26724149007083

R^2: 0.0696811053030808

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ImageRegressionCNN(nn.Module):
    def __init__(self, reduction = 16):
        super(ImageRegressionCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2)  # 입력 채널 3 (RGB), 출력 채널 16
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.se = SEBlock(16, reduction=16)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2) # 출력 채널 32
        self.fc1 = nn.Linear(32 * 37 * 37, 1024)  # 예시: 이미지 크기가 224x224이고, 두 번의 풀링을 거쳤을 경우
        self.fc2 = nn.Linear(1024, 128)  # 최종 출력: 예측 값 1개



    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.se(x)
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)  # Flatten
        x = F.relu(self.fc1(x))
        x = self.fc2(x)  # 회귀 문제이므로 마지막에 활성화 함수 사용하지 않음

        return x

새로운것

In [None]:
from torchvision import transforms

# 데이터 증강을 위한 변환 정의
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # 수평 반전
    transforms.RandomRotation(15),  # 최대 15도 회전
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # 최대 10% 이동
    transforms.RandomApply([transforms.GaussianBlur(3)]),  # 확률적으로 가우시안 블러 적용
])

# CustomDataset은 앞서 설명한 CustomDataset 클래스와 동일하며, transform 파라미터를 포함합니다.


In [None]:
full_dataset_size = len(full_dataset)
test_size = 0
train_val_size = full_dataset_size - test_size
# Splitting the dataset into training+validation and test

train_val_dataset, test_dataset = random_split(full_dataset, [train_val_size, test_size])



print(f"Size of training+validation set: {len(train_val_dataset)}")
print(f"Size of test set: {len(test_dataset)}")

In [None]:
class SingleChannelDataset(Dataset):
    def __init__(self, full_dataset, channel_index, transform=None, indices=None):
        """
        Args:
            full_dataset (Dataset): The complete dataset containing both images_tensor and labels.
            channel_index (int): The index of the channel to use.
            transform (callable, optional): Optional transform to be applied on a sample.
            indices (list of int, optional): List of indices to use from the full_dataset.
        """
        self.full_dataset = full_dataset
        self.channel_index = channel_index
        self.transform = transform
        self.indices = indices if indices is not None else range(len(full_dataset))

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        actual_idx = self.indices[idx]
        image, label = self.full_dataset[actual_idx]
        # Select the specific channel
        image = image[self.channel_index, :, :].unsqueeze(0)
        if self.transform:
            image = self.transform(image)
        return image, label


In [None]:
import torch
import torchvision.models as models
from torch.utils.data import DataLoader

class FeatureExtractor(torch.nn.Module):
    def __init__(self, pretrained_model):
        super(FeatureExtractor, self).__init__()
        # ResNet18의 avgpool과 fc 레이어 이전의 특징을 추출
        self.features = torch.nn.Sequential(*list(pretrained_model.children())[:-2])

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)  # Flatten the features
        return x

def extract_channel_features(dataset, num_channels):
    # 사전 학습된 resnet18 모델 불러오기
    pretrained_model = models.resnet18(pretrained=True)
    feature_extractor = FeatureExtractor(pretrained_model)
    feature_extractor.eval()  # 평가 모드로 설정

    # GPU 사용 설정 (가능한 경우)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    feature_extractor.to(device)

    extracted_features = []
    for channel_index in range(num_channels):
        channel_dataset = SingleChannelDataset(
            full_dataset=dataset,
            channel_index=channel_index,
            transform=None
        )

        data_loader = DataLoader(channel_dataset, batch_size=64, shuffle=False)

        channel_features = []
        channel_labels = []  # 채널별 레이블 수집을 위한 리스트
        with torch.no_grad():
            for inputs, labels in data_loader:
                inputs = inputs.repeat(1, 3, 1, 1)
                inputs = inputs.to(device)
                features = feature_extractor(inputs)
                channel_features.append(features.cpu().numpy())
                channel_labels.extend(labels.cpu().numpy())  # 수정된 부분: 레이블 저장

        # 현재 채널의 특징을 전체 리스트에 추가
        extracted_features.append(np.concatenate(channel_features, axis=0))

    # 수정된 부분: 모든 채널을 처리한 후에 레이블 리스트를 반환
    return np.concatenate(extracted_features, axis=1), np.array(channel_labels)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from torch.utils.data import Dataset, DataLoader, random_split, SubsetRandomSampler
from copy import deepcopy
import numpy as np
from sklearn.model_selection import train_test_split

# 이미지 데이터셋과 레이블이 준비되어 있다고 가정
# images_tensor: 전체 이미지 데이터 (N, C, H, W)
# survival_times: 각 이미지에 대한 레이블 (N,)

num_channels = 52  # 총 채널 수

extracted_features, extracted_labels = extract_channel_features(dataset=train_val_dataset, num_channels=52)
print(type(extracted_features))
print(len(extracted_features))
if len(extracted_features) > 0:
    print(type(extracted_features[0]))
    print(np.array(extracted_features[0]).shape)

X_train, X_test, y_train, y_test = train_test_split(
    extracted_features,  # 이미지로부터 추출된 특징 행렬을 직접 사용
    extracted_labels,    # 각 이미지 또는 데이터 포인트에 대응하는 레이블 또는 타겟 값
    test_size=0.2,       # 예: 테스트 데이터셋의 비율을 20%로 설정
    random_state=42      # 결과의 재현성을 위한 랜덤 시드 설정
)


In [None]:
# param_grid = {
#     'n_estimators': [50, 100, 150, 200],  # 트리의 수
#     'max_depth': [None, 10, 20, 30],  # 트리의 최대 깊이
#     'min_samples_split': [2, 5, 10],  # 분할을 위한 최소 샘플 수
#     'min_samples_leaf': [1, 2, 4],  # 리프 노드의 최소 샘플 수
#     'max_features': ['auto', 'sqrt']  # 각 분할에서 고려할 특성의 최대 수
# }
param_grid = {
    'n_estimators': [100],  # 트리의 수
    'max_depth': [30],  # 트리의 최대 깊이
    'min_samples_split': [5],  # 분할을 위한 최소 샘플 수
    'min_samples_leaf':  [4],  # 리프 노드의 최소 샘플 수
    'max_features': ['auto']  # 각 분할에서 고려할 특성의 최대 수
}
# MSE를 스코어링 함수로 사용
mse_scorer = make_scorer(mean_squared_error, greater_is_better=False)

# GridSearchCV 객체 생성
grid_search = GridSearchCV(RandomForestRegressor(random_state=42), param_grid, cv=3, scoring=mse_scorer, n_jobs=-1, verbose=2)

# 그리드 서치 수행 - ensemble_features와 ensemble_labels을 사용
grid_search.fit(X_train, y_train)

# 최적의 하이퍼파라미터 출력
print("Best Parameters:", grid_search.best_params_)

# 최적의 모델로 테스트 데이터에 대한 예측 수행
best_rf = grid_search.best_estimator_

print("shape of ensemble_features:", extracted_features.shape)
print("shape of ensemble_labels:", extracted_labels.shape)

predicted_labels = best_rf.predict(X_test)

# 성능 지표 계산
mse = mean_squared_error(y_test, predicted_labels)
mae = mean_absolute_error(y_test, predicted_labels)
r2 = r2_score(y_test, predicted_labels)

print(y_test)
print(predicted_labels)

print(f"Test MSE: {mse:.4f}")
print(f"Test MAE: {mae:.4f}")
print(f"Test R^2: {r2:.4f}")

# # Optionally, evaluate the ensemble model
# test_datasets = [SingleChannelDataset(test_dataset, i) for i in range(num_channels)]
# # 각 채널별로 DataLoader 생성
# test_loaders = [DataLoader(dataset, batch_size=8, shuffle=False) for dataset in test_datasets]

# # 각 채널에 대한 예측을 수집합니다.
# test_features = []
# all_test_labels = []   # 실제 레이블은 한 번만 저장하기 위한 변수

# for channel_index, loader in enumerate(test_loaders):
#     # 채널별 모델을 불러옵니다.
#    # 채널별 모델 인스턴스를 생성합니다.
#     model = ImageRegressionCNN()  # 또는 적절한 모델 클래스 사용
#     # 저장된 모델 상태를 불러와 모델 인스턴스에 적용합니다.
#     model.load_state_dict(best_models_per_channel[channel_index])
#     # 모델을 계산 장치에 할당합니다.
#     model = model.to(device)
#     model.eval()

#     channel_predictions = []
#     channel_predictions = []
#     with torch.no_grad():
#         for images, labels in loader:
#             images = images.to(device)
#             output = model(images)
#             channel_predictions.append(output.cpu().numpy())
#             if channel_index == 0:  # Collect labels only once (assuming labels are same for all channels)
#                 all_test_labels.extend(labels.cpu().numpy())


#         # 채널별 예측값을 수집합니다.
#     test_features.append(np.concatenate(channel_predictions, axis=0))


# # 모든 채널의 예측값을 가로로 결합하여 하나의 특성 벡터로 만듭니다.
# all_test_labels = np.array(all_test_labels)


# # 예측값을 가로로 결합하여 최종 특성 행렬을 구성합니다.
# test_features = np.hstack(test_features)

# print(test_features.shape)
# print(all_test_labels.shape)
# # assert test_features.shape[0] == all_test_labels.shape[0], "Mismatch in the number of labels and predictions."



# # RandomForestRegressor로 최종 예측 및 평가
# # predicted_labels = rf.predict(test_features)
# predicted_labels = best_rf.predict(test_features)
# print("all_test_labels", all_test_labels)
# print("predicted_labels", predicted_labels)

# print("MSE:", mean_squared_error(all_test_labels, predicted_labels))
# print("MAE:", mean_absolute_error(all_test_labels, predicted_labels))
# print("R^2:", r2_score(all_test_labels, predicted_labels))

In [None]:
print(y_test)
print(predicted_labels)