# RAG Retrieval part

In [None]:
!pip install chromadb openai langchain
!pip install transformers torch sentence-transformers

Collecting chromadb
  Downloading chromadb-0.5.18-py3-none-any.whl.metadata (6.8 kB)
Collecting build>=1.0.3 (from chromadb)
  Downloading build-1.2.2.post1-py3-none-any.whl.metadata (6.5 kB)
Collecting chroma-hnswlib==0.7.6 (from chromadb)
  Downloading chroma_hnswlib-0.7.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (252 bytes)
Collecting fastapi>=0.95.2 (from chromadb)
  Downloading fastapi-0.115.4-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn>=0.18.3 (from uvicorn[standard]>=0.18.3->chromadb)
  Downloading uvicorn-0.32.0-py3-none-any.whl.metadata (6.6 kB)
Collecting posthog>=2.4.0 (from chromadb)
  Downloading posthog-3.7.0-py2.py3-none-any.whl.metadata (2.0 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.20.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.28.0-py3

In [None]:
!pip install requests beautifulsoup4



In [None]:
import json
from collections import defaultdict
import hashlib
import torch
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer
import chromadb
import numpy as np

def preprocess_heart_attack_dataset(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)

    processed_data = {
        'questions': [],
        'documents': [],
        'snippets': [],
        'document_snippets': defaultdict(list),
        'question_documents': defaultdict(set)
    }

    document_set = set()
    snippet_set = set()

    for item in data['questions']:
        question_id = item['id']
        question_body = item['body']

        processed_data['questions'].append({
            'id': question_id,
            'body': question_body,
            'type': item['type'],
            'ideal_answer': item['ideal_answer']
        })

        # Process documents
        for doc in item['documents']:
            if doc not in document_set:
                document_set.add(doc)
                processed_data['documents'].append({
                    'id': f"doc_{len(processed_data['documents'])}",
                    'url': doc
                })
            processed_data['question_documents'][question_id].add(doc)

        # Process snippets
        for snippet in item['snippets']:
            snippet_text = snippet['text']
            snippet_doc = snippet['document']
            snippet_hash = hashlib.md5(snippet_text.encode()).hexdigest()

            if snippet_hash not in snippet_set:
                snippet_set.add(snippet_hash)
                snippet_id = f"snippet_{len(processed_data['snippets'])}"
                processed_data['snippets'].append({
                    'id': snippet_id,
                    'text': snippet_text,
                    'document': snippet_doc,
                    'begin_section': snippet['beginSection'],
                    'end_section': snippet['endSection'],
                    'offset_begin': snippet['offsetInBeginSection'],
                    'offset_end': snippet['offsetInEndSection']
                })
                processed_data['document_snippets'][snippet_doc].append(snippet_id)

    # Convert sets to lists for JSON serialization
    processed_data['question_documents'] = {k: list(v) for k, v in processed_data['question_documents'].items()}

    return processed_data



# Load BioBERT model
model_name = "dmis-lab/biobert-v1.1"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

# Function to get BioBERT embeddings
def get_biobert_embedding(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, padding=True)
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs.last_hidden_state.mean(dim=1).squeeze().numpy()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/462 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]



pytorch_model.bin:   0%|          | 0.00/433M [00:00<?, ?B/s]

In [None]:

# Usage
file_path = 'heart_attack_dataset.json'
preprocessed_data = preprocess_heart_attack_dataset(file_path)

# Initialize Chroma client
client = chromadb.Client()

# Create a collection
collection = client.create_collection(name="heart_attack_data_biobert")

# Prepare documents and snippets for embedding
document_texts = [doc['url'] for doc in preprocessed_data['documents']]
snippet_texts = [snippet['text'] for snippet in preprocessed_data['snippets']]
all_texts = document_texts + snippet_texts

# Get BioBERT embeddings
all_embeddings = [get_biobert_embedding(text) for text in all_texts]

# Add documents to the collection
collection.add(
    ids=[doc['id'] for doc in preprocessed_data['documents']],
    documents=document_texts,
    embeddings=all_embeddings[:len(document_texts)]
)

# Add snippets to the collection
collection.add(
    ids=[snippet['id'] for snippet in preprocessed_data['snippets']],
    documents=snippet_texts,
    embeddings=all_embeddings[len(document_texts):]
)

print(f"Added {len(preprocessed_data['documents'])} documents and {len(preprocessed_data['snippets'])} snippets to the Chroma database.")

# Function to query the collection
def query_collection(query_text, n_results=3):
    query_embedding = get_biobert_embedding(query_text)
    results = collection.query(
        query_embeddings=[query_embedding.tolist()],
        n_results=n_results
    )
    return results


Added 100 documents and 143 snippets to the Chroma database.


In [None]:
# Example query
query = "What are the symptoms of a heart attack?"
results = query_collection(query)

print("\nQuery results:")
for i, (doc, distance) in enumerate(zip(results['documents'][0], results['distances'][0])):
    print(f"Result {i+1}:")
    print(f"Document: {doc}")
    print(f"Distance: {distance}")
    print("---")


Query results:
Result 1:
Document: A heart attack is diagnosed using an ECG, blood tests for cardiac enzymes, and imaging tests like angiography.
Distance: 26.08465576171875
---
Result 2:
Document: An ECG detects irregular heart rhythms and damage to heart muscle, which are critical for diagnosing heart attacks.
Distance: 27.27193832397461
---
Result 3:
Document: Silent heart attacks show no obvious symptoms but can still cause significant heart damage and increase future heart attack risk.
Distance: 27.518798828125
---


# RAG Generation part

In [None]:
import json
from collections import defaultdict
import hashlib
import torch
from transformers import AutoTokenizer, AutoModel, pipeline
from sentence_transformers import SentenceTransformer
import chromadb
import numpy as np

