In [None]:
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import balanced_accuracy_score, matthews_corrcoef, f1_score, recall_score, precision_score
from sklearn.preprocessing import StandardScaler,OneHotEncoder
from sklearn.feature_extraction.text import TfidfVectorizer

from imblearn.combine import SMOTEENN

import pandas as pd
import numpy as np
import nltk
from sklearn.preprocessing import LabelEncoder
from scipy.sparse import csr_matrix
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
# file_path = 'Hall_2012_cleaned.csv'
# file_path = 'Jeyaraman_2020_cleaned.csv'
# file_path = 'Radjenovic_2013_cleaned.csv'
file_path = 'Smid_2020_cleaned.csv'

df = pd.read_csv(file_path, delimiter=',')
df = df.dropna(axis=0)
df_sample = df.copy()
df_sample = df.reset_index(drop=True)

from nltk.corpus import wordnet
import random

def get_synonyms(word):
    synonyms = set()
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            synonym = lemma.name().replace('_', ' ')
            synonyms.add(synonym)
    if word in synonyms:
        synonyms.remove(word)
    return list(synonyms)

def synonym_replacement(sentence, n):
    words = sentence.split()
    new_words = words.copy()
    random_word_list = list(set([word for word in words if word.isalpha()]))
    random.shuffle(random_word_list)
    num_replaced = 0
    for random_word in random_word_list:
        synonyms = get_synonyms(random_word)
        if len(synonyms) >= 1:
            synonym = random.choice(list(synonyms))
            new_words = [synonym if word == random_word else word for word in new_words]
            num_replaced += 1
        if num_replaced >= n: # only replace up to n words
            break

    sentence = ' '.join(new_words)
    return sentence

def augment_text(df, minority_class, augment_by=0.9):
    minority_df = df[df['label_included'] == minority_class]
    n_augmentations = int(len(minority_df) * augment_by)
    
    augmented_texts = []
    for _ in range(n_augmentations):
        original_text = random.choice(minority_df['Corpus'].tolist())
        augmented_text = synonym_replacement(original_text, n=1) # You can adjust n for more replacements
        augmented_texts.append(augmented_text)
    
    # Add augmented texts to the dataframe
    augmented_df = pd.DataFrame(augmented_texts, columns=['Corpus'])
    augmented_df['label_included'] = minority_class
    return pd.concat([df, augmented_df], ignore_index=True)

# Assuming your minority class is identified, for example, as 1
df_augmented = augment_text(df, minority_class=1, augment_by=0.5)


df_sample = pd.concat([df, df_augmented], ignore_index=True)

# Shuffle the dataframe to mix original and augmented examples (optional)
df_sample = df_sample.sample(frac=1).reset_index(drop=True)

X = df_sample['Corpus']
y = df_sample['label_included']

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

NUM_FOLDS = 5
RANDOM_STATE = 42
scaler = StandardScaler(with_mean=False)


# Lists to store evaluation metrics
balanced_acc_scores = []
mcc_scores = []
f1_scores = []

recall_scores = []
precision_scores = []
stratified_kfold = StratifiedKFold(n_splits=NUM_FOLDS, shuffle=True, random_state=RANDOM_STATE)





In [None]:
embedding_dim=200
learning_rate = 0.001
batch_size = 2
hidden_units = 128
projection_units = 128
epochs = 10
dropout_rate = 0.3
temperature = 0.1


In [None]:
def create_classifier(encoder, trainable=True):
    input_shape = (encoder.get_feature_names_out().shape[0],)

    inputs = keras.Input(shape=input_shape, sparse=True, dtype=tf.float32)
    
    # Add a single dense layer for classification
    outputs = keras.layers.Dense(1, activation="sigmoid")(inputs)

    model = keras.Model(inputs=inputs, outputs=outputs, name="simple-text-classifier")

    model.compile(
        optimizer=keras.optimizers.Adam(0.001,),
        loss='binary_crossentropy',
        metrics=[
            keras.metrics.BinaryAccuracy(),
            balanced_accuracy_metric,
            f1_score_metric,
            mcc_metric,
        ]
    )
    
    return model


In [None]:

def create_classifier_2(input_shape, trainable=True):
    inputs = keras.Input(shape=input_shape, dtype=tf.float32)

    # Add a single dense layer for classification
    outputs = keras.layers.Dense(1, activation="sigmoid")(inputs)

    model = keras.Model(inputs=inputs, outputs=outputs, name="simple-text-classifier")

    model.compile(
        optimizer=keras.optimizers.Adam(0.001,),
        loss='binary_crossentropy',
        metrics=[
            keras.metrics.BinaryAccuracy(),
            balanced_accuracy_metric,
            f1_score_metric,
            mcc_metric,
        ]
    )

    return model

