In [None]:
# Cell 1: Setup and Configuration
import io
import os
import tempfile
import json
import requests
import pypdf
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.exceptions import SnowparkSQLException
from urllib.parse import urlparse

# --- Configuration ---
SNOWFLAKE_DATABASE = "WORLD_HISTORY"
SNOWFLAKE_SCHEMA = "public"
SNOWFLAKE_ROLE = "SYSADMIN"
TARGET_WEBSITE_URL = "https://glhssocialstudies.weebly.com/world-history-textbook---pdf-copy.html"

# Define stage names WITHOUT a leading '@'
SOURCE_STAGE_NAME = "pdf_documents"

# Dynamic names based on database
EXTERNAL_ACCESS_INTEGRATION_NAME = f"{SNOWFLAKE_DATABASE}_WEB_ACCESS"
NETWORK_RULE_NAME = f"{SNOWFLAKE_DATABASE}_WEBSITE_ACCESS"

ADAPTIVE_SPLIT_TARGET_PATH = f"{SOURCE_STAGE_NAME}/parts"
SINGLE_PAGE_TARGET_PATH = f"{SOURCE_STAGE_NAME}/pages"

# --- Session Initialization ---
session = get_active_session()
session.sql(f"CREATE DATABASE IF NOT EXISTS {SNOWFLAKE_DATABASE};").collect()
session.use_database(SNOWFLAKE_DATABASE)
session.use_schema(SNOWFLAKE_SCHEMA)

print(f"✅ Setup complete.")
print(f"  - Current Role: {session.get_current_role()}")
print(f"  - Configured Role: {SNOWFLAKE_ROLE}")
print(f"  - Database: {session.get_current_database()}")
print(f"  - Schema: {session.get_current_schema()}")
print(f"  - Target Website: {TARGET_WEBSITE_URL}")
print(f"  - Source Stage: @{SOURCE_STAGE_NAME}")
print(f"  - External Access Integration: {EXTERNAL_ACCESS_INTEGRATION_NAME}")
print(f"  - Network Rule: {NETWORK_RULE_NAME}")


In [None]:
# Cell 2: External Access Setup
print("🔐 Setting up external access with proper role management...")

# Switch to ACCOUNTADMIN for setup
session.use_role("ACCOUNTADMIN")
print(f"   Switched to role: {session.get_current_role()}")

# Extract domain from the configured URL
target_domain = urlparse(TARGET_WEBSITE_URL).netloc
print(f"   Target domain: {target_domain}")

# IMPORTANT: Also allow icomets.org where the PDFs are actually hosted
pdf_domain = "icomets.org"
print(f"   PDF domain: {pdf_domain}")

try:
    # Create network rule for BOTH the target website AND the PDF hosting domain
    session.sql(f"""
        CREATE OR REPLACE NETWORK RULE {NETWORK_RULE_NAME}
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('{target_domain}:443', '{target_domain}:80', '{pdf_domain}:443', '{pdf_domain}:80')
    """).collect()
    print(f"✅ Network rule created: {NETWORK_RULE_NAME} for {target_domain} + {pdf_domain}")
    
    # Create external access integration using configured variables
    session.sql(f"""
        CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION {EXTERNAL_ACCESS_INTEGRATION_NAME}
        ALLOWED_NETWORK_RULES = ({NETWORK_RULE_NAME})
        ENABLED = TRUE
    """).collect()
    print(f"✅ External access integration created: {EXTERNAL_ACCESS_INTEGRATION_NAME}")

    # Grant access for the role to use the access integration
    session.sql(f"""
        GRANT USAGE ON INTEGRATION {EXTERNAL_ACCESS_INTEGRATION_NAME} to {SNOWFLAKE_ROLE}
    """).collect()
    print(f"✅ Integration {EXTERNAL_ACCESS_INTEGRATION_NAME} access granted to {SNOWFLAKE_ROLE}")
    
    
except Exception as e:
    print(f"❌ Error during setup: {str(e)}")

finally:
    # Switch back to configured role
    try:
        session.use_role(SNOWFLAKE_ROLE)
        print(f"✅ Switched back to role: {session.get_current_role()}")
    except Exception as role_error:
        print(f"⚠️  Warning: Could not switch to {SNOWFLAKE_ROLE}: {role_error}")

print(f"🔧 External access setup completed!")


# Add external integration to the notebook

Now that the external access integration has been created, you need to go to Notebook Settings -> External Access and enable "World_History_Web_Access" or the downloads will fail.

In [None]:
# Create the PDF documents stage if it doesn't exist
session.sql(f"""
    CREATE STAGE IF NOT EXISTS {SOURCE_STAGE_NAME}
        ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
        DIRECTORY = (ENABLE = TRUE)
        COMMENT = 'Stage for PDF documents'
""").collect()
print(f"✅ Stage @{SOURCE_STAGE_NAME} created/verified")

# Upload your PDF Documents to the Stage

You have the option of using Snowflake to download the PDF's which requires the AccountAdmin role to enable external access to the website containing the files.

Alternatively, you can use Python locally on your machine and run `python3 pdf_downloader.py` (after installing dependencies) and then can manually upload the files to the stage @PDF_DOCUMENTS in the database you created.  If you go this route, skip to 

In [None]:
CREATE OR REPLACE FUNCTION get_pdf_links_from_website(website_url STRING)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.12'
EXTERNAL_ACCESS_INTEGRATIONS = (WORLD_HISTORY_WEB_ACCESS)
PACKAGES = ('requests', 'beautifulsoup4', 'lxml')
HANDLER = 'scrape_pdfs'
AS
$$
import requests
import re
import json
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup

def scrape_pdfs(website_url):
    try:
        # Fetch webpage
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        
        response = requests.get(website_url, headers=headers, timeout=30)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.text, 'html.parser')
        pdf_links = []
        
        # Method 1: Direct PDF links
        for link in soup.find_all('a', href=True):
            href = link['href']
            if href.lower().endswith('.pdf'):
                full_url = urljoin(website_url, href)
                filename = urlparse(href).path.split('/')[-1]
                text = link.get_text(strip=True)
                
                pdf_links.append({
                    'url': full_url,
                    'text': text,
                    'filename': filename,
                    'method': 'direct_link'
                })
        
        # Method 2: Regex patterns if no direct links found
        if len(pdf_links) == 0:
            content = response.text
            pdf_patterns = [
                r'https?://[^"\s]+\.pdf',
                r'/files/[^"\s]+\.pdf',
                r'uploads/[^"\s]+\.pdf'
            ]
            
            found_urls = set()
            for pattern in pdf_patterns:
                matches = re.findall(pattern, content, re.IGNORECASE)
                for match in matches:
                    url = match.strip('"\'')
                    if url.startswith('/'):
                        url = urljoin(website_url, url)
                    found_urls.add(url)
            
            for i, url in enumerate(found_urls, 1):
                filename = urlparse(url).path.split('/')[-1]
                if not filename or not filename.endswith('.pdf'):
                    filename = f"chapter_{i:02d}.pdf"
                
                pdf_links.append({
                    'url': url,
                    'text': f'Chapter {i}',
                    'filename': filename,
                    'method': 'regex_pattern'
                })
        
        return {
            "success": True,
            "website_url": website_url,
            "pdf_count": len(pdf_links),
            "pdf_links": pdf_links,
            "scraped_at": str(response.headers.get('date', 'unknown'))
        }
        
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "website_url": website_url,
            "pdf_count": 0,
            "pdf_links": []
        }