def preprocess_heart_attack_dataset(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)

    processed_data = {
        'questions': [],
        'documents': [],
        'snippets': [],
        'document_snippets': defaultdict(list),
        'question_documents': defaultdict(set)
    }

    document_set = set()
    snippet_set = set()

    for item in data['questions']:
        question_id = item['id']
        question_body = item['body']

        processed_data['questions'].append({
            'id': question_id,
            'body': question_body,
            'type': item['type'],
            'ideal_answer': item['ideal_answer']
        })

        # Process documents
        for doc in item['documents']:
            if doc not in document_set:
                document_set.add(doc)
                processed_data['documents'].append({
                    'id': f"doc_{len(processed_data['documents'])}",
                    'url': doc
                })
            processed_data['question_documents'][question_id].add(doc)

        # Process snippets
        for snippet in item['snippets']:
            snippet_text = snippet['text']
            snippet_doc = snippet['document']
            snippet_hash = hashlib.md5(snippet_text.encode()).hexdigest()

            if snippet_hash not in snippet_set:
                snippet_set.add(snippet_hash)
                snippet_id = f"snippet_{len(processed_data['snippets'])}"
                processed_data['snippets'].append({
                    'id': snippet_id,
                    'text': snippet_text,
                    'document': snippet_doc,
                    'begin_section': snippet['beginSection'],
                    'end_section': snippet['endSection'],
                    'offset_begin': snippet['offsetInBeginSection'],
                    'offset_end': snippet['offsetInEndSection']
                })
                processed_data['document_snippets'][snippet_doc].append(snippet_id)

    # Convert sets to lists for JSON serialization
    processed_data['question_documents'] = {k: list(v) for k, v in processed_data['question_documents'].items()}

    return processed_data

# Load BioBERT model
model_name = "dmis-lab/biobert-v1.1"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

# Function to get BioBERT embeddings
def get_biobert_embedding(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, padding=True)
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs.last_hidden_state.mean(dim=1).squeeze().numpy()





In [None]:
# Usage
file_path = 'heart_attack_dataset.json'
preprocessed_data = preprocess_heart_attack_dataset(file_path)

# Initialize Chroma client
client = chromadb.Client()

# Create a collection
collection = client.create_collection(name="heart_attack_datad_biobeert")

# Prepare documents and snippets for embedding
document_texts = [doc['url'] for doc in preprocessed_data['documents']]
snippet_texts = [snippet['text'] for snippet in preprocessed_data['snippets']]
all_texts = document_texts + snippet_texts

# Get BioBERT embeddings
all_embeddings = [get_biobert_embedding(text) for text in all_texts]

# Add documents to the collection
# When adding documents to the collection
collection.add(
    ids=[f"doc_{i}" for i in range(len(document_texts))],
    documents=[f"Content: {text}\nSource: {url}" for text, url in zip(document_texts, [doc['url'] for doc in preprocessed_data['documents']])],
    embeddings=all_embeddings[:len(document_texts)]
)

# When adding snippets to the collection
collection.add(
    ids=[f"snippet_{i}" for i in range(len(snippet_texts))],
    documents=[f"Content: {text}\nSource: {url}" for text, url in zip(snippet_texts, [snippet['document'] for snippet in preprocessed_data['snippets']])],
    embeddings=all_embeddings[len(document_texts):]
)
print(f"Added {len(preprocessed_data['documents'])} documents and {len(preprocessed_data['snippets'])} snippets to the Chroma database.")

# Initialize the text generation model
generator = pipeline('text-generation', model='gpt2')

# Function to query the collection and generate an answer
def query_and_generate(query_text, n_results=3, max_new_tokens=100):
    query_embedding = get_biobert_embedding(query_text)
    results = collection.query(
        query_embeddings=[query_embedding.tolist()],
        n_results=n_results
    )

    # Prepare context for generation
    context = ""
    for doc in results['documents'][0]:
        context += f"Document: {doc}\n\n"

    # Generation part
    prompt = f"Based on the following information:\n{context}\n\nQuestion: {query_text}\nAnswer:"
    generated_text = generator(prompt, max_new_tokens=max_new_tokens, num_return_sequences=1)[0]['generated_text']

    # Extract the generated answer (everything after "Answer:")
    answer = generated_text.split("Answer:")[-1].strip()

    return {
        "query": query_text,
        "retrieved_documents": results['documents'][0],
        "generated_answer": answer
    }

# Example query
query = "What are the symptoms of a heart attack?"
result = query_and_generate(query)

print("\nQuery:", result["query"])
print("\nRetrieved Documents:")
for doc in result["retrieved_documents"]:
    print(f"- {doc}")
print("\nGenerated Answer:", result["generated_answer"])

Added 100 documents and 143 snippets to the Chroma database.


Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.



Query: What are the symptoms of a heart attack?

Retrieved Documents:
- Content: A heart attack is diagnosed using an ECG, blood tests for cardiac enzymes, and imaging tests like angiography.
Source: http://www.ncbi.nlm.nih.gov/pubmed/16563933
- Content: An ECG detects irregular heart rhythms and damage to heart muscle, which are critical for diagnosing heart attacks.
Source: http://www.ncbi.nlm.nih.gov/pubmed/20672792
- Content: Silent heart attacks show no obvious symptoms but can still cause significant heart damage and increase future heart attack risk.
Source: http://www.ncbi.nlm.nih.gov/pubmed/3185760

Generated Answer: The patient must have severe muscle weakness or other cardiac arrhythmia. They may also experience blood clots in their muscles, especially as the heart beats less frequently. A heart attack is a more extreme type of heart attack and is characterized by the sudden death of the heart.

A heart attack has been identified by researchers because it can take several m

