In [1]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D
import matplotlib.pyplot as plt
import pickle
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

In [None]:
strategy = tf.distribute.MirroredStrategy(devices=["/cpu:0"])

In [7]:
def r2_keras(y_true, y_pred):
    from tensorflow.keras import backend as K
    SS_res =  K.sum(K.square(y_true - y_pred)) 
    SS_tot = K.sum(K.square(y_true - K.mean(y_true))) 
    return ( 1 - SS_res/(SS_tot + K.epsilon()) )

def load_data(num_samples=10000, is3d=False):
    arr = pickle.load(open("data_large_fixed.pkl",'rb'))
    scores = "adrp_adpr_A_cat_sorted.csv"
    scores = pd.read_csv(scores)
    scores = scores['Chemgauss4'].iloc[0:num_samples].to_numpy()
    if is3d:
        images_3d = [np.concatenate((i,i[:,:,1].reshape(64,64,1)),axis=2) for i in arr[0][0:num_samples]]
        images = np.stack(images_3d,axis=0)
    else:
        images = np.stack(arr[0][0:num_samples],axis=0)
    scaler = MinMaxScaler()
    scores = np.abs(scores)
    scaled_scores = scaler.fit_transform(scores.reshape(-1,1))
    return images, scaled_scores

def create_large_model():
    with strategy.scope():
            base_model = tf.keras.applications.ResNet101( weights="imagenet",
                                        input_shape= (64,64,3), include_top=False)
            global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
            prediction_layer = tf.keras.layers.Dense(1)
            model = tf.keras.Sequential([
                base_model,
                global_average_layer,
                prediction_layer
            ])
            model.compile(loss=tf.keras.losses.mean_squared_error,
                         optimizer=tf.keras.optimizers.Adam(lr=1e-4),
                      metrics=['mean_squared_error', r2_keras])
    return model

def create_small_model():
    #with strategy.scope():
    model = Sequential()
    model.add(Conv2D(8, kernel_size=(6, 6),
                     activation='relu',
                     input_shape=(64,64,2),strides=2))
    model.add(Conv2D(32, (3, 3), activation='relu', strides=2))
    model.add(Conv2D(32, (3, 3), activation='relu'))
    model.add(Conv2D(8, (3, 3), activation='relu'))
    model.add(Flatten())
    #model.add(Dropout(0.1))
    model.add(Dense(64, activation='relu'))
    #model.add(Dropout(0.1))
    model.add(Dense(16, activation='relu'))
    #model.add(Dropout(0.1))
    model.add(Dense(1))

    model.compile(loss=tf.keras.losses.mean_squared_error,
                  optimizer=tf.keras.optimizers.SGD(lr=1e-3),
                  metrics=['mean_squared_error', r2_keras])
    return model

def prepare_for_training(X,y, cache=True, shuffle_buffer_size=1000, batch_size=64):
    # This is a small dataset, only load it once, and keep it in memory.
    # use `.cache(filename)` to cache preprocessing work for datasets that don't
    # fit in memory.
    
    def generator():
        for i, j in zip(X, y):
            yield i, j

    ds = tf.data.Dataset.from_generator(generator, (tf.float32, tf.float32), 
                                        output_shapes=(tf.TensorShape((64, 64, 3)), tf.TensorShape((1, ))))
    ds = ds.batch(batch_size)
    ds = ds.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

    return ds

def train(images, scores, isSmall=True, batch_size=64, num_epochs=25):
    X_train,X_test, y_train, y_test = train_test_split(images, scores, test_size=0.2, shuffle=True)
    model = create_small_model() if isSmall else create_large_model()
    history = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=num_epochs,
                        verbose=1, batch_size=batch_size)
    return history

In [8]:
image, scores = load_data(num_samples=10000,is3d=False)
print(image.shape, scores.shape)

(10000, 64, 64, 2) (10000, 1)


In [None]:
history = train(image,scores)

In [None]:
# Plot training & validation accuracy values
plt.plot(history.history['r2_keras'][1:])
plt.plot(history.history['val_r2_keras'][1:])
plt.title('Model r2')
plt.ylabel('r2')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

# Plot training & validation loss values
plt.plot(history.history['loss'][1:])
plt.plot(history.history['val_loss'][1:])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

In [None]:
from sklearn.metrics import r2_score
y_pred = model.predict(X_test)
y_pred_inv = scaler.inverse_transform(y_pred)
y_test_inv = scaler.inverse_transform(y_test.reshape(-1,1))
print(r2_score(y_test_inv, y_pred_inv))

In [None]:
from scipy.stats import pearsonr
print(pearsonr(y_test_inv.flatten(),y_pred_inv.flatten()))