$$;

In [None]:
# Cell 4: Test PDF Discovery
print(f"🌐 Testing PDF discovery from: {TARGET_WEBSITE_URL}")

try:
    query = f"SELECT get_pdf_links_from_website('{TARGET_WEBSITE_URL}') as result"
    result = session.sql(query).collect()
    
    if result:
        result_raw = result[0]['RESULT']
        print(f"🔍 Raw result type: {type(result_raw)}")
        print(f"🔍 Raw result preview: {str(result_raw)[:100]}...")
        
        # Handle both string and dict results
        if isinstance(result_raw, str):
            try:
                result_data = json.loads(result_raw)
            except json.JSONDecodeError:
                print(f"❌ Failed to parse result as JSON: {result_raw}")
                result_data = None
        else:
            result_data = result_raw
        
        if result_data and result_data.get('success', False):
            pdf_links = result_data.get('pdf_links', [])
            pdf_count = result_data.get('pdf_count', 0)
            
            print(f"✅ Found {pdf_count} PDF links")
            
            if pdf_links:
                print(f"📄 Available PDFs:")
                for i, pdf_info in enumerate(pdf_links[:10], 1):  # Show first 10
                    filename = pdf_info.get('filename', 'unknown')
                    text = pdf_info.get('text', 'No description')[:50]
                    print(f"   {i:2d}. {text:<50} → {filename}")
                
                if len(pdf_links) > 10:
                    print(f"   ... and {len(pdf_links) - 10} more PDFs")
            else:
                print("⚠️  No PDF links found")
        else:
            error_msg = result_data.get('error', 'Unknown error') if result_data else 'No result data'
            print(f"❌ PDF discovery failed: {error_msg}")
    else:
        print("❌ No result returned from PDF discovery")
        
except Exception as e:
    print(f"❌ Error during PDF discovery: {str(e)}")
    import traceback
    print(f"🔍 Full traceback:")
    traceback.print_exc()

print(f"📝 Next Steps:")
print(f"1. If PDFs were found above, continue with remaining cells")
print(f"2. Cell 5 will handle download and upload to @{SOURCE_STAGE_NAME}")
print(f"3. Run all cells sequentially for complete processing")


In [None]:
# Cell 6: Diagnose Disk Space Issues
print("🔍 Diagnosing disk space and temp file issues...")

import shutil
import tempfile
import gc

# Check available disk space
try:
    # Get temp directory info
    temp_dir = tempfile.gettempdir()
    print(f"📁 Temp directory: {temp_dir}")
    
    # Check disk usage
    total, used, free = shutil.disk_usage(temp_dir)
    print(f"💾 Disk usage in temp directory:")
    print(f"   Total: {total / (1024**3):.2f} GB")
    print(f"   Used:  {used / (1024**3):.2f} GB") 
    print(f"   Free:  {free / (1024**3):.2f} GB")
    print(f"   Free:  {free / (1024**2):.1f} MB")
    
    if free < 1024**3:  # Less than 1GB
        print(f"⚠️  WARNING: Only {free / (1024**2):.1f} MB free space!")
        print(f"🔧 Need to optimize approach for limited disk space")
    
    # Check current working directory space too
    cwd_total, cwd_used, cwd_free = shutil.disk_usage(".")
    print(f"\n💾 Disk usage in current directory:")
    print(f"   Free:  {cwd_free / (1024**2):.1f} MB")
    
except Exception as e:
    print(f"❌ Error checking disk space: {e}")

# Check for any leftover temp files
try:
    import os
    temp_files = []
    for root, dirs, files in os.walk(temp_dir):
        for file in files:
            if file.endswith('.pdf') and 'tmp' in file:
                filepath = os.path.join(root, file)
                size = os.path.getsize(filepath)
                temp_files.append((filepath, size))
    
    if temp_files:
        print(f"\n🗂️  Found {len(temp_files)} leftover PDF temp files:")
        total_temp_size = 0
        for filepath, size in temp_files:
            print(f"   📄 {os.path.basename(filepath)}: {size / (1024**2):.1f} MB")
            total_temp_size += size
        print(f"   📦 Total temp files: {total_temp_size / (1024**2):.1f} MB")
        
        # Clean up old temp files
        print(f"🧹 Cleaning up leftover temp files...")
        for filepath, _ in temp_files:
            try:
                os.unlink(filepath)
                print(f"   ✅ Deleted {os.path.basename(filepath)}")
            except Exception as cleanup_error:
                print(f"   ❌ Failed to delete {os.path.basename(filepath)}: {cleanup_error}")
    else:
        print(f"\n✅ No leftover PDF temp files found")
        
except Exception as e:
    print(f"❌ Error checking temp files: {e}")

# Force garbage collection
gc.collect()
print(f"\n🧹 Forced garbage collection")

print(f"\n💡 Recommendations:")
if free < 500 * 1024**2:  # Less than 500MB
    print(f"   1. Use streaming approach to avoid saving full files to disk")
    print(f"   2. Process files in smaller batches")
    print(f"   3. Force cleanup after each file")
else:
    print(f"   1. Should have enough space - check for cleanup issues")
    print(f"   2. Monitor temp file accumulation")


In [None]:
# Cell 5: Download PDFs with Optimizations
print(f"📥 Downloading PDFs to @{SOURCE_STAGE_NAME}")

# Clean up existing stage files first
try:
    session.sql(f"REMOVE @{SOURCE_STAGE_NAME}").collect()
    print(f"🧹 Cleaned up existing stage files")
except:
    pass