In [None]:
query = "What is the role of nitric oxide in heart health?"
result = query_and_generate(query)

print("\nQuery:", result["query"])
print("\nRetrieved Documents:")
for doc in result["retrieved_documents"]:
    print(f"- {doc}")
print("\nGenerated Answer:", result["generated_answer"])

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.



Query: What is the role of nitric oxide in heart health?

Retrieved Documents:
- Content: Emerging research highlights the role of gut health and microbiota in cardiovascular health and heart attack prevention.
Source: http://www.ncbi.nlm.nih.gov/pubmed/19081153
- Content: Vitamin D supports heart health by reducing inflammation, improving blood pressure, and enhancing vascular function.
Source: http://www.ncbi.nlm.nih.gov/pubmed/37087452
- Content: Nitric oxide helps dilate blood vessels, improving blood flow and reducing blood pressure, thereby protecting against heart attacks.
Source: http://www.ncbi.nlm.nih.gov/pubmed/20672792

Generated Answer: Nitric oxide deficiency contributes to an increased risk for heart attacks and sudden deaths (T4DM) by anaerobic bacteria, or bacteria that are resistant to Nitric Oxides. A study published in this journal revealed that nitric oxide deficiency, when treated with antioxidants, reduces T4DM risk. Nitric oxide deficiency may be particularly b

# RAG Evaluation

