In [1]:
import dask.dataframe as dd
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
import pandas as pd
from datetime import datetime
import os
import cv2
import matplotlib.pyplot as plt

class Lettore:
    def __init__(self, filepth, frame_size = (144,112)):
        self.pth = filepth
        self.frame_size = frame_size

    def __enter__(self):
        self.file_img = open( os.path.join( self.pth, 'img.csv' ) , 'r') 
        self.file_label = open(os.path.join( self.pth, 'label.csv' ), 'r') 
        self.file_beh = open(os.path.join( self.pth, 'label_beh.csv' ), 'r') 
        
        # reading header
        line = self.file_img.readline() 
        line = self.file_label.readline() 
        lineb = self.file_beh.readline() 
        
        line = line.strip()
        self.header = dict( [i.split(':') for i in line[1:].split(';')] )
        self.header['ROI_x'] = int(self.header['ROI_x'])
        self.header['ROI_y'] = int(self.header['ROI_y'])
        
        return self
    
    def getFrameNext(self):
        line_img = self.file_img.readline() 
        line_lbl = self.file_label.readline()
        line_beh = self.file_beh.readline()
        
        if line_img:
            line_img = line_img.strip()
            line_lbl = line_lbl.strip()
            line_beh = line_beh.strip()
            
            if line_img[-1]==';':
                array_img = np.array(line_img[:-1].split(';'), dtype=np.float32)
            else:
                array_img = np.array(line_img.split(';'), dtype=np.float32)
                
            if line_lbl[-1]==';':
                array_lbl = np.array(line_lbl[:-1].split(';'), dtype=np.float32)
            else:
                array_lbl = np.array(line_lbl.split(';'), dtype=np.float32)
            
            if line_beh[-1]==';':
                beh = line_beh[:-1].split(';')
            else:
                array_beh = line_beh.split(';')

            frame_id = array_img[0]
            lbl_id = array_lbl[0]
            beh_id = np.float32(array_beh[0])
            
            if not(frame_id == lbl_id and lbl_id== beh_id and frame_id == beh_id):
                print('######### WARNING: ID mismatch ##########')
                print(frame_id, lbl_id, beh_id,'=',frame_id - lbl_id - beh_id)
            
            frame = array_img[1:].reshape([self.header['ROI_x'],self.header['ROI_y']])
            lbl = array_lbl[1:].reshape([self.header['ROI_x'],self.header['ROI_y']])
            beh = array_beh[1]
            
            frame = cv2.resize(frame,dsize=self.frame_size,interpolation=cv2.INTER_CUBIC).T
            lbl = cv2.resize(lbl,dsize=self.frame_size,interpolation=cv2.INTER_NEAREST).T
        else:
            frame_id = None
            frame = None
            lbl = None
            beh = None
            
        return frame_id, frame, lbl, beh

    def __exit__(self, exc_type, exc_value, traceback):
        self.file_img.close()
        self.file_label.close()

In [2]:
pth = Path(r'D:\thermography\thermal_behaviour_72_h_clean\data')
content = os.listdir(pth)
content = list(filter(lambda x: (pth/x).is_dir(), content ) ) 

subjects = dict()
for c in content:
    parts = c.split('-')
    sub_id = parts[2]
    if sub_id not in subjects.keys():
        subjects[sub_id] = dict()
        subjects[sub_id]['id'] = parts[2] 
        subject_parts = subjects[sub_id]['id'].split('_')
        subjects[sub_id]['geno'] = subject_parts[0]
        subjects[sub_id]['number'] = subject_parts[1]
        subjects[sub_id]['path'] = (pth/c).as_posix()
        #subjects[sub_id]['img'] = [(pth/c/'img.csv').as_posix()]
    #else:
        #subjects[sub_id]['label'].append( (pth/c/'label.csv').as_posix() )
        #subjects[sub_id]['img'].append( (pth/c/'img.csv').as_posix() )

subjects = pd.DataFrame.from_dict(subjects).T.reset_index(drop=True)
#subjects['img'] = subjects['img'].apply(sorted)

In [3]:
subjects