# Get PDF links
try:
    result = session.sql(f"SELECT get_pdf_links_from_website('{TARGET_WEBSITE_URL}') as result").collect()
    
    if result:
        result_raw = result[0]['RESULT']
        if isinstance(result_raw, str):
            result_data = json.loads(result_raw)
        else:
            result_data = result_raw
        
        if result_data and result_data.get('success', False):
            pdf_links = result_data.get('pdf_links', [])
            total_pdfs = len(pdf_links)
            
            print(f"📊 Found {total_pdfs} PDFs to download")
            
            success_count = 0
            error_count = 0
            
            for i, pdf_info in enumerate(pdf_links, 1):
                pdf_url = pdf_info['url']
                filename = pdf_info['filename']
                
                print(f"\n📄 {i}/{total_pdfs}: {filename}")
                
                # Check available space
                try:
                    temp_dir = tempfile.gettempdir()
                    _, _, free_space = shutil.disk_usage(temp_dir)
                    free_mb = free_space / (1024**2)
                    
                    if free_mb < 150:
                        print(f"   ⚠️  Low space ({free_mb:.1f} MB) - cleaning up...")
                        # Clean up any leftover temp files
                        for temp_file in os.listdir(temp_dir):
                            if temp_file.endswith('.pdf') and 'tmp' in temp_file:
                                try:
                                    os.unlink(os.path.join(temp_dir, temp_file))
                                except:
                                    pass
                        
                        if free_mb < 100:
                            print(f"   ❌ Insufficient space - skipping")
                            error_count += 1
                            continue
                            
                except Exception:
                    pass
                
                temp_path = None
                try:
                    headers = {
                        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
                    }
                    
                    # Streaming download
                    with requests.get(pdf_url, headers=headers, timeout=90, stream=True) as response:
                        response.raise_for_status()
                        
                        # Create temp file with correct name for proper upload
                        temp_dir = tempfile.gettempdir()
                        temp_path = os.path.join(temp_dir, filename)
                        
                        try:
                            total_size = 0
                            with open(temp_path, 'wb') as temp_file:
                                for chunk in response.iter_content(chunk_size=8192):
                                    if chunk:
                                        temp_file.write(chunk)
                                        total_size += len(chunk)
                            
                            print(f"   📦 Downloaded {total_size:,} bytes ({total_size/1024/1024:.1f} MB)")
                            
                            if total_size < 10000:
                                raise Exception(f"File too small: {total_size} bytes")
                            
                            # Upload with proper filename and no compression
                            put_result = session.file.put(
                                local_file_name=temp_path,
                                stage_location=f"@{SOURCE_STAGE_NAME}",
                                auto_compress=False,
                                overwrite=True
                            )
                            
                            if put_result and put_result[0].status == 'UPLOADED':
                                print(f"   ✅ Uploaded {filename}")
                                success_count += 1
                            else:
                                print(f"   ⚠️  Upload failed")
                                error_count += 1
                            
                        finally:
                            if temp_path and os.path.exists(temp_path):
                                os.unlink(temp_path)
                    
                except Exception as e:
                    print(f"   ❌ Error: {str(e)}")
                    error_count += 1
                    if temp_path and os.path.exists(temp_path):
                        try:
                            os.unlink(temp_path)
                        except:
                            pass
                
                # Progress every 5 files
                if i % 5 == 0 or i == total_pdfs:
                    print(f"\n📈 Progress: {i}/{total_pdfs}, ✅{success_count} success, ❌{error_count} errors")
            
            # Final verification  
            print(f"\n📁 Stage verification...")
            stage_files = session.sql(f"LIST @{SOURCE_STAGE_NAME}").collect()
            
            if stage_files:
                print(f"✅ {len(stage_files)} files in @{SOURCE_STAGE_NAME}:")
                total_size = 0
                
                # Sort files by name for better display
                sorted_files = sorted(stage_files, key=lambda x: x['name'])
                
                for file_info in sorted_files:
                    # Get the name field and clean it up
                    full_path = file_info['name']
                    
                    # Handle different path formats from LIST command
                    if full_path.startswith('pdf_documents/'):
                        name = full_path.replace('pdf_documents/', '')
                    elif '/' in full_path:
                        name = full_path.split('/')[-1]
                    else:
                        name = full_path
                    
                    size = file_info['size']
                    total_size += size
                    print(f"   📄 {name} - {size:,} bytes ({size/1024/1024:.1f} MB)")
                
                print(f"\n🎉 SUCCESS! Total: {total_size:,} bytes ({total_size/1024/1024:.1f} MB)")
            else:
                print(f"❌ No files in stage")
                
        else:
            print("❌ Failed to get PDF links")
    else:
        print("❌ No result from discovery")
        
except Exception as e:
    print(f"❌ Download error: {str(e)}")

print(f"\n✅ Download completed!")


In [None]:
# Create Filename-Chapter Associations Table
import re
print("📋 Creating filename-chapter associations table...")

# Create the table with separate chapter number and title
try:
    session.sql(f"""
        CREATE OR REPLACE TABLE FILENAME_CHAPTER_ASSOCIATIONS (
            FILENAME VARCHAR(100),
            CHAPTER_NUMBER INTEGER,
            CHAPTER_TITLE VARCHAR(500),
            ORIGINAL_TEXT VARCHAR(500),
            UPLOAD_TIMESTAMP TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
            PRIMARY KEY (FILENAME)
        )
    """).collect()
    print("✅ Table FILENAME_CHAPTER_ASSOCIATIONS created/verified")
    
    # Get PDF links to extract titles
    result = session.sql(f"SELECT get_pdf_links_from_website('{TARGET_WEBSITE_URL}') as result").collect()
    
    if result:
        result_raw = result[0]['RESULT']
        if isinstance(result_raw, str):
            result_data = json.loads(result_raw)
        else:
            result_data = result_raw
        
        if result_data and result_data.get('success', False):
            pdf_links = result_data.get('pdf_links', [])
            
            print(f"📥 Parsing and inserting {len(pdf_links)} filename-chapter associations...")
            
            # Insert each filename-title mapping with parsed data
            for pdf_info in pdf_links:
                filename = pdf_info['filename']
                original_text = pdf_info.get('text', filename)
                
                # Parse chapter number and title
                # Pattern: "Chapter X: Title (size)" or similar
                chapter_number = None
                chapter_title = original_text
                
                # Try to extract chapter number
                chapter_match = re.search(r'Chapter\s+(\d+)', original_text, re.IGNORECASE)
                if chapter_match:
                    chapter_number = int(chapter_match.group(1))
                    
                    # Extract title after the colon, before any parentheses
                    title_match = re.search(r'Chapter\s+\d+:\s*([^(]+)', original_text, re.IGNORECASE)
                    if title_match:
                        chapter_title = title_match.group(1).strip()
                
                # If no "Chapter X:" pattern, try to extract number from filename
                if chapter_number is None:
                    filename_match = re.search(r'chap(\d+)', filename, re.IGNORECASE)
                    if filename_match:
                        chapter_number = int(filename_match.group(1))
                
                # Clean up chapter title (remove extra spaces, size info)
                chapter_title = re.sub(r'\s*\(\d+[A-Za-z]*\)\s*$', '', chapter_title).strip()
                
                try:
                    session.sql(f"""
                        INSERT INTO FILENAME_CHAPTER_ASSOCIATIONS 
                        (FILENAME, CHAPTER_NUMBER, CHAPTER_TITLE, ORIGINAL_TEXT)
                        VALUES ('{filename}', {chapter_number or 'NULL'}, '{chapter_title.replace("'", "''")}', '{original_text.replace("'", "''")}')
                    """).collect()
                except Exception as e:
                    print(f"   ⚠️  Error inserting {filename}: {str(e)}")
            
            # Verify the data
            associations = session.sql("SELECT * FROM FILENAME_CHAPTER_ASSOCIATIONS ORDER BY CHAPTER_NUMBER").collect()
            
            print(f"\n✅ {len(associations)} associations created:")
            for assoc in associations[:10]:  # Show first 10
                filename = assoc['FILENAME']
                chapter_num = assoc['CHAPTER_NUMBER']
                title = assoc['CHAPTER_TITLE'][:50] + "..." if len(assoc['CHAPTER_TITLE']) > 50 else assoc['CHAPTER_TITLE']
                print(f"   📄 {filename:<15} → Ch.{chapter_num:2d}: {title}")
            
            if len(associations) > 10:
                print(f"   ... and {len(associations) - 10} more associations")
                
            print(f"\n🎯 Query examples:")
            print(f"   • SELECT * FROM FILENAME_CHAPTER_ASSOCIATIONS WHERE CHAPTER_NUMBER = 1")
            print(f"   • SELECT FILENAME, CHAPTER_TITLE FROM FILENAME_CHAPTER_ASSOCIATIONS ORDER BY CHAPTER_NUMBER")
                
        else:
            print("❌ Failed to get PDF links for associations")
    else:
        print("❌ No result from PDF discovery")
        
except Exception as e:
    print(f"❌ Error creating associations: {str(e)}")

print(f"\n✅ Filename-chapter associations completed!")


# Note to self... 
do something with the above associations; add to metadata...

# Done with file upload!

If you manually uploaded files to the Snowflake stage, continue processing the files starting from the cell below.

