In [None]:
# 1. import libraries
#    settomg cuda
# 2. define model
# 3. load_weight
# 4. predict PFT values


In [None]:
# DEFINE global constant 

In [None]:
# We resampled the low-dose computed tomography (LDCT) scans to have 2.5mm isocubic voxels and cropped and padded the images to have an identical size of 180x178x140 (width, height, depth). This results in a sequence of coronal images, each with a width of 180 and height of 178, containing 140 slices.

In [None]:
img_width, img_height = 180,178
slice_interval = 1
NUM_CH = 1
NUM_FRAMES = 140

In [None]:
# 1. import libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import cv2
import os
import glob
import SimpleITK as sitk

import random


In [None]:
# setting cuda environment

In [None]:
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   
os.environ["CUDA_VISIBLE_DEVICES"]="1"  # You can change this based on your GPU working environment.

In [None]:
# define model

In [None]:
import keras
keras.backend.image_data_format()
print(keras.__version__)

In [None]:
# the base code of i3d network in keras is quoted from keras version of i3d in "https://github.com/dlpbc/keras-kinetics-i3d"

In [None]:

import keras
from keras.preprocessing import image
from keras import backend as K

#------
from i3d_inception_last_global import Inception_Inflated3d as I3d
from i3d_inception_last_global import conv3d_bn

#------
from keras.models import Model
from keras import layers
from keras.layers import Activation
from keras.layers import Dense
from keras.layers import Input
from keras.layers import BatchNormalization
from keras.layers import Conv3D
from keras.layers import MaxPooling3D
from keras.layers import AveragePooling3D
from keras.layers import Dropout
from keras.layers import Reshape
from keras.layers import Lambda
from keras.layers import GlobalAveragePooling3D



K.clear_session()

n_classes = 1


###################################
# we changed the last part of base model to global average pooling instead of 1*1*1 convolution
###################################

base_model = I3d(include_top=False,
                weights=None,
                input_shape=(NUM_FRAMES,178,180,NUM_CH),
                endpoint_logit=False, # softmax will be applied
                classes=n_classes
                ) 


x = base_model.output
x = Dense(500, activation = "relu")(x)
prediction = Dense(1,activation = "linear" )(x)

#----------------------------------

model = Model(inputs=base_model.input, outputs=prediction)

In [None]:
# define functions for image input


In [None]:



def preprocess_12bit(input_img):    # 12 bit full range 
    density_low = -1024
    density_high = 3071
  
    output_img = (input_img-density_low) / (density_high-density_low+1)   
    output_img[output_img < 0.] = 0.
    output_img[output_img > 1.] = 1.
    
    return output_img

def preprocess_lung(input_img):    # lung windowing

    LEVEL = -700
    WIDTH = 1500
    density_low = LEVEL - WIDTH/2
    density_high = LEVEL + WIDTH/2
#     print(density_low, density_high)
    
    output_img = (input_img-density_low) / (density_high-density_low+1)   
    output_img[output_img < 0.] = 0.
    output_img[output_img > 1.] = 1.
    
    return output_img


def preprocess_media(input_img):   

    LEVEL = 50
    WIDTH = 500
    density_low = LEVEL - WIDTH/2
    density_high = LEVEL + WIDTH/2

    
    output_img = (input_img-density_low) / (density_high-density_low+1)   
    output_img[output_img < 0.] = 0.
    output_img[output_img > 1.] = 1.
    
    return output_img


In [None]:
def load_image(path,augmentation = False, num_channel = NUM_CH):
    img = sitk.GetArrayFromImage(sitk.ReadImage(path)) # 3d image를 불러 와야 함. (z, y, x)

    if num_channel ==1:
        img_12bit = preprocess_12bit(img) # (z,y,x)  
        # we used the overall 12bit range of CT image
        # you can change the windowing by change the preprocessing function and the the windowing level and width.
        
        img_tmp = np.moveaxis(img_12bit, [0,1], [1,0]) # (y, z, x)
        img_tmp = img_tmp[:, ::-1, : ]# (y, -z, x)
        
        img_tmp = np.expand_dims(img_tmp, axis = -1)
        img_out = img_tmp.copy()

        if augmentation:
            img_out = aug(img_out)
            img_out = np.array(img_out)

    return img_out