Unnamed: 0,id,geno,number,path
0,KO_70,KO,70,D:/thermography/thermal_behaviour_72_h_clean/d...
1,KO_69,KO,69,D:/thermography/thermal_behaviour_72_h_clean/d...
2,WT_74,WT,74,D:/thermography/thermal_behaviour_72_h_clean/d...
3,WT_81,WT,81,D:/thermography/thermal_behaviour_72_h_clean/d...
4,KO_75,KO,75,D:/thermography/thermal_behaviour_72_h_clean/d...
5,KO_72,KO,72,D:/thermography/thermal_behaviour_72_h_clean/d...
6,WT_79,WT,79,D:/thermography/thermal_behaviour_72_h_clean/d...


In [4]:

input_dir = 'D:\\thermography\\thermal_behaviour_72_h_clean\\thermals\\'
target_dir = 'D:\\thermography\\thermal_behaviour_72_h_clean\\masks\\'
beh_dir = 'D:\\thermography\\thermal_behaviour_72_h_clean\\labels\\'

img_size = (144, 112)
num_classes = 1
batch_size = 32

input_img_paths = sorted(
    [
        os.path.join(input_dir, fname)
        for fname in os.listdir(input_dir)
        if fname.endswith(".npy")
    ]
)

target_img_paths = sorted(
    [
        os.path.join(target_dir, fname)
        for fname in os.listdir(target_dir)
        if fname.endswith(".npy") and not fname.startswith(".")
    ]
)

target_beh_paths = sorted(
    [
        os.path.join(beh_dir, fname)
        for fname in os.listdir(target_dir)
        if fname.endswith(".npy") and not fname.startswith(".")
    ]
)

print("Number of samples:", len(input_img_paths))

for input_path, target_path, beh_path in zip(input_img_paths[:10], target_img_paths[:10], target_beh_paths[:10]):
    print(input_path, "|", target_path, '|', beh_path)

Number of samples: 60358
D:\thermography\thermal_behaviour_72_h_clean\thermals\KO_69_1000206.npy | D:\thermography\thermal_behaviour_72_h_clean\masks\KO_69_1000206.npy | D:\thermography\thermal_behaviour_72_h_clean\labels\KO_69_1000206.npy
D:\thermography\thermal_behaviour_72_h_clean\thermals\KO_69_100025.npy | D:\thermography\thermal_behaviour_72_h_clean\masks\KO_69_100025.npy | D:\thermography\thermal_behaviour_72_h_clean\labels\KO_69_100025.npy
D:\thermography\thermal_behaviour_72_h_clean\thermals\KO_69_1000465.npy | D:\thermography\thermal_behaviour_72_h_clean\masks\KO_69_1000465.npy | D:\thermography\thermal_behaviour_72_h_clean\labels\KO_69_1000465.npy
D:\thermography\thermal_behaviour_72_h_clean\thermals\KO_69_1000726.npy | D:\thermography\thermal_behaviour_72_h_clean\masks\KO_69_1000726.npy | D:\thermography\thermal_behaviour_72_h_clean\labels\KO_69_1000726.npy
D:\thermography\thermal_behaviour_72_h_clean\thermals\KO_69_1000983.npy | D:\thermography\thermal_behaviour_72_h_clean

In [5]:
from tensorflow import keras
import numpy as np



class ThermoMouse(keras.utils.Sequence):
    """Helper to iterate over the data (as Numpy arrays)."""

    def __init__(self, batch_size, img_size, input_img_paths, target_img_paths, target_beh_paths):
        self.batch_size = batch_size
        self.img_size = img_size
        self.input_img_paths = input_img_paths
        self.target_img_paths = target_img_paths
        self.target_beh_paths = target_beh_paths

    def __len__(self):
        return len(self.target_img_paths) // self.batch_size
    
    def __getitem__(self, idx):
        """Returns tuple (input, target) correspond to batch #idx."""
        i = idx * self.batch_size
        batch_input_img_paths = self.input_img_paths[i : i + self.batch_size]
        batch_target_img_paths = self.target_img_paths[i : i + self.batch_size]
        batch_target_beh_paths = self.target_beh_paths[i : i + self.batch_size]
        
        x = np.zeros((self.batch_size,) + self.img_size + (1,), dtype="float32")
        for j, path in enumerate(batch_input_img_paths):
            img = np.load(path)
            img = (img - np.min(img)) / (np.max(img) - np.min(img))
            img[np.isnan(img)] = 0
            x[j] = np.expand_dims(img, 2)
            
        beh_outputs = np.zeros((self.batch_size), dtype="float32")
        lbl_outputs = np.zeros((self.batch_size,) + self.img_size + (1,), dtype="float32")
        for j, (path_mask, path_beh) in enumerate(zip(batch_target_img_paths, batch_target_beh_paths)):
            lbl = np.load(path_mask)
            lbl = (lbl - np.min(lbl)) / (np.max(lbl) - np.min(lbl))
            lbl[np.isnan(lbl)] = 0
            
            beh = np.load(path_beh)
            if beh == 'SLEEP':
                beh = 0
            elif beh == 'ACTIVE':
                beh = 1
            else:
                print('############ WARNING ################ beh level not expected: ', beh)
            
            lbl_outputs[j] = np.expand_dims(lbl, 2)
            #beh_outputs[j] = beh 
            
        return x, lbl_outputs #, beh_outputs]