In [None]:
!pip install scikit-learn rouge-score numpy

Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: rouge-score
  Building wheel for rouge-score (setup.py) ... [?25l[?25hdone
  Created wheel for rouge-score: filename=rouge_score-0.1.2-py3-none-any.whl size=24935 sha256=373634d02f2aca1eeb6b277e475235c22880853f88b26bc3691f19baf785501d
  Stored in directory: /root/.cache/pip/wheels/5f/dd/89/461065a73be61a532ff8599a28e9beef17985c9e9c31e541b4
Successfully built rouge-score
Installing collected packages: rouge-score
Successfully installed rouge-score-0.1.2


In [None]:

import chromadb
from sklearn.metrics import precision_score, recall_score, f1_score
from rouge_score import rouge_scorer
import numpy as np
import random
from transformers import pipeline
from sentence_transformers import SentenceTransformer

# Assuming you have functions: preprocess_heart_attack_dataset, get_biobert_embedding, query_and_generate

def evaluate_rag(preprocessed_data, query_and_generate_func, num_queries=10):
    """
    Evaluates the Retrieval-Augmented Generation (RAG) pipeline.

    Args:
        preprocessed_data (dict): The preprocessed dataset containing questions, documents, and snippets.
        query_and_generate_func (function): The function to query the collection and generate an answer.
        num_queries (int): The number of queries to sample for evaluation.

    Returns:
        dict: A dictionary containing the average retrieval and generation metrics.
    """
    # Initialize metrics
    retrieval_metrics = {
        'precision': [],
        'recall': [],
        'f1': []
    }
    generation_metrics = {
        'rouge1': [],
        'rouge2': [],
        'rougeL': []
    }

    # Initialize ROUGE scorer
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

    # Sample questions for evaluation
    sampled_questions = random.sample(preprocessed_data['questions'], min(num_queries, len(preprocessed_data['questions'])))

    for question in sampled_questions:
        # Get RAG results
        rag_result = query_and_generate_func(question['body'])

        # Evaluate retrieval
        # Assuming the relevant document information is stored under 'context_documents' in the question dictionary
        relevant_docs = set(question.get('context_documents', []))  # Handle cases where 'context_documents' might be missing
        retrieved_docs = set([doc for doc in rag_result['retrieved_documents']])  # Modified to match the output format of query_and_generate

        true_positives = len(relevant_docs.intersection(retrieved_docs))
        false_positives = len(retrieved_docs - relevant_docs)
        false_negatives = len(relevant_docs - retrieved_docs)

        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

        retrieval_metrics['precision'].append(precision)
        retrieval_metrics['recall'].append(recall)
        retrieval_metrics['f1'].append(f1)

        # Evaluate generation
        reference_answer = ' '.join(question['ideal_answer'])
        generated_answer = rag_result['generated_answer']  # Assuming 'generated_answer' key in rag_result

        rouge_scores = scorer.score(reference_answer, generated_answer)

        generation_metrics['rouge1'].append(rouge_scores['rouge1'].fmeasure)
        generation_metrics['rouge2'].append(rouge_scores['rouge2'].fmeasure)
        generation_metrics['rougeL'].append(rouge_scores['rougeL'].fmeasure)

    # Calculate average metrics
    avg_retrieval_metrics = {k: np.mean(v) for k, v in retrieval_metrics.items()}
    avg_generation_metrics = {k: np.mean(v) for k, v in generation_metrics.items()}

    return {
        'retrieval': avg_retrieval_metrics,
        'generation': avg_generation_metrics
    }

# Usage
# Assuming you have preprocessed_data and query_and_generate function defined
evaluation_results = evaluate_rag(preprocessed_data, query_and_generate, num_queries=10)
print("Retrieval Metrics:", evaluation_results['retrieval'])
print("Generation Metrics:", evaluation_results['generation'])

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


Retrieval Metrics: {'precision': 0.0, 'recall': 0.0, 'f1': 0.0}
Generation Metrics: {'rouge1': 0.0961400765116628, 'rouge2': 0.013030654981874496, 'rougeL': 0.06914815387730222}


In [None]:
# import json
# from collections import defaultdict
# import hashlib
# import torch
# from transformers import AutoTokenizer, AutoModel, pipeline
# import chromadb
# import numpy as np

# def preprocess_heart_attack_dataset(file_path):
#     with open(file_path, 'r') as f:
#         data = json.load(f)

#     processed_data = {
#         'questions': [],
#         'documents': [],
#         'snippets': [],
#         'document_snippets': defaultdict(list),
#         'question_documents': defaultdict(set)
#     }

#     document_set = set()
#     snippet_set = set()

#     for item in data['questions']:
#         question_id = item['id']
#         question_body = item['body']

#         processed_data['questions'].append({
#             'id': question_id,
#             'body': question_body,
#             'type': item['type'],
#             'ideal_answer': item['ideal_answer']
#         })

#         # Process documents
#         for doc in item['documents']:
#             if doc not in document_set:
#                 document_set.add(doc)
#                 processed_data['documents'].append({
#                     'id': f"doc_{len(processed_data['documents'])}",
#                     'url': doc
#                 })
#             processed_data['question_documents'][question_id].add(doc)

#         # Process snippets
#         for snippet in item['snippets']:
#             snippet_text = snippet['text']
#             snippet_doc = snippet['document']
#             snippet_hash = hashlib.md5(snippet_text.encode()).hexdigest()

#             if snippet_hash not in snippet_set:
#                 snippet_set.add(snippet_hash)
#                 snippet_id = f"snippet_{len(processed_data['snippets'])}"
#                 processed_data['snippets'].append({
#                     'id': snippet_id,
#                     'text': snippet_text,
#                     'document': snippet_doc,
#                     'begin_section': snippet['beginSection'],
#                     'end_section': snippet['endSection'],
#                     'offset_begin': snippet['offsetInBeginSection'],
#                     'offset_end': snippet['offsetInEndSection']
#                 })
#                 processed_data['document_snippets'][snippet_doc].append(snippet_id)

#     # Convert sets to lists for JSON serialization
#     processed_data['question_documents'] = {k: list(v) for k, v in processed_data['question_documents'].items()}

#     return processed_data


# # Load BioBERT model
# model_name = "dmis-lab/biobert-v1.1"
# tokenizer = AutoTokenizer.from_pretrained(model_name)
# model = AutoModel.from_pretrained(model_name)

# # Function to get BioBERT embeddings
# def get_biobert_embedding(text):
#     inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512, padding=True)
#     with torch.no_grad():
#         outputs = model(**inputs)
#     return outputs.last_hidden_state.mean(dim=1).squeeze().numpy()



# # Function to aggregate snippets for each document
# def aggregate_document_content(preprocessed_data):
#     document_content = defaultdict(str)
#     for snippet in preprocessed_data['snippets']:
#         document_content[snippet['document']] += snippet['text'] + " "
#     return document_content





In [None]:
# # Usage
# file_path = 'heart_attack_dataset.json'
# preprocessed_data = preprocess_heart_attack_dataset(file_path)

# # Aggregate document content
# document_content = aggregate_document_content(preprocessed_data)

# # Initialize Chroma client
# client = chromadb.Client()

# # Create a collection
# collection = client.create_collection(name="heart_attack_data_biobert_content")

# # Prepare documents for embedding
# document_texts = []
# document_urls = []
# for doc in preprocessed_data['documents']:
#     url = doc['url']
#     content = document_content[url]
#     if content:  # Only include documents with content
#         document_texts.append(content)
#         document_urls.append(url)

# # Get BioBERT embeddings for document content
# document_embeddings = [get_biobert_embedding(text) for text in document_texts]


# # Add documents to the collection
# collection.add(
#     ids=[f"doc_{i}" for i in range(len(document_texts))],
#     documents=[f"Content: {text}\nSource: {url}" for text, url in zip(document_texts, document_urls)],
#     embeddings=[embedding.tolist() for embedding in document_embeddings]
# )

# print(f"Added {len(document_texts)} documents to the Chroma database.")

# # Initialize the text generation model
# generator = pipeline('text-generation', model='gpt2')

# # Function to query the collection and generate an answer
# def query_and_generate(query_text, n_results=3, max_new_tokens=100):
#     query_embedding = get_biobert_embedding(query_text)
#     results = collection.query(
#         query_embeddings=[query_embedding.tolist()],
#         n_results=n_results
#     )

#     # Prepare context for generation
#     context = ""
#     for doc in results['documents'][0]:
#         context += f"{doc}\n\n"

#     # Generation part
#     prompt = f"Based on the following information:\n{context}\n\nQuestion: {query_text}\nAnswer:"
#     generated_text = generator(prompt, max_new_tokens=max_new_tokens, num_return_sequences=1)[0]['generated_text']

#     # Extract the generated answer (everything after "Answer:")
#     answer = generated_text.split("Answer:")[-1].strip()

#     return {
#         "query": query_text,
#         "retrieved_documents": results['documents'][0],
#         "generated_answer": answer
#     }

# # Example queries
# queries = [
#     "What are the symptoms of a heart attack?",
#     "What is the role of nitric oxide in heart health?"
# ]

# for query in queries:
#     result = query_and_generate(query)
#     print("\nQuery:", result["query"])
#     print("\nRetrieved Documents:")
#     for doc in result["retrieved_documents"]:
#         print(f"- {doc}")
#     print("\nGenerated Answer:", result["generated_answer"])

In [None]:
import requests
from bs4 import BeautifulSoup
import time

In [None]:
def fetch_document_content(url, max_retries=3, delay=1):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')

            # Extract title
            title = soup.find('meta', {'name': 'citation_title'})
            title = title['content'] if title else ''

            # Extract authors
            authors = soup.find_all('meta', {'name': 'citation_author'})
            authors = [author['content'] for author in authors]

            # Extract abstract
            abstract = soup.find('meta', {'name': 'description'})
            abstract = abstract['content'] if abstract else ''

            # Extract main content (this might need adjustment based on the actual structure)
            main_content = soup.find('div', {'name': 'abstract'})
            main_text = main_content.get_text(strip=True) if main_content else ''

            content = f"Title: {title}\n\nAuthors: {', '.join(authors)}\n\nAbstract: {abstract}\n\nMain Content: {main_text}"

            return content.strip()
        except Exception as e:
            print(f"Error fetching content from {url}: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(delay)
            else:
                return ""

# Example usage
url = "https://pmc.ncbi.nlm.nih.gov/articles/PMC6820920/"
content = fetch_document_content(url)
print(content)

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): pmc.ncbi.nlm.nih.gov:443
DEBUG:urllib3.connectionpool:https://pmc.ncbi.nlm.nih.gov:443 "GET /articles/PMC6820920/ HTTP/11" 200 None


Title: Remotely controlled mandibular positioning of oral appliance therapy during polysomnography and drug-induced sleep endoscopy compared with conventional subjective titration in patients with obstructive sleep apnea: protocol for a randomized crossover trial

Authors: Marijke Dieltjens, Marc J Braem, Sara Op de Beeck, Anneclaire V M T Vroegop, Elahe Kazemeini, Eli Van de Perck, Jolien Beyers, Chloé Kastoer, Kristien Wouters, Marc Willemen, Johan A Verbraecken, Olivier M Vanderveken

Abstract: The amount of mandibular protrusion is a key factor in optimizing the efficacy of mandibular advancement device (MAD) therapy in an individual patient diagnosed with obstructive sleep apnea. This process is called titration and is generally based on ...

Main Content:


In [None]:
pip install firecrawl beautifulsoup4 aiohttp



In [None]:
!pip install firecrawl-py



In [None]:
pip install scrapy

Collecting scrapy
  Using cached Scrapy-2.11.2-py2.py3-none-any.whl.metadata (5.3 kB)
Collecting itemloaders>=1.0.1 (from scrapy)
  Using cached itemloaders-1.3.2-py3-none-any.whl.metadata (3.9 kB)
Collecting parsel>=1.5.0 (from scrapy)
  Using cached parsel-1.9.1-py2.py3-none-any.whl.metadata (11 kB)
Collecting service-identity>=18.1.0 (from scrapy)
  Using cached service_identity-24.2.0-py3-none-any.whl.metadata (5.1 kB)
Collecting tldextract (from scrapy)
  Using cached tldextract-5.1.3-py3-none-any.whl.metadata (11 kB)
Collecting requests-file>=1.4 (from tldextract->scrapy)
  Using cached requests_file-2.1.0-py2.py3-none-any.whl.metadata (1.7 kB)
Using cached Scrapy-2.11.2-py2.py3-none-any.whl (290 kB)
Using cached itemloaders-1.3.2-py3-none-any.whl (12 kB)
Using cached parsel-1.9.1-py2.py3-none-any.whl (17 kB)
Using cached service_identity-24.2.0-py3-none-any.whl (11 kB)
Using cached tldextract-5.1.3-py3-none-any.whl (104 kB)
Using cached requests_file-2.1.0-py2.py3-none-any.whl (

In [None]:
!pip install firecrawl # installing firecrawl for AsyncCrawler



In [None]:
import json
import logging
import time
from pathlib import Path
from typing import Dict, List
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

class WebScraper:
    def __init__(self, output_dir: str = "scraped_data"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        })
        self.results = []

    def extract_content(self, url: str) -> Dict:
        """Extract content from a webpage"""
        logging.info(f"Processing URL: {url}")

        try:
            # Add delay between requests
            time.sleep(1)

            # Fetch the webpage
            response = self.session.get(url, timeout=30)
            response.raise_for_status()

            # Parse with BeautifulSoup
            soup = BeautifulSoup(response.text, 'html.parser')

            # Extract title (try different possible selectors)
            title = None
            for title_selector in ['h1.content-title', 'div.content-title', 'h1', 'title']:
                title = soup.select_one(title_selector)
                if title:
                    break

            # Extract abstract
            abstract = None
            for abstract_selector in ['div.abstract', 'abstract', 'div.article-abstract']:
                abstract = soup.select_one(abstract_selector)
                if abstract:
                    break

            # Initialize content dictionary
            content = {
                'url': url,
                'title': title.text.strip() if title else None,
                'abstract': abstract.text.strip() if abstract else None,
                'sections': []
            }

            # Extract sections
            for section in soup.find_all(['div', 'section'], class_=['section', 'sec']):
                section_title = section.find(['h2', 'h3', 'title'])
                section_content = section.find(['p', 'div'], class_=['section-content', 'p'])

                if section_title and section_content:
                    content['sections'].append({
                        'title': section_title.text.strip(),
                        'content': section_content.text.strip()
                    })

            # If no sections found, try to get main text content
            if not content['sections']:
                main_content = soup.find(['article', 'main', 'div.content'])
                if main_content:
                    paragraphs = main_content.find_all('p')
                    content['main_text'] = '\n'.join(p.text.strip() for p in paragraphs)

            # Save individual article
            article_id = url.split('/')[-1]
            output_path = self.output_dir / f"{article_id}.json"

            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(content, f, indent=2, ensure_ascii=False)

            logging.info(f"Successfully processed: {url}")
            return content

        except Exception as e:
            logging.error(f"Error processing {url}: {str(e)}")
            return {'url': url, 'error': str(e)}

    def process_urls(self, urls: List[str], max_workers: int = 2) -> List[Dict]:
        """Process multiple URLs concurrently"""
        results = []

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_url = {executor.submit(self.extract_content, url): url for url in urls}

            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    result = future.result()
                    if result:
                        results.append(result)
                except Exception as e:
                    logging.error(f"Error processing {url}: {str(e)}")

        return results

def process_dataset(dataset_path: str, output_dir: str = "scraped_data"):
    """Process the dataset and extract content from URLs"""
    try:
        # Read dataset
        with open(dataset_path, 'r') as f:
            dataset = json.load(f)

        # Collect unique URLs
        unique_urls = set()
        for question in dataset['questions']:
            unique_urls.update(question['documents'])

        logging.info(f"Found {len(unique_urls)} unique URLs to process")

        # Initialize scraper and process URLs
        scraper = WebScraper(output_dir=output_dir)
        results = scraper.process_urls(list(unique_urls))

        # Save all results
        with open(Path(output_dir) / "all_articles.json", 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)

        logging.info(f"Successfully processed {len(results)} articles")
        return results

    except Exception as e:
        logging.error(f"Error in process_dataset: {str(e)}")
        import traceback
        logging.error(traceback.format_exc())
        return []

if __name__ == "__main__":
    # Process dataset
    dataset_path = "heart-attack-pmc-json.json"
    results = process_dataset(dataset_path)

    # Print summary
    successful = len([r for r in results if 'error' not in r])
    logging.info(f"Successfully processed {successful} out of {len(results)} articles")

INFO:root:Found 7 unique URLs to process
2024-11-06 03:46:59 [root] INFO: Found 7 unique URLs to process
INFO:root:Processing URL: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920
2024-11-06 03:46:59 [root] INFO: Processing URL: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920
INFO:root:Processing URL: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6450699
2024-11-06 03:46:59 [root] INFO: Processing URL: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6450699
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): www.ncbi.nlm.nih.gov:443
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (2): www.ncbi.nlm.nih.gov:443
DEBUG:urllib3.connectionpool:https://www.ncbi.nlm.nih.gov:443 "GET /pmc/articles/PMC6820920 HTTP/11" 301 215
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): pmc.ncbi.nlm.nih.gov:443
DEBUG:urllib3.connectionpool:https://www.ncbi.nlm.nih.gov:443 "GET /pmc/articles/PMC6450699 HTTP/11" 301 216
DEBUG:urllib3.connectionpool:Starting new HTTPS 

