# Benchling Pipeline Test Notebook

This notebook tests the complete Benchling ingestion pipeline:

1. **Extract** - Download files from S3 bucket
2. **Ingest** - Process files (PDF, DOCX, XLSX, PPTX, TXT)
3. **Chunk** - Create text chunks with sliding window
4. **Embed** - Generate embeddings with PubMedBERT
5. **Save** - Store to Delta tables (optional) and JSON files

**Key Features:**
- Uses inline configuration (no YAML files)
- Uses Databricks instance profile for S3 access (no AWS secrets)
- Saves to Delta tables instead of PostgreSQL
- Skips NER annotation steps (not applicable for Benchling)

**S3 Bucket:** `gilead-edp-kite-rd-dev-us-west-2-kite-benchling-text-sql`  
**S3 Prefix:** `benchling_unstructured/`

## Setup and Configuration

In [None]:
# Install requirements if needed
%pip install -r ../requirements.txt -q

In [None]:
import os
import sys
from pathlib import Path
from datetime import datetime
import json
import logging

# Add project root to path
project_root = Path(os.getcwd()).parent
sys.path.insert(0, str(project_root))

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print(f"Project root: {project_root}")
print(f"Python path configured")

In [None]:
# ============================================================================
# CONFIGURATION - Modify these values as needed
# ============================================================================

# S3 Configuration
S3_BUCKET = "gilead-edp-kite-rd-dev-us-west-2-kite-benchling-text-sql"
S3_REGION = "us-west-2"
S3_PREFIX = "benchling_unstructured/"  # Can be more specific, e.g., "benchling_unstructured/RD_Biovia_ELNs/"

# Delta Table Configuration (optional - set to None to disable)
DELTA_CATALOG = "kite_rd_dev"  # Set to None to disable Delta writes
DELTA_SCHEMA = "pubtator"
DOCUMENTS_TABLE = "benchling_documents"
CHUNKS_TABLE = "benchling_chunks"

# Processing Configuration
WORKFLOW_ID = f"benchling_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
SOURCE = "benchling"
FILE_TYPES = ["pdf", "docx", "xlsx", "pptx", "txt"]  # File types to process
MAX_FILES = 5  # Limit files for testing (set to None for all)

# Local paths for processing
BASE_PATH = "/tmp/benchling_processing"  # Change for Databricks: "/dbfs/tmp/benchling_processing"

# Embedding model path in S3
EMBEDDINGS_MODEL = "pubmedbert"
EMBEDDINGS_MODEL_S3_PATH = "s3://gilead-edp-kite-rd-dev-us-west-2-kite-benchling-text-sql/models/pubmedbert-base-embeddings/"

# Whether to write to Delta tables
WRITE_TO_DELTA = False  # Set to True in Databricks with proper catalog access

print(f"Workflow ID: {WORKFLOW_ID}")
print(f"S3 Source: s3://{S3_BUCKET}/{S3_PREFIX}")
print(f"File types: {FILE_TYPES}")
print(f"Max files: {MAX_FILES or 'all'}")
print(f"Write to Delta: {WRITE_TO_DELTA}")

In [None]:
# Create configuration object
from src.data_ingestion.ingest_benchling.benchling_config import BenchlingConfig

config = BenchlingConfig.from_dict({
    "s3_bucket": S3_BUCKET,
    "s3_region": S3_REGION,
    "s3_prefix": S3_PREFIX,
    "delta_catalog": DELTA_CATALOG,
    "delta_schema": DELTA_SCHEMA,
    "documents_table": DOCUMENTS_TABLE,
    "chunks_table": CHUNKS_TABLE,
    "base_path": BASE_PATH,
    "allowed_file_types": FILE_TYPES,
    "embeddings_model": EMBEDDINGS_MODEL,
    "embeddings_model_path": EMBEDDINGS_MODEL_S3_PATH,
})

# Get paths for this workflow
paths = config.paths.get_paths(WORKFLOW_ID, SOURCE)

print("\nConfiguration created successfully!")
print(f"\nPaths for workflow '{WORKFLOW_ID}':")
for name, path in paths.items():
    print(f"  {name}: {path}")