In [None]:
# 3. load_weight

In [None]:
FEV1_weightpath = 
FVC_weifhtpath = 

In [None]:
model.load_weights(FEV1_weightpath)

In [None]:
# predict PFT values from model image by image

In [None]:
_filename = "####change the path as your environment"

img_test = load_image(_filename)

test_predictions = model.predict(np.expand_dims(img_test, axis=0))

test_predictions_value = model.predict(np.expand_dims(img_test, axis=0))[0][0] # predicted PFT values by the model


In [None]:
# predict PFT values iteratively for the large dataset. 
# evaluate the performance of the model


In [None]:
import numpy as np
from sklearn import metrics
import matplotlib.pyplot as plt
import tqdm
from datetime import datetime
now = datetime.now().strftime("%y%m%d")

In [None]:
def model_prediction(_df, load_func, splitted_set, save_df = False):
    """
    _df : pandas dataframe which contains columns with following information
        ["path_in_workspace"] : path fo resampled CT image.
        ["FEV1"] / ["FVC"] : measured values of FEV1 or FVC in liters.
    load_func : we used previously defined function "load_image"
    splitted_set : name of dataset during experiment --> "training""validation""test", which we used only for setting save filename
    
    save_df : use when you want the dataframe with test result
    
    """
    
    
    _idx = chosen_idx_of_weight

    y= [ ]
    score = [ ]
    totScore=[]
    test_true = []
    test_predictions=[] # 예측값
    test_predictions_value = []

    for _idx, _row in tqdm.tqdm(_df.iterrows()):

        _filename = _row["path_in_workspace"]


        img_test = load_func(_filename)
        test_predictions.append(model.predict(np.expand_dims(img_test, axis=0)))

        test_predictions_value.append(model.predict(np.expand_dims(img_test, axis=0))[0][0])
        test_true.append(_row["FEV1"])
        
    col_y_pred = "modelpredicted(FEV1)"
    col_residual = "residuals"
    col_resid_z = "zscore_residuals"
    _df[col_y_pred] = test_predictions_value
    _df[col_residual] = _df[col_y_pred]  - _df[col_y] 
    _df[col_resid_z] = zscore(_df[col_residual])
    
    if save_df:
        save_path = os.path.join(PATH_trial,f"df_{splitted_set}_result_({now}).xlsx")
        _df.to_excel(save_path)
    return _df, col_y_pred, col_residual, col_resid_z

In [None]:
def get_Bland_Altman_by_sex_v2(target_df, col_measure1, col_measure2, check_criteria = 1.96, xlim_min = 0, xlim_max = 7):
    import statsmodels.api as sm
    import numpy as np
    import matplotlib.pyplot as plt
    target_measure_1 =col_measure1
    target_measure_2 = col_measure2
    target_str = target_measure_1.split("_")[0]

    df_target =target_df.copy() 
    ax_ylim = (df_target[target_measure_1] -df_target[target_measure_2]).abs().max() * 1.1
#     ax_xlim_min = np.round(df_target[target_measure_1].min(), 2)
#     ax_xlim_max = np.round(df_target[target_measure_1].max(), 2)
    ax_xlim_min = xlim_min
    ax_xlim_max = xlim_max
#----------------------
    M2_total = df_target[target_measure_1]
    M1_total = df_target[target_measure_2]
    
    diffs_total = M1_total - M2_total 
    mean_diff_total = np.mean(diffs_total)
    std_diff_total = np.std(diffs_total, axis = 0)
    upper_total = mean_diff_total + check_criteria * std_diff_total
    lower_total = mean_diff_total - check_criteria * std_diff_total
    print(mean_diff_total,std_diff_total,upper_total  , lower_total)
#-----------------
#----------------------------------------------------------------
    f, ax = plt.subplots(1,3, figsize = (21,5), sharex = "row", sharey = "row")   
