In [1]:
!pip install keras_metrics owlready2 pprint



In [2]:
from dataset_utils_2 import compute_max_length, preprocess_dataset, get_classes_weights
import pandas as pd
from sklearn.utils import shuffle
import numpy as np
from time import time
import tensorflow.keras as keras
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, GaussianNoise
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Conv1D, MaxPooling1D, Embedding
from keras_metrics import precision, recall, f1_score
from tensorflow.keras.callbacks import EarlyStopping
from imblearn.over_sampling import RandomOverSampler
from imblearn.keras import balanced_batch_generator
from nltk.metrics import edit_distance
from keras.utils import plot_model

def create_model(input_length, embedding):
    model = Sequential()
    model.add(Embedding(128, 300, input_length=input_length, weights=[embedding]))
    model.add(Conv1D(64, 5, padding='same'))
    model.add(Activation('relu'))
    model.add(Conv1D(64, 5))
    model.add(Activation('relu'))
    model.add(AveragePooling1D(pool_size=2))
    
    model.add(Conv1D(64, 5, padding='same'))
    model.add(Activation('relu'))
    model.add(Conv1D(64, 5))
    model.add(Activation('relu'))
    model.add(AveragePooling1D(pool_size=2))
    
    model.add(Conv1D(64, 5, padding='same'))
    model.add(Activation('relu'))
    model.add(Conv1D(64, 5))
    model.add(Activation('relu'))
    model.add(AveragePooling1D(pool_size=2))
    
    model.add(Conv1D(64, 5, padding='same'))
    model.add(Activation('relu'))
    model.add(Conv1D(64, 5))
    model.add(Activation('relu'))
    model.add(AveragePooling1D(pool_size=2))
    
    model.add(Conv1D(64, 5, padding='same'))
    model.add(Activation('relu'))
    model.add(Conv1D(64, 5))
    model.add(Activation('relu'))
    model.add(AveragePooling1D(pool_size=2))
    
    model.add(Conv1D(64, 5, padding='same'))
    model.add(Activation('relu'))
    model.add(Conv1D(64, 5))
    model.add(Activation('relu'))
    model.add(AveragePooling1D(pool_size=2))
    
    model.add(Conv1D(64, 5, padding='same'))
    model.add(Activation('relu'))
    model.add(Conv1D(64, 5))
    model.add(Activation('relu'))
    model.add(AveragePooling1D(pool_size=2))
    
    model.add(Conv1D(64, 5, padding='same'))
    model.add(Activation('relu'))
    model.add(Conv1D(64, 5))
    model.add(Activation('relu'))

    model.add(BatchNormalization())

    model.add(Flatten())
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(500))
    model.add(Activation('relu'))
    model.add(Dense(1))
    model.add(Activation('sigmoid'))

    opt = keras.optimizers.Adam(lr=0.00001, beta_1=0.9, beta_2=0.999, decay=0.0, amsgrad=False)

    model.compile(loss='binary_crossentropy',
                  optimizer=opt,
                  metrics=[binary_precision(), binary_recall(), binary_f1_score()])
    return model

def prepare_embedding(filepath):
    embedding_vectors = {}
    with open(filepath, 'r') as f:
        for line in f:
            line_split = line.strip().split(" ")
            vec = np.array(line_split[1:], dtype=float)
            char = line_split[0]
            embedding_vectors[char] = vec

    embedding_matrix = np.zeros((128, 300))
    for i in range(128):
        embedding_vector = embedding_vectors.get(chr(i))
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector

    return embedding_matrix

def train(model, train_data, valid_data):
    nb_examples = train_data.shape[0]
    x_train = train_data[:nb_examples, :-1]
    y_train = train_data[:nb_examples, -1]
    
#     x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], 1)

    x_valid = valid_data[:, :-1]
    y_valid = valid_data[:, -1]
    
#     x_valid = x_valid.reshape(x_valid.shape[0], x_valid.shape[1], 1)

#     y_train = keras.utils.to_categorical(y_train, 2)
#     y_valid = keras.utils.to_categorical(y_valid, 2)