## Step 1: Explore S3 Bucket

First, let's explore what files are available in the Benchling S3 bucket.

In [None]:
from src.data_ingestion.ingest_benchling.benchling_s3_client import BenchlingS3Client

# Initialize S3 client (uses instance profile in Databricks)
s3_client = BenchlingS3Client(
    bucket_name=config.s3.bucket_name,
    bucket_region=config.s3.bucket_region,
)

print(f"Connected to bucket: {s3_client.bucket_name}")
print(f"Region: {s3_client.bucket_region}")

In [None]:
# List files in the source prefix
all_files = s3_client.list_files(
    prefix=config.s3.source_prefix,
    file_types=FILE_TYPES,
)

print(f"\nFound {len(all_files)} files matching file types {FILE_TYPES}")
print(f"\nFirst 10 files:")
for f in all_files[:10]:
    print(f"  {f}")

if len(all_files) > 10:
    print(f"  ... and {len(all_files) - 10} more")

In [None]:
# Analyze file types distribution
from collections import Counter
import pandas as pd

extensions = [f.split('.')[-1].lower() for f in all_files if '.' in f]
ext_counts = Counter(extensions)

print("\nFile type distribution:")
df_types = pd.DataFrame(ext_counts.most_common(), columns=['Extension', 'Count'])
display(df_types)

## Step 2: Extract Files from S3

Download files from S3 to local staging directory.

In [None]:
from src.data_ingestion.ingest_benchling.benchling_articles_extractor import (
    extract_benchling_articles,
    stable_hash,
)

# Limit files for testing
files_to_extract = all_files[:MAX_FILES] if MAX_FILES else all_files

print(f"\nExtracting {len(files_to_extract)} files...")
print(f"Files to extract:")
for f in files_to_extract:
    print(f"  {f}")

In [None]:
# Create a modified config with limited prefix if needed
# For testing, we'll extract files one by one to have more control

from src.data_ingestion.ingest_benchling.benchling_articles_extractor import extract_single_file
import os

# Create staging directory
staging_path = paths["ingestion_path"]
os.makedirs(staging_path, exist_ok=True)

extracted_files = {}

for s3_path in files_to_extract:
    print(f"\nExtracting: {s3_path}")
    
    try:
        doc_id = extract_single_file(
            config=config,
            s3_path=s3_path,
            local_staging_path=staging_path,
            workflow_id=WORKFLOW_ID,
            source=SOURCE,
            write_to_delta=WRITE_TO_DELTA,
        )
        
        if doc_id:
            extracted_files[s3_path] = doc_id
            print(f"  -> Document ID: {doc_id}")
        else:
            print(f"  -> Failed to extract")
            
    except Exception as e:
        print(f"  -> Error: {e}")

print(f"\n\nExtracted {len(extracted_files)} files successfully")

## Step 2.5: Load Benchling JSON Metadata and Update Documents

Load Benchling metadata from S3 JSON files and update document records with metadata.

In [None]:
from src.data_ingestion.ingest_benchling.benchling_metadata_loader import (
    load_benchling_metadata_from_s3,
    get_metadata_for_entry,
)
from src.data_ingestion.ingest_benchling.databricks_delta_handler import DatabricksDeltaHandler
from typing import Optional
import re

# Load Benchling metadata from S3
print("Loading Benchling metadata from S3...")
metadata_by_entry = load_benchling_metadata_from_s3(
    bucket_name=S3_BUCKET,
    base_prefix=S3_PREFIX,
)

print(f"Loaded metadata for {len(metadata_by_entry)} entries")

# Extract entry_id from S3 paths (format: benchling_unstructured/{project}/{entry_id}/{date}/{filename})
DATE_PATTERN = re.compile(r"/\d{4}-\d{2}-\d{2}/")

def extract_entry_id_from_path(s3_path: str) -> Optional[str]:
    """Extract entry_id from S3 path."""
    if S3_PREFIX not in s3_path:
        return None
    
    rel_path = s3_path[len(S3_PREFIX):]
    parts = rel_path.split("/")
    
    if len(parts) < 3:
        return None
    
    # entry_id is typically the first part after project
    entry_id = parts[1] if len(parts) > 1 else None
    
    # Verify it looks like an entry_id (usually alphanumeric)
    if entry_id and entry_id.replace("-", "").replace("_", "").isalnum():
        return entry_id
    
    return None