In [None]:
import json
import asyncio
from typing import List, Dict
# Try importing directly from firecrawl or upgrade the package using !pip install firecrawl-py --upgrade
from firecrawl.crawler import AsyncCrawler
from bs4 import BeautifulSoup
import aiohttp
import logging
import time
from pathlib import Path
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

class PMCCrawler:
    def __init__(self, output_dir: str = "scraped_data"):
        self.crawler = AsyncCrawler(
            concurrent_requests=2,  # Respect rate limits
            delay_between_requests=1.0  # 1 second delay between requests
        )
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

    async def extract_article_content(self, html: str, url: str) -> Dict:
        """Extract relevant content from PMC article HTML."""
        soup = BeautifulSoup(html, 'html.parser')

        # Extract article metadata
        title = soup.find('h1', {'class': 'content-title'})
        abstract = soup.find('div', {'class': 'abstract'})

        content = {
            'url': url,
            'title': title.text.strip() if title else None,
            'abstract': abstract.text.strip() if abstract else None,
            'sections': []
        }

        # Extract main content sections
        for section in soup.find_all('div', {'class': 'section'}):
            section_title = section.find('h2')
            section_content = section.find('div', {'class': 'section-content'})

            if section_title and section_content:
                content['sections'].append({
                    'title': section_title.text.strip(),
                    'content': section_content.text.strip()
                })

        return content

    async def process_dataset(self, dataset_path: str):
        """Process the dataset and crawl all unique URLs."""
        with open(dataset_path, 'r') as f:
            dataset = json.load(f)

        # Collect unique URLs
        unique_urls = set(
                "https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920",
                "https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5752199",
                "https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6450699",
                "https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5332475")
        # for question in dataset['questions']:
        #     unique_urls.update(question['documents'])

        # logging.info(f"Found {len(unique_urls)} unique URLs to process")

        # Crawl each URL
        results = []
        async for response in self.crawler.crawl(list(unique_urls)):
            if response.status == 200:
                try:
                    content = await self.extract_article_content(
                        response.text,
                        response.url
                    )
                    results.append(content)

                    # Save individual article
                    article_id = response.url.split('/')[-1]
                    output_path = self.output_dir / f"{article_id}.json"
                    with open(output_path, 'w') as f:
                        json.dump(content, f, indent=2)

                    logging.info(f"Successfully processed {response.url}")

                except Exception as e:
                    logging.error(f"Error processing {response.url}: {str(e)}")
            else:
                logging.warning(f"Failed to fetch {response.url}: {response.status}")

        # Save complete results
        with open(self.output_dir / "all_articles.json", 'w') as f:
            json.dump(results, f, indent=2)

        return results

