# Импорт библиотек

In [4]:
import numpy as np
import matplotlib.pyplot as plt
import os
import nibabel as nib
import torch
import torchvision.transforms as transforms
import cv2
from PIL import Image
from sklearn.linear_model import LinearRegression
from scipy.ndimage import gaussian_filter
from scipy.signal import convolve
import scipy.signal as signal
import random
from sklearn.metrics import r2_score
import seaborn as sns
import pickle

ModuleNotFoundError: No module named 'nibabel'

# Задание окружения

In [14]:
figures = os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), "figures")

# Работа с данными

## Видеоряд

In [15]:
video_path = os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), "src", "Film stimulus.mp4")

In [16]:
def video_to_frames():
    videocap = cv2.VideoCapture(video_path)
    success, frame = videocap.read()
    count = 1
    while success:
        cv2.imwrite(os.path.join(os.path.dirname(os.path.dirname(os.getcwd())),
                                "src", "frames", f"frame_{count}.jpg"), frame)    
        success, frame = videocap.read()
        count += 1

In [17]:
#video_to_frames()

In [18]:
model = torch.hub.load('pytorch/vision:v0.6.0', 'resnet152', pretrained=True, verbose=False)

for param in model.parameters():
    param.requires_grad = False

model.fc = torch.nn.Linear(2048, 2048)



In [19]:
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [20]:
def frames_to_tensors():
    for i in range(1, 9751):
        frame_path = os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), "src", "frames", f"frame_{i}.jpg")
        frame = Image.open(frame_path)
        frame_tensor = preprocess(frame)
        frame_tensor = frame_tensor.unsqueeze(0)
        yield frame_tensor

In [21]:
def tensors_to_vectors():
    for frame_tensor in frames_to_tensors():
        # передача картинки в модель и получение выходных данных
        with torch.no_grad():
            output = model(frame_tensor)
        # преобразование выходных данных в вектор
        vector = output.numpy().flatten()
        yield vector

In [22]:
#vector_list = [vector for vector in tensors_to_vectors()]
#np.save("vector_list", vector_list)
vector_list = np.load(os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), "src", "vector_list.npy"))

## Снимки фМРТ

In [1]:
class Sub:

    subs_with_fmri = ['04', '07', '08', '09', '11', '13', '14', '15', '16', '18',\
                  '22', '24', '27', '28', '29', '31', '35', '41', '43', '44',\
                  '45', '46', '47', '51', '52', '53', '55', '56', '60', '62']

    def __init__(self, number):
        if not number in Sub.subs_with_fmri:
            raise ValueError(f"У {number} испытуемого отсутствуют снимки фМРТ")
        else:
            self.number = number
        self.path = os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), "src", "ds003688-download", f"sub-{self.number}",\
                                    "ses-mri3t", "func", f"sub-{self.number}_ses-mri3t_task-film_run-1_bold.nii.gz")
        self.scan = nib.load(self.path)
        self.data = self.scan.get_fdata()
        self.tensor = torch.tensor(self.data)
        self.tensor_np = self.tensor.numpy()

In [3]:
sub = Sub('04')

NameError: name 'os' is not defined

In [45]:
sub.tensor.shape

torch.Size([40, 64, 64, 641])

# Построение линейной модели

In [5]:
class Preprocessor:

    def __init__(self, sub, dt, coef, train_size=0.7):
        self.sub = sub
        self.dt = dt
        self.coef = coef
        self.train_size = train_size

        self.nu = 25 # частота видео
        self.mu = 641. / 390. # частота снимков фМРТ
        self.d1 = self.sub.tensor.shape[0] # размерности снимка фМРТ до сжатия
        self.d2 = self.sub.tensor.shape[1]
        self.d3 = self.sub.tensor.shape[2]
        self.d4 = self.sub.tensor.shape[3]
        self.d = 2048 # длина вектора признакового описания изображения
        self.N = 641 - int(self.mu * self.dt) # N - количество снимков fMRI

        self.train, self.test = self.get_train_test()
        self.X_train, self.Y_train, self.X_test, self.Y_test = self.get_XY()

    @staticmethod
    def preprocess(v):
        return (v - v.min()) / (v.max() - v.min())

    @staticmethod
    def MSE(A):
        m, n = A.shape
        return 1 / n * np.linalg.norm(A, "fro") ** 2 # усреднение по столбцам матрицы

    def get_train_test(self):
        pairs = [(int(i * self.nu / self.mu), int(self.mu * self.dt + i)) for i in range(self.N)] # (номер изображения, номер снимка)

        if (self.coef > 1): # сжатие снимка фМРТ
            maxpool = torch.nn.MaxPool3d(kernel_size=self.coef, stride=self.coef)
            input_tensor = self.sub.tensor.permute(3, 0, 1, 2)
            output_tensor = maxpool(input_tensor).permute(1, 2, 3, 0)
            self.sub._tensor = output_tensor
        else:
            self.sub._tensor = self.sub.tensor
        
        self._d1 = self.sub._tensor.shape[0]
        self._d2 = self.sub._tensor.shape[1]
        self._d3 = self.sub._tensor.shape[2]
        self._d4 = self.sub._tensor.shape[3]
        
        scans_list = [self.sub._tensor[:, :, :, i] for i in range(self.d4)] # список тензоров снимков фМРТ
        voxels = [scan.reshape(self._d1 * self._d2 * self._d3).numpy() for scan in scans_list] # список снимков фМРТ, развернутых в векторы
        data = [(vector_list[n], voxels[k]) for n, k in pairs] # (изображение, снимок)

        # train, test
        l = int(self.train_size * self.d4) # размер обучающей выборки
        train, test = data[:l], data[l:]
            
        train = [(pair[0], self.preprocess(pair[1])) for pair in train]
        test = [(pair[0], self.preprocess(pair[1])) for pair in test]

        return train, test

    def get_XY(self):
        X_train = np.array([pair[0] for pair in self.train])
        Y_train = np.array([pair[1] for pair in self.train]).T
        X_test = np.array([pair[0] for pair in self.test])
        Y_test = np.array([pair[1] for pair in self.test]).T
        return X_train, Y_train, X_test, Y_test
    