# Update documents with metadata if writing to Delta
if WRITE_TO_DELTA and metadata_by_entry:
    print("\nUpdating documents with Benchling metadata...")
    
    delta_handler = DatabricksDeltaHandler(
        catalog=DELTA_CATALOG,
        schema=DELTA_SCHEMA,
        documents_table=DOCUMENTS_TABLE,
        chunks_table=CHUNKS_TABLE,
    )
    
    updated_count = 0
    for s3_path, doc_id in extracted_files.items():
        entry_id = extract_entry_id_from_path(s3_path)
        
        if entry_id and entry_id in metadata_by_entry:
            metadata = get_metadata_for_entry(entry_id, metadata_by_entry, s3_path)
            
            if metadata:
                success = delta_handler.update_document_with_metadata(
                    document_grsar_id=doc_id,
                    **metadata
                )
                if success:
                    updated_count += 1
                    print(f"  Updated {doc_id[:16]}... with metadata from entry {entry_id}")
    
    print(f"\nUpdated {updated_count} documents with Benchling metadata")
else:
    print("\nSkipping metadata update (WRITE_TO_DELTA=False or no metadata found)")

In [None]:
# List extracted files
extracted_local_files = os.listdir(staging_path)

print(f"\nFiles in staging directory ({staging_path}):")
for f in extracted_local_files:
    file_path = os.path.join(staging_path, f)
    size = os.path.getsize(file_path)
    print(f"  {f} ({size:,} bytes)")

## Step 3: Ingest Files

Process each file type (convert to BioC XML, extract tables, etc.)

In [None]:
from src.data_ingestion.ingest_benchling.articles_ingestor import BenchlingIngestor

# Initialize the ingestor
ingestor = BenchlingIngestor(
    workflow_id=WORKFLOW_ID,
    config=config,
    file_type="all",
    source=SOURCE,
    write_to_delta=WRITE_TO_DELTA,
)

print("Ingestor initialized")
print(f"\nProcessing summary before ingestion:")
summary = ingestor.get_processing_summary()
for key, value in summary.items():
    if key != 'paths':
        print(f"  {key}: {value}")

In [None]:
# Process each file
processed_count = 0
failed_count = 0

for file_name in extracted_local_files:
    print(f"\nProcessing: {file_name}")
    
    try:
        success = ingestor.process_file(file_name)
        if success:
            processed_count += 1
            print(f"  -> Success")
        else:
            failed_count += 1
            print(f"  -> Skipped or failed")
            
    except Exception as e:
        failed_count += 1
        print(f"  -> Error: {e}")

print(f"\n\nIngestion complete!")
print(f"  Processed: {processed_count}")
print(f"  Failed: {failed_count}")

In [None]:
# Check processing results
print("\nProcessing summary after ingestion:")
summary = ingestor.get_processing_summary()
for key, value in summary.items():
    if key != 'paths':
        print(f"  {key}: {value}")

# List BioC XML files
bioc_path = paths["bioc_path"]
if os.path.exists(bioc_path):
    bioc_files = os.listdir(bioc_path)
    print(f"\nBioC XML files ({len(bioc_files)}):")
    for f in bioc_files:
        print(f"  {f}")

# List metadata files
metadata_path = paths["metadata_path"]
if os.path.exists(metadata_path):
    metadata_files = os.listdir(metadata_path)
    print(f"\nMetadata files ({len(metadata_files)}):")
    for f in metadata_files:
        print(f"  {f}")

## Step 4: Create Chunks and Embeddings

Process BioC XML files to create chunks and generate embeddings.

In [None]:
# First, download the embeddings model from S3 (if not already available)
import boto3
from urllib.parse import urlparse