async def main():
    # Initialize crawler
    crawler = PMCCrawler(output_dir="pmc_articles")


    # Process dataset
    # dataset_path = "heart-attack-pmc-json.json"  # Update with your dataset path
    try:
        results = await crawler.process_dataset()
        logging.info(f"Successfully processed {len(results)} articles")
    except Exception as e:
        logging.error(f"Error processing dataset: {str(e)}")

if __name__ == "__main__":
    asyncio.run(main())

ModuleNotFoundError: No module named 'firecrawl.crawler'

In [None]:
!pip install firecrawl beautifulsoup4 aiohttp



In [None]:
!pip install firecrawl-py # install the firecrawl-py package



In [None]:
!pip install firecrawl-py --upgrade  # Try upgrading the package



In [None]:
from firecrawl import AsyncCrawler  # Try importing directly from firecrawl

ImportError: cannot import name 'AsyncCrawler' from 'firecrawl' (/usr/local/lib/python3.10/dist-packages/firecrawl/__init__.py)

In [None]:
!pip install firecrawl-py




In [None]:
import json
import asyncio
from typing import List, Dict
from bs4 import BeautifulSoup
import aiohttp
import logging
import nest_asyncio
from pathlib import Path

# Enable nested async support for Jupyter
nest_asyncio.apply()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