#     training_generator, steps_per_epoch = balanced_batch_generator(x_train, y_train, sampler=RandomOverSampler(), batch_size=32, random_state=42)

    es = EarlyStopping(monitor='val_loss', min_delta=0, patience=2, verbose=1, mode='min')
    cp = ModelCheckpoint("model.h5", save_best_only=True, monitor='val_loss', mode='min')

#     model.fit_generator(generator=training_generator,
#                         steps_per_epoch=steps_per_epoch,
#                         epochs=50,
#                         validation_data=(x_valid, y_valid),
#                         shuffle=True,
#                         class_weight={0: 1, 1: 5},
#                         callbacks=[es, cp])

    neg_weight = 40

    valid_weights = np.array([neg_weight if y == 0 else 1 for y in y_valid])

    model.fit(x=x_train,
              y=y_train,
              epochs=100,
              batch_size=64,
              validation_data=(x_valid, y_valid, valid_weights),
              shuffle=True,
              class_weight={0: neg_weight, 1: 1},
              callbacks=[es, cp])

def test_model(model, test_datasets, distances=None):
    for i, ds in enumerate(test_datasets):
        x_test = ds[:, :-1]
#         x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], 1)
        y_test = ds[:, -1]
#         y_test = keras.utils.to_categorical(y_test, 2)
#         scores = model.evaluate(x_test, y_test, verbose=1)
#         print('Test loss:', scores[0])
#         print('Test accuracy:', scores[1])
#         print('Test recall:', scores[2])
#         print('Test f1-score:', scores[3])


        preds = model.predict(x_test)
        preds = [1 if preds[i][0] > 0.1 else 0 for i in range(preds.shape[0])]
        tp_ = tp(preds, y_test)
        fp_ = fp(preds, y_test)
        fn_ = fn(preds, y_test)
        p = precision(tp_, fp_)
        r = recall(tp_, fn_)
        f1 = f1_score(p, r)

        print(f"tp: {tp_}, nb_pos: {len([1 for y in y_test if y == 1])}")
        print(f"fp: {fp_}, nb_pos: {len([1 for y in y_test if y == 1])}")
        print(f"fn: {fn_}, nb_neg: {len([1 for y in y_test if y == 0])}")
        print(f"precision: {p}, recall: {r}, f1: {f1}")
    
        if distances:
            thresh = 1
            preds = [preds[j] if distances[i][j] < thresh else 0 for j in range(len(preds))]
            tp_ = tp(preds, y_test)
            fp_ = fp(preds, y_test)
            fn_ = fn(preds, y_test)
            p = precision(tp_, fp_)
            r = recall(tp_, fn_)
            f1 = f1_score(p, r)

            print(f"tp: {tp_}, nb_pos: {len([1 for y in y_test if y == 1])}")
            print(f"fp: {fp_}, nb_pos: {len([1 for y in y_test if y == 1])}")
            print(f"fn: {fn_}, nb_neg: {len([1 for y in y_test if y == 0])}")
            print(f"better: precision: {p}, recall: {r}, f1: {f1}")
            
def tp(preds, targets):
    tp = len([1 for i in range(len(preds)) if preds[i] == 1 and targets[i] == 1])
    
    return tp

def fp(preds, targets):
    fp = len([1 for i in range(len(preds)) if preds[i] == 1 and targets[i] == 0])
    
    return fp

def tn(preds, targets):
    tn = len([1 for i in range(len(preds)) if preds[i] == 0 and targets[i] == 0])

def fn(preds, targets):
    fn = len([1 for i in range(len(preds)) if preds[i] == 0 and targets[i] == 1])
    
    return fn
    
def precision(tp_, fp_):    
    return tp_ / (tp_ + fp_)

def recall(tp_, fn_):
    return tp_ / (tp_ + fn_)

def f1_score(p, r):
    return 2 * p * r / (p + r)