def download_model_from_s3(
    s3_uri: str,
    local_dir: str,
):
    """Download model files from S3."""
    parsed = urlparse(s3_uri)
    bucket = parsed.netloc
    prefix = parsed.path.lstrip("/")
    
    s3 = boto3.client("s3")
    os.makedirs(local_dir, exist_ok=True)
    
    print(f"Downloading model from {s3_uri}...")
    
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            if key.endswith("/"):
                continue
            
            relpath = os.path.relpath(key, prefix)
            local_path = os.path.join(local_dir, relpath)
            
            os.makedirs(os.path.dirname(local_path), exist_ok=True)
            
            if not os.path.exists(local_path):
                s3.download_file(bucket, key, local_path)
                print(f"  Downloaded: {relpath}")
    
    print("Model download complete.")

# Download model
MODEL_LOCAL_DIR = os.path.join(str(project_root), "src/models/pubmedbert-base-embeddings")

if not os.path.exists(MODEL_LOCAL_DIR) or not os.listdir(MODEL_LOCAL_DIR):
    download_model_from_s3(EMBEDDINGS_MODEL_S3_PATH, MODEL_LOCAL_DIR)
else:
    print(f"Model already exists at {MODEL_LOCAL_DIR}")

In [None]:
from src.data_processing.orchestrator_benchling import BenchlingArticleProcessor

# Initialize the processor
processor = BenchlingArticleProcessor(
    workflow_id=WORKFLOW_ID,
    config=config,
    source=SOURCE,
    write_to_delta=WRITE_TO_DELTA,
    embeddings_model=EMBEDDINGS_MODEL,
    window_size=512,  # Chunk size in words
    stride=256,       # 50% overlap
)

print("Processor initialized")

In [None]:
# Process all files and create chunks with embeddings
results = processor.process_all(
    save_to_json=True,
    save_to_delta=WRITE_TO_DELTA,
)

print("\nProcessing Results:")
for key, value in results.items():
    print(f"  {key}: {value}")

In [None]:
# List generated files
chunks_path = paths["chunks_path"]
embeddings_path = paths["embeddings_path"]

print(f"\nChunks files:")
if os.path.exists(chunks_path):
    for f in os.listdir(chunks_path):
        file_path = os.path.join(chunks_path, f)
        size = os.path.getsize(file_path)
        print(f"  {f} ({size:,} bytes)")

print(f"\nEmbeddings files:")
if os.path.exists(embeddings_path):
    for f in os.listdir(embeddings_path):
        file_path = os.path.join(embeddings_path, f)
        size = os.path.getsize(file_path)
        print(f"  {f} ({size:,} bytes)")

## Step 5: Inspect Results

Let's look at the generated chunks and embeddings.

In [None]:
# Load and inspect chunks
chunks_file = os.path.join(chunks_path, f"{WORKFLOW_ID}_all_chunks.json")

if os.path.exists(chunks_file):
    with open(chunks_file, "r") as f:
        all_chunks = json.load(f)
    
    print(f"Total chunks: {len(all_chunks)}")
    
    if all_chunks:
        print(f"\nFirst chunk:")
        first_chunk = all_chunks[0]
        for key, value in first_chunk.items():
            if key == 'embeddings':
                print(f"  {key}: [{len(value)} dimensions]")
            elif key == 'chunk_text' or key == 'merged_text':
                print(f"  {key}: {value[:200]}..." if len(str(value)) > 200 else f"  {key}: {value}")
            else:
                print(f"  {key}: {value}")
else:
    print(f"Chunks file not found: {chunks_file}")

In [None]:
# Create a summary DataFrame
if all_chunks:
    chunk_summary = []
    for chunk in all_chunks:
        chunk_summary.append({
            'chunk_id': chunk['chunk_id'][:8] + '...',
            'document_id': chunk['document_grsar_id'][:8] + '...',
            'sequence': chunk['chunk_sequence'],
            'type': chunk['chunk_type'],
            'word_count': chunk['token_count'],
            'has_embedding': len(chunk.get('embeddings', [])) > 0,
        })
    
    df_chunks = pd.DataFrame(chunk_summary)
    print("\nChunks Summary:")
    display(df_chunks)

## Step 6: Test Semantic Search (Optional)

Demonstrate semantic search using the generated embeddings.