class AsyncWebScraper:
    def __init__(self, output_dir: str = "scraped_data"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.semaphore = asyncio.Semaphore(2)  # Limit concurrent requests

    async def fetch_url(self, session: aiohttp.ClientSession, url: str) -> Dict:
        """Fetch and process a single URL with rate limiting."""
        async with self.semaphore:  # Limit concurrent requests
            try:
                # Add delay for rate limiting
                await asyncio.sleep(1)

                async with session.get(url) as response:
                    logging.info(f"Fetching {url}")
                    if response.status == 200:
                        html = await response.text()
                        logging.info(f"Successfully fetched {url}")
                        content = await self.extract_article_content(html, url)

                        # Save individual article
                        article_id = url.split('/')[-1]
                        output_path = self.output_dir / f"{article_id}.json"

                        with open(output_path, 'w', encoding='utf-8') as f:
                            json.dump(content, f, indent=2, ensure_ascii=False)

                        logging.info(f"Successfully processed {url}")
                        return content
                    else:
                        error_msg = f"Failed to fetch {url}: HTTP {response.status}"
                        logging.warning(error_msg)
                        return {"url": url, "error": error_msg}

            except Exception as e:
                error_msg = f"Error processing {url}: {str(e)}"
                logging.error(error_msg)
                return {"url": url, "error": error_msg}

    async def extract_article_content(self, html: str, url: str) -> Dict:
        """Extract relevant content from HTML."""
        soup = BeautifulSoup(html, 'html.parser')
        logging.info(f"Extracting content from {url}")

        # Try multiple selectors for title
        title = None
        for selector in ['h1.content-title', 'div.content-title', 'h1', '.article-title']:
            title = soup.select_one(selector)
            if title:
                logging.info(f"Found title using selector: {selector}")
                break

        # Try multiple selectors for abstract
        abstract = None
        for selector in ['div.abstract', 'abstract', '.article-abstract', '#abstract',"body main-article-body"]:
            abstract = soup.select_one(selector[4])
            if abstract:
                logging.info(f"Found abstract using selector: {selector}")
                break

        content = {
            'url': url,
            'title': title.text.strip() if title else None,
            'abstract': abstract.text.strip() if abstract else None,
            'sections': []
        }

        # Extract main content sections
        sections = soup.find_all(['div', 'section'], class_=['section', 'sec'])

        for section in sections:
            section_title = section.find(['h2', 'h3', 'title'])
            section_content = section.find(['p', 'div'], class_=['section-content', 'p'])

            if section_title and section_content:
                content['sections'].append({
                    'title': section_title.text.strip(),
                    'content': section_content.text.strip()
                })
                logging.info(f"Found section: {section_title.text.strip()[:50]}...")

        # If no sections found, try to get main text
        if not content['sections']:
            logging.info("No sections found, trying to extract main text")
            main_content = soup.find(['article', 'main', 'div.content'])
            if main_content:
                paragraphs = main_content.find_all('p')
                content['main_text'] = '\n'.join(p.text.strip() for p in paragraphs)

        return content

    async def process_urls(self, urls: List[str]) -> List[Dict]:
        """Process multiple URLs concurrently."""
        timeout = aiohttp.ClientTimeout(total=60)  # 60 seconds timeout

        async with aiohttp.ClientSession(
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            },
            timeout=timeout
        ) as session:
            tasks = []
            for url in urls:
                tasks.append(asyncio.ensure_future(self.fetch_url(session, url)))

            results = await asyncio.gather(*tasks)

            # Save complete results
            with open(self.output_dir / "all_articles.json", 'w', encoding='utf-8') as f:
                json.dump(results, f, indent=2, ensure_ascii=False)

            return results

# Create an instance of the scraper
scraper = AsyncWebScraper(output_dir="pmc_articles")

# Define URLs to process
urls = [
    "https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5752199",
    "https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6450699",
    "https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5332475"
]

# Function to run the scraper
async def run_scraper():
    try:
        results = await scraper.process_urls(urls)

        # Print summary
        successful = len([r for r in results if 'error' not in r])
        logging.info(f"Successfully processed {successful} out of {len(results)} articles")

        # Print first successful article title if any
        for result in results:
            if 'title' in result and result['title']:
                logging.info(f"Sample article title: {result['title']}")
                break

        return results

    except Exception as e:
        logging.error(f"Error in run_scraper: {str(e)}")
        return []

# To run in Jupyter:
results = await run_scraper()

INFO:root:Fetching https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5752199
2024-11-06 05:05:15 [root] INFO: Fetching https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5752199
INFO:root:Fetching https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920
2024-11-06 05:05:17 [root] INFO: Fetching https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920
INFO:root:Successfully fetched https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920
2024-11-06 05:05:17 [root] INFO: Successfully fetched https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920
INFO:root:Extracting content from https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920
2024-11-06 05:05:17 [root] INFO: Extracting content from https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6820920
INFO:root:Found title using selector: h1
2024-11-06 05:05:17 [root] INFO: Found title using selector: h1
INFO:root:Found abstract using selector: div.abstract
2024-11-06 05:05:17 [root] INFO: Found abstract using selector: div.abstract
INFO:root:No sections found, trying to ext

In [None]:
!pip install aiohttp beautifulsoup4 nest_asyncio



In [None]:
!pip install selenium webdriver_manager

Collecting selenium
  Downloading selenium-4.26.1-py3-none-any.whl.metadata (7.1 kB)
Collecting webdriver_manager
  Downloading webdriver_manager-4.0.2-py2.py3-none-any.whl.metadata (12 kB)