#         f.suptitle("Bland-Altman Plot\n- First Visit 2018 - weak external validation set", fontsize = 20)


    for _i, (_sex, _color, _marker) in enumerate(zip(["F", "M"],colors, ["+", "x"])) :
        df_sub = target_df[target_df["성별"] == _sex]
        if _sex == "F":
            ax_title = "Female"
        else:
            ax_title = "Male"
        # version 1
    #     m1 =df_sub['FVC_MEAS ']
    #     m2 = df_sub['modelpredicted(FVC_MEAS )']
        # version 2
        m2 =df_sub[target_measure_1]
        m1 = df_sub[target_measure_2]    

        sm.graphics.mean_diff_plot(m1, m2, ax = ax[_i],
                                   sd_limit = check_criteria, 
                                scatter_kwds = {"marker":_marker, 
                                                 "color":_color, 
                                                 "alpha":0.7})
        ax[_i].set_xlabel(f"Mean {target_str} (L)\n(Ground Truths + Predicted value)/2", fontsize = 13)
        ax[_i].set_title(ax_title, fontsize = 15)
        ax[_i].set_ylim(-ax_ylim, ax_ylim)
        ax[_i].set_xlim(ax_xlim_min, ax_xlim_max)




        #----------------------------------------------------------------------
        means = np.mean([m1, m2], axis=0)
        diffs = m1 - m2
        mean_diff = np.mean(diffs)
        std_diff = np.std(diffs, axis=0)

        ax[2].scatter(means, diffs, 
                     marker = _marker,
                     color = _color, 
                     alpha = 0.7, label = _sex)


    # draw line
    ax[2].hlines(mean_diff_total,xmin =ax_xlim_min, xmax = ax_xlim_max,
                color = "k", 
                linestyle = "--", linewidth = 1)
    ax[2].hlines(mean_diff_total + check_criteria *std_diff_total,xmin =ax_xlim_min, xmax = ax_xlim_max,
                color = "k",
                 linestyle = "--", linewidth = 1)
    ax[2].hlines(mean_diff_total - check_criteria *std_diff_total,xmin =ax_xlim_min, xmax = ax_xlim_max,
                color = "k",
                linestyle = "--", linewidth = 1)

    # anootate 
    ax[2].annotate('mean diff:\n{}'.format(np.round(mean_diff_total, 2)),
                xy=(0.99, 0.5),
                horizontalalignment='right',
                verticalalignment='center',
                fontsize=14,
                xycoords='axes fraction')

    ax[2].annotate('-SD{}: {}'.format(check_criteria, np.round(lower_total, 2)),
            xy=(0.99, 0.07),
            horizontalalignment='right',
            verticalalignment='bottom',
            fontsize=14,
            xycoords='axes fraction')
    ax[2].annotate('+SD{}: {}'.format(check_criteria, np.round(upper_total, 2)),
            xy=(0.99, 0.92),
            horizontalalignment='right',
            fontsize=14,
            xycoords='axes fraction')





    ax[1].set_ylabel("")  
    ax[2].set_ylabel("")
    ax[0].set_ylabel("Differences\n(Predicted values - Ground truth)")

    ax[2].tick_params(labelsize = 13)
    ax[2].legend(loc = "upper left")
    ax[2].set_xlabel(f"Mean {target_str} (L)\n(Ground Truth + Predicted values)/2", fontsize = 13)
    ax[2].set_ylim(-ax_ylim,ax_ylim)
    ax[2].set_xlim(ax_xlim_min, ax_xlim_max)

    f.tight_layout()   

    return f, ax


In [None]:
import numpy as np
from math import sqrt
from sklearn.metrics import r2_score, max_error, median_absolute_error
def get_RMSE(Y_TRUE, Y_PRED):
    mse = np.average((Y_PRED - Y_TRUE)**2)
    rmse = sqrt(mse)
    return rmse

def get_MAE(Y_TRUE, Y_PRED):
    mae = np.average(abs(Y_PRED - Y_TRUE))
    return mae

def get_max_error(Y_TRUE, Y_PRED):
    max_er = max_error(Y_PRED , Y_TRUE)
    return max_er
def get_uncentered_R_squared(Y_TRUE, Y_PRED):
    SS_tot_uncentered = (Y_PRED**2).sum()
    SS_res = ((Y_PRED - Y_TRUE)**2).sum()

    R_squared_uncentered = 1- SS_res/SS_tot_uncentered
    return R_squared_uncentered

