In [1]:
import transformers
import torch
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSequenceClassification, pipeline
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import mean_squared_error

# Step 1: Load and Deduplicate Questions
def deduplicate_questions(questions):
    seen = set()
    unique_questions = []
    for question in questions:
        if question not in seen:
            seen.add(question)
            unique_questions.append(question)
    return unique_questions

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Step 2: Generate Responses
def generate_responses(prompt, num_samples=4, max_new_tokens=50):
    responses = []
    for _ in range(num_samples):
        outputs = response_generator(prompt, max_length=max_new_tokens, num_return_sequences=1, do_sample=True)
        if outputs and 'generated_text' in outputs[0]:
            responses.append(outputs[0]['generated_text'].strip())
    return responses

In [3]:
# Step 3: Semantic Clustering

def check_entailment(text1, text2):
    inputs = tokenizer(text1, text2, return_tensors='pt', truncation=True, padding=True)
    outputs = nli_model(**inputs)
    probs = torch.softmax(outputs.logits, dim=1)
    return torch.argmax(probs, dim=1).item() == 0

def semantic_clustering(responses):
    clusters = []
    for response in responses:
        found = False
        for cluster in clusters:
            if all(check_entailment(response, member) for member in cluster['responses']):
                cluster['responses'].append(response)
                found = True
                break
        if not found:
            clusters.append({'responses': [response]})
    return clusters


# Step 4: Calculate Entropy
def calculate_entropy(clusters):
    # Debug: Check the structure of clusters
    if not all(isinstance(cluster, dict) and 'responses' in cluster for cluster in clusters):
        raise ValueError("Each cluster should be a dictionary with a 'responses' key.")

    total_responses = sum(len(cluster['responses']) for cluster in clusters)
    cluster_probs = np.array([len(cluster['responses']) / total_responses for cluster in clusters])
    return -np.sum(cluster_probs * np.log(cluster_probs))
# Step 5: Optimal Threshold
def find_optimal_threshold(entropies):
    sorted_entropies = np.sort(entropies)
    best_threshold, min_error = None, float('inf')
    for i in range(1, len(sorted_entropies)):
        threshold = (sorted_entropies[i - 1] + sorted_entropies[i]) / 2
        low_group, high_group = [e for e in entropies if e < threshold], [e for e in entropies if e >= threshold]
        mse_low, mse_high = np.var(low_group), np.var(high_group)
        total_mse = mse_low + mse_high
        if total_mse < min_error:
            min_error, best_threshold = total_mse, threshold
    return best_threshold


# Step 6: Extract Hidden States for SEP Training 
def extract_hidden_states(model, tokenizer, texts, position='TBG'):
    """
    Extract hidden states from a given model at specified token positions.
    
    Args:
    - model (AutoModelForCausalLM): Pre-trained model from Hugging Face.
    - tokenizer (AutoTokenizer): Corresponding tokenizer for the model.
    - texts (list of str): Input texts to process.
    - position (str): Position to extract hidden states from ('TBG' or 'SLT').
    
    Returns:
    - dict of torch.Tensor: Hidden states for each layer at specified position.
    """
    model.eval()  # Ensure the model is in evaluation mode.
    inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=512)
    if 'token_type_ids' in inputs:
        del inputs['token_type_ids']
    inputs = {key: value.to(model.device) for key, value in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs, output_hidden_states=True)
    
    hidden_states = outputs.hidden_states
    if position == 'TBG':
        # Token Before Generating: last token of the input (excluding padding)
        indices = (inputs['input_ids'] != tokenizer.pad_token_id).sum(dim=1) - 1
    elif position == 'SLT':
        # Second Last Token: just before the end-of-sequence token
        indices = inputs['input_ids'].shape[1] - 2
    
    # Extract hidden states for the specified position across all layers
    position_states = {f'Layer_{i}': hidden_states[i][:, indices, :] for i in range(len(hidden_states))}
    
    return position_states


