In [1]:

import os
import pandas as pd

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

# Loading the data

same method as proposed 

In [None]:
def read_texts_from_dir(dir_path):
  """
  Reads the texts from a given directory and saves them in the pd.DataFrame with columns ['id', 'file_1', 'file_2'].

  Params:
    dir_path (str): path to the directory with data
  """
  # Count number of directories in the provided path
  dir_count = sum(os.path.isdir(os.path.join(root, d)) for root, dirs, _ in os.walk(dir_path) for d in dirs)
  data=[0 for _ in range(dir_count)]
  print(f"Number of directories: {dir_count}")

  # For each directory, read both file_1.txt and file_2.txt and save results to the list
  i=0
  for folder_name in sorted(os.listdir(dir_path)):
    folder_path = os.path.join(dir_path, folder_name)
    if os.path.isdir(folder_path):
      try:
        with open(os.path.join(folder_path, 'file_1.txt'), 'r', encoding='utf-8') as f1:
          text1 = f1.read().strip()
        with open(os.path.join(folder_path, 'file_2.txt'), 'r', encoding='utf-8') as f2:
          text2 = f2.read().strip()
        index = int(folder_name[-4:])
        data[i]=(index, text1, text2)
        i+=1
      except Exception as e:
        print(f"Error reading directory {folder_name}: {e}")

  # Change list with results into pandas DataFrame
  df = pd.DataFrame(data, columns=['id', 'file_1', 'file_2']).set_index('id')
  return df


In [None]:
train_path="/content/data/train"
df_train=read_texts_from_dir(train_path)
test_path="/content/data/test"
df_test=read_texts_from_dir(test_path)

In [None]:
df_train_gt=pd.read_csv("/content/data/train.csv")
df_train_gt.head()

# Loading the model

In [None]:
import torch
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModel
from sklearn.decomposition import PCA
from sklearn.covariance import EllipticEnvelope
from sklearn.ensemble import IsolationForest
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