In [6]:
import random
val_samples = 1000
random.Random(1337).shuffle(input_img_paths)
random.Random(1337).shuffle(target_img_paths)
random.Random(1337).shuffle(target_beh_paths)

train_input_img_paths = input_img_paths[:-val_samples]
train_target_img_paths = target_img_paths[:-val_samples]
train_target_beh_paths = target_beh_paths[:-val_samples]

val_input_img_paths = input_img_paths[-val_samples:]
val_target_img_paths = target_img_paths[-val_samples:]
val_target_beh_paths = target_beh_paths[-val_samples:]

# Instantiate data Sequences for each split
batch_size = 8
train_gen = ThermoMouse(
    batch_size, img_size, train_input_img_paths, train_target_img_paths, train_target_beh_paths
)
val_gen = ThermoMouse(batch_size, img_size, val_input_img_paths, val_target_img_paths, val_target_beh_paths)

In [7]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow import keras

def unet_model(input_shape):
    inputs = tf.keras.Input(shape=input_shape)
    
    # Encoder
    conv1 = layers.Conv2D(2, 3, activation="relu", padding="same")(inputs)
    pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = layers.Conv2D(4, 3, activation="relu", padding="same")(pool1)
    pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)
    
    # Bottleneck
    conv3 = layers.Conv2D(8, 3, activation="relu", padding="same")(pool2)
    
    # Decoder
    up1 = layers.Conv2DTranspose(4, 2, strides=(2, 2), padding="same")(conv3)
    merge1 = layers.concatenate([conv2, up1], axis=3)
    conv4 = layers.Conv2D(4, 3, activation="relu", padding="same")(merge1)

    up2 = layers.Conv2DTranspose(2, 2, strides=(2, 2), padding="same")(conv4)
    merge2 = layers.concatenate([conv1, up2], axis=3)
    conv5 = layers.Conv2D(2, 3, activation="relu", padding="same")(merge2)
    
    # Output
    out_frame = layers.Conv2D(1, 1, activation="sigmoid", name='frame')(conv5)
    
    # Create the model
    model = tf.keras.Model(inputs=inputs, outputs=out_frame)
    
    return model


# Create the U-Net model
input_shape = (144, 112,1)
keras.backend.clear_session()
model = unet_model(input_shape)

from focal_loss import BinaryFocalLoss

# clipnorm=1.
model.compile(optimizer= keras.optimizers.Adam(0.0001), loss=BinaryFocalLoss(gamma=2), metrics='Accuracy') #BalancedSparseCategoricalAccuracy())

# Print the model summary
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 144, 112, 1  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 144, 112, 2)  20          ['input_1[0][0]']                
                                                                                                  
 max_pooling2d (MaxPooling2D)   (None, 72, 56, 2)    0           ['conv2d[0][0]']                 
                                                                                                  
 conv2d_1 (Conv2D)              (None, 72, 56, 4)    76          ['max_pooling2d[0][0]']      

In [None]:
epochs = 15
model.fit(train_gen,  epochs=epochs, validation_data=val_gen)

Epoch 1/15
Epoch 2/15
 140/7419 [..............................] - ETA: 58:13 - loss: 0.0617 - Accuracy: 0.9698

In [None]:
model_path = 'segmentation_unet_mouse_rsmall.h5'
model.save(model_path, save_format='h5')