In [1]:
import sys
import os
project_path = os.path.abspath('/Enter/Your/Project/Path/Here')
sys.path.append(project_path)
from src.API.mongo_utils import get_mongo_client, get_collection
from src.API.constant import AI, HUMAN, REUTER_COLLECTION, OPEN_AI_COLLECTION
# IMPORTANT: 0 is AI, 1 is Human
# Initialize the client
client = get_mongo_client()
# Fetch collections
human_collection = get_collection(REUTER_COLLECTION)
 
AI_collection = get_collection(OPEN_AI_COLLECTION)

# Fetch documents from the collections
AI_documents = AI_collection.find()
human_documents = human_collection.find()

In [2]:
# Constant for the field names
EMBEDDING_CLUSTERS = 'embedding_clustering'
EMBEDDING_CLASSIFICATION = 'embedding_classification'

In [3]:
from src.API.text_classifier import create_text_data_list
# print update
print("Starting to create text data list")

# Create a list of TextData objects
print("Creating text data list for AI")
# Open AI: 99% accuracy
# Gemini : 99.45% accuracy
AI_texts = create_text_data_list(AI_documents, AI)
print("Creating text data list for Human")
human_texts = create_text_data_list(human_documents, HUMAN)
# Shuffle AI_texts
import random
random.shuffle(AI_texts)

In [4]:
print(len(AI_texts))
print(len(human_texts))

In [5]:
AI_texts = AI_texts[:5000]

In [6]:
from sklearn.model_selection import train_test_split

# Split the list into training and testing
AI_train, AI_test = train_test_split(AI_texts, test_size=0.2, random_state=42)
human_train, human_test = train_test_split(human_texts, test_size=0.2, random_state=42)
training_data = AI_train + human_train
test_data = AI_test + human_test

In [7]:
# Print the length of the training and testing data
print("Training data length: ", len(training_data))
print("Testing data length: ", len(test_data))

# **K-Mean Clustering**

In [8]:
from src.API.machine_learning import k_mean_cluster

n_clusters = 2
print("Training the KMeans model with classification embeddings")
classification_kmeans, classification_feature_array, classification_cluster_majorities = k_mean_cluster(training_data, EMBEDDING_CLASSIFICATION, 'text_type', n_clusters)

In [9]:
# print the vote dictionary for the first text data
print(training_data[0].votes)
print(training_data[0].text_type)

In [10]:
from src.API.machine_learning import plot_latest_result, plot_actual_label, plot_actual_label_3d
%matplotlib notebook
# Print update
print("Visualizing the result")
# Visualize the result
plot_latest_result(training_data, 'KMeans2_embedding_classification', 'embedding_classification')
plot_actual_label(training_data, 'embedding_classification')
plot_actual_label_3d(training_data, 'embedding_classification')

In [11]:
from src.API.machine_learning import generate_classification_report


def calculate_accuracy_and_generate_report(data_list):
    """
    Calculate accuracy based on text_data votes and generate a classification report.
    
    :param data_list: List of text_data objects. Each must have a `classify()` method and a `text_type` attribute.
    """
    if data_list is None or len(data_list) == 0:
        print("The data list is empty")
        return

    true_labels = []
    predicted_labels = []
    correct = 0

    for text_data in data_list:
        pred = text_data.classify()
        true = text_data.text_type
        predicted_labels.append(pred)
        true_labels.append(true)
        if pred == true:
            correct += 1

    accuracy = correct / len(data_list)
    print(f"Accuracy: {accuracy}")

    generate_classification_report(true_labels, predicted_labels, target_names=["AI", "HUMAN"])


calculate_accuracy_and_generate_report(training_data)

In [12]:
from src.API.machine_learning import k_mean_cluster_test

# Test the model
print("Testing the model")

result_array, cluster_label = k_mean_cluster_test(classification_kmeans, test_data, EMBEDDING_CLASSIFICATION, 'text_type', classification_cluster_majorities)
#result_array, cluster_label = k_mean_cluster_test(small_kmeans, test_data, EMBEDDING_SMALL, 'text_type', small_cluster_majorities)