In [None]:
#
# Helper Function (Adaptive Splitting) (Updated)
#
def get_page_range_desc(page_labels, start_idx, end_idx):
    start_label = page_labels[start_idx] if page_labels and start_idx < len(page_labels) else start_idx + 1
    end_label = page_labels[end_idx] if page_labels and end_idx < len(page_labels) else end_idx + 1
    clean_desc = f"pages{start_label}to{end_label}".replace('/', '_').replace(' ', '')
    readable_desc = f"pages {start_label}-{end_label}"
    return clean_desc, readable_desc

def split_large_pdf_on_stage(session, file_content_stream, original_filename, target_stage_path, max_size_mb=25):
    original_size_mb = file_content_stream.getbuffer().nbytes / (1024 * 1024)
    print(f"\n📄 Processing {original_filename} ({original_size_mb:.1f} MB)")

    # --- MODIFIED: DO NOT COPY SMALL FILES ---
    if original_size_mb <= 25:
        print("✅ File is under 25MB and in the correct stage. No action needed.")
        return [{'filename': original_filename, 'page_range': 'all', 'size_mb': original_size_mb}]

    # --- Logic for splitting large files remains the same ---
    with tempfile.TemporaryDirectory() as temp_dir:
        reader = pypdf.PdfReader(file_content_stream)
        total_pages, page_labels = len(reader.pages), reader.page_labels
        print(f"Total pages: {total_pages}. Starting adaptive split...")
        
        output_parts, start_page_of_chunk, part_num = [], 0, 1
        
        while start_page_of_chunk < total_pages:
            size_test_writer, end_page_of_chunk = pypdf.PdfWriter(), start_page_of_chunk - 1
            for i in range(start_page_of_chunk, total_pages):
                size_test_writer.add_page(reader.pages[i])
                with io.BytesIO() as buffer:
                    size_test_writer.write(buffer)
                    if buffer.tell() / (1024 * 1024) > max_size_mb and i > start_page_of_chunk: break
                end_page_of_chunk = i
            
            final_writer = pypdf.PdfWriter()
            for i in range(start_page_of_chunk, end_page_of_chunk + 1):
                final_writer.add_page(reader.pages[i])

            base_name = original_filename.rsplit('.', 1)[0]
            clean_desc, readable_desc = get_page_range_desc(page_labels, start_page_of_chunk, end_page_of_chunk)
            output_filename = f"{base_name}_{clean_desc}.pdf"
            
            temp_part_path = os.path.join(temp_dir, output_filename)
            with open(temp_part_path, "wb") as f: final_writer.write(f)

            final_size_mb = os.path.getsize(temp_part_path) / (1024 * 1024)
            session.file.put(
                local_file_name=temp_part_path,
                #stage_location=f"{target_stage_path}/{output_filename}",
                stage_location=f"{target_stage_path}",
                auto_compress=False, overwrite=True
            )
            print(f"  - Part {part_num}: Uploaded {output_filename} ({final_size_mb:.1f} MB, {readable_desc})")
            
            output_parts.append({'filename': output_filename, 'page_range': readable_desc, 'size_mb': final_size_mb})
            part_num += 1
            start_page_of_chunk = end_page_of_chunk + 1
            
    return output_parts

In [None]:
#
# Main Execution (Adaptive Splitting for Large Files)
#
print("--- Starting Task 1: Adaptive splitting for files > 25MB ---")
all_results = {}
try:
    # pattern selects only the root directory so we don't also do anything recursive by selecting sub-directories
    staged_files = session.sql(f"LS @{SOURCE_STAGE_NAME} PATTERN='[^/]+'").collect()
    files_to_process = [f["name"] for f in staged_files if f["name"].lower().endswith('.pdf') and 'pages' not in f["name"]]

    if not files_to_process:
        print("No matching PDF files found in the stage to process.")
    else:
        print(f"Found {len(files_to_process)} PDF(s) to check...")
        for file_path_on_stage in sorted(files_to_process):
            try:
                stage_file_path = f"@{file_path_on_stage}"
                file_name_only = file_path_on_stage.split('/')[-1]
                
                with session.file.get_stream(stage_file_path) as instream:
                    pdf_bytes_io = io.BytesIO(instream.read())
                    parts = split_large_pdf_on_stage(
                        session=session,
                        file_content_stream=pdf_bytes_io,
                        original_filename=file_name_only,
                        target_stage_path=ADAPTIVE_SPLIT_TARGET_PATH, # Use correct target
                    )
                    if len(parts) > 1:
                        all_results[file_name_only.rsplit('.', 1)[0]] = parts
            except Exception as e:
                print(f"❌ Error processing {file_path_on_stage}: {e}")
except SnowparkSQLException as e:
    print(f"❌ SQL Error: Could not list files in stage '@{SOURCE_STAGE_NAME}'. Please check permissions.")
    print(e)
print("\n--- Task 1 Complete ---")

In [None]:
#
# Cell 4: Helper Functions (Single-Page Splitting) (New)
#
def get_page_label(pdf_reader, page_num):
    """Extracts the page label for a given page number."""
    try:
        return pdf_reader.page_labels[page_num]
    except (IndexError, KeyError):
        return None

def split_pdf_to_single_pages_on_stage(session, file_content_stream, original_filename, target_stage_path):
    """Splits a PDF from a stream into single pages and uploads them to a stage directory."""
    print(f"\n📄 Splitting {original_filename} into single pages...")
    
    with tempfile.TemporaryDirectory() as temp_dir:
        reader = pypdf.PdfReader(file_content_stream)
        total_pages = len(reader.pages)
        print(f"Total pages: {total_pages}")

        base_name = original_filename.rsplit('.', 1)[0]
        page_upload_info = []

        for page_num in range(total_pages):
            pdf_writer = pypdf.PdfWriter()
            pdf_writer.add_page(reader.pages[page_num])
            
            page_label = get_page_label(reader, page_num)
            
            if page_label:
                clean_label = str(page_label).replace('/', '_').replace('\\', '_')
                try:
                    # Attempt to convert and format
                    page_number_str = f"{int(clean_label):04d}"
                    output_filename = f"{base_name}_page{page_number_str}.pdf"
                except ValueError:
                    # Handle cases where clean_label is not a valid number
                    output_filename = f"{base_name}_page{clean_label}.pdf" 
            else:
                output_filename = f"{base_name}_page{page_num+1:04d}_nolabel.pdf"
            
            temp_page_path = os.path.join(temp_dir, output_filename)
            with open(temp_page_path, "wb") as f:
                pdf_writer.write(f)

            session.file.put(
                local_file_name=temp_page_path,
                #stage_location=f"{target_stage_path}/{output_filename}",
                stage_location=f"{target_stage_path}",
                auto_compress=False, overwrite=True
            )
            page_upload_info.append({'filename': output_filename, 'page_label': page_label})
            
            if (page_num + 1) % 20 == 0 or page_num == total_pages - 1:
                print(f"  ...uploaded {page_num+1}/{total_pages} pages")
    
    print(f"✅ Split into {len(page_upload_info)} individual pages in @{target_stage_path}")
    return page_upload_info

In [None]:
#
# Cell 5: Main Execution (Single-Page Splitting) (Updated)
#
print("\n--- Starting Task 2: Splitting all original files into single pages ---")
single_page_results = {} # To store results for the summary