Collecting trio~=0.17 (from selenium)
  Downloading trio-0.27.0-py3-none-any.whl.metadata (8.6 kB)
Collecting trio-websocket~=0.9 (from selenium)
  Downloading trio_websocket-0.11.1-py3-none-any.whl.metadata (4.7 kB)
Collecting python-dotenv (from webdriver_manager)
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting sortedcontainers (from trio~=0.17->selenium)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting outcome (from trio~=0.17->selenium)
  Downloading outcome-1.3.0.post0-py2.py3-none-any.whl.metadata (2.6 kB)
Collecting wsproto>=0.14 (from trio-websocket~=0.9->selenium)
  Downloading wsproto-1.2.0-py3-none-any.whl.metadata (5.6 kB)
Downloading selenium-4.26.1-py3-none-any.whl (9.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
# import json
# import os
# import time
# from pathlib import Path
# import undetected_chromedriver as uc
# from selenium.webdriver.common.by import By
# from selenium.webdriver.support.ui import WebDriverWait
# from selenium.webdriver.support import expected_conditions as EC
# from selenium.common.exceptions import TimeoutException, NoSuchElementException
# import logging
# import re



# # Configure logging
# logging.basicConfig(
#     level=logging.INFO,
#     format='%(asctime)s - %(levelname)s - %(message)s'
# )

# class PMCPDFDownloader:
#     def __init__(self, download_dir: str = "pmc_pdfs"):
#         self.download_dir = Path(download_dir).absolute()
#         self.download_dir.mkdir(exist_ok=True)

#         # Setup Chrome options
#         self.options = uc.ChromeOptions()
#         self.options.add_argument('--headless')  # Run in headless mode
#         self.options.add_argument('--no-sandbox')
#         self.options.add_argument('--disable-dev-shm-usage')
#         self.options.add_experimental_option(
#             "prefs",
#             {
#                 "download.default_directory": str(self.download_dir),
#                 "download.prompt_for_download": False,
#                 "download.directory_upgrade": True,
#                 "plugins.always_open_pdf_externally": True,
#                 "profile.default_content_settings.popups": 0
#             }
#         )

#     def setup_driver(self):
#         """Initialize and return undetected ChromeDriver"""
#         try:
#             driver = uc.Chrome(
#                 options=self.options,
#                 driver_executable_path='/usr/bin/chromedriver'
#             )
#             return driver
#         except Exception as e:
#             logging.error(f"Error setting up Chrome driver: {str(e)}")
#             raise

#     def extract_pmcid(self, url: str) -> str:
#         """Extract PMCID from URL"""
#         match = re.search(r'PMC\d+', url)
#         return match.group(0) if match else None

#     def get_direct_pdf_link(self, driver, url: str) -> str:
#         """Get direct PDF download link from PMC page"""
#         try:
#             driver.get(url)
#             wait = WebDriverWait(driver, 10)
#             pdf_link = wait.until(
#                 EC.presence_of_element_located((By.CSS_SELECTOR, "a[title='Download PDF']"))
#             )
#             return pdf_link.get_attribute('href')
#         except Exception as e:
#             logging.error(f"Error getting PDF link from {url}: {str(e)}")
#             return None

#     def download_pdf(self, driver, url: str, question_id: str) -> bool:
#         """Download PDF for a single PMC article"""
#         try:
#             logging.info(f"Processing {url} for question {question_id}")

#             # Get direct PDF link
#             pdf_url = self.get_direct_pdf_link(driver, url)
#             if not pdf_url:
#                 logging.error(f"Could not get PDF link for {url}")
#                 return False

#             # Get PMCID for filename
#             pmcid = self.extract_pmcid(url)
#             if not pmcid:
#                 logging.error(f"Could not extract PMCID from {url}")
#                 return False

#             # Construct the new filename
#             new_filename = f"{pmcid}_{question_id}.pdf"
#             output_path = self.download_dir / new_filename

#             # Use wget to download the PDF
#             !wget -O "{output_path}" "{pdf_url}"

#             if output_path.exists() and output_path.stat().st_size > 0:
#                 logging.info(f"Successfully downloaded {new_filename}")
#                 return True
#             else:
#                 logging.error(f"Failed to download {new_filename}")
#                 return False

#         except Exception as e:
#             logging.error(f"Error downloading PDF from {url}: {str(e)}")
#             return False

#     def process_dataset(self, dataset_path: str):
#         """Process entire dataset and download PDFs"""
#         try:
#             # Read dataset
#             with open(dataset_path, 'r') as f:
#                 dataset = json.load(f)

#             # Initialize web driver
#             driver = self.setup_driver()

#             try:
#                 # Process each question and its documents
#                 for question in dataset['questions']:
#                     question_id = question['id']

#                     for url in question['documents']:
#                         success = self.download_pdf(driver, url, question_id)
#                         if not success:
#                             logging.warning(f"Failed to download PDF for {url}")

#                         # Add delay between downloads
#                         time.sleep(2)

#             finally:
#                 # Always close the driver
#                 driver.quit()

#         except Exception as e:
#             logging.error(f"Error processing dataset: {str(e)}")
#             import traceback
#             logging.error(traceback.format_exc())

# # Function to run the downloader
# def run_downloader(dataset_path: str):
#     # Initialize downloader
#     downloader = PMCPDFDownloader(download_dir="pmc_pdfs")

#     # Process dataset
#     downloader.process_dataset(dataset_path)

# # Example usage:
# dataset_path = "heart-attack-pmc-json.json"
# run_downloader(dataset_path)

In [None]:
# # First, install required packages
# !pip install undetected-chromedriver
# !apt-get update
# !apt install chromium-chromedriver
# !cp /usr/lib/chromium-browser/chromedriver /usr/bin
# !pip install selenium