# Visualize the result
plot_latest_result(test_data,'KMeans2_embedding_classification', EMBEDDING_CLASSIFICATION)
plot_actual_label(test_data, EMBEDDING_CLASSIFICATION)
plot_actual_label_3d(test_data, EMBEDDING_CLASSIFICATION)
# Calculate the accuracy using the vote
calculate_accuracy_and_generate_report(test_data)

In [13]:
# Save the model 
from src.API.machine_learning import save_kmeans_model
save_kmeans_model(classification_kmeans, EMBEDDING_CLASSIFICATION, classification_cluster_majorities, "kmean.pkl")

# **Hierarchical Clustering**

In [14]:
from src.API.machine_learning import hierarchical_cluster, hierarchical_cluster_test

n_clusters = 4
distance_threshold = 0.75
# Train the Hierarchical model
print("Training the Hierarchical model")
hierarchical_model, hierarchical_feature_array, hierarchical_cluster_majorities, hierarchical_centroids = hierarchical_cluster(training_data, EMBEDDING_CLASSIFICATION, 'text_type', n_clusters, distance_threshold=distance_threshold)

# Test the model
print("Testing the Hierarchical model")
result_array, cluster_label = hierarchical_cluster_test(hierarchical_model, test_data, EMBEDDING_CLASSIFICATION, 'text_type', hierarchical_cluster_majorities, hierarchical_centroids)

# Calculate the accuracy using the vote
calculate_accuracy_and_generate_report(test_data)
print(test_data[0].votes)

In [15]:
from src.API.machine_learning import save_hierarchical_model
save_hierarchical_model(hierarchical_model, EMBEDDING_CLASSIFICATION, hierarchical_cluster_majorities, hierarchical_centroids, "hierarchical.pkl")

**Auto-Encoder**

In [16]:
# Create the dataloader
from src.API.machine_learning import create_dataloader
field = EMBEDDING_CLASSIFICATION #94% accuracy
# field = EMBEDDING_CLUSTERS 89% accuracy
human_dataloader = create_dataloader(human_train, field, 64)
AI_dataloader = create_dataloader(AI_train, field, 64)

In [17]:
# Initialize the model
# fetch the field size using the field variable
input_dim = len(getattr(human_texts[0], field))  # Size of the embeddings
print(input_dim)
latent_dim = 128  # Dimensionality of the latent space

# Create 2 encoders
from src.API.machine_learning import create_autoencoder
h_encoder, h_criterion, h_optimizer = create_autoencoder(input_dim, latent_dim, 0.001)
AI_encoder, AI_criterion, AI_optimizer = create_autoencoder(input_dim, latent_dim, 0.001)

In [18]:
# Train human encoder
from src.API.machine_learning import train_autoencoder
print("Training human encoder")
train_autoencoder(h_encoder, h_criterion, h_optimizer, human_dataloader, 20)

# Train AI encoder
print("Training AI encoder")
train_autoencoder(AI_encoder, AI_criterion, AI_optimizer, AI_dataloader, 20)

In [19]:
import numpy as np
# Test the encoder
encoder_test_set = np.concatenate((human_test, AI_test))

# Create a list x that contains the embeddings and a list y that contains the labels
x = []
y = []
for text_data in encoder_test_set:
    x.append(getattr(text_data, field))
    y.append(text_data.text_type)

In [20]:
import torch
# Convert the test data to tensor
X_test_tensor = torch.tensor(x, dtype=torch.float32)

In [21]:
# Evaluate each test embedding
method_name = "Auto-Encoder"+"_"+field
predictions = []
with torch.no_grad():
    for idx, embedding in enumerate(X_test_tensor):
        # Get reconstruction errors
        reconstructed_human = h_encoder(embedding)
        error_human = h_criterion(reconstructed_human, embedding).item()
        
        reconstructed_ai = AI_encoder(embedding)
        error_ai = AI_criterion(reconstructed_ai, embedding).item()
        
        # Classify based on reconstruction error
        if error_human < error_ai:
            classification = HUMAN  # Classified as Human-written
            predictions.append(HUMAN)
        else:
            classification = AI  # Classified as AI-generated
            predictions.append(AI)
        # Add the vote
        encoder_test_set[idx].add_vote(method_name, classification)