try:
    # the pattern select only from the root; otherwise we get recursive splitting and sub-directory files
    staged_files = session.sql(f"LS @{SOURCE_STAGE_NAME} PATTERN='[^/]+'").collect()
    # Find original PDFs, excluding any adaptively split parts
    files_to_process = [f["name"] for f in staged_files if f["name"].lower().endswith('.pdf') and '_pages' not in f["name"]]

    if not files_to_process:
        print("No original PDF files found in the stage to process.")
    else:
        print(f"Found {len(files_to_process)} original PDF(s) to split into single pages...")
        for file_path_on_stage in sorted(files_to_process):
            try:
                stage_file_path = f"@{file_path_on_stage}"
                file_name_only = file_path_on_stage.split('/')[-1]
                
                with session.file.get_stream(stage_file_path) as instream:
                    pdf_bytes_io = io.BytesIO(instream.read())
                    # Capture the returned info
                    page_upload_info = split_pdf_to_single_pages_on_stage(
                        session=session,
                        file_content_stream=pdf_bytes_io,
                        original_filename=file_name_only,
                        target_stage_path=SINGLE_PAGE_TARGET_PATH,
                    )
                    single_page_results[file_name_only] = page_upload_info
            except Exception as e:
                print(f"❌ Error splitting {file_path_on_stage} into single pages: {e}")
except SnowparkSQLException as e:
    print(f"❌ SQL Error: Could not list files in stage '@{SOURCE_STAGE_NAME}'. Please check permissions.")
    print(e)
print("\n--- Task 2 Complete ---")

In [None]:
#
# Cell 6: Final Summary
#
print("\n" + "="*60)
print("✅ All Tasks Complete")
print("="*60)

# --- Summary for Task 1: Adaptive Splitting ---
print("\n📋 Summary of Adaptive Splitting (Task 1)")
if all_results:
    total_parts = sum(len(parts) for parts in all_results.values())
    print(f"  - Total: {len(all_results)} large PDF(s) were split into {total_parts} parts.")
    print(f"  - Destination: @{ADAPTIVE_SPLIT_TARGET_PATH}/")
else:
    print("  - No files required adaptive splitting.")

# --- Summary for Task 2: Single-Page Splitting ---
print("\n📋 Summary of Single-Page Splitting (Task 2)")
if single_page_results:
    total_pages_created = sum(len(pages) for pages in single_page_results.values())
    print(f"  - Total: {len(single_page_results)} original PDF(s) were split into {total_pages_created} single pages.")
    print(f"  - Destination: @{SINGLE_PAGE_TARGET_PATH}/")
else:
    print("  - Single-page splitting was not run or no files were processed.")

print("\n" + "="*60)

# Start Table Creation

In [None]:
-- Main table for individual document pages
CREATE OR REPLACE TABLE DOCUMENT_PAGES (
    PAGE_ID STRING PRIMARY KEY,
    CHAPTER_NUMBER INTEGER,
    PAGE_NUMBER INTEGER,                    -- Actual page number from PDF
    PART_IDENTIFIER STRING,                 -- Links pages to their parent parts (e.g., 'pages64to76')
    CHAPTER_TITLE STRING,
    -- UNIT_NUMBER INTEGER,
    PAGE_CONTENT_RAW TEXT,
    PAGE_CONTENT TEXT,                      -- Content for this specific page
    PAGE_SUMMARY TEXT,                      -- AI-generated page summary
    PAGE_KEYWORDS ARRAY,                    -- Key terms on this page
    --CONTENT_VECTOR VECTOR(FLOAT, 768),     -- Page embedding for search
    
    -- Content metadata
    CONTENT_TYPE STRING,                    -- 'text_heavy', 'visual_heavy', 'mixed'
    WORD_COUNT INTEGER,
    CHAR_COUNT INTEGER,
    
    -- URL and citation
    CHAPTER_PDF_URL STRING,                 -- Presigned URL to PDF
    SOURCE_FILENAME STRING,                 -- Original source filename (RELATIVE_PATH)
    ENHANCED_CITATION STRING,               -- "Chapter Title, Chapter X, Page Y"
    
    -- Processing metadata
    PROCESSING_STATUS STRING DEFAULT 'PENDING',
    PROCESSING_TIMESTAMP TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);

-- ================================================================================
-- HIERARCHICAL RAG STAGING TABLE - For chapter and part-level content
-- ================================================================================

-- Staging table for chapter and multi-page part PDFs
CREATE OR REPLACE TABLE DOCUMENT_PARTS (
    CHAPTER_NUMBER INTEGER,
    PART_IDENTIFIER STRING,      -- e.g., 'pages64to76', 'full' for complete chapters
    PART_CONTENT_RAW STRING,     -- Raw text extracted from PDF
    SOURCE_FILENAME STRING,      -- Original filename
    PDF_URL STRING,              -- Presigned URL to the source PDF
    
    -- Processing metadata
    WORD_COUNT INTEGER,
    CHAR_COUNT INTEGER,
    ENHANCED_CITATION STRING,
    PROCESSING_STATUS STRING DEFAULT 'PENDING',
    CREATED_TIMESTAMP TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
)
COMMENT = 'Staging table for chapter-level and multi-page part PDFs used in hierarchical RAG pipeline';