## Предсказание снимка

In [6]:
class LinearPredictor(Preprocessor):
    
    def __init__(self, sub, dt, coef, alpha, train_size=0.7):
        super().__init__(sub, dt, coef, train_size)
        self.delta = False
        self.alpha = alpha

    def predict(self):
        W = [] # матрица весов модели

        if (self.alpha > 0):
            A = np.linalg.inv(self.X_train.T @ self.X_train + self.alpha * np.identity(self.X_train.shape[1])) @ self.X_train.T
        else:
            A = np.linalg.pinv(self.X_train)
        
        for i in range(self._d1 * self._d2 * self._d3):
            Y_train_vector = self.Y_train[i]
            w = A @ Y_train_vector
            W.append(w)
            
        self.W = np.array(W) # w будут строками

        self.Y_train_predicted = W @ self.X_train.T
        self.Y_test_predicted = W @ self.X_test.T

        self.MSE_train = self.MSE(self.Y_train_predicted - self.Y_train)
        self.MSE_test = self.MSE(self.Y_test_predicted - self.Y_test)

## Предсказание разницы между снимками

In [7]:
class LinearDeltaPredictor(Preprocessor):
    def __init__(self, sub, dt, coef, alpha, train_size=0.7):
        super().__init__(sub, dt, coef, train_size)
        self.delta = True
        self.alpha = alpha
        self.deltaY_train, self.deltaY_test = self.get_deltaY()

    def get_deltaY(self):
        delta_train = [(self.train[n][0], self.train[n][1] - self.train[n-1][1]) for n in range(1, len(self.train))]
        delta_test = [(self.test[n][0], self.test[n][1] - self.test[n-1][1]) for n in range(1, len(self.test))]
        deltaY_train = np.array([pair[1] for pair in delta_train]).T
        deltaY_test = np.array([pair[1] for pair in delta_test]).T

        return deltaY_train, deltaY_test

    def predict(self):
        W = [] # матрица весов модели

        if (self.alpha > 0):
            A = np.linalg.inv(self.X_train.T @ self.X_train + self.alpha * np.identity(self.X_train.shape[1])) @ self.X_train.T
        else:
            A = np.linalg.pinv(self.X_train)
        
        for i in range(self._d1 * self._d2 * self._d3):
            deltaY_train_vector = self.Y_train[i]
            w = A @ deltaY_train_vector
            W.append(w)
            
        self.W = np.array(W) # w будут строками

        self.deltaY_train_predicted = W @ self.X_train.T
        self.deltaY_test_predicted = W @ self.X_test.T
        self.Y_train_predicted = np.delete(self.Y_train, -1, 1) + self.deltaY_train_predicted
        self.Y_test_predicted = np.delete(self.Y_test, -1, 1) + self.deltaY_test_predicted
    
        self.MSE_train = self.MSE(self.Y_train_predicted - np.delete(self.Y_train, 0, 1))
        self.MSE_test = self.MSE(self.Y_test_predicted - np.delete(self.Y_test, 0, 1))

In [167]:
sub = Sub('04')

In [188]:
delta = LinearDeltaPredictor(sub=sub, dt=4, coef=1, alpha=1000)

In [189]:
delta.predict()

MemoryError: Unable to allocate 2.50 GiB for an array with shape (163840, 2048) and data type float64

In [187]:
linpred.MSE_test

153.45747869048895

## Построение срезов снимков