In [4]:
# Load tokenizer and models
tokenizer = AutoTokenizer.from_pretrained("microsoft/deberta-base")
nli_model = AutoModelForSequenceClassification.from_pretrained('microsoft/deberta-base')
text_model = AutoModelForCausalLM.from_pretrained('meta-llama/Meta-Llama-3-8B-Instruct')

# Step 1: Load and Deduplicate Questions
# Load and deduplicate questions
dataset = load_dataset("kroshan/BioASQ")
questions = dataset['train']['question']
unique_questions = deduplicate_questions(questions)


Some weights of DebertaForSequenceClassification were not initialized from the model checkpoint at microsoft/deberta-base and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Loading checkpoint shards: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████

In [None]:
# Step 2: Generate Responses
model_id = "meta-llama/Meta-Llama-3-8B-Instruct"
response_generator = transformers.pipeline(
    "text-generation",
    model=model_id,
    model_kwargs={"torch_dtype": torch.bfloat16},
    device_map="cuda",#cpu
)
# responses = [generate_responses(question) for question in unique_questions[:3]]

responses = [generate_responses(question) for question in unique_questions[:10]]## For a minimal example set to 10 questions

In [None]:
#Save responses
import csv
import pandas as pd

# Define the file path where you want to save the CSV file
file_path = 'questions_and_responses.csv'

# Save the questions and responses to a CSV file
with open(file_path, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Question', 'Response'])  # Write the header
    writer.writerows(zip(unique_questions[:5], responses))  # Write each question and response

print(f"Questions and responses saved to {file_path}")

In [22]:
##Uncomment to load saved responses

# # Define the file path where you want to save the CSV file
# file_path = 'questions_and_responses.csv'

# ques_and_response = pd.read_csv(file_path)
# responses = ques_and_response['Response']
# responses = responses.tolist()
# responses = [item.strip("[]").replace("'", "").split(", ") for item in responses]


In [None]:
# Semantic clustering
clustered_responses = [semantic_clustering(response_set) for response_set in responses]

# Calculate entropy for each cluster
entropies = [calculate_entropy(cluster) for cluster in clustered_responses]

# Find optimal threshold for entropy
optimal_threshold = find_optimal_threshold(entropies)


In [None]:
def flatten_hidden_states(hidden_states):
    # Flatten the tensor and convert to numpy array
    return np.concatenate([hidden_states[layer].cpu().numpy().flatten() for layer in hidden_states], axis=0)

# Collecting and preparing training data
training_data = []
labels = []

for text in unique_questions[:10]:
    hidden_states_tbg = extract_hidden_states(text_model, tokenizer, [text], 'TBG')
    hidden_states_slt = extract_hidden_states(text_model, tokenizer, [text], 'SLT')

    # Flatten the hidden states for TBG and SLT
    features_tbg = flatten_hidden_states(hidden_states_tbg)
    features_slt = flatten_hidden_states(hidden_states_slt)

    # Assume generate_responses and calculate_entropy are defined
    # responses = generate_responses(text)
    clusters = semantic_clustering(responses)
    entropy = calculate_entropy(clusters)
    
    # Binarize entropy based on the previously found optimal threshold
    label = 1 if entropy > optimal_threshold else 0

    # Append features and labels to the training data
    training_data.append(features_tbg)
    training_data.append(features_slt)
    labels.extend([label, label])

# Convert list to numpy arrays for training
X = np.array(training_data)
y = np.array(labels)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train the logistic regression model
clf = LogisticRegression().fit(X_train, y_train)

# Evaluate the model
accuracy = clf.score(X_test, y_test)
print(f"Model Accuracy: {accuracy}")

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Predict and plot confusion matrix
y_pred = clf.predict(X_test)
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

# Plotting semantic entropy vs. hidden state values
plt.scatter(X_train[:, 0], y_train, c='blue', label='Training Data')
plt.scatter(X_test[:, 0], y_pred, c='red', label='Predicted Data')
plt.xlabel('Hidden State Value')
plt.ylabel('Semantic Entropy (Binarized)')
plt.legend()
plt.show()