-- AI analysis results for cross-reference extraction
CREATE OR REPLACE TABLE DOCUMENT_ANALYSIS (
    PAGE_ID STRING PRIMARY KEY,
    CHAPTER_NUMBER INTEGER,
    PAGE_NUMBER INTEGER,
    PAGE_CONTENT TEXT,
    EXTRACTED_REFERENCES VARIANT,          -- JSON array of cross-references
    AI_MODEL STRING,
    PROCESSING_TIMESTAMP TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Knowledge graph edges between pages/chapters
CREATE OR REPLACE TABLE DOCUMENT_EDGES (
    EDGE_ID STRING DEFAULT CONCAT('EDGE_', UNIFORM(1, 999999999, RANDOM())) PRIMARY KEY,
    
    -- Source page
    SRC_PAGE_ID STRING,
    SRC_CHAPTER_NUMBER INTEGER,
    SRC_PAGE_NUMBER INTEGER,
    
    -- Destination page (may not exist yet)
    DST_PAGE_ID STRING,                     -- NULL if referenced page doesn't exist
    DST_CHAPTER_NUMBER INTEGER,
    DST_PAGE_NUMBER INTEGER,
    
    -- Reference details
    REFERENCE_TYPE STRING,                  -- 'page_reference', 'chapter_reference', 'figure_reference'
    REFERENCE_CONTEXT STRING,              -- "see page 759", "discussed in Chapter 24"
    REFERENCE_EXPLANATION STRING,          -- AI explanation of the connection
    CONFIDENCE_SCORE FLOAT,
    
    PROCESSING_TIMESTAMP TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);

-- ================================================================================
-- HIERARCHICAL RAG TABLE - Final unified table for multi-granularity RAG
-- ================================================================================

-- Final table for hierarchical multi-hop RAG - stores content at all granularity levels
CREATE OR REPLACE TABLE WORLD_HISTORY_RAG (
    CHAPTER_NUMBER INTEGER,
    PART_IDENTIFIER STRING,     -- e.g., 'pages64to76', 'full', or NULL for chapter-wide summaries
    PAGE_NUMBER INTEGER,        -- The specific page number, or NULL for part/chapter summaries
    CONTENT_TYPE STRING,        -- 'ChapterSummary', 'PartSummary', 'PageSummary', 'RawText'
    TEXT_CONTENT STRING,        -- The actual text or summary
    SOURCE_FILENAME STRING,     -- The original file the content was extracted from
    PDF_URL STRING,             -- A presigned URL to the source PDF
    PAGE_ID STRING,             -- The page ID used in multi-hop search
    ENHANCED_CITATION STRING,   -- "Chapter Title, Chapter X, Page Y"
    -- Processing metadata
    CREATED_TIMESTAMP TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
)    
    COMMENT = 'Hierarchical RAG table storing content at multiple granularities: raw text, page summaries, part summaries, and chapter summaries';


In [None]:
INSERT INTO DOCUMENT_PAGES (
    PAGE_ID, CHAPTER_NUMBER, PAGE_NUMBER, PART_IDENTIFIER, CHAPTER_TITLE,
     PAGE_CONTENT_RAW, PAGE_CONTENT,     CONTENT_TYPE, WORD_COUNT, CHAR_COUNT, CHAPTER_PDF_URL,
    SOURCE_FILENAME, ENHANCED_CITATION, PROCESSING_STATUS
)
WITH page_processing AS (
    SELECT
        -- Extract chapter and page numbers using regex
        ZEROIFNULL(REGEXP_SUBSTR(RELATIVE_PATH, 'chap(\\d+)', 1, 1, 'ie', 1))::INT AS chapter_number,
        REGEXP_SUBSTR(RELATIVE_PATH, 'page[A-Z]*(\\d+)', 1, 1, 'ie', 1)::INT AS page_number,
        
        -- Extract part identifier if this page belongs to a multi-page chunk (usually NULL for individual pages)
        REGEXP_SUBSTR(RELATIVE_PATH, 'pages(\\d+to\\d+)', 1, 1, 'ie', 1) as part_identifier,
        
        -- Generate consistent page ID
        'CH' || LPAD(ZEROIFNULL(REGEXP_SUBSTR(RELATIVE_PATH, 'chap(\\d+)', 1, 1, 'ie', 1))::INT, 2, '0') || 
        '_P' || LPAD(REGEXP_SUBSTR(RELATIVE_PATH, 'page[A-Z]*(\\d+)', 1, 1, 'ie', 1)::INT, 4, '0') as page_id,
        
        -- Parse PDF content
        SNOWFLAKE.CORTEX.PARSE_DOCUMENT('@PDF_DOCUMENTS', RELATIVE_PATH, {'mode': 'LAYOUT'}) as parse_result,
        RELATIVE_PATH,
        
        -- Generate presigned URL
        GET_PRESIGNED_URL('@PDF_DOCUMENTS', RELATIVE_PATH, 604800) as pdf_url
    FROM
        DIRECTORY(@PDF_DOCUMENTS)
    WHERE
        RELATIVE_PATH LIKE 'pages/%.pdf'
        AND page_id
            NOT IN (SELECT PAGE_ID FROM DOCUMENT_PAGES WHERE PAGE_ID IS NOT NULL)  -- Idempotency check
)
SELECT
    -- Core identifiers
    page_id,
    chapter_number,
    page_number,
    part_identifier,
    'Chapter ' || chapter_number as chapter_title,
    
    
    -- Content fields
    COALESCE(parse_result['content']::STRING, '') as page_content_raw,
    COALESCE(parse_result['content']::STRING, '') as page_content,  -- Same as raw for now

    
    -- Content metadata
    CASE 
        WHEN LENGTH(TRIM(COALESCE(parse_result['content']::STRING, ''))) < 100 THEN 'text_light'
        WHEN LENGTH(TRIM(COALESCE(parse_result['content']::STRING, ''))) > 2000 THEN 'text_heavy'
        ELSE 'mixed'
    END as content_type,
    
    -- Counts
    CASE 
        WHEN LENGTH(TRIM(COALESCE(parse_result['content']::STRING, ''))) = 0 THEN 0
        ELSE ARRAY_SIZE(SPLIT(COALESCE(parse_result['content']::STRING, ''), ' '))
    END as word_count,
    LENGTH(COALESCE(parse_result['content']::STRING, '')) as char_count,
    
    -- URLs and citations
    pdf_url as chapter_pdf_url,
    RELATIVE_PATH as source_filename,
    CONCAT(
        'Chapter ',
        chapter_number,
        ', Pages ',
        REPLACE(
        REGEXP_SUBSTR(source_filename, '\\d+to\\d+'),
        'to',
        '-'
        )
     ) AS enhanced_citation,       
    
    -- Processing status
    'PROCESSED' as processing_status

FROM page_processing
WHERE chapter_number IS NOT NULL 
  AND page_number IS NOT NULL
  AND LENGTH(TRIM(COALESCE(parse_result['content']::STRING, ''))) > 50;  -- Quality filter


In [None]:
INSERT INTO DOCUMENT_PARTS (CHAPTER_NUMBER, PART_IDENTIFIER, PART_CONTENT_RAW, SOURCE_FILENAME, PDF_URL, WORD_COUNT, CHAR_COUNT, ENHANCED_CITATION, PROCESSING_STATUS)
WITH parsed_docs AS (
  SELECT
    RELATIVE_PATH,
    SNOWFLAKE.CORTEX.PARSE_DOCUMENT('@PDF_DOCUMENTS', RELATIVE_PATH, {'mode': 'LAYOUT'})['content']::STRING AS content_raw
  FROM DIRECTORY(@PDF_DOCUMENTS)
  WHERE
    RELATIVE_PATH LIKE 'parts/%.pdf' -- Only files in parts subdirectory
    AND RELATIVE_PATH NOT IN (SELECT SOURCE_FILENAME FROM DOCUMENT_PARTS WHERE SOURCE_FILENAME IS NOT NULL) -- Idempotency check
)
SELECT
    ZEROIFNULL(REGEXP_SUBSTR(pd.RELATIVE_PATH, 'chap(\\d+)', 1, 1, 'ie', 1))::INT as chapter_number,
    REGEXP_SUBSTR(pd.RELATIVE_PATH, '(pages\\d+to\\d+)', 1, 1, 'ie', 1) as part_identifier,
    pd.content_raw,
    pd.RELATIVE_PATH,
    GET_PRESIGNED_URL('@PDF_DOCUMENTS', pd.RELATIVE_PATH, 604800),
    -- Calculate word and character counts from the content_raw field
    ARRAY_SIZE(SPLIT(pd.content_raw, ' ')),
    LENGTH(pd.content_raw),
    'Chapter ' || chapter_number || ', Part ' || part_identifier as enhanced_citation,
    'PROCESSED'
FROM parsed_docs AS pd
WHERE
    ZEROIFNULL(REGEXP_SUBSTR(pd.RELATIVE_PATH, 'chap(\\d+)', 1, 1, 'ie', 1))::INT IS NOT NULL; -- Must have a valid chapter number

In [None]:
INSERT INTO WORLD_HISTORY_RAG (CHAPTER_NUMBER, PART_IDENTIFIER, PAGE_NUMBER, CONTENT_TYPE, TEXT_CONTENT, SOURCE_FILENAME, PDF_URL, PAGE_ID, ENHANCED_CITATION)

-- First, select and insert the raw text for each page
SELECT
    p.CHAPTER_NUMBER, 
    p.PART_IDENTIFIER, 
    p.PAGE_NUMBER, 
    'RawText' AS CONTENT_TYPE,
    AI_COMPLETE(
        'llama3.3-70b', 
        'Return the raw text of the page with obvious errors fixed.  This includes spelling errors, hyphenated words, combined words, etc.  If and only a part of the text is not clear you can try to figure out what it means, but if there is not enough context then just return the raw text.  If any historical names, dates, etc seem way off try to correct it but do not make up anything.  Do NOT add any additional text or commentary like "Here is the text with obvious errors fixed".  The text will be used later in a search engine so it should be as close to the original text as possible. :\n' || p.PAGE_CONTENT_RAW
    ) as PAGE_CONTENT_RAW,
    p.SOURCE_FILENAME, 
    p.CHAPTER_PDF_URL,
    'CH' || LPAD(p.CHAPTER_NUMBER, 2, '0') || '_P' || LPAD(p.PAGE_NUMBER, 4, '0') as PAGE_ID,
    p.ENHANCED_CITATION
FROM DOCUMENT_PAGES p
WHERE p.PROCESSING_STATUS = 'PROCESSED'
  AND p.PAGE_CONTENT_RAW IS NOT NULL
  AND LENGTH(p.PAGE_CONTENT_RAW) > 50;  -- Only process pages with substantial content


In [None]:
INSERT INTO WORLD_HISTORY_RAG (CHAPTER_NUMBER, PART_IDENTIFIER, PAGE_NUMBER, CONTENT_TYPE, TEXT_CONTENT, SOURCE_FILENAME, PDF_URL, PAGE_ID, ENHANCED_CITATION)
SELECT
    p.CHAPTER_NUMBER, 
    p.PART_IDENTIFIER, 
    p.PAGE_NUMBER, 
    'PageSummary' AS CONTENT_TYPE,
    AI_COMPLETE(
        'llama3.3-70b', 
        'Do not add any additional text or commentary like "Here is the summary".  The text will be used later in a search engine so it should be as close to the original text as possible.  Summarize this page content in 1-2 concise sentences:\n' || p.PAGE_CONTENT_RAW
    ),
    p.SOURCE_FILENAME, 
    p.CHAPTER_PDF_URL,
    'CH' || LPAD(p.CHAPTER_NUMBER, 2, '0') || '_P' || LPAD(p.PAGE_NUMBER, 4, '0') as PAGE_ID,
    p.ENHANCED_CITATION
FROM DOCUMENT_PAGES p
WHERE p.PROCESSING_STATUS = 'PROCESSED'
  AND p.PAGE_CONTENT_RAW IS NOT NULL
  AND LENGTH(p.PAGE_CONTENT_RAW) > 50  -- Only process pages with substantial content
  AND CONCAT(p.CHAPTER_NUMBER, '_', p.PAGE_NUMBER, '_PageSummary') NOT IN (
      SELECT CONCAT(CHAPTER_NUMBER, '_', PAGE_NUMBER, '_', CONTENT_TYPE) 
      FROM WORLD_HISTORY_RAG 
      WHERE CONTENT_TYPE = 'PageSummary'
      limit 10 
  ); -- Idempotency check for page summaries

In [None]:
INSERT INTO WORLD_HISTORY_RAG (CHAPTER_NUMBER, PART_IDENTIFIER, PAGE_NUMBER, CONTENT_TYPE, TEXT_CONTENT, SOURCE_FILENAME, PDF_URL, PAGE_ID, ENHANCED_CITATION)
SELECT
    p.CHAPTER_NUMBER,
    p.PART_IDENTIFIER,
    NULL AS PAGE_NUMBER,  -- Parts don't have specific page numbers
    'RawText' AS CONTENT_TYPE,
    AI_COMPLETE(
      'llama3.3-70b', 
      'Return the raw text of the page with obvious errors fixed.  This includes spelling errors, hyphenated words, combined words, etc.  If and only a part of the text is not clear you can try to figure out what it means, but if there is not enough context then just return the raw text.  If any historical names, dates, etc seem way off try to correct it but do not make up anything.  Do NOT add any additional text or commentary like "Here is the text with obvious errors fixed".  The text will be used later in a search engine so it should be as close to the original text as possible. :\n' ||  p.PART_CONTENT_RAW
    ) as TEXT_CONTENT,
    p.SOURCE_FILENAME,
    p.PDF_URL,
    'chap' || LPAD(p.CHAPTER_NUMBER, 2, '0') || '_part' || p.PART_IDENTIFIER as PAGE_ID,
    p.ENHANCED_CITATION
FROM DOCUMENT_PARTS p
WHERE p.PART_CONTENT_RAW IS NOT NULL
  -- AND LENGTH(TRIM(p.PART_CONTENT_RAW)) > 100  -- Quality filter
  AND CONCAT(p.CHAPTER_NUMBER, '_', p.PART_IDENTIFIER, '_RawText') NOT IN (
      SELECT CONCAT(CHAPTER_NUMBER, '_', PART_IDENTIFIER, '_', CONTENT_TYPE) 
      FROM WORLD_HISTORY_RAG 
      WHERE CONTENT_TYPE = 'RawText' AND PART_IDENTIFIER IS NOT NULL
  ); -- Idempotency check


In [None]:
INSERT INTO WORLD_HISTORY_RAG (CHAPTER_NUMBER, PART_IDENTIFIER, PAGE_NUMBER, CONTENT_TYPE, TEXT_CONTENT, SOURCE_FILENAME, PDF_URL, PAGE_ID, ENHANCED_CITATION)
SELECT
    p.CHAPTER_NUMBER,
    p.PART_IDENTIFIER,
    NULL AS PAGE_NUMBER,  -- Parts don't have specific page numbers
    'PartSummary' AS CONTENT_TYPE,
    AI_COMPLETE(
        'llama3.3-70b',
        'Do not add any additional text or commentary like "Here is the summary".  If there is not enough context to summarize the part then just return the raw text.  Summarize the following history textbook section in 2-3 concise paragraphs. Focus on the main themes, key events, important people, and historical significance:\n\n' || 
        p.PART_CONTENT_RAW ||  
        '\n\nProvide a clear, educational summary suitable for a history student.'
    ) AS TEXT_CONTENT,
    p.SOURCE_FILENAME,
    p.PDF_URL,
    'CH' || LPAD(p.CHAPTER_NUMBER, 2, '0') || '_' || p.PART_IDENTIFIER as PAGE_ID,
    p.ENHANCED_CITATION
FROM DOCUMENT_PARTS p
WHERE p.PART_CONTENT_RAW IS NOT NULL
  -- AND LENGTH(TRIM(p.PART_CONTENT_RAW)) > 100  -- Quality filter
  AND CONCAT(p.CHAPTER_NUMBER, '_', p.PART_IDENTIFIER, '_PartSummary') NOT IN (
      SELECT CONCAT(CHAPTER_NUMBER, '_', PART_IDENTIFIER, '_', CONTENT_TYPE) 
      FROM WORLD_HISTORY_RAG 
      WHERE CONTENT_TYPE = 'PartSummary' AND PART_IDENTIFIER IS NOT NULL
  ); -- Idempotency check

In [None]:
INSERT INTO WORLD_HISTORY_RAG (CHAPTER_NUMBER, PART_IDENTIFIER, PAGE_NUMBER, CONTENT_TYPE, TEXT_CONTENT, SOURCE_FILENAME, PDF_URL)
WITH aggregated_part_summaries AS (
    SELECT
        CHAPTER_NUMBER,
        ANY_VALUE(PDF_URL) as PDF_URL,
        ANY_VALUE(SOURCE_FILENAME) as SOURCE_FILENAME,
        LISTAGG(TEXT_CONTENT, '\n\n---\n\n') WITHIN GROUP (ORDER BY PART_IDENTIFIER) as full_chapter_text,
        COUNT(*) as part_count
    FROM WORLD_HISTORY_RAG
    WHERE CONTENT_TYPE = 'PartSummary'
      AND CHAPTER_NUMBER NOT IN (
          SELECT DISTINCT CHAPTER_NUMBER 
          FROM WORLD_HISTORY_RAG 
          WHERE CONTENT_TYPE = 'ChapterSummary'
      ) -- Idempotency check for chapter summaries
    GROUP BY CHAPTER_NUMBER
)
SELECT
    a.CHAPTER_NUMBER, 
    NULL AS PART_IDENTIFIER, 
    NULL AS PAGE_NUMBER, 
    'ChapterSummary' AS CONTENT_TYPE,
    AI_COMPLETE(
        'llama3.3-70b', 
        'You are given several high-level summaries from different sections of a history chapter. Combine them into a single, comprehensive 2-3 paragraph summary of the entire chapter.  Do not add any additional text or commentary like "Here is the summary".  The text will be used later in a search engine so it should be as close to the original text as possible. :\n\n' || 
        a.full_chapter_text ||
        '\n\nCreate a cohesive narrative that synthesizes the key themes, events, and historical significance covered throughout this chapter.'
    ),
    a.SOURCE_FILENAME, 
    a.PDF_URL
FROM aggregated_part_summaries a
WHERE a.part_count > 0;  -- Only process chapters that have part summaries

In [None]:
CREATE OR REPLACE TASK DAILY_PDF_URL_REFRESH
    USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
    SCHEDULE = 'USING CRON 0 6 * * * UTC'  -- Daily at 6 AM UTC
    COMMENT = 'Refresh presigned URLs that expire within 24 hours'
AS
    update world_history_rag
    set pdf_url = GET_PRESIGNED_URL(@WORLD_HISTORY.PUBLIC.PDF_DOCUMENTS, source_filename, 604800);

alter task daily_pdf_url_refresh resume;

# Create Cortex Search Service

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE WORLD_HISTORY_RAG_SEARCH
    ON TEXT_CONTENT
    ATTRIBUTES 
        CHAPTER_NUMBER,
        PAGE_NUMBER,
        SOURCE_FILENAME,
        CONTENT_TYPE
    WAREHOUSE = WAREHOUSE_XL_G2
    TARGET_LAG = '30 days'
    EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
    AS (
        SELECT 
            wh.TEXT_CONTENT,
            wh.CHAPTER_NUMBER,
            wh.PAGE_NUMBER,
            wh.PART_IDENTIFIER,
            wh.CONTENT_TYPE,
            wh.SOURCE_FILENAME,
            wh.PDF_URL,
            wh.PAGE_ID,
            wh.ENHANCED_CITATION
        FROM WORLD_HISTORY_RAG wh
        WHERE wh.TEXT_CONTENT IS NOT NULL
          AND LENGTH(TRIM(wh.TEXT_CONTENT)) > 50
    );

In [None]:
CREATE OR REPLACE VIEW MULTIHOP_SEARCH_RESULTS AS
WITH search_base AS (
    -- All searchable content with page IDs
    SELECT 
        page_id,
        wh.CHAPTER_NUMBER,
        wh.PAGE_NUMBER,
        wh.CONTENT_TYPE,
        'Chapter ' || wh.CHAPTER_NUMBER || COALESCE(', Page ' || wh.PAGE_NUMBER, '') || ' (' || wh.CONTENT_TYPE || ')' as citation,
        LEFT(wh.TEXT_CONTENT, 500) as content_preview,
        wh.PDF_URL
    FROM WORLD_HISTORY_RAG wh
    WHERE wh.TEXT_CONTENT IS NOT NULL
)
SELECT 
    sb.*,
    COALESCE(connected.related_pages, ARRAY_CONSTRUCT()) as connected_pages,
    COALESCE(connected.connection_count, 0) as connection_count
FROM search_base sb
LEFT JOIN (
    SELECT 
        e.SRC_PAGE_ID,
        ARRAY_AGG(OBJECT_CONSTRUCT(
            'page_id', e.DST_PAGE_ID,
            'citation', 'Chapter ' || e.DST_CHAPTER_NUMBER || ', Page ' || e.DST_PAGE_NUMBER,
            'context', e.REFERENCE_CONTEXT,
            'confidence', e.CONFIDENCE_SCORE
        )) as related_pages,
        COUNT(*) as connection_count
    FROM DOCUMENT_EDGES e
    WHERE e.CONFIDENCE_SCORE > 0.5
    GROUP BY e.SRC_PAGE_ID
) connected ON sb.page_id = connected.SRC_PAGE_ID;

-- Function to be used as a tool in the ; only takes a single unique page id
CREATE OR REPLACE FUNCTION MULTIHOP_SEARCH_RESULTS_FN(page_id_param STRING)
RETURNS array
LANGUAGE SQL
AS
$$
    SELECT 
    ARRAY_AGG(OBJECT_CONSTRUCT(*))
        FROM multihop_search_results
        WHERE page_id = page_id_param
$$;

In [None]:
CREATE OR REPLACE FUNCTION FIND_CONNECTED_PAGES(starting_page_id STRING, max_hops INT)
RETURNS array
LANGUAGE SQL
AS
$$
    WITH RECURSIVE page_traversal AS (
        -- Starting page (hop 0)
        SELECT 
            page_id as dest_page_id,
            chapter_number as dest_chapter_number,
            page_number as dest_page_number,
            enhanced_citation,
            ARRAY_CONSTRUCT('Starting page: ' || enhanced_citation) as connection_path,
            0 as hop_count
        FROM DOCUMENT_PAGES 
        WHERE page_id = starting_page_id

        UNION ALL

        -- Connected pages (hop 1+)
        SELECT 
            COALESCE(e.DST_PAGE_ID, 'MISSING_CH' || e.DST_CHAPTER_NUMBER || '_P' || LPAD(e.DST_PAGE_NUMBER, 4, '0')) as dest_page_id,
            e.DST_CHAPTER_NUMBER as dest_chapter_number,
            e.DST_PAGE_NUMBER as dest_page_number,
            COALESCE(dp.ENHANCED_CITATION, 'Referenced: Chapter ' || e.DST_CHAPTER_NUMBER || ', Page ' || e.DST_PAGE_NUMBER) as enhanced_citation,
            ARRAY_APPEND(pt.connection_path, e.REFERENCE_EXPLANATION || ' (' || e.REFERENCE_CONTEXT || ')') as connection_path,
            pt.hop_count + 1
        FROM page_traversal pt
        JOIN DOCUMENT_EDGES e ON pt.dest_page_id = e.SRC_PAGE_ID
        LEFT JOIN DOCUMENT_PAGES dp ON e.DST_PAGE_ID = dp.PAGE_ID
        WHERE pt.hop_count < max_hops
          AND e.CONFIDENCE_SCORE > 0.5
    )
    SELECT 
        ARRAY_AGG(OBJECT_CONSTRUCT(*))
    FROM page_traversal 
    WHERE hop_count > 0  -- Exclude starting page
    ORDER BY hop_count, dest_chapter_number, dest_page_number
$$;