# Task 3: Helper notebook for loading the data and saving the predictions

In [45]:
import pickle
import gzip
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from typing import List, Tuple
from sklearn.model_selection import train_test_split

### Helper functions

In [3]:
def load_zipped_pickle(filename):
    with gzip.open(filename, 'rb') as f:
        loaded_object = pickle.load(f)
        return loaded_object

In [4]:
def save_zipped_pickle(obj, filename):
    with gzip.open(filename, 'wb') as f:
        pickle.dump(obj, f, 2)

### Load data, make predictions and save prediction in correct format

In [5]:
# load data
train_data = load_zipped_pickle("train.pkl")
test_data = load_zipped_pickle("test.pkl")
samples = load_zipped_pickle("sample.pkl")

In [6]:
train_data[0]['video'].shape

(112, 112, 334)

## Visualization with images and videos

In [7]:
with open('images_test', 'wb') as file:
    frames = 0
    for frames in range(train_data[0]['video'].shape[2]):
        im = Image.fromarray(train_data[0]['video'][:,:,frames])
        im.save(f"image_test/image{frames}.png")

In [8]:
frameSize = (112, 112)

out = cv2.VideoWriter('output_video.avi',cv2.VideoWriter_fourcc(*'DIVX'), 30, frameSize)

for frames in range(train_data[0]['video'].shape[2]):
    filename = f"image_test/image{frames}.png"
    img = cv2.imread(filename)
    out.write(img)

out.release()

In [9]:
train_images = [train_data[video]['video'][:,:,frame] for video in range(len(train_data)) for frame in range(train_data[video]['video'].shape[2])]
train_images

[array([[0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        ...,
        [0, 0, 0, ..., 0, 1, 1],
        [0, 1, 0, ..., 1, 0, 0],
        [0, 0, 3, ..., 0, 0, 0]], dtype=uint8),
 array([[0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        ...,
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 2, 1],
        [0, 0, 2, ..., 0, 7, 3]], dtype=uint8),
 array([[0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        ...,
        [1, 2, 1, ..., 1, 0, 0],
        [0, 0, 0, ..., 1, 0, 0],
        [0, 0, 0, ..., 1, 0, 0]], dtype=uint8),
 array([[0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        ...,
        [0, 0, 0, ..., 1, 0, 0],
        [2, 0, 0, ..., 1, 0, 0],
        [0, 0, 4, ..., 0, 0, 0]], dtype=uint8),
 array([[0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0

In [10]:
idx_expert = []
for i, image in enumerate(train_data):
    if image['dataset'] == 'expert':
        idx_expert.append(i)

idx_expert

[46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64]

In [11]:
def select_dataset(data:list, quality:str) -> list:
    return [dict_frames for dict_frames in data if dict_frames["dataset"]==quality]

In [12]:
train_data_expert = select_dataset(train_data, "expert")
train_data_expert[0]["video"].shape


(587, 647, 76)

In [13]:
def select_only_label_images(data:List[dict])->dict:
    images_labelized = {}
    for video in data:
        indices_labelized_frames = video["frames"]
        for index in indices_labelized_frames:
            images_labelized[video["name"]+"_"+str(index)] = {"image":video["video"][:,:,index], "label":video["label"][:,:,index]}
    return images_labelized

In [51]:
def labelize_image(image:np.array, label:np.array) -> np.array:
    """Labelize image (put the Mitral Valve detected fron label in white)

    Args:
        image (np.array): image
        label (np.array): label

    Returns:
        np.array: labelized image
    """
    labelized_image = image.copy()
    label = label.astype(np.uint8)  #convert to an unsigned byte
    label*=255
    labelized_image[label == 255] = 255
    cv2.imshow("labelized", labelized_image)
    cv2.waitKey(0)
    return 

In [14]:
train_data_expert = select_dataset(train_data, "expert")
train_data_expert_labelized = select_only_label_images(train_data_expert)

In [15]:
train_data_expert_labelized

{'YEZ7BPLZW0_20': {'image': array([[0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 0],
         ...,
         [0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 0]], dtype=uint8),
  'label': array([[False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         ...,
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False]])},
 'YEZ7BPLZW0_34': {'image': array([[0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 0],
         ...,
         [0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 0]], dtype=uint8),
  'label': array([[False, False, False, ..., False, False, False],
         [False, False, False, ..., False

In [16]:
def create_X_y(data_labelized:dict) -> Tuple[np.array, np.array]:
    X, y = [], []
    for name in data_labelized.keys():
        X.append(data_labelized[name]["image"])
        y.append(data_labelized[name]["label"])
    return np.array(X), np.array(y)

In [17]:
train_img_exp, train_lab_exp = create_X_y(train_data_expert_labelized)
train_img_exp.shape[0]

  return np.array(X), np.array(y)


57

In [52]:
labelize_image(train_img_exp[0], train_lab_exp[0])

In [18]:
X = []
Y = []
for i in range(train_img_exp.shape[0]):
    X.append(train_img_exp[i].shape[0])
    Y.append(train_img_exp[i].shape[1])
    # print(train_img_exp[i].shape)
X_max = max(X)
Y_max = max(Y)
print(X_max,Y_max)

731 863


In [35]:
image = train_img_exp[0]
label = train_lab_exp[0].astype(float)
min_shape = 900

resized_image = cv2.resize(image, (min_shape,min_shape), interpolation=cv2.INTER_LANCZOS4)
resized_label = cv2.resize(label, (min_shape,min_shape), interpolation=cv2.INTER_LANCZOS4)

norm_image = cv2.normalize(resized_image,  np.zeros(resized_image.shape), 0, 255, cv2.NORM_MINMAX)


In [44]:
norm_img = []
resized_lab = []
min_shape = 900

for img in range(train_img_exp.shape[0]):
    image = train_img_exp[img]
    label = train_lab_exp[img].astype(float)

    resized_image = cv2.resize(image, (min_shape,min_shape), interpolation=cv2.INTER_LANCZOS4)
    resized_label = cv2.resize(label, (min_shape,min_shape), interpolation=cv2.INTER_LANCZOS4)
    norm_image = cv2.normalize(resized_image,  np.zeros(resized_image.shape), 0, 255, cv2.NORM_MINMAX)

    norm_img.append(norm_image)
    resized_lab.append(resized_label)

print(len(norm_img),norm_img[0].shape)
print(len(resized_lab),resized_lab[0].shape)

57 (900, 900)
57 (900, 900)


In [36]:
norm_image
# cv2.imwrite("image.png",image)

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]], dtype=uint8)

In [48]:


x_train, x_test, y_train, y_test = train_test_split(norm_img, resized_lab, test_size=0.05, random_state=0)

print("x_train: ", len(x_train))
print("y_train: ", len(y_train))
print("x_test: ", len(x_test))
print("y_test: ", len(y_test))

x_train:  54
y_train:  54
x_test:  3
y_test:  3


In [53]:
labelize_image(x_train[0], y_train[0])

## Help prediction

In [None]:
# make prediction for test
predictions = []
for d in test_data:
    prediction = np.array(np.zeros_like(d['video']), dtype=np.bool)
    height = prediction.shape[0]
    width = prediction.shape[1]
    prediction[int(height/2)-50:int(height/2+50), int(width/2)-50:int(width/2+50)] = True
    
    # DATA Strucure
    predictions.append({
        'name': d['name'],
        'prediction': prediction
        }
    )

In [None]:
# save in correct format
save_zipped_pickle(predictions, 'my_predictions.pkl')