def get_CCC(Y_TRUE, Y_PRED):

    x_bar = np.mean(Y_TRUE)
    y_bar = np.mean(Y_PRED)

    var_x = ((Y_TRUE - x_bar)**2).sum() / len(Y_TRUE)
    var_y = ((Y_PRED - y_bar)**2).sum() / len(Y_PRED)

    covar = ((Y_TRUE - x_bar)*(Y_PRED - y_bar)).sum()/ len(Y_TRUE)

    rho_c= (2 * covar) / (var_x + var_y + (x_bar - y_bar)**2)
    
    return rho_c




def get_evaluation_metrics(Y_TRUE, Y_PRED):
#     print(tr_num, "\n", _target_col)
    rmse = get_RMSE(Y_TRUE, Y_PRED)
    
    mae_value = get_MAE(Y_TRUE, Y_PRED)
    abs_max_error_value = get_max_error(Y_TRUE, Y_PRED)
    median_abs_error_value = median_absolute_error(Y_TRUE, Y_PRED)
    
    r2_scipy_value = r2_score(Y_TRUE, Y_PRED)
    
#     correlation_matrix = np.corrcoef(Y_TRUE, Y_PRED)
#     correlation_xy = correlation_matrix[0,1]
#     r2_inverse = correlation_xy**2
    r2_inverse = r2_score( Y_PRED, Y_TRUE)
    resid = Y_PRED - Y_TRUE
    r2_uncentered = get_uncentered_R_squared( Y_TRUE, Y_PRED)
    Lin_concor = get_CCC(Y_TRUE, Y_PRED)
    
    print("RMSE: ", rmse)
    print("MAE: ", mae_value)
    print("R2_scipy: ", r2_scipy_value)
    print("R2_scipy_inverse:", r2_inverse)
    print("R2_uncentered: ", r2_uncentered)
    print("Lin Concordance correlation coefficient: ", Lin_concor)
    
    print("\nmax resid: ", abs(resid).max())
    print("mean resid: ", abs(resid).mean())
    print("std resid: ", resid.std())
    print("median resid: ", abs(resid).median())
    
    return rmse, mae_value, abs_max_error_value, median_abs_error_value, Lin_concor

In [None]:
df_target, col_y_pred, col_residual, col_resid_z = model_prediction(df_target, load_image,"test", save_df = True)
    

In [None]:
_true = df_target["FEV1"]
_pred = _true = df_target["modelpredicted(FEV1)"]
_rmse, _mae, _max_abser, _median_abser, _CC = get_evaluation_metrics(_true,_pred)

In [None]:
## plotting scatter
import string
alphabet_list = list(string.ascii_lowercase)
#----------------------------------------



metric_font_style = {"fontsize": 18,
                     "transform": "ax.transAxes"}
bbox = dict(boxstyle="round,pad=0.5,rounding_size=0.2", 
            fc="white", 
            ec ="lightgray"
           )

_x, _y = _true,_pred
_name = "FEV1"

#==============================================================================
_rmse, _mae, _max_abser, _median_abser, _CC = get_evaluation_metrics(_x,_y)

fig, ax = plt.subplots(nrows = 1, ncols = 1,figsize = (10, 10))
label_text = f"CCC  : {_CC:,.4f}\nMAE  : {_mae:,.4f}\nRMSE: {_rmse:,.4f}"
ax.annotate(label_text, (0.05, 0.85),
                xycoords='axes fraction',
        **metric_font_style, 
        bbox = bbox)

ax.scatter(_x, _y, marker = "o", 
           alpha = 0.2,
          s = 20)
ax.plot([0, 150], [0, 150],
     **line_styles, 
     )



ax.tick_params(axis = "both",size = 10, labelsize = 18)
ax.set_xlabel("Ground truth", fontsize = 20)
ax.set_ylabel("Predicted values", fontsize = 20)


if (i ==0) or (i==1):
    ax.set_xlim(r1_min,r1_max)
    ax.set_ylim(r1_min,r1_max)


else:
    ax.set_xlim(r2_min,r2_max)
    ax.set_ylim(r2_min,r2_max)
plt.savefig("put save path here", 
            dpi = 300, bbox_inches = "tight")
plt.show()