In [9]:
class Vizualizer:

    def __init__(self, predictor):
        self.predictor = predictor
        self.filename = f"sub-{self.predictor.sub.number}-{self.predictor.dt}-{self.predictor.coef}"
        if self.predictor.alpha > 0:
            self.filename += f"-{self.predictor.alpha}"

    def show_scan_slices(self, scan: int, dim: int, slice: int):
        scan_test = self.predictor.Y_test.T[scan].reshape((self.predictor._d1, self.predictor._d2, self.predictor._d3))
        scan_predicted = self.predictor.Y_test_predicted.T[scan].reshape((self.predictor._d1, self.predictor._d2, self.predictor._d3))
        scan_difference = abs(scan_test - scan_predicted)

        if dim == 0:
            slice_test = scan_test[slice, :, :].T
            slice_predicted = scan_predicted[slice, :, :].T
            slice_difference = scan_difference[slice, :, :].T
            slices = f"-{slice}-_-_"
        elif dim == 1:
            slice_test = scan_test[:, slice, :].T
            slice_predicted = scan_predicted[:, slice, :].T
            slice_difference = scan_difference[:, slice, :].T
            slices = f"-_-{slice}-_"
        elif dim == 2:
            slice_test = scan_test[:, :, slice].T
            slice_predicted = scan_predicted[:, :, slice].T
            slice_difference = scan_difference[:, :, slice].T
            slices = f"-_-_-{slice}"

        filename_test = self.filename + slices + "-test.png"
        filename_predicted = self.filename + slices + "-predicted.png"
        filename_difference = self.filename + slices + "-difference.png"
        figures = "figures_delta" if delta else "figures"
        folder_path = os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename)

        if not os.path.exists(folder_path):
            os.makedirs(folder_path)

        os.chmod(folder_path, 0o400)

        print("ORIGINAL")
        plt.imshow(slice_test, cmap="gray", origin="lower")
        plt.colorbar()
        plt.savefig(
                os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename, filename_test),
                dpi=300,
                bbox_inches="tight")
        plt.show()
            
        print("PREDICTED")
        plt.imshow(slice_predicted, cmap="gray", origin="lower")
        plt.colorbar()
        plt.savefig(
                os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename, filename_predicted),
                dpi=300,
                bbox_inches="tight")
        plt.show()

        print("DIFFERENCE")
        plt.imshow(slice_difference, cmap="gray", origin="lower")
        plt.colorbar()
        plt.savefig(
                os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename, filename_difference),
                dpi=300,
                bbox_inches="tight")
        plt.show()

    def show_recovered_scan_slices(self, scan: int, dim: int, slice: int):
        if self.predictor.delta == False:
            raise ValueError("Данный метод доступен только для LinearDeltaPredictor()")
        scan_test = self.predictor.Y_test.T[scan].reshape((self.predictor._d1, self.predictor._d2, self.predictor._d3))
        scan_predicted = (self.predictor.Y_test.T[0] + np.sum(self.predictor.deltaY_test_predicted.T[:scan], axis = 0)).reshape((self.predictor._d1, self.predictor._d2, self.predictor._d3))
        scan_delta = np.sum(self.predictor.deltaY_test_predicted.T[:scan], axis = 0).reshape((self.predictor._d1, self.predictor._d2, self.predictor._d3))
        scan_difference = abs(scan_test - scan_predicted)

        if dim == 0:
            slice_test = scan_test[slice, :, :].T
            slice_predicted = scan_predicted[slice, :, :].T
            slice_delta = scan_delta[slice, :, :].T
            slice_difference = scan_difference[slice, :, :].T
            slices = f"-{slice}-_-_"
        elif dim == 1:
            slice_test = scan_test[:, slice, :].T
            slice_predicted = scan_predicted[:, slice, :].T
            slice_delta = scan_delta[:, slice, :].T
            slice_difference = scan_difference[:, slice, :].T
            slices = f"-_-{slice}-_"
        elif dim == 2:
            slice_test = scan_test[:, :, slice].T
            slice_predicted = scan_predicted[:, :, slice].T
            slice_delta = scan_delta[:, :, slice].T
            slice_difference = scan_difference[:, :, slice].T
            slices = f"-_-_-{slice}"
        
        filename_test = self.filename + slices + "-recovered-test.png"
        filename_predicted = self.filename + slices + "-recovered-predicted.png"
        filename_delta = self.filename + slices + "-recovered-delta.png"
        filename_difference = self.filename + slices + "-recovered-difference.png"
        figures = "figures_delta" if delta else "figures"
        folder_path = os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename)

        if not os.path.exists(folder_path):
            os.makedirs(folder_path)

        os.chmod(folder_path, 0o400)

        print("ORIGINAL")
        plt.imshow(slice_test, cmap="gray", origin="lower")
        plt.colorbar()
        plt.savefig(
                os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename, filename_test),
                dpi=300,
                bbox_inches="tight")
        plt.show()
            
        print("PREDICTED")
        plt.imshow(slice_predicted, cmap="gray", origin="lower")
        plt.colorbar()
        plt.savefig(
                os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename, filename_predicted),
                dpi=300,
                bbox_inches="tight")
        plt.show()

        print("PREDICTED")
        plt.imshow(slice_delta, cmap="gray", origin="lower")
        plt.colorbar()
        plt.savefig(
                os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename, filename_delta),
                dpi=300,
                bbox_inches="tight")
        plt.show()

        print("DIFFERENCE")
        plt.imshow(slice_difference, cmap="gray", origin="lower")
        plt.colorbar()
        plt.savefig(
                os.path.join(os.path.dirname(os.path.dirname(os.getcwd())), figures, self.filename, filename_difference),
                dpi=300,
                bbox_inches="tight")
        plt.show()