In [None]:
def mcc_metric(y_true, y_pred):
    true_positives = tf.keras.backend.sum(tf.keras.backend.round(tf.keras.backend.clip(y_true * y_pred, 0, 1)))
    true_negatives = tf.keras.backend.sum(tf.keras.backend.round(tf.keras.backend.clip((1 - y_true) * (1 - y_pred), 0, 1)))
    false_positives = tf.keras.backend.sum(tf.keras.backend.round(tf.keras.backend.clip((1 - y_true) * y_pred, 0, 1)))
    false_negatives = tf.keras.backend.sum(tf.keras.backend.round(tf.keras.backend.clip(y_true * (1 - y_pred), 0, 1)))
    
    denominator = tf.keras.backend.sqrt((true_positives + false_positives) * (true_positives + false_negatives) * (true_negatives + false_positives) * (true_negatives + false_negatives))
    mcc = (true_positives * true_negatives - false_positives * false_negatives) / (denominator + tf.keras.backend.epsilon())
    
    return mcc


def balanced_accuracy_metric(y_true, y_pred):
    actual_positives = tf.math.reduce_sum(y_true)
    actual_negatives = tf.math.reduce_sum(1 - y_true)
    
    epsilon = 1e-7  # Small constant to avoid division by zero

    true_positives = tf.math.reduce_sum(y_true * tf.round(y_pred))
    true_negatives = tf.math.reduce_sum((1 - y_true) * tf.round(1 - y_pred))
    
    balanced_accuracy = 0.5 * (true_positives / (actual_positives + epsilon) + true_negatives / (actual_negatives + epsilon))
    
    return balanced_accuracy




def f1_score_metric(y_true, y_pred):
    true_positives = tf.keras.backend.sum(tf.keras.backend.round(tf.keras.backend.clip(y_true * y_pred, 0, 1)))
    predicted_positives = tf.keras.backend.sum(tf.keras.backend.round(tf.keras.backend.clip(y_pred, 0, 1)))
    actual_positives = tf.keras.backend.sum(y_true)
    
    precision = true_positives / (predicted_positives + tf.keras.backend.epsilon())
    recall = true_positives / (actual_positives + tf.keras.backend.epsilon())
    
    f1_score = 2 * (precision * recall) / (precision + recall + tf.keras.backend.epsilon())
    
    return f1_score


In [None]:
def train_evaluate_tf_idf_classifier(X_train, y_train, X_test, y_test, kfold):
    # Data preprocessing
    tfidf_vectorizer = TfidfVectorizer(max_features=1000, ngram_range=(2, 3), max_df=0.7)
    X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)
    X_test_tfidf = tfidf_vectorizer.transform(X_test)

    X_train_tfidf_standardized = scaler.fit_transform(X_train_tfidf)
    X_test_tfidf_standardized = scaler.transform(X_test_tfidf)

    # Convert the standardized sparse matrix to a dense tensor
    X_train_tfidf_tensor = tf.convert_to_tensor(X_train_tfidf_standardized.toarray(), dtype=tf.float32)
    X_test_tfidf_tensor = tf.convert_to_tensor(X_test_tfidf_standardized.toarray(), dtype=tf.float32)

