In [1]:
pip install pymupdf markdown langchain nltk




In [2]:
import os
import fitz  # PyMuPDF for PDFs
import markdown
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [3]:
import nltk
nltk.download('punkt')


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [4]:
def extract_text_from_pdf(pdf_path):
    """Extracts text from a PDF file."""
    doc = fitz.open(pdf_path)
    text = "\n".join([page.get_text("text") for page in doc])
    return text

def extract_text_from_md(md_path):
    """Extracts text from a Markdown file."""
    with open(md_path, "r", encoding="utf-8") as f:
        text = f.read()
    # Strip frontmatter if it exists
    text = re.sub(r'^---.*?---\n', '', text, flags=re.DOTALL)
    return markdown.markdown(text)

def preprocess_text(text):
    """Cleans and normalizes text."""
    text = re.sub(r'\s+', ' ', text).strip()  # Remove extra spaces/newlines
    return text

def chunk_text(text, chunk_size=1024, overlap=100):
    """Splits text into chunks for embedding."""
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=overlap, separators=["\n", " "]
    )
    return splitter.split_text(text)

In [5]:
def process_documents(folder_path):
    """Processes only PDF and Markdown files in a folder."""
    all_chunks = []
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)

        # Process only .pdf and .md files
        if file_name.endswith(".pdf"):
            text = extract_text_from_pdf(file_path)
        elif file_name.endswith(".md"):
            text = extract_text_from_md(file_path)
        else:
            continue  # Skip unnecessary files

        text = preprocess_text(text)
        chunks = chunk_text(text)
        all_chunks.extend(chunks)

    return all_chunks

# Run the script again
folder_path = "/content/"
chunks = process_documents(folder_path)
print(f"Processed {len(chunks)} text chunks!")




Processed 136 text chunks!


 Preprocessing Steps

In [6]:
!pip install faiss-cpu




In [7]:
!pip install sentence-transformers




In [None]:
from sentence_transformers import SentenceTransformer
import numpy as np

# Assuming 'chunks' is a list of text chunks produced by your document processing pipeline
# For example, chunks = process_documents(folder_path)

# Initialize the embedding model
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

# Generate embeddings for all chunks (ensure you're getting a numpy array)
chunk_embeddings = embedding_model.encode(chunks, convert_to_tensor=False)
chunk_embeddings = np.array(chunk_embeddings)  # Convert to numpy array if needed

# Now, determine the embedding dimension
embedding_dim = chunk_embeddings.shape[1]
print(f"Embedding dimension: {embedding_dim}")

# Create a FAISS index using L2 distance
import faiss
index = faiss.IndexFlatL2(embedding_dim)
index.add(chunk_embeddings)  # Add your chunk embeddings to the index

print(f"FAISS index built with {index.ntotal} vectors.")


In [None]:
def retrieve_chunks(query: str, top_k: int = 5) -> list:
    """
    Retrieves the top_k most relevant text chunks for a given query.

    Args:
        query (str): The input question/query.
        top_k (int): Number of top chunks to retrieve.

    Returns:
        list: List of retrieved text chunks.
    """
    query_embedding = embedding_model.encode(query, convert_to_tensor=False)
    query_embedding = np.array(query_embedding).reshape(1, -1)
    distances, indices = index.search(query_embedding, top_k)
    retrieved_chunks = [chunks[i] for i in indices[0]]
    return retrieved_chunks


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM


In [None]:
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-3B-Instruct")
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-3B-Instruct")



In [None]:
def generate_answer(query: str) -> str:
    # Retrieve relevant context chunks
    retrieved_context = retrieve_chunks(query, top_k=5)
    # Combine retrieved chunks into a single context block
    context = "\n".join(retrieved_context)

    prompt = (
    "You are Qwen, an expert assistant in AI research. Based on the context provided below, "
    "please answer the following question in a clear, concise, and informative manner. "
    "Make sure your answer is accurate and directly addresses the question.\n\n"
    f"Question: {query}\n\n"
    f"Context:\n{context}\n\n"
    "Answer:"
)

    # Tokenize the prompt
    inputs = tokenizer(prompt, return_tensors="pt")

    # Use max_new_tokens instead of max_length to avoid truncating the prompt
    outputs = model.generate(**inputs, max_new_tokens=256, do_sample=True, temperature=0.7)
    answer = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return answer


# --- Example Usage ---
if __name__ == "__main__":
    query = "What advancements have been made in transformer architectures?"
    answer = generate_answer(query)
    print("Generated Answer:\n", answer)

In [None]:
import nltk
nltk.download('punkt_tab')


In [None]:
import json
import nltk

# Ensure NLTK tokenizer resources are available
nltk.download('punkt')

def load_dataset(file_path: str):
    """Load QA pairs from a JSON file."""
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