def test_batch(models, test_datasets):
    best_model = None
    best_error = float("inf")
    for m in models:
        error = 0
    for ds in test_datasets:
        x_test = ds[:, :-1]
    #       x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], 1)
        y_test = ds[:, -1]
    #       y_test = keras.utils.to_categorical(y_test, 2)
        scores = model.evaluate(x_test, y_test, verbose=1)
        loss = scores[0]
        error += loss

    error = error / len(test_datasets)
    if error < best_error:
        best_error = error
        best_model = model

    best_model.save("best_model.h5")
    test_model(best_model, test_datasets)
    
def explore_results(model, dataset, text):
    x_test = dataset[:, :-1]
    y_test = dataset[:, -1]
    
    preds = model.predict_classes(x_test)
    
    for i, y in enumerate(y_test):
        if y == 0 and y != preds[i]:
            print(text.iloc[i, :])
    
def test_dataset(model, dataset, max_length, nb_subclasses, nb_superclasses):
    
    test_df = pd.DataFrame()
    
    for _, row in dataset.iterrows():
        if row["class_1"] != row["class_2"]:
            test_df = test_df.append(row, ignore_index=True)
            
    test_df.to_csv("temp.csv", index=False)
            
    _, _, test_dataset = preprocess_dataset(None, None, ["temp.csv"],
                                            max_length, nb_subclasses, nb_superclasses)
    
    test_dataset = test_dataset[0]
            
    x_test = test_dataset[:, :-1]
    y_test = test_dataset[:, -1]
    
    preds = model.predict_classes(x_test)
    
    tp_ = len(dataset) - len(test_df) + tp(preds, y_test)
    fn_ = fn(preds, y_test)
    
    print(tp_)
    print(fn_)

if __name__ == '__main__':
    # max_length = compute_max_length(dataset)
    max_length = 150

    nb_subclasses = 0
    nb_superclasses = 5
    
    load = False
    
#     print("Loading training and validation dataset...")

    if load:
        (training_dataset, valid_dataset, test_dataset) = preprocess_dataset(
            "Training Dataset.csv",
            "Validation Dataset.csv",
            ["Test Dataset.csv"], max_length, nb_subclasses, nb_superclasses
        )

        np.save("training", training_dataset)
        np.save("validation", valid_dataset)

    else:
        training_dataset = np.load("/content/training.npy")
        valid_dataset = np.load("/content/validation.npy")

    print("Loading test datasets...")
    
    if load:
        (_, _, test_dataset) = preprocess_dataset(None, None, [
            "Test Dataset.csv"], max_length, nb_subclasses, nb_superclasses)

        np.save("test", test_dataset)

    else:
        test_dataset = np.load("test.npy", allow_pickle=True)
    
    test_files = [
        "Dataset Mouse-Human2.csv",
        "Training FMA-NCI 2.csv",
        "Training FMA-SNOMED 2.csv",
        "Training SNOMED-NCI 2.csv",
        "Dataset envo-sweet.csv",
        "Dataset flopo-pto.csv"]
       
    distances = []
    for f in test_files:
        ds = pd.read_csv(f)
        dist = []
        for _, row in ds.iterrows():
            c1 = row["class_1"].lower().replace("_", " ")
            c2 = row["class_2"].lower().replace("_", " ")

            d = edit_distance(c1, c2) / min(len(c1), len(c2))
            dist.append(d)
        distances.append(dist)

    training_dataset = training_dataset[np.random.choice(training_dataset.shape[0], 40000, replace=False)]
    embedding = prepare_embedding("glove.840B.300d-char.txt")

    model = create_model(2*(1 + nb_subclasses + nb_superclasses)*max_length, embedding)
    train(model, training_dataset, valid_dataset)
    # model.load_weights("best_model.h5")
    test_model(model, test_dataset)
    explore_results(model, test_dataset[0], pd.read_csv("Test Dataset.csv"))
    model.save("model_test_full_2.h5")
    test_ds = pd.read_csv("Positive Dataset.csv")
    test_dataset(model, test_ds, max_length, nb_subclasses, nb_superclasses)



Using TensorFlow backend.


FileNotFoundError: [Errno 2] No such file or directory: '/content/training.npy'

# Créer dataset de training