#     # Resample using SMOTEENN
#     X_train_resampled, y_train_resampled = smote_enn.fit_resample(X_train_tfidf_tensor, y_train)

    # Build and compile the TF-IDF classifier model
    classifier_model = create_classifier(tfidf_vectorizer, trainable=True)

    # Train the model
    history = classifier_model.fit(
        X_train_tfidf_tensor, y_train,
        validation_data=(X_test_tfidf_tensor, y_test),
        epochs=epochs,
        batch_size=batch_size,
        verbose=0
    )

    # Evaluate the model
    y_pred_proba = classifier_model.predict(X_test_tfidf_tensor)
    y_pred = (y_pred_proba > 0.5).astype(int)

    balanced_acc = balanced_accuracy_score(y_test, y_pred)
    mcc = matthews_corrcoef(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    
    return balanced_acc, mcc, f1, recall

In [None]:
for fold_idx, (train_index, test_index) in enumerate(stratified_kfold.split(X, y_encoded), 1):
    X_train_fold, X_test_fold = X.iloc[train_index], X.iloc[test_index]
    y_train_fold, y_test_fold = y_encoded[train_index], y_encoded[test_index]

    # Train and evaluate TF-IDF classifier model
    balanced_acc_fold, mcc_fold, f1_fold, recall_fold = train_evaluate_tf_idf_classifier(
        X_train_fold, y_train_fold, X_test_fold, y_test_fold, stratified_kfold
    )

    # Append scores to lists
    balanced_acc_scores.append(balanced_acc_fold)
    mcc_scores.append(mcc_fold)
    f1_scores.append(f1_fold)
    recall_scores.append(recall_fold)

    # Print results for the fold
    print(f"Fold {fold_idx}:")
    print(f"  Balanced Accuracy: {balanced_acc_fold:.2f}")
    print(f"  F1-Score: {f1_fold:.2f}")
    print(f"  Matthew's Correlation Coefficient: {mcc_fold:.2f}")
    print(f"  Recall: {recall_fold:.2f}")

# Print average scores
print("Average Scores sentences:")
print(f"{np.mean(balanced_acc_scores):.2f}")
print(f"{np.mean(f1_scores):.2f}")
print(f"{np.mean(mcc_scores):.2f}")
print(f"{np.mean(recall_scores):.2f}")

In [None]:
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import GlobalAveragePooling1D
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

balanced_acc_scores = []
mcc_scores = []
f1_scores = []
recall_scores = []
precision_scores = []

In [None]:
def create_simple_encoder():
    model = Sequential([
        Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=max_sequence_length),
        GlobalAveragePooling1D()
    ])
    
    return model

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(X)
X_sequences = tokenizer.texts_to_sequences(X)
max_sequence_length = max(len(seq) for seq in X_sequences)
X_padded = pad_sequences(X_sequences, maxlen=max_sequence_length, padding='post')

# Define vocabulary size and embedding dimension
vocab_size = len(tokenizer.word_index) + 1
embedding_dim = 200

In [None]:
for fold_idx, (train_index, test_index) in enumerate(stratified_kfold.split(X_padded, y_encoded), 1):
    X_train_fold, X_test_fold = X_padded[train_index], X_padded[test_index]
    y_train_fold, y_test_fold = y_encoded[train_index], y_encoded[test_index]

    # Create and compile the simple encoder model
    simple_encoder = create_simple_encoder()
    simple_encoder.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    # Train the simple encoder on the training data
    simple_encoder.fit(X_train_fold, y_train_fold, epochs=5, batch_size=32, verbose=0)

    # Get embeddings for the training and test data
    X_train_embeddings_fold = simple_encoder.predict(X_train_fold)
    X_test_embeddings_fold = simple_encoder.predict(X_test_fold)

    # Standardize the features if they haven't been standardized
    if not np.all(np.isclose(np.mean(X_train_embeddings_fold), 0.0, atol=1e-4)) or not np.all(np.isclose(np.std(X_train_embeddings_fold), 1.0, atol=1e-4)):
        X_train_embeddings_fold = scaler.fit_transform(X_train_embeddings_fold)
        X_test_embeddings_fold = scaler.transform(X_test_embeddings_fold)



    # Create and compile the classifier model
    classifier_model = create_classifier_2((X_test_embeddings_fold.shape[1],))

    # Train the classifier model on the entire resampled training data
    classifier_model.fit(
        X_train_embeddings_fold,  
        y_train_fold,
        epochs=epochs,
        batch_size=batch_size,
        verbose=0 
    )

    y_pred_fold = (classifier_model.predict(X_test_embeddings_fold) > 0.5).astype(int)

    # Calculate evaluation metrics for this fold
    balanced_acc_fold = balanced_accuracy_score(y_test_fold, y_pred_fold)
    mcc_fold = matthews_corrcoef(y_test_fold, y_pred_fold)
    f1_fold = f1_score(y_test_fold, y_pred_fold)
    precision_fold = precision_score(y_test_fold, y_pred_fold)
    recall_fold = recall_score(y_test_fold, y_pred_fold)

    # Append the scores to the lists
    balanced_acc_scores.append(balanced_acc_fold)
    mcc_scores.append(mcc_fold)
    f1_scores.append(f1_fold)
    precision_scores.append(precision_fold)
    recall_scores.append(recall_fold)

    # Print the evaluation metrics for this fold
    print(f"Fold {fold_idx}:")
    print(f"Balanced Accuracy: {balanced_acc_fold:.2f}")
    print(f"F1-Score: {f1_fold:.2f}")
    print(f"Matthew's Correlation Coefficient: {mcc_fold:.2f}")
    print(f"precision: {precision_fold:.2f}")
    print(f"Recall: {recall_fold:.2f}")
    print()



In [None]:
print("Average Scores sentences:")
print(f"{np.mean(mcc_scores):.2f}")
print(f"{np.mean(balanced_acc_scores):.2f}")
print(f"{np.mean(f1_scores):.2f}")
print(f"{np.mean(precision_scores):.2f}")
print(f"{np.mean(recall_scores):.2f}")