def compute_metrics(dataset: list) -> dict:
    """
    Compute basic metrics for a dataset of QA pairs.

    Returns a dictionary with:
      - Total number of QA pairs
      - Average question length (in words)
      - Average answer length (in words)
    """
    total_pairs = len(dataset)
    total_question_length = 0
    total_answer_length = 0

    for pair in dataset:
        question = pair.get("question", "")
        answer = pair.get("answer", "")
        total_question_length += len(nltk.word_tokenize(question))
        total_answer_length += len(nltk.word_tokenize(answer))

    avg_question_length = total_question_length / total_pairs if total_pairs else 0
    avg_answer_length = total_answer_length / total_pairs if total_pairs else 0

    return {
        "total_pairs": total_pairs,
        "avg_question_length": avg_question_length,
        "avg_answer_length": avg_answer_length
    }

# Load the datasets
train_dataset = load_dataset("train_qa.json")
val_dataset = load_dataset("val_qa.json")
test_dataset = load_dataset("test_qa.json")

# Compute metrics for each split
train_metrics = compute_metrics(train_dataset)
val_metrics = compute_metrics(val_dataset)
test_metrics = compute_metrics(test_dataset)

print("Train Metrics:", train_metrics)
print("Validation Metrics:", val_metrics)
print("Test Metrics:", test_metrics)


In [None]:
import json
import string
import re
import nltk

# Ensure NLTK tokenizer resources are available
nltk.download('punkt')

# --- Utility Functions for Normalization and Metrics ---

def normalize_answer(s: str) -> str:
    """
    Lower text and remove punctuation, articles and extra whitespace.
    """
    def remove_articles(text):
        return re.sub(r'\b(a|an|the)\b', ' ', text)
    def remove_punc(text):
        return ''.join(ch for ch in text if ch not in set(string.punctuation))
    def white_space_fix(text):
        return ' '.join(text.split())
    return white_space_fix(remove_articles(remove_punc(s.lower())))

def compute_em_f1(prediction: str, ground_truth: str) -> (int, float):
    """
    Compute Exact Match (EM) and F1 score between a prediction and a ground truth answer.
    """
    norm_pred = normalize_answer(prediction)
    norm_gt = normalize_answer(ground_truth)
    em = int(norm_pred == norm_gt)

    pred_tokens = norm_pred.split()
    gt_tokens = norm_gt.split()
    common = set(pred_tokens) & set(gt_tokens)
    num_same = sum(min(pred_tokens.count(token), gt_tokens.count(token)) for token in common)

    if len(pred_tokens) == 0 or len(gt_tokens) == 0:
        f1 = int(pred_tokens == gt_tokens)
    else:
        precision = num_same / len(pred_tokens)
        recall = num_same / len(gt_tokens)
        f1 = 0 if precision + recall == 0 else (2 * precision * recall) / (precision + recall)

    return em, f1

# --- Load the Test Dataset ---

def load_dataset(file_path: str) -> list:
    """Load QA pairs from a JSON file."""
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

test_dataset = load_dataset("test_qa.json")
print(f"Loaded {len(test_dataset)} QA pairs from test_qa.json.")

# --- Define the Inference Function for Evaluation ---

# Assumes you've already loaded your model and tokenizer.
# For example:
# from transformers import AutoTokenizer, AutoModelForCausalLM
# model_name = "Qwen/Qwen2.5-3B-Instruct"
# model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype="auto", device_map="auto")
# tokenizer = AutoTokenizer.from_pretrained(model_name)

def generate_answer_for_evaluation(qa_pair: dict) -> str:
    """
    Construct a prompt using the QA pair's context and question,
    generate an answer using the model, and return the generated answer.
    """
    prompt = f"Question: {qa_pair['question']}\nContext:\n{qa_pair['context']}\nAnswer:"
    model_inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    # Generate answer without sampling (deterministic)
    generated_ids = model.generate(**model_inputs, max_new_tokens=128, do_sample=False)
    output = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    # Remove the prompt from the output if it is included
    if output.startswith(prompt):
        output = output[len(prompt):].strip()
    return output

# --- Evaluate the Model on the Test Set ---

total_em = 0
total_f1 = 0
num_examples = len(test_dataset)

print("\nEvaluating on Test Set:")
for qa in test_dataset:
    gt_answer = qa.get("answer", "")
    pred_answer = generate_answer_for_evaluation(qa)
    em, f1 = compute_em_f1(pred_answer, gt_answer)
    total_em += em
    total_f1 += f1
    print("Question:", qa["question"])
    print("Ground Truth Answer:", gt_answer)
    print("Model Prediction:", pred_answer)
    print("EM:", em, "F1:", f1, "\n")

avg_em = total_em / num_examples if num_examples > 0 else 0
avg_f1 = total_f1 / num_examples if num_examples > 0 else 0
print("Average Exact Match (EM):", avg_em)
print("Average F1 Score:", avg_f1)