In [None]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def semantic_search(query: str, chunks: list, model, tokenizer, top_k: int = 3):
    """Perform semantic search over chunks."""
    from src.pubtator_utils.embeddings_handler.embeddings_generator import get_embeddings
    
    # Get query embedding
    query_embedding = get_embeddings(
        model_name="pubmedbert",
        texts=[query],
        model=model,
        tokenizer=tokenizer,
    )[0]
    
    # Get chunk embeddings
    chunk_embeddings = np.array([c['embeddings'] for c in chunks if c.get('embeddings')])
    
    if len(chunk_embeddings) == 0:
        print("No chunks with embeddings found")
        return []
    
    # Calculate similarities
    similarities = cosine_similarity([query_embedding], chunk_embeddings)[0]
    
    # Get top k results
    top_indices = np.argsort(similarities)[::-1][:top_k]
    
    results = []
    for idx in top_indices:
        results.append({
            'chunk': chunks[idx],
            'similarity': float(similarities[idx]),
        })
    
    return results

print("Semantic search function defined")

In [None]:
# Load model for search (if we have chunks with embeddings)
if all_chunks and any(c.get('embeddings') for c in all_chunks):
    from transformers import AutoModel, AutoTokenizer
    
    print("Loading embeddings model for search...")
    tokenizer = AutoTokenizer.from_pretrained(MODEL_LOCAL_DIR, local_files_only=True)
    model = AutoModel.from_pretrained(MODEL_LOCAL_DIR, local_files_only=True)
    print("Model loaded")
    
    # Test query - modify based on your data
    QUERY = "experimental results and data analysis"
    
    print(f"\nQuery: '{QUERY}'")
    print("=" * 60)
    
    results = semantic_search(QUERY, all_chunks, model, tokenizer, top_k=3)
    
    for i, result in enumerate(results, 1):
        chunk = result['chunk']
        similarity = result['similarity']
        
        print(f"\nResult #{i} (similarity: {similarity:.4f})")
        print(f"  Document: {chunk['document_grsar_id'][:16]}...")
        print(f"  Type: {chunk['chunk_type']}")
        print(f"  Text: {chunk['chunk_text'][:300]}..." if len(chunk['chunk_text']) > 300 else f"  Text: {chunk['chunk_text']}")
else:
    print("No chunks with embeddings available for search")

## Summary

This notebook demonstrated the complete Benchling ingestion pipeline:

| Step | Description | Output |
|------|-------------|--------|
| 1. Extract | Download files from S3 | Local staging files |
| 2. Ingest | Process files by type | BioC XML, metadata JSON |
| 3. Chunk | Split into chunks | Chunks JSON |
| 4. Embed | Generate embeddings | Embeddings JSON |
| 5. Save | Store to Delta (optional) | Delta tables |

### Key Differences from Apollo Pipeline:
- **No YAML config** - Uses inline configuration
- **No AWS secrets** - Uses Databricks instance profile
- **No PostgreSQL** - Uses Delta tables
- **No NER annotation** - Skips annotation steps

### Output Files:
```
{base_path}/{workflow_id}/benchling/
├── ingestion/      # Original files from S3
├── interim/        # Intermediate processing files
├── bioc_xml/       # BioC XML documents
├── metadata/       # Document metadata JSON
├── chunks/         # Chunk JSON files
├── embeddings/     # Chunks with embeddings
└── failed/         # Failed processing files
```

In [None]:
# Final summary
print("\n" + "=" * 60)
print("PIPELINE EXECUTION SUMMARY")
print("=" * 60)
print(f"\nWorkflow ID: {WORKFLOW_ID}")
print(f"Source: {SOURCE}")
print(f"\nFiles:")
print(f"  Extracted: {len(extracted_files)}")
print(f"  Processed: {processed_count}")
print(f"  Failed: {failed_count}")
print(f"\nChunks:")
print(f"  Total: {len(all_chunks) if 'all_chunks' in dir() else 0}")
print(f"  With embeddings: {sum(1 for c in all_chunks if c.get('embeddings')) if 'all_chunks' in dir() else 0}")
print(f"\nOutput directory: {BASE_PATH}/{WORKFLOW_ID}")
print("\n" + "=" * 60)