In [None]:
class RealTextManifoldDetector:
    """
    Learn the manifold of real texts and detect fake texts as outliers
    """

    def __init__(self, model_name="distilbert-base-uncased", device=None):
        self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {self.device}")

        # Use a more reliable model for embeddings
        print(f"Loading model: {model_name}")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name).to(self.device)
        self.model.eval()

        # Add padding token if missing
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        print(f"Model loaded successfully. Hidden size: {self.model.config.hidden_size}")
        print(f"Number of layers: {self.model.config.num_hidden_layers}")

        # Initialize components
        self.scaler = StandardScaler()
        self.pca = PCA(n_components=0.95)  # Keep 95% of variance
        self.outlier_detector = None
        self.real_embeddings = None
        self.real_centroid = None

    def test_model(self, sample_text="This is a test sentence."):
        """
        Test if the model works with a simple text
        """
        print("Testing model with sample text...")
        try:
            embeddings = self.get_all_layer_embeddings(sample_text)
            print(f"Model test successful!")
            print(f"Number of layers: {len(embeddings)}")
            print(f"Embedding shape for layer -1: {embeddings[-1].shape}")
            return True
        except Exception as e:
            print(f" Model test failed: {e}")
            return False

    def get_all_layer_embeddings(self, text, pooling='mean'):
        """
        Extract embeddings from ALL layers

        Returns:
            dict: {layer_idx: embedding_tensor}
        """
        # Ensure text is string and not empty
        if not isinstance(text, str) or len(text.strip()) == 0:
            print(f"Warning: Invalid text input: {text}")
            text = "Empty text"  # Fallback

        model_inputs = self.tokenizer(
            text,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=512
        ).to(self.device)

        # Debug: Check token types
        print(f"Input IDs dtype: {model_inputs.input_ids.dtype}")
        print(f"Input IDs shape: {model_inputs.input_ids.shape}")

        # Ensure input_ids are integers
        if model_inputs.input_ids.dtype != torch.long:
            model_inputs.input_ids = model_inputs.input_ids.long()

        with torch.no_grad():
            try:
                outputs = self.model(
                    input_ids=model_inputs.input_ids,
                    attention_mask=model_inputs.attention_mask,
                    output_hidden_states=True
                )
            except Exception as e:
                print(f"Model forward pass error: {e}")
                print(f"Input shape: {model_inputs.input_ids.shape}")
                print(f"Input sample: {model_inputs.input_ids[0][:10]}")
                raise e

            layer_embeddings = {}
            attention_mask = model_inputs.attention_mask.unsqueeze(-1)

            for layer_idx, hidden_state in enumerate(outputs.hidden_states):
                if pooling == 'mean':
                    masked_embeddings = hidden_state * attention_mask
                    embedding = masked_embeddings.sum(dim=1) / attention_mask.sum(dim=1)
                elif pooling == 'cls':
                    embedding = hidden_state[:, 0, :]
                elif pooling == 'max':
                    embedding = torch.max(hidden_state, dim=1)[0]

                layer_embeddings[layer_idx] = embedding.squeeze(0).cpu().numpy()

        return layer_embeddings

    def extract_real_texts(self, df, labels_df):
        """
        Extract only the real texts from pairs using labels

        Args:
            df: DataFrame with text pairs (indexed by id)
            labels_df: DataFrame with columns ['id', 'real_text_id'] where real_text_id is 1 or 2

        Returns:
            list: Real texts only
        """
        real_texts = []

        for idx, row in df.iterrows():
            if idx in labels_df.index:
                real_text_id = labels_df.loc[idx]['real_text_id']

                if real_text_id == 1:
                    real_texts.append(row['file_1'])
                elif real_text_id == 2:
                    real_texts.append(row['file_2'])
                else:
                    print(f"Warning: Invalid real_text_id {real_text_id} for index {idx}")
            else:
                print(f"Warning: No label found for index {idx}")

        print(f"Extracted {len(real_texts)} real texts")
        return real_texts

    def learn_real_manifold(self, real_texts, target_layer=-2):
        """
        Learn the manifold of real texts

        Args:
            real_texts: List of real text strings
            target_layer: Which layer to use for manifold learning
        """
        print(f"Learning manifold from {len(real_texts)} real texts...")
        print(f"Using layer {target_layer}")

        # Extract embeddings from target layer
        embeddings_list = []

        for text in tqdm(real_texts, desc="Extracting embeddings"):
            layer_embeddings = self.get_all_layer_embeddings(text)
            target_embedding = layer_embeddings[target_layer]
            embeddings_list.append(target_embedding)

        # Convert to numpy array
        self.real_embeddings = np.array(embeddings_list)
        print(f"Real embeddings shape: {self.real_embeddings.shape}")

        # Standardize embeddings
        self.real_embeddings_scaled = self.scaler.fit_transform(self.real_embeddings)

        # Apply PCA for dimensionality reduction
        self.real_embeddings_pca = self.pca.fit_transform(self.real_embeddings_scaled)
        print(f"PCA reduced shape: {self.real_embeddings_pca.shape}")
        print(f"Explained variance ratio: {self.pca.explained_variance_ratio_.sum():.3f}")

        # Compute centroid of real texts
        self.real_centroid = np.mean(self.real_embeddings_pca, axis=0)

        # Fit outlier detection model
        print("Fitting outlier detection model...")
        self.outlier_detector = EllipticEnvelope(contamination=0.1, random_state=42)
        self.outlier_detector.fit(self.real_embeddings_pca)

        print(" Real text manifold learned!")

    def predict_text(self, text, target_layer=-2):
        """
        Predict if a text is real (1) or fake (0)

        Args:
            text: Input text string
            target_layer: Same layer used for training

        Returns:
            int: 1 for real, 0 for fake
            float: Distance from real centroid
            float: Outlier score
        """
        # Get embedding
        layer_embeddings = self.get_all_layer_embeddings(text)
        embedding = layer_embeddings[target_layer].reshape(1, -1)

        # Transform using learned scaler and PCA
        embedding_scaled = self.scaler.transform(embedding)
        embedding_pca = self.pca.transform(embedding_scaled)

        # Compute distance from centroid
        distance = np.linalg.norm(embedding_pca - self.real_centroid)

        # Predict using outlier detector
        is_real = self.outlier_detector.predict(embedding_pca)[0]
        outlier_score = self.outlier_detector.decision_function(embedding_pca)[0]

        return int(is_real == 1), distance, outlier_score

    def evaluate_pairs(self, df, labels_df, target_layer=-2):
        """
        Evaluate on text pairs

        Args:
            df: DataFrame with text pairs
            labels_df: DataFrame with columns ['id', 'real_text_id']

        Returns:
            np.array: Predictions (1 or 2 indicating which text is real)
            dict: Detailed results
        """
        predictions = []
        details = []

        print("Evaluating text pairs...")
        for idx, row in tqdm(df.iterrows(), total=len(df)):
            # Predict both texts
            pred1, dist1, score1 = self.predict_text(row['file_1'], target_layer)
            pred2, dist2, score2 = self.predict_text(row['file_2'], target_layer)

            # Choose the one more likely to be real (higher score, lower distance)
            if score1 > score2:  # Text 1 is more "real-like"
                prediction = 1
            else:  # Text 2 is more "real-like"
                prediction = 2

            predictions.append(prediction)

            true_label = labels_df.loc[idx]['real_text_id'] if idx in labels_df.index else None
            details.append({
                'text1_score': score1,
                'text2_score': score2,
                'text1_distance': dist1,
                'text2_distance': dist2,
                'prediction': prediction,
                'true_label': true_label
            })

        return np.array(predictions), details

    def visualize_manifold(self, df, labels_df, target_layer=-2):
        """
        Visualize the real text manifold and fake text positions
        """
        if self.real_embeddings_pca.shape[1] < 2:
            print("Need at least 2 PCA components for visualization")
            return

        plt.figure(figsize=(12, 8))

        # Plot real text manifold
        plt.scatter(self.real_embeddings_pca[:, 0], self.real_embeddings_pca[:, 1],
                   alpha=0.6, c='green', label='Real Texts', s=50)

        # Plot centroid
        plt.scatter(self.real_centroid[0], self.real_centroid[1],
                   c='red', s=200, marker='*', label='Real Centroid')

        # Sample some fake texts and plot them
        fake_embeddings = []
        for idx, row in df.head(20).iterrows():  # Sample first 20
            real_text_id = labels_df.loc[idx]['real_text_id']
            fake_text = row['file_2'] if real_text_id == 1 else row['file_1']

            layer_embeddings = self.get_all_layer_embeddings(fake_text)
            embedding = layer_embeddings[target_layer].reshape(1, -1)
            embedding_scaled = self.scaler.transform(embedding)
            embedding_pca = self.pca.transform(embedding_scaled)
            fake_embeddings.append(embedding_pca[0])

        fake_embeddings = np.array(fake_embeddings)
        plt.scatter(fake_embeddings[:, 0], fake_embeddings[:, 1],
                   alpha=0.6, c='orange', label='Fake Texts', s=50)

        plt.xlabel('PCA Component 1')
        plt.ylabel('PCA Component 2')
        plt.title('Real vs Fake Text Manifold')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()

In [None]:
model_id = "meta-llama/Llama-3.1-8B"
model_id = "Qwen/Qwen2-1.5B"
model_id = "Qwen/Qwen2-0.5B"

detector = RealTextManifoldDetector(model_id)


In [None]:
# Extract real texts
real_texts = detector.extract_real_texts(df_train, df_train_gt)

In [None]:
detector.learn_real_manifold(real_texts, target_layer=15)

In [None]:
# Evaluate
predictions, details = detector.evaluate_pairs(df_train, df_train_gt,target_layer=15)
true_labels = df_train_gt['real_text_id'].values

accuracy = accuracy_score(true_labels, predictions)
print(f"\n🎯 Accuracy: {accuracy:.4f}")