In [22]:
from sklearn.metrics import classification_report

# Calculate the accuracy using the vote
print("Vote Accuracy\n")
calculate_accuracy_and_generate_report(encoder_test_set)
# Convert predictions to a NumPy array
predictions = np.array(predictions)

# Print evaluation metrics
print("Auto_encoder Accuracy\n")
print(classification_report(y, predictions, target_names=["Human", "AI"]))

In [23]:
# Visualize the result
print("method_name: "+method_name+ " field: "+field)
plot_latest_result(encoder_test_set, method_name, field)
plot_actual_label(encoder_test_set, field)

In [24]:
def is_wrongly_classified(text_data):
    return text_data.classify() != text_data.text_type

In [25]:
inaccurate = 0
for text_data in encoder_test_set:
    if is_wrongly_classified(text_data):
        print('*'*50)
        inaccurate += 1
        print(text_data.id)
        print(text_data.text_type)
        print(text_data.votes)
        print('*'*50)
print(inaccurate)

In [26]:
# Save Auto-Encoders
from src.API.machine_learning import save_autoencoders
save_autoencoders(h_encoder, AI_encoder, h_optimizer, AI_optimizer, input_dim, latent_dim, filename="autoencoders.pth")

**Simple POS Transition Matrix**

In [27]:
%load_ext autoreload
%autoreload 2

In [28]:
from src.API.machine_learning import compute_transition_matrix, evaluate_sequence

# Create mappings from pos_tags (as tuples) to text_data objects
human_pos_to_text_data = {tuple(text_data.pos_tags): text_data for text_data in human_train + human_test}
AI_pos_to_text_data = {tuple(text_data.pos_tags): text_data for text_data in AI_train + AI_test}


# fetch the attribute pos_tags for all the text_data objects
human_train_pos = [text_data.pos_tags for text_data in human_train]
AI_train_pos = [text_data.pos_tags for text_data in AI_train]
human_test_pos = [text_data.pos_tags for text_data in human_test]
AI_test_pos = [text_data.pos_tags for text_data in AI_test]

# Compute the transition matrix
human_transition_matrix = compute_transition_matrix(human_train_pos)
AI_transition_matrix = compute_transition_matrix(AI_train_pos)

# Evaluate the test sequences using the transition matrices
human_normalized_log_likelihoods = [evaluate_sequence(seq, human_transition_matrix) for seq in human_test_pos]
human_Ai_normalized_log_likelihoods = [evaluate_sequence(seq, AI_transition_matrix) for seq in human_test_pos]
AI_normalized_log_likelihoods = [evaluate_sequence(seq, AI_transition_matrix) for seq in AI_test_pos]
Ai_human_normalized_log_likelihoods = [evaluate_sequence(seq, human_transition_matrix) for seq in AI_test_pos]


In [29]:
# Compare human test sequences
human_comparisons = [
    (human_ll, ai_ll, HUMAN if human_ll > ai_ll else AI)
    for human_ll, ai_ll in zip(human_normalized_log_likelihoods, human_Ai_normalized_log_likelihoods)
]

# Compare AI test sequences
ai_comparisons = [
    (ai_ll, human_ll, AI if ai_ll > human_ll else HUMAN)
    for ai_ll, human_ll in zip(AI_normalized_log_likelihoods, Ai_human_normalized_log_likelihoods)
]

In [30]:
# Assign votes to human test text data
method_name = "Simple_POS_Transition_Matrix"
for pos_tags, (human_ll, ai_ll, vote) in zip(human_test_pos, human_comparisons):
    text_data = human_pos_to_text_data[tuple(pos_tags)]
    text_data.add_vote(method_name, vote)

