In [None]:
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import numpy as np
from sklearn.model_selection import KFold
import torchvision.models as models

import matplotlib.pyplot as plt

from pytorch_lightning.callbacks import EarlyStopping
import seaborn as sns
from torchvision.models.video import r3d_18

from sklearn.model_selection import TimeSeriesSplit
import torch.optim as optim
from sklearn.metrics import mean_squared_error

In [None]:
from torch.utils.data import Dataset, DataLoader

class VideoRegressor(pl.LightningModule):
    def __init__(self, learning_rate=3e-3):
        super(VideoRegressor, self).__init__()

        # 3D ResNet-18モデル
        self.model = r3d_18(pretrained=False)
        #self.model = EfficientNet3D.from_name("efficientnet-b0", override_params={'num_classes':1}, in_channels=1)

        # 最初の畳み込み層のチャネル数を1に変更
        self.model.stem[0] = nn.Conv3d(1, 64, kernel_size=(3, 7, 7), stride=(1, 2, 2), padding=(1, 3, 3), bias=False)

        # 出力を1に
        self.model.fc = nn.Linear(self.model.fc.in_features, 1)

        self.loss_fn = nn.MSELoss()  # 回帰のための損失関数
        self.learning_rate = learning_rate
    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self(inputs).squeeze()  # 出力の不要な次元を削除
        loss = self.loss_fn(outputs, labels)
        self.log("train_loss", loss)
        return loss
    def validation_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self(inputs).squeeze()
        loss = self.loss_fn(outputs, labels)
        self.log('val_loss', loss)  # 検証損失をログに記録
        return loss
    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=self.learning_rate)






In [None]:

def get_predictions_and_targets(model, dataloader):
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print("GPU is available.")
    else:
        device = torch.device("cpu")
        print("GPU not available, using CPU.")
    model.eval()
    predictions = []
    targets = []
    with torch.no_grad():
        for batch in dataloader:
            inputs, labels = batch
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            predictions.extend(outputs.cpu().numpy())
            targets.extend(labels.cpu().numpy())
    return predictions, targets


class VideoDataset(Dataset):
    def __init__(self, video_list, labels):
        # video_list: 動画データのリスト (numpy配列のリスト)
        # labels: 各動画データに対応する回帰のラベル
        self.video_list = video_list
        self.labels = labels

    def __len__(self):
        return len(self.video_list)

    def __getitem__(self, idx):
        video_np = self.video_list[idx]
        video_tensor = torch.from_numpy(video_np).float()
        label_tensor = torch.tensor(self.labels[idx]).float()  # ラベルもfloatに変換
        return video_tensor, label_tensor

In [None]:
data_path="../data/train_data/"
crop=10
img=np.load(f"{data_path}time_lapse_QPI_B4_crop={crop}.npy")


In [None]:
img.shape

In [None]:
ans=np.load(f"{data_path}/time_lapse_red_B4_crop={crop}.npy")

img=img[ans>0]
ans=(ans[ans>0])
ini=ans.max()
targets=ans/ini

#train_images= torch.tensor(img).float()
#train_targets = torch.tensor(targets).float()



In [None]:
ini

In [None]:
val_videos=np.load(f"{data_path}/time_lapse_QPI_B2_crop={crop}.npy")
ans=np.load(f"{data_path}/time_lapse_red_B2_crop={crop}.npy")
val_videos=val_videos[ans>0]
ans=ans[ans>0]
val_labels=ans/ini
val_dataset = VideoDataset(np.array(val_videos), np.array(val_labels))
val_dataloader2 = DataLoader(val_dataset, batch_size=16*4, shuffle=False)


In [None]:
plt.hist(targets,bins=100)
plt.hist(val_labels,bins=100)

In [None]:
pred_array=np.zeros((len(ans),3,2))

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available.")
else:
    device = torch.device("cpu")
    print("GPU not available, using CPU.")


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold

# 既存のコード（インポート、データセットクラス、VideoRegressorクラスなど）はここに続く

# KFold交差検証の設定
n_splits = 3
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
train_videos=img
train_labels=targets

bins = np.linspace(train_labels.min(), train_labels.max(), 11)
y_binned = np.digitize(train_labels, bins)
fold=0
for train_idx, val_idx in skf.split(train_videos, y_binned):
    early_stop_callback = EarlyStopping(
    monitor='val_loss',
    min_delta=0.001,
    patience=30,
    verbose=True,
    mode='min')
    # データセットとデータローダーの作成
    train_dataset = VideoDataset(np.array(train_videos)[train_idx], np.array(train_labels)[train_idx])
    val_dataset = VideoDataset(np.array(train_videos)[val_idx], np.array(train_labels)[val_idx])

    train_dataloader = DataLoader(train_dataset, batch_size=16*32, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=16*8, shuffle=False)

    # モデルとトレーナーの初期化
    model = VideoRegressor()
    trainer = pl.Trainer(max_epochs=100,accelerator="gpu",callbacks=[early_stop_callback])

    # トレーニングの開始
    trainer.fit(model, train_dataloader, val_dataloader)

    # 検証データでの予測
    # 真値と予測値のプロット
    plt.figure(figsize=(16, 8))
    plt.subplot(1,2,1)

    model.eval()
    predictions = []
    with torch.no_grad():
        for x, _ in val_dataloader:
            preds = model(x)
            predictions.extend(preds.squeeze().cpu().numpy())

    print("MSE Loss"+f"{mean_squared_error(predictions,np.array(train_labels)[val_idx]):.4f}")
    plt.scatter(predictions,np.array(train_labels)[val_idx],s=5)
    plt.plot([min(train_labels), max(train_labels)],[min(train_labels), max(train_labels)], '--', color='red')
    plt.title(f'Fold {fold+1}')
    plt.xlim(0,1)
    plt.ylim(0,1)

    plt.ylabel('True Values')
    plt.xlabel('Predictions')

    plt.subplot(1,2,2)

    model.eval()
    predictions = []
    with torch.no_grad():
        for x, _ in val_dataloader2:
            preds = model(x)
            predictions.extend(preds.squeeze().cpu().numpy())
    pred_array[:,fold,0]=predictions
    pred_array[:,fold,1]=np.array(val_labels)
    print("MSE Loss"+f"{mean_squared_error(predictions,np.array(val_labels)):.4f}")
    plt.scatter(predictions,np.array(val_labels),s=5)
    plt.plot([min(val_labels), max(val_labels)],[min(val_labels), max(val_labels)], '--', color='red')
    plt.title(f'Fold {fold+1}')
    plt.xlim(0,1)
    plt.ylim(0,1)
    plt.ylabel('True Values')
    plt.xlabel('Predictions')
    plt.show()
    fold+=1