# Assign votes to AI test text data
for pos_tags, (ai_ll, human_ll, vote) in zip(AI_test_pos, ai_comparisons):
    text_data = AI_pos_to_text_data[tuple(pos_tags)]
    text_data.add_vote(method_name, vote)


In [31]:
inaccurate = 0
for text_data in test_data:
    if is_wrongly_classified(text_data):
        print('*'*50)
        inaccurate += 1
        print(text_data.id)
        print(text_data.votes)
print("Inaccurate = " + str(inaccurate))
# Calculate the accuracy using the vote
calculate_accuracy_and_generate_report(encoder_test_set)

In [32]:
# Calculate the accuracy of the predictions making sure to use float division
human_accuracy = sum(1.0 for _, _, predicted in human_comparisons if predicted == HUMAN) / len(human_comparisons)
ai_accuracy = sum(1.0 for _, _, predicted in ai_comparisons if predicted == AI) / len(ai_comparisons)

# Calculate the number of times human is predicted as AI and vice versa
human_accuracy_wrong = sum(1 for _, _, predicted in human_comparisons if predicted == AI)
ai_accuracy_wrong = sum(1 for _, _, predicted in ai_comparisons if predicted == HUMAN)

In [33]:
# Print the results
print("Human accuracy:", human_accuracy)
print("AI accuracy:", ai_accuracy)
print("Number of time Human is predicted as AI:", human_accuracy_wrong)
print("Number of time AI is predicted as Human:", ai_accuracy_wrong)

In [34]:
# Compute the confusion matrix components
TP_human = sum(1 for _, _, predicted in human_comparisons if predicted == HUMAN)  # Correctly predicted as Human
FN_human = sum(1 for _, _, predicted in human_comparisons if predicted == AI)     # Incorrectly predicted as AI

TP_ai = sum(1 for _, _, predicted in ai_comparisons if predicted == AI)          # Correctly predicted as AI
FN_ai = sum(1 for _, _, predicted in ai_comparisons if predicted == HUMAN)       # Incorrectly predicted as Human

FP_human = FN_ai  # AI misclassified as Human
FP_ai = FN_human  # Human misclassified as AI

# Calculate Precision
precision_human = TP_human / (TP_human + FP_human) if (TP_human + FP_human) > 0 else 0
precision_ai = TP_ai / (TP_ai + FP_ai) if (TP_ai + FP_ai) > 0 else 0

# Calculate Recall
recall_human = TP_human / (TP_human + FN_human) if (TP_human + FN_human) > 0 else 0
recall_ai = TP_ai / (TP_ai + FN_ai) if (TP_ai + FN_ai) > 0 else 0

# Calculate F1-score
f1_human = 2 * (precision_human * recall_human) / (precision_human + recall_human) if (precision_human + recall_human) > 0 else 0
f1_ai = 2 * (precision_ai * recall_ai) / (precision_ai + recall_ai) if (precision_ai + recall_ai) > 0 else 0

# Calculate Overall Accuracy
total_samples = len(human_comparisons) + len(ai_comparisons)
accuracy = (TP_human + TP_ai) / total_samples if total_samples > 0 else 0

# Display results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision (Human): {precision_human:.4f}")
print(f"Recall (Human): {recall_human:.4f}")
print(f"F1-score (Human): {f1_human:.4f}")
print(f"Precision (AI): {precision_ai:.4f}")
print(f"Recall (AI): {recall_ai:.4f}")
print(f"F1-score (AI): {f1_ai:.4f}")

In [35]:
# Save POS Matrix
from src.API.machine_learning import save_transition_matrix
save_transition_matrix(human_transition_matrix, "human_matrix.pkl")
save_transition_matrix(AI_transition_matrix, "AI_matrix.pkl")

In [36]:
calculate_accuracy_and_generate_report(test_data)

In [37]:
# print the vote dictionary for the first text data
print(test_data[0].votes)
print(test_data[0].text_type)