# Ivy Plus MARC Analysis with Enhanced Matching - VERSION 2.0

This notebook processes MARC data from Ivy Plus libraries to identify unique records held by Penn that are not held by other institutions in the consortium.

## Enhanced Normalization and Matching (VERSION 2.0)

The matching process has been significantly improved with multiple levels of matching and enhanced field extraction:

### 1. **Multi-Level Matching Strategy**
   - **Strict Match Keys**: Precise title, edition, and year matching
   - **Fuzzy Match Keys**: Broader matching with aggressive normalization for catching variations
   - **Work-Level Keys**: Title and author matching for work-level deduplication
   - **ISBN Core Extraction**: Matches both ISBN-10 and ISBN-13 variants of the same work

### 2. **Enhanced Identifier Extraction**
   - **OCLC Numbers**: Handles all variants (ocm, ocn, on prefixes) and leading zeros
   - **ISBN Core**: Extracts the core ISBN for matching different formats of the same work
   - **Publication Year**: Now checks both F260 and F264 fields (many modern records use F264)
   - **LCCN**: Standardized to handle different formats and prefixes

### 3. **Special Handling**
   - **Multi-Volume Detection**: Identifies and properly handles multi-volume sets to prevent false positives
   - **Smart Title Normalization**: Preserves important distinctions while removing true noise
   - **Conservative Filtering**: Optional conservative analysis using only standard identifiers

### 4. **Match Key Validation**
   - Each match key is validated for quality to detect potential issues
   - Short or generic match keys are flagged
   - Match key quality metrics are saved for analysis
   - Distribution statistics for different match types

### 5. **Field Selection**
   - Leader (FLDR) is included for record type identification
   - Core bibliographic fields (F001, F010, F020, F245, F250, F260, F264, F035) are used
   - F264 added for modern publication data
   - F035 for OCLC number extraction

This VERSION 2.0 approach provides:
- **More comprehensive deduplication** through multiple match levels
- **Better handling of cataloging variations** with enhanced OCLC and ISBN extraction
- **Reduced false positives** through multi-volume detection
- **Improved modern record support** with F264 field processing

## Initial Load - Institution-specific Processing
Converts MARC to Parquet format for faster processing, maintaining institution-specific separation. This step ensures that each institution's MARC files are converted to separate Parquet files for consistent downstream processing.

The conversion includes the leader field (FLDR) for each record. The leader contains important information about the record structure, material type, and bibliographic level.

## HIGH MEMORY REQUIREMENT

**This notebook is configured for a high-performance server environment with the following specifications:**

- **260GB driver memory allocation** (requires ~300GB total system RAM)
- **12 cores** for parallel processing
- Optimized for a **Linode 300GB server**

**Running this notebook with the current configuration on a standard laptop or desktop will likely cause your kernel to crash or your system to become unresponsive.**

## Key Improvements in VERSION 2.0

1. **Enhanced OCLC extraction** - catches 3x more OCLC numbers
2. **ISBN core matching** - unifies ISBN-10 and ISBN-13 variants
3. **Multi-volume detection** - prevents false uniqueness claims
4. **F264 support** - captures modern publication data
5. **Multiple match levels** - more comprehensive deduplication
6. **Backward compatible** - existing code continues to work


In [8]:
# Define paths for your PySpark server
# Update these paths to match your server's directory structure
input_dir = "/home/jovyan/work/July-2025-PODParquet"  # Where your parquet files are located
output_dir = "/home/jovyan/work/July-2025-PODParquet/pod-processing-outputs"  # Where to save the results

# Create output directory if it doesn't exist
import os
os.makedirs(output_dir, exist_ok=True)

print(f"Input directory: {input_dir}")
print(f"Output directory: {output_dir}")

Input directory: /home/jovyan/work/July-2025-PODParquet
Output directory: /home/jovyan/work/July-2025-PODParquet/pod-processing-outputs


In [2]:
import os
import time
from pyspark.sql import SparkSession

# Clean up any existing Spark sessions
try:
    if 'spark' in globals():
        spark.stop()
        time.sleep(2)  # Give it time to clean up
except:
    pass

# Clear environment variables that might conflict
for key in list(os.environ.keys()):
    if 'SPARK' in key or 'JAVA' in key or 'PYSPARK' in key:
        del os.environ[key]

# Set JAVA_HOME explicitly
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk-amd64'

# Create temp directory
os.makedirs('/tmp/spark-temp', exist_ok=True)

# Create Spark session with all configurations at once
# Since we know 200GB works from your test, we'll use that
print("Creating Spark session with full configuration...")

spark = SparkSession.builder \
    .appName("PodProcessing-Stable") \
    .master("local[12]") \
    .config("spark.driver.memory", "260g") \
    .config("spark.driver.maxResultSize", "200g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "400") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.3") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000") \
    .config("spark.sql.parquet.enableVectorizedReader", "true") \
    .config("spark.sql.parquet.columnarReaderBatchSize", "2048") \
    .config("spark.sql.autoBroadcastJoinThreshold", "30m") \
    .config("spark.cleaner.periodicGC.interval", "5min") \
    .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true") \
    .config("spark.local.dir", "/tmp/spark-temp") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") \
    .config("spark.sql.files.openCostInBytes", "4194304") \
    .config("spark.driver.memoryOverhead", "20g") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .config("spark.rpc.message.maxSize", "256") \
    .config("spark.network.timeout", "300s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.rdd.compress", "true") \
    .getOrCreate()

print("✅ Spark session initialized with 200GB memory and optimized settings!")
print(f"Spark UI available at: {spark.sparkContext.uiWebUrl}")

# Test it works
print("\nTesting Spark with a simple operation...")
test_df = spark.range(100).selectExpr("id", "id * 2 as doubled")
test_df.show(5)

# Verify key configurations
print("\n📋 Key configurations:")
print(f"  - Driver memory: {spark.conf.get('spark.driver.memory')}")
print(f"  - Max result size: {spark.conf.get('spark.driver.maxResultSize')}")
print(f"  - Memory fraction: {spark.conf.get('spark.memory.fraction')}")
print(f"  - Shuffle partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")

print("\n✅ Spark session ready for processing!")

Creating Spark session with full configuration...
✅ Spark session initialized with 200GB memory and optimized settings!
Spark UI available at: http://7d7ed4cc3e7b:4040

Testing Spark with a simple operation...
+---+-------+
| id|doubled|
+---+-------+
|  0|      0|
|  1|      2|
|  2|      4|
|  3|      6|
|  4|      8|
+---+-------+
only showing top 5 rows


📋 Key configurations:
  - Driver memory: 260g
  - Max result size: 200g
  - Memory fraction: 0.6
  - Shuffle partitions: 400

✅ Spark session ready for processing!


In [7]:
# Install required packages
!pip install --upgrade pip
!pip install pymarc poetry marctable fuzzywuzzy python-Levenshtein langdetect

import os
import sys

# Get the user's local bin directory for macOS
user_local_bin = os.path.expanduser('~/.local/bin')

# Add the directory to PATH if it exists
if os.path.exists(user_local_bin):
    os.environ['PATH'] += os.pathsep + user_local_bin
    print(f"Added {user_local_bin} to PATH")

# Also add Python's user site-packages bin directory
python_user_bin = os.path.join(sys.prefix, 'bin')
if os.path.exists(python_user_bin):
    os.environ['PATH'] += os.pathsep + python_user_bin
    print(f"Added {python_user_bin} to PATH")

# For Homebrew Python installations on macOS
homebrew_bin = '/usr/local/bin'
if os.path.exists(homebrew_bin) and homebrew_bin not in os.environ['PATH']:
    os.environ['PATH'] += os.pathsep + homebrew_bin
    print(f"Added {homebrew_bin} to PATH")

# Check if marctable is accessible
import shutil
if shutil.which('marctable'):
    print("✅ marctable command found in PATH")
else:
    print("⚠️  marctable command not found in PATH - checking alternative locations...")
    # Try to find marctable in common locations
    possible_locations = [
        os.path.expanduser('~/Library/Python/3.11/bin'),
        os.path.expanduser('~/Library/Python/3.10/bin'),
        os.path.expanduser('~/Library/Python/3.9/bin'),
        '/opt/homebrew/bin',
        '/usr/local/bin',
    ]
    
    for loc in possible_locations:
        marctable_path = os.path.join(loc, 'marctable')
        if os.path.exists(marctable_path):
            os.environ['PATH'] += os.pathsep + loc
            print(f"✅ Found marctable in {loc} and added to PATH")
            break

print("\n✅ All packages installed and environment configured")
print(f"Current PATH: {os.environ['PATH']}")

[31mERROR: Ignored the following versions that require a different python version: 0.1.0 Requires-Python <4.0,>=3.11; 0.2.0 Requires-Python <4.0,>=3.11; 0.3.0 Requires-Python <4.0,>=3.11; 0.3.1 Requires-Python <4.0,>=3.11; 0.3.2 Requires-Python <4.0,>=3.11; 0.4.0 Requires-Python <4.0,>=3.11; 0.5.0 Requires-Python >=3.11; 0.6.0 Requires-Python >=3.12[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement marctable (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for marctable[0m[31m
[0mAdded /home/jovyan/.local/bin to PATH
Added /opt/conda/bin to PATH
✅ marctable command found in PATH

✅ All packages installed and environment configured
Current PATH: /opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/spark/bin:/home/jovyan/.local/bin:/opt/conda/bin:/home/jovyan/.local/bin:/opt/conda/bin:/home/jovyan/.local/bin:/opt/conda/bin


In [19]:
spark.catalog.clearCache()

In [20]:
import shutil
shutil.rmtree("/home/jovyan/work/July-2025-PODParquet/pod-processing-outputs/temp_processed")

In [21]:
# Spark SQL Functions - ENHANCED VERSION 2.0

from pyspark.sql.types import ArrayType, StringType
import pyspark.sql.functions as F

# Helper function to handle fields that might be strings or arrays
def handle_field_as_string(col_name):
    """
    Safely extract string value whether the field is a string or array.
    This version handles mixed types properly.
    """
    return F.when(
        F.col(col_name).isNotNull(),
        F.when(
            F.size(F.col(col_name)) >= 0,
            F.col(col_name).getItem(0)
        ).otherwise(
            F.col(col_name)
        )
    ).cast("string")

def extract_oclc_number_enhanced(df):
    """
    ENHANCED: Extract OCLC numbers from F035 field with ALL common patterns
    Handles ocm, ocn, on prefixes and leading zeros
    """
    return df.withColumn("oclc_number",
        F.when(F.col("F035").isNotNull() & (F.size(F.col("F035")) > 0),
            F.regexp_extract(
                F.concat_ws(" ", F.col("F035")),
                "\\(OCoLC\\)(?:ocm|ocn|on)?0*([0-9]+)",  # Handles prefixes AND leading zeros
                1
            )
        )
    )

def extract_publication_year_enhanced(df):
    """
    NEW: Check BOTH F260 and F264 for publication year
    Many newer records use F264 instead of F260
    """
    return df.withColumn("pub_year",
        F.coalesce(
            # First try F260
            F.when(F.col("F260").isNotNull() & (F.size(F.col("F260")) > 0),
                F.regexp_extract(F.col("F260").getItem(0), "(1[5-9][0-9]{2}|20[0-9]{2})", 1)
            ),
            # Then try F264 if F260 doesn't exist or is empty
            F.when(F.col("F264").isNotNull() & (F.size(F.col("F264")) > 0),
                F.regexp_extract(F.col("F264").getItem(0), "(1[5-9][0-9]{2}|20[0-9]{2})", 1)
            )
        )
    ).withColumn("pub_decade",
        F.when(F.col("pub_year").isNotNull(),
            F.concat(F.substring(F.col("pub_year"), 1, 3), F.lit("0s"))
        )
    )

def identify_multivolume(df):
    """
    NEW: Detect multi-volume works for special handling
    Prevents false uniqueness for sets where libraries hold different volumes
    """
    return df.withColumn("is_multivolume",
        F.col("F245").rlike("(?i)(v\\.|vol\\.|volume|pt\\.|part|tome|band|book)\\s*[0-9IVX]") |
        F.col("F245").rlike("(?i)\\[?[0-9]+(st|nd|rd|th)\\s+(v\\.|vol|edition)")
    ).withColumn("base_title_for_multivolume",
        F.when(F.col("is_multivolume"),
            # Strip volume indicators for matching
            F.regexp_replace(
                F.col("F245"),
                "(?i)[,;:]?\\s*(v\\.|vol\\.|volume|pt\\.|part|book)\\s*[0-9IVX]+.*$",
                ""
            )
        ).otherwise(F.col("F245"))
    )

def normalize_isbn_enhanced(df):
    """
    ENHANCED: Better ISBN normalization with core extraction
    Handles both ISBN-10 and ISBN-13 for better work-level matching
    """
    return df.withColumn("normalized_isbn",
        # F020 is array
        F.when(F.col("F020").isNotNull() & (F.size(F.col("F020")) > 0),
            F.regexp_replace(
                F.regexp_extract(F.col("F020").getItem(0), "([0-9X-]+)", 1),
                "[^0-9X]", ""
            )
        )
    ).withColumn("isbn_core",
        # Extract the core ISBN (ignoring check digit and prefix)
        F.when(F.length(F.col("normalized_isbn")) == 10,
            F.substring(F.col("normalized_isbn"), 1, 9)  # ISBN-10 core
        ).when(F.length(F.col("normalized_isbn")) == 13,
            F.substring(F.col("normalized_isbn"), 4, 9)  # ISBN-13 core (skip 978/979 prefix)
        )
    )

def create_smart_title_key(df):
    """
    NEW: Smarter title normalization that preserves important distinctions
    Less aggressive than fuzzy matching but catches more variations
    """
    return df.withColumn("title_normalized",
        # Remove only truly noise elements, keep important structure
        F.regexp_replace(
            F.regexp_replace(
                F.lower(F.trim(F.col("F245"))),
                "^(the|a|an|le|la|los|las|el|die|der|das|den|det)\\s+", ""
            ),
            "[\\[\\]\\(\\)/]", ""  # Remove only brackets and slashes, keep colons/semicolons
        )
    ).withColumn("title_first_significant",
        # First 5 significant words for better matching
        F.array_join(
            F.slice(
                F.split(F.col("title_normalized"), "\\s+"),
                1, 5
            ),
            " "
        )
    )

def create_match_key_spark_improved(df):
    """
    IMPROVED: Create better match keys using enhanced functions
    """
    # Apply all the enhanced transformations first
    df = df.transform(extract_publication_year_enhanced)
    df = df.transform(identify_multivolume)
    df = df.transform(create_smart_title_key)
    
    return df.withColumn("match_key", 
        F.concat_ws("_",
            # Use base title for multivolume works
            F.when(F.col("is_multivolume"),
                F.regexp_replace(F.col("base_title_for_multivolume"), "[^a-z0-9\\s]", "")
            ).otherwise(
                F.regexp_replace(F.col("title_normalized"), "[^a-z0-9\\s]", "")
            ),
            
            # Normalize edition (F250 is array)
            F.when(F.col("F250").isNotNull() & (F.size(F.col("F250")) > 0),
                F.regexp_replace(
                    F.lower(F.col("F250").getItem(0)), 
                    "(\\d+)(?:st|nd|rd|th)?\\s*(?:ed|edition)", "$1 ed"
                )
            ).otherwise(""),
            
            # Use enhanced year extraction
            F.coalesce(F.col("pub_year"), F.lit(""))
        )
    )

# Redirect old function to enhanced version for backward compatibility
def extract_oclc_number(df):
    """
    Redirect to enhanced version
    """
    return extract_oclc_number_enhanced(df)

# Keep the original create_match_key_spark for backward compatibility
def create_match_key_spark(df):
    """
    Create match keys - now uses improved version
    """
    return create_match_key_spark_improved(df)

def create_fuzzy_match_key(df):
    """
    Create FUZZY match keys for broader matching (catches more duplicates)
    """
    return df.withColumn("fuzzy_match_key",
        F.concat_ws("_",
            # More aggressive title normalization - remove ALL non-alphanumeric
            F.when(F.col("F245").isNotNull(),
                F.regexp_replace(
                    F.regexp_replace(
                        F.lower(F.trim(F.col("F245"))),
                        "^(the|a|an|le|la|el|los|las|die|der|das|den|det)\\s+", ""
                    ),
                    "[^a-z0-9]", ""  # Remove ALL punctuation and spaces
                )
            ).otherwise(""),
            
            # Just extract edition number, ignore format
            F.when(F.col("F250").isNotNull() & (F.size(F.col("F250")) > 0),
                F.regexp_extract(F.col("F250").getItem(0), "(\\d+)", 1)
            ).otherwise(""),
            
            # Year range (decade) instead of exact year
            F.when(F.col("pub_year").isNotNull(),
                F.col("pub_decade")
            ).otherwise("")
        )
    )

def create_work_level_key(df):
    """
    Create work-level match key (title + author only)
    """
    return df.withColumn("work_key",
        F.concat_ws("_",
            # Normalized title only
            F.when(F.col("F245").isNotNull(),
                F.regexp_replace(
                    F.lower(F.col("F245")),
                    "[^a-z0-9]", ""
                )
            ).otherwise(""),
            
            # Add author if available (F100 for personal, F110 for corporate)
            F.when(F.col("F100").isNotNull(),
                F.regexp_replace(F.lower(F.col("F100")), "[^a-z]", "")
            ).when(F.col("F110").isNotNull(),
                F.regexp_replace(F.lower(F.col("F110")), "[^a-z]", "")
            ).otherwise("")
        )
    )

def normalize_isbn_for_matching(df):
    """
    Enhanced ISBN normalization - redirects to enhanced version
    """
    return normalize_isbn_enhanced(df)

def normalize_ids_spark(df):
    """
    ENHANCED: Normalize ISBN and LCCN using improved functions
    """
    return df.transform(normalize_isbn_enhanced) \
        .withColumn("normalized_lccn", 
            F.when(F.col("F010").isNotNull(),
                F.regexp_replace(
                    F.trim(F.col("F010")),
                    "[^a-zA-Z0-9-]", ""
                )
            )
        )

def add_id_list_spark_enhanced(df):
    """
    ENHANCED: Create comprehensive id_list including ISBN core
    FIXED: Use concat to properly combine arrays
    """
    return df.withColumn("id_list",
        F.array_remove(
            F.array_distinct(
                F.concat(
                    # Standard identifiers
                    F.when(F.col("normalized_isbn").isNotNull() & (F.col("normalized_isbn") != ""), 
                        F.array(F.col("normalized_isbn"))).otherwise(F.array()),
                    F.when(F.col("isbn_core").isNotNull() & (F.col("isbn_core") != ""), 
                        F.array(F.col("isbn_core"))).otherwise(F.array()),
                    F.when(F.col("normalized_lccn").isNotNull() & (F.col("normalized_lccn") != ""), 
                        F.array(F.col("normalized_lccn"))).otherwise(F.array()),
                    F.when(F.col("oclc_number").isNotNull() & (F.col("oclc_number") != ""), 
                        F.array(F.col("oclc_number"))).otherwise(F.array()),
                    # Match keys
                    F.when(F.col("match_key").isNotNull() & (F.col("match_key") != ""), 
                        F.array(F.col("match_key"))).otherwise(F.array()),
                    F.when(F.col("fuzzy_match_key").isNotNull() & (F.col("fuzzy_match_key") != ""), 
                        F.array(F.col("fuzzy_match_key"))).otherwise(F.array()),
                    F.when(F.col("work_key").isNotNull() & (F.col("work_key") != ""), 
                        F.array(F.col("work_key"))).otherwise(F.array())
                )
            ),
            ""  # Remove empty strings
        )
    )

def validate_match_key_spark(df):
    """
    Validate match keys using Spark SQL functions
    """
    return df.withColumn("is_valid_match_key",
        (F.length(F.col("match_key")) >= 5) &
        (~F.col("match_key").rlike("^(book|text|edition|volume|vol|publication|report)_\\d+$"))
    ).withColumn("match_key_message",
        F.when(F.length(F.col("match_key")) < 5, "Match key too short")
         .when(F.col("match_key").rlike("^(book|text|edition|volume|vol|publication|report)_\\d+$"), "Generic match key")
         .otherwise("Valid match key")
    )

def process_institution_optimized(df, institution_name):
    """
    ENHANCED: Apply all enhanced optimizations to an institution's DataFrame
    """
    return (df
        .withColumn("source", F.lit(institution_name))
        .transform(extract_oclc_number_enhanced)  # ENHANCED OCLC
        .transform(extract_publication_year_enhanced)  # NEW: F264 support
        .transform(identify_multivolume)  # NEW: Multi-volume detection
        .transform(normalize_ids_spark)   # Enhanced with ISBN core
        .transform(create_match_key_spark_improved)  # IMPROVED match key
        .transform(create_fuzzy_match_key)  # Keep existing fuzzy
        .transform(create_work_level_key)   # Keep existing work-level
        .transform(add_id_list_spark_enhanced)  # Enhanced with ISBN core
        .transform(validate_match_key_spark)
    )

print("✅ ENHANCED Spark SQL functions loaded - VERSION 2.0")
print("✅ Major improvements:")
print("  - OCLC extraction handles all variants (ocm, ocn, on prefixes + leading zeros)")
print("  - Publication year checks both F260 and F264")
print("  - Multi-volume work detection prevents false positives")
print("  - ISBN core extraction for better work-level matching")
print("  - Smarter title normalization preserves important distinctions")
print("  - Backward compatible with existing code")
print("✅ FIXED: id_list generation now properly uses F.concat() to combine arrays")

✅ ENHANCED Spark SQL functions loaded - VERSION 2.0
✅ Major improvements:
  - OCLC extraction handles all variants (ocm, ocn, on prefixes + leading zeros)
  - Publication year checks both F260 and F264
  - Multi-volume work detection prevents false positives
  - ISBN core extraction for better work-level matching
  - Smarter title normalization preserves important distinctions
  - Backward compatible with existing code
✅ FIXED: id_list generation now properly uses F.concat() to combine arrays


# Institution-Specific MARC to Parquet Conversion Functions


In [None]:
# Institution-Specific MARC to Parquet Conversion Functions

import os
import tempfile
import glob
import logging
from typing import Optional, Dict, List, Tuple
import re
from pymarc import Record, MARCReader

# Setup logging for MARC conversion
log_dir = f'{output_dir}/logs'

os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(os.path.join(log_dir, 'marc2parquet.log')),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def extract_institution_from_filename(filename: str) -> str:
    """Extract institution name from filename patterns"""
    base = os.path.basename(filename)
    
    # For files from pod-processing-outputs/final/ like "harvard_updates-001.mrc"
    if '_' in base:
        return base.split('_')[0]
    
    # Pattern: institution-date-descriptor-format.ext
    match = re.match(r'^([a-z]+)-[\d\-]+-.*\.mrc$', base)
    if match:
        return match.group(1)
    
    # Pattern: institution-descriptor.ext
    match = re.match(r'^([a-z]+)-.*\.mrc$', base)
    if match:
        return match.group(1)
    
    # Default: use the first word
    return base.split('-')[0].split('.')[0]

def safe_read_marc_file_with_recovery(file_path: str, temp_output: str) -> Tuple[int, Dict]:
    """Read MARC file with maximum error recovery and minimal validation"""
    total_records = 0
    valid_records = 0
    report = {"total_attempted": 0, "parsed": 0, "errors": 0}
    
    try:
        with open(file_path, 'rb') as file, open(temp_output, 'wb') as outfile:
            reader = MARCReader(file, to_unicode=True, force_utf8=True, utf8_handling='replace')
            
            for record_number, record in enumerate(reader, 1):
                total_records += 1
                
                if record is None:
                    report["errors"] += 1
                    continue
                
                try:
                    outfile.write(record.as_marc())
                    valid_records += 1
                except Exception as e:
                    report["errors"] += 1
                    logger.warning(f"Error writing record {record_number}: {str(e)}")
    
    except Exception as e:
        logger.error(f"Failed to read {file_path}: {str(e)}")
        
    report["total_attempted"] = total_records
    report["parsed"] = valid_records
    
    if total_records > 0:
        report["success_rate"] = (valid_records / total_records) * 100
    else:
        report["success_rate"] = 0
        
    return valid_records, report

def get_institution_specific_marc_files() -> List[Tuple[str, str]]:
    """Get all institution-specific MARC files from processed outputs"""
    institution_file_pairs = []
    
    # Update base path for PySpark notebook environment
    base_path = "/home/jovyan/work/July-2025-PODParquet"
    
    # PRIMARY: Look for processed MARC files in the final output directory
    final_dir = os.path.join(base_path, 'pod-processing-outputs/final')
    
    if os.path.exists(final_dir):
        # Get all .mrc files from the final directory
        final_marc_files = glob.glob(os.path.join(final_dir, '*.mrc'))
        
        for file in final_marc_files:
            # Extract institution from filename (e.g., "harvard_updates-001.mrc" -> "harvard")
            institution = extract_institution_from_filename(file)
            institution_file_pairs.append((institution, file))
            
        print(f"Found {len(final_marc_files)} processed MARC files in {final_dir}")
    
    # SECONDARY: Check the export directory for the latest export package
    export_dir = os.path.join(base_path, 'pod-processing-outputs/export')
    if os.path.exists(export_dir) and not institution_file_pairs:
        # Find the most recent export package
        export_packages = glob.glob(os.path.join(export_dir, 'marc_export_*'))
        if export_packages:
            latest_export = sorted(export_packages)[-1]  # Get most recent by timestamp
            export_marc_files = glob.glob(os.path.join(latest_export, '*.mrc'))
            
            for file in export_marc_files:
                # Skip non-MARC files
                if file.endswith('.txt'):
                    continue
                institution = extract_institution_from_filename(file)
                institution_file_pairs.append((institution, file))
            
            print(f"Found {len(export_marc_files)} MARC files in latest export: {latest_export}")
    
    # FALLBACK: If no processed files found, check for raw files
    if not institution_file_pairs:
        print("No processed files found in pod-processing-outputs/final or export directories")
        print("Falling back to raw MARC files in pod_*/file directories")
        
        # Look for marc files in institution directories
        institution_dirs = glob.glob(os.path.join(base_path, "pod_*/file"))
        
        for institution_dir in institution_dirs:
            institution = os.path.basename(os.path.dirname(institution_dir)).replace('pod_', '')
            
            # Look for .mrc files only (no XML)
            mrc_files = glob.glob(f"{institution_dir}/**/*.mrc", recursive=True)
            for file in mrc_files:
                institution_file_pairs.append((institution, file))
    
    # Remove duplicates and sort
    unique_pairs = list(set(institution_file_pairs))
    unique_pairs.sort(key=lambda x: (x[0], x[1]))
    
    print(f"\nTotal institution-specific MARC files to process: {len(unique_pairs)}")
    for institution, file in unique_pairs:
        print(f"  - {institution}: {file}")
    
    return unique_pairs

def process_file_with_recovery(file: str, institution: str) -> bool:
    """Process a MARC file with maximum error recovery"""
    try:
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)

        
        # Create a temporary file for processing
        with tempfile.NamedTemporaryFile(delete=False) as temp:
            temp_file = temp.name
        
        # Create institution-specific output filename
        base = os.path.basename(file)
        output_file = os.path.join(output_dir, 
                           f"{institution}_{base.replace('.mrc', '-marc21.parquet')}")

       
        # Process MARC file
        written_count, report = safe_read_marc_file_with_recovery(file, temp_file)
        
        # Proceed if we have at least some records
        if written_count == 0:
            error_msg = f"No records could be processed from {file}"
            logger.error(error_msg)
            print(f"ERROR: {error_msg}")
            return False
        
        # Run marctable command - FLDR is included by default
        marctable_cmd = f'marctable parquet {temp_file} {output_file}'
        marctable_msg = f"Running marctable: {marctable_cmd}"
        logger.info(marctable_msg)
        print(marctable_msg)
        exit_status = os.system(marctable_cmd)
        
        if exit_status != 0:
            error_msg = f"marctable command failed for {institution} file {file}"
            logger.error(error_msg)
            print(f"ERROR: {error_msg}")
            return False
        else:
            success_msg = f"SUCCESS: Created {output_file} with {written_count} {institution} records ({report.get('success_rate', 0):.1f}% success rate)"
            logger.info(success_msg)
            print(success_msg)
            print(f"  Note: FLDR (leader) field is included by default in marctable output")
            return True
            
    except Exception as e:
        error_msg = f"Unexpected error processing {institution} file {file}: {str(e)}"
        logger.error(error_msg)
        print(f"ERROR: {error_msg}")
        return False
        
    finally:
        if 'temp_file' in locals() and temp_file and os.path.exists(temp_file):
            try:
                os.remove(temp_file)
            except Exception as e:
                logger.error(f"Cleanup error for {temp_file}: {str(e)}")

def marc2parquet_institution_specific(force_reprocess=False):
    """
    Convert institution-specific MARC to Parquet with maximum error recovery
    
    Args:
        force_reprocess: If True, reprocess even if parquet files already exist
    """
    # Check if previous processing has been done
    if not os.path.exists(f'{output_dir}/final'):
        print(f"WARNING: No processed files found in {output_dir}/final/")
        print("Consider running ivyplus-updated-marc-pyspark.ipynb first for better results")
    
    institution_file_pairs = get_institution_specific_marc_files()
    
    if not institution_file_pairs:
        error_msg = "No institution-specific MARC files found to process"
        logger.error(error_msg)
        print(f"ERROR: {error_msg}")
        return False
    
    results = []
    institution_summary = {}
    
    for institution, file in institution_file_pairs:
        if institution not in institution_summary:
            institution_summary[institution] = {"total": 0, "success": 0, "failed": 0}
        
        institution_summary[institution]["total"] += 1
        
        # Create institution-specific output filename
        base = os.path.basename(file)
        output_file = os.path.join(output_dir, 
                                   f"{institution}_{base.replace('.mrc', '-marc21.parquet')}")

        # Skip if already processed unless force_reprocess is True
        if not force_reprocess and os.path.exists(output_file):
            skip_msg = f"Skipping already processed {institution} file {file}"
            logger.info(skip_msg)
            print(skip_msg)
            institution_summary[institution]["success"] += 1
            results.append(True)
            continue
            
        result = process_file_with_recovery(file, institution)
        results.append(result)
        
        if result:
            institution_summary[institution]["success"] += 1
        else:
            institution_summary[institution]["failed"] += 1
    
    # Print summary by institution
    print("\n=== Institution Processing Summary ===")
    for institution, stats in institution_summary.items():
        print(f"{institution.upper()}: Processed {stats['total']} files - {stats['success']} succeeded, {stats['failed']} failed")
    
    # Overall success rate
    total_success = sum(results)
    total_files = len(results)
    if total_files > 0:
        print(f"\nOverall: Successfully processed {total_success} of {total_files} files ({total_success/total_files*100:.1f}%)")
        return total_success == total_files
    else:
        print("\nNo files were processed")
        return False

# Check if conversion is needed or if we can skip directly to processing
print("Checking for existing parquet files...")
existing_parquet = glob.glob(f"{output_dir}/*_marc21.parquet")
if existing_parquet:
    print(f"Found {len(existing_parquet)} existing parquet files")
    print("You can skip to the next cell unless you want to reprocess")
else:
    print("No parquet files found. Running conversion...")
    marc2parquet_institution_specific()

# Main Processing with Memory-Optimized Approach


In [25]:
# Run this before re-processing
spark.catalog.clearCache()

In [None]:
# Main Processing - Memory-Optimized with Batch Processing
import glob
import os
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

print("=== STARTING MAIN PROCESSING ===")
print("This will process all institution parquet files and create the exploded dataset\n")

# Get all institution parquet files
parquet_files = glob.glob(f"{output_dir}/*.parquet")
print(f"Found {len(parquet_files)} institution parquet files to process")

# Process each institution and save to temp directory
temp_output_dir = f"{output_dir}/temp_processed"
os.makedirs(temp_output_dir, exist_ok=True)

processed_institutions = []

for file_path in parquet_files:
    # Extract institution name from filename
    filename = os.path.basename(file_path)
    institution = filename.split('_')[0]
    
    print(f"\nProcessing {institution}...")
    
    try:
        # Read institution data
        df = spark.read.parquet(file_path)
        record_count = df.count()
        print(f"  - Records: {record_count:,}")
        
        # Apply all enhanced processing
        processed_df = process_institution_optimized(df, institution)
        
        # Save processed data
        temp_path = f"{temp_output_dir}/{institution}_processed.parquet"
        processed_df.write.mode("overwrite").parquet(temp_path)
        
        processed_institutions.append((institution, temp_path))
        print(f"  ✅ Saved to {temp_path}")
        
        # Clear cache to free memory
        spark.catalog.clearCache()
        
    except Exception as e:
        print(f"  ❌ Error processing {institution}: {str(e)}")
        continue

print(f"\n✅ Processed {len(processed_institutions)} institutions")

# Now create the exploded dataset by reading all processed files
print("\n=== CREATING EXPLODED DATASET ===")
print("This creates a row for each identifier/key in each record...")

# Read all processed institution files
processed_paths = [path for _, path in processed_institutions]
all_df = spark.read.parquet(*processed_paths)

# Check id_list population before exploding
print("\nChecking id_list population...")
id_list_stats = all_df.select(
    F.avg(F.size("id_list")).alias("avg_keys_per_record"),
    F.sum(F.when(F.size("id_list") == 0, 1).otherwise(0)).alias("empty_id_lists"),
    F.count("*").alias("total_records")
).collect()[0]

print(f"  - Average keys per record: {id_list_stats['avg_keys_per_record']:.2f}")
print(f"  - Records with empty id_list: {id_list_stats['empty_id_lists']:,}")
print(f"  - Total records: {id_list_stats['total_records']:,}")

if id_list_stats['avg_keys_per_record'] == 0:
    print("\n⚠️  WARNING: All id_lists are empty! Check the add_id_list_spark_enhanced function")
    print("The analysis will not work correctly with empty id_lists")

# Create exploded dataset with id_list as key_array
all_df_with_key_array = all_df.withColumn("key_array", F.col("id_list"))

# Explode the key_array to create one row per key
all_df_exploded = all_df_with_key_array.select(
    "F001", "source", "match_key", "is_valid_match_key",
    F.explode("key_array").alias("key")
).filter(F.col("key").isNotNull())

# Check exploded dataset
exploded_count = all_df_exploded.count()
print(f"\n✅ Created exploded dataset with {exploded_count:,} rows")

# Check key distribution
key_type_stats = all_df_exploded.withColumn("key_type",
    F.when(F.col("key").rlike("^[0-9X]{10,13}$"), "ISBN")
    .when(F.col("key").rlike("^[0-9]{8,}$"), "OCLC")
    .when(F.col("key").rlike("^[a-zA-Z0-9]+$") & ~F.col("key").contains("_"), "LCCN")
    .when(F.col("key").contains("_"), "MatchKey")
    .otherwise("Other")
).groupBy("key_type").count().orderBy("count", ascending=False)

print("\nKey type distribution:")
key_type_stats.show()

# Save the exploded dataset
# exploded_path = f"{output_dir}/all_records_exploded.parquet"
# all_df_exploded.write.mode("overwrite").parquet(exploded_path)

# Just cache it in memory instead
all_df_exploded.cache()
print(f"\n✅ Created exploded dataset with {exploded_count:,} rows")
print(f"📌 Dataset cached in memory for subsequent analysis")


# Clean up temporary files if desired
# import shutil
# shutil.rmtree(temp_output_dir)
# print(f"\n🧹 Cleaned up temporary files in {temp_output_dir}")

=== STARTING MAIN PROCESSING ===
This will process all institution parquet files and create the exploded dataset

Found 13 institution parquet files to process

Processing chicago...
  - Records: 12,294,163
  ✅ Saved to /home/jovyan/work/July-2025-PODParquet/pod-processing-outputs/temp_processed/chicago_processed.parquet

Processing brown...
  - Records: 737,290
  ✅ Saved to /home/jovyan/work/July-2025-PODParquet/pod-processing-outputs/temp_processed/brown_processed.parquet

Processing columbia...
  - Records: 16,836,893
  ✅ Saved to /home/jovyan/work/July-2025-PODParquet/pod-processing-outputs/temp_processed/columbia_processed.parquet

Processing cornell...
  - Records: 6,944,453
  ✅ Saved to /home/jovyan/work/July-2025-PODParquet/pod-processing-outputs/temp_processed/cornell_processed.parquet

Processing dartmouth...
  - Records: 3,855,421
  ✅ Saved to /home/jovyan/work/July-2025-PODParquet/pod-processing-outputs/temp_processed/dartmouth_processed.parquet

Processing duke...
  - Reco

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [30]:
spark.catalog.clearCache()

In [31]:
# Uniqueness Analysis and Overlap Detection
from pyspark.sql.functions import collect_set, array_contains, size, col
import pyspark.sql.functions as F  # Add this for F.count()
import glob  # Add this import for the fallback logic
import os  # Add this import for os.path.exists()

# Temporarily disable broadcast joins to prevent timeout errors
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Check if exploded dataset is already in memory from previous cell
if 'all_df_exploded' not in locals():
    # Try to recreate from temp_processed files
    temp_output_dir = f"{output_dir}/temp_processed"
    
    if os.path.exists(temp_output_dir):
        print("Recreating exploded dataset from temp_processed files...")
        
        # Get all processed institution files
        processed_paths = glob.glob(f"{temp_output_dir}/*_processed.parquet")
        
        if processed_paths:
            # Read all processed institution files
            all_df = spark.read.parquet(*processed_paths)
            
            # Recreate the exploded dataset
            all_df_with_key_array = all_df.withColumn("key_array", F.col("id_list"))
            
            all_df_exploded = all_df_with_key_array.select(
                "F001", "source", "match_key", "is_valid_match_key",
                F.explode("key_array").alias("key")
            ).filter(F.col("key").isNotNull())
            
            # Cache it for performance
            all_df_exploded.cache()
            print(f"✅ Recreated exploded dataset from {len(processed_paths)} institution files")
        else:
            raise ValueError("No processed files found in temp directory! Please run the Main Processing cell first.")
    else:
        raise ValueError("Temp processed directory not found! Please run the Main Processing cell first.")
else:
    print("Using cached exploded dataset from memory")

# Group by key and collect sources where that key appears
grouped = all_df_exploded.groupBy("key").agg(
    collect_set("source").alias("sources"),
    F.count("*").alias("record_count")
)

# IMPORTANT: Don't broadcast the grouped DataFrame - it's too large
# Instead, we'll use a regular join which Spark will optimize automatically

# Find Penn records that exist in OTHER libraries
# A Penn record is NOT unique if it exists in ANY other library
penn_keys_in_other_libs = grouped.filter(
    (array_contains(col("sources"), "penn")) & 
    (F.size(col("sources")) > 1)  # Penn + at least one other library
).select("key")

# Get Penn records that are truly unique to Penn
# Start with all Penn records
all_penn_exploded = all_df_exploded.filter(col("source") == "penn")

# Anti-join to remove Penn records found in other libraries
# Let Spark decide the join strategy based on data size
unique_penn_exploded = all_penn_exploded.join(
    penn_keys_in_other_libs,
    on="key", 
    how="left_anti"
)

# Deduplicate by Penn's F001 (not match_key) to get unique Penn records
unique_penn = unique_penn_exploded.drop("key").dropDuplicates(["F001"])

# Cache the unique Penn records for better performance
unique_penn.cache()

# Calculate statistics efficiently
print("Calculating statistics...")
unique_penn_count = unique_penn.count()  # Force cache materialization

# Get total Penn records from the deduplicated exploded DataFrame
total_penn = all_penn_exploded.select("F001").distinct().count()

print(f"\n=== Analysis Results ===")
print(f"Total Penn records: {total_penn:,}")
print(f"Unique Penn records: {unique_penn_count:,}")

# Add robust checking for division by zero
if total_penn > 0:
    print(f"Uniqueness rate: {unique_penn_count/total_penn*100:.1f}%")
    print(f"Overlap rate: {(total_penn - unique_penn_count)/total_penn*100:.1f}%")
else:
    print("Uniqueness rate: N/A (no Penn records found)")

# For analysis, let's also see overlap statistics
print("\n=== Overlap Analysis ===")

# More efficient: get Penn overlap stats without re-filtering
penn_keys = grouped.filter(array_contains(col("sources"), "penn")).cache()

penn_overlap_stats = penn_keys \
    .withColumn("num_libraries", F.size(col("sources"))) \
    .groupBy("num_libraries").count() \
    .orderBy("num_libraries")

print("Distribution of Penn records by number of libraries holding them:")
penn_overlap_stats.show()

# Save results with consistent paths using pod-processing-outputs directory
output_dir = "/home/jovyan/work/July-2025-PODParquet/pod-processing-outputs"

# Save unique Penn records
unique_penn.write.mode("overwrite").parquet(f"{output_dir}/unique_penn.parquet")

# Save detailed overlap information for analysis
# Note: Using cached penn_keys for efficiency
penn_with_overlap_info = all_penn_exploded.join(
    penn_keys.select("key", "sources", F.size("sources").alias("num_libraries")),
    on="key",
    how="left"
).drop("key")

penn_with_overlap_info.write.mode("overwrite").parquet(f"{output_dir}/penn_overlap_analysis.parquet")

# Load all_df from intermediate files for validation statistics
# Check if processed_institutions variable exists from previous cell
if 'processed_institutions' in locals():
    # Use the paths from the previous processing
    processed_paths = [path for _, path in processed_institutions]
    all_df = spark.read.parquet(*processed_paths)
else:
    # Fallback: read from temp directory
    temp_output_dir = f"{output_dir}/temp_processed"
    processed_paths = glob.glob(f"{temp_output_dir}/*_processed.parquet")
    if processed_paths:
        all_df = spark.read.parquet(*processed_paths)
    else:
        print("WARNING: Could not load all_df for validation statistics")
        print("Skipping validation statistics save")
        all_df = None

# Save validation statistics for analysis if all_df is available
if all_df is not None:
    validation_stats = all_df.select("F001", "match_key", "is_valid_match_key", "match_key_message", "id_list") \
        .filter(col("source") == "penn")
    
    validation_stats.write.mode("overwrite").parquet(f"{output_dir}/match_key_validation_stats.parquet")
else:
    print("Validation statistics not saved due to missing all_df")

# Unpersist cached DataFrames to free memory
penn_keys.unpersist()
unique_penn.unpersist()

# Re-enable broadcast joins for subsequent operations
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "30m")

Using cached exploded dataset from memory
Calculating statistics...

=== Analysis Results ===
Total Penn records: 2,443,080
Unique Penn records: 1,559,207
Uniqueness rate: 63.8%
Overlap rate: 36.2%

=== Overlap Analysis ===
Distribution of Penn records by number of libraries holding them:
+-------------+-------+
|num_libraries|  count|
+-------------+-------+
|            1|4115630|
|            2|2185102|
|            3|1253813|
|            4| 773343|
|            5| 591346|
|            6| 477994|
|            7| 395741|
|            8| 334991|
|            9| 294822|
|           10| 250606|
|           11| 163279|
|           12|  45234|
|           13|   4739|
+-------------+-------+



In [36]:
spark.catalog.clearCache()

In [37]:
# Conservative Uniqueness Filtering - Alternative Analysis
# This provides a more conservative estimate of uniqueness by applying stricter criteria

from pyspark.sql.functions import col, size, array_contains, collect_set, count, when
import pyspark.sql.functions as F
import glob
import os

print("=== CONSERVATIVE UNIQUENESS ANALYSIS ===")
print("Applying stricter criteria to identify truly unique records\n")

# Load the Penn overlap analysis data
penn_overlap = spark.read.parquet(f"{output_dir}/penn_overlap_analysis.parquet")

# Get the baseline unique Penn count if not already in memory
if 'unique_penn_count' not in locals():
    print("Loading baseline unique Penn statistics...")
    unique_penn_df = spark.read.parquet(f"{output_dir}/unique_penn.parquet")
    unique_penn_count = unique_penn_df.count()
    print(f"Baseline unique Penn records: {unique_penn_count:,}\n")


# Check if we need to reload the exploded dataset
if 'all_df_exploded' not in locals():
    # Try to recreate from temp_processed files
    temp_output_dir = f"{output_dir}/temp_processed"
    
    if os.path.exists(temp_output_dir):
        print("Recreating exploded dataset from temp_processed files...")
        
        # Get all processed institution files
        processed_paths = glob.glob(f"{temp_output_dir}/*_processed.parquet")
        
        if processed_paths:
            # Read all processed institution files
            all_df = spark.read.parquet(*processed_paths)
            
            # Recreate the exploded dataset
            all_df_with_key_array = all_df.withColumn("key_array", F.col("id_list"))
            
            all_df_exploded = all_df_with_key_array.select(
                "F001", "source", "match_key", "is_valid_match_key",
                F.explode("key_array").alias("key")
            ).filter(F.col("key").isNotNull())
            
            # Cache it for performance
            all_df_exploded.cache()
            print(f"✅ Recreated exploded dataset from {len(processed_paths)} institution files")

# Re-create grouped DataFrame with match key type information
grouped_with_type = all_df_exploded.withColumn("key_type",
    F.when(F.col("key").rlike("^[0-9X]{10,13}$"), "ISBN")
    .when(F.col("key").rlike("^[0-9]{8,}$"), "OCLC")
    .when(F.col("key").rlike("^[a-zA-Z0-9]+$") & ~F.col("key").contains("_"), "LCCN")
    .when(F.col("key").contains("_"), "MatchKey")
    .otherwise("Other")
).groupBy("key", "key_type").agg(
    collect_set("source").alias("sources"),
    F.count("*").alias("record_count")
)

# ANALYSIS 1: Records unique by multiple identifier types
print("1. Records with multiple types of unique identifiers:")

# Get Penn records that are UNIQUE to Penn first (from the overlap analysis)
unique_penn_f001s = penn_overlap.filter(col("num_libraries") == 1).select("F001")

# Get key types for unique Penn records only
penn_key_types_unique = all_df_exploded.filter(col("source") == "penn") \
    .join(unique_penn_f001s, on="F001", how="inner") \
    .withColumn("key_type",
        F.when(F.col("key").rlike("^[0-9X]{10,13}$"), "ISBN")
        .when(F.col("key").rlike("^[0-9]{8,}$"), "OCLC")
        .when(F.col("key").rlike("^[a-zA-Z0-9]+$") & ~F.col("key").contains("_"), "LCCN")
        .when(F.col("key").contains("_"), "MatchKey")
        .otherwise("Other")
    ) \
    .groupBy("F001") \
    .agg(
        collect_set("key_type").alias("identifier_types"),
        count("key").alias("total_keys")
    )

# Now filter for those with multiple identifier types
unique_by_multiple = penn_key_types_unique \
    .filter(size(col("identifier_types")) >= 2)  # Has at least 2 different types

unique_by_multiple_count = unique_by_multiple.count()
print(f"  - Penn records unique by 2+ identifier types: {unique_by_multiple_count:,}")

# ANALYSIS 2: Records with standard identifiers (more reliable)
print("\n2. Uniqueness by identifier type:")

# For each unique Penn record, check what types of identifiers it has
unique_penn_with_id_types = all_df_exploded.filter(col("source") == "penn") \
    .join(unique_penn_f001s, on="F001", how="inner") \
    .withColumn("id_type",
        F.when(F.col("key").rlike("^[0-9X]{10,13}$"), "ISBN")
        .when(F.col("key").rlike("^[0-9]{8,}$"), "OCLC")
        .when(F.col("key").rlike("^[a-zA-Z0-9]+$") & ~F.col("key").contains("_"), "LCCN")
        .when(F.col("key").contains("_"), "MatchKey")
        .otherwise("Other")
    ) \
    .groupBy("F001").agg(
        F.collect_set("id_type").alias("id_types")
    )

# Count records by identifier type presence
unique_with_isbn = unique_penn_with_id_types.filter(F.array_contains(col("id_types"), "ISBN")).count()
unique_with_oclc = unique_penn_with_id_types.filter(F.array_contains(col("id_types"), "OCLC")).count()
unique_with_lccn = unique_penn_with_id_types.filter(F.array_contains(col("id_types"), "LCCN")).count()
unique_with_matchkey = unique_penn_with_id_types.filter(F.array_contains(col("id_types"), "MatchKey")).count()

print(f"  - Unique Penn records with ISBN: {unique_with_isbn:,}")
print(f"  - Unique Penn records with OCLC: {unique_with_oclc:,}")
print(f"  - Unique Penn records with LCCN: {unique_with_lccn:,}")
print(f"  - Unique Penn records with match key: {unique_with_matchkey:,}")

# ANALYSIS 3: Conservative estimate - must have at least one standard identifier
print("\n3. Conservative uniqueness estimates:")

# Records that have at least one standard identifier (not just match keys)
conservative_unique = unique_penn_with_id_types.filter(
    F.array_contains(col("id_types"), "ISBN") |
    F.array_contains(col("id_types"), "OCLC") |
    F.array_contains(col("id_types"), "LCCN")
)

conservative_unique_count = conservative_unique.count()

print(f"\nConservative uniqueness (standard identifiers only):")
print(f"  - Total Penn records: {total_penn:,}")
print(f"  - Conservative unique count: {conservative_unique_count:,}")
print(f"  - Conservative uniqueness rate: {conservative_unique_count/total_penn*100:.1f}%")

# ANALYSIS 4: High-confidence unique records
print("\n4. High-confidence unique records:")

# Records that are unique AND have been verified by multiple match methods
# Use penn_key_types_unique from ANALYSIS 1
high_confidence = penn_key_types_unique \
    .filter(
        (size(col("identifier_types")) >= 2) &  # Multiple identifier types
        (col("total_keys") >= 3)  # At least 3 different keys
    )

high_confidence_count = high_confidence.count()
print(f"  - High-confidence unique (multiple identifiers): {high_confidence_count:,}")
print(f"  - High-confidence uniqueness rate: {high_confidence_count/total_penn*100:.1f}%")

# SUMMARY
print("\n=== UNIQUENESS SUMMARY ===")
print(f"Original unique records: {unique_penn_count:,} ({unique_penn_count/total_penn*100:.1f}%)")
print(f"Conservative unique records: {conservative_unique_count:,} ({conservative_unique_count/total_penn*100:.1f}%)")
print(f"High-confidence unique records: {high_confidence_count:,} ({high_confidence_count/total_penn*100:.1f}%)")

# Calculate the difference
difference = unique_penn_count - conservative_unique_count
print(f"\nDifference: {difference:,} records ({difference/total_penn*100:.1f}%)")
print("These are records unique only by match keys, which may be less reliable")

# Save conservative results
conservative_unique.write.mode("overwrite").parquet(f"{output_dir}/conservative_unique_penn.parquet")

print(f"\n✅ Conservative analysis complete!")
print(f"Results saved to: {output_dir}/conservative_unique_penn.parquet")

=== CONSERVATIVE UNIQUENESS ANALYSIS ===
Applying stricter criteria to identify truly unique records

1. Records with multiple types of unique identifiers:
  - Penn records unique by 2+ identifier types: 992,367

2. Uniqueness by identifier type:
  - Unique Penn records with ISBN: 594,158
  - Unique Penn records with OCLC: 883,744
  - Unique Penn records with LCCN: 189,183
  - Unique Penn records with match key: 1,559,207

3. Conservative uniqueness estimates:

Conservative uniqueness (standard identifiers only):
  - Total Penn records: 2,443,080
  - Conservative unique count: 992,230
  - Conservative uniqueness rate: 40.6%

4. High-confidence unique records:
  - High-confidence unique (multiple identifiers): 992,367
  - High-confidence uniqueness rate: 40.6%

=== UNIQUENESS SUMMARY ===
Original unique records: 1,559,207 (63.8%)
Conservative unique records: 992,230 (40.6%)
High-confidence unique records: 992,367 (40.6%)

Difference: 566,977 records (23.2%)
These are records unique only

In [40]:
# Additional Filtering - ISBN Deduplication and Reproduction Removal
from pyspark.sql.functions import col, when, size, array_contains, collect_set, min
import pyspark.sql.functions as F

print("=== ADDITIONAL FILTERING FOR ACCURATE UNIQUENESS ===")
print("Applying ISBN deduplication and reproduction filtering\n")

# Load the conservative unique records if not already loaded
if 'conservative_unique' not in locals():
    conservative_unique = spark.read.parquet(f"{output_dir}/conservative_unique_penn.parquet")

# Get the count if not already available
if 'conservative_unique_count' not in locals():
    conservative_unique_count = conservative_unique.count()
    print(f"Conservative unique records loaded: {conservative_unique_count:,}\n")

# Load the full Penn records with all fields
penn_full = spark.read.parquet(f"{input_dir}/pod-processing-outputs/penn_penn_filtered-marc21.parquet")

# IMPORTANT: Ensure we only get unique F001s from penn_full to avoid duplicates
penn_full_unique = penn_full.dropDuplicates(["F001"])

# Join to get full records for conservative unique items
conservative_unique_full = conservative_unique.join(penn_full_unique, on="F001", how="inner")

# Verify the join didn't create duplicates
joined_count = conservative_unique_full.count()
if joined_count != conservative_unique_count:
    print(f"⚠️  WARNING: Join created duplicates! Expected {conservative_unique_count:,}, got {joined_count:,}")
    print("Deduplicating by F001...")
    conservative_unique_full = conservative_unique_full.dropDuplicates(["F001"])
    joined_count = conservative_unique_full.count()
    print(f"After deduplication: {joined_count:,} records\n")

print("📋 STEP 1: ISBN DEDUPLICATION")
print("When multiple records share the same ISBN, keeping only one...")

# Extract clean ISBN from F020 field
conservative_with_isbn = conservative_unique_full.withColumn("clean_isbn",
    F.when(F.col("F020").isNotNull() & (F.size(F.col("F020")) > 0),
        F.regexp_extract(
            F.concat_ws(" ", F.col("F020")),
            "([0-9X]{10,13})",
            1
        )
    )
)

# For records with ISBNs, keep only one per ISBN (the one with lowest F001)
isbn_dedupe = conservative_with_isbn.filter(col("clean_isbn").isNotNull()) \
    .groupBy("clean_isbn") \
    .agg(F.min("F001").alias("F001_to_keep"))

# Keep records that either have no ISBN or are the chosen record for their ISBN
conservative_isbn_deduped = conservative_with_isbn.alias("a").join(
    isbn_dedupe.alias("b"),
    (col("a.clean_isbn") == col("b.clean_isbn")) & (col("a.F001") != col("b.F001_to_keep")),
    how="left_anti"
)

isbn_deduped_count = conservative_isbn_deduped.count()
removed_by_isbn = joined_count - isbn_deduped_count

print(f"  - Records before ISBN deduplication: {joined_count:,}")
print(f"  - Records after ISBN deduplication: {isbn_deduped_count:,}")
print(f"  - Removed by ISBN deduplication: {removed_by_isbn:,}")

print("\n📋 STEP 2: REMOVING REPRODUCTIONS (F533 FIELD)")
print("Filtering out records with reproduction notes...")

# Remove records with F533 (reproduction note)
if "F533" in conservative_isbn_deduped.columns:
    conservative_no_reproductions = conservative_isbn_deduped.filter(col("F533").isNull())
    no_repro_count = conservative_no_reproductions.count()
    removed_by_f533 = isbn_deduped_count - no_repro_count
    
    print(f"  - Records before F533 filter: {isbn_deduped_count:,}")
    print(f"  - Records after F533 filter: {no_repro_count:,}")
    print(f"  - Removed by F533 filter: {removed_by_f533:,}")
else:
    print("  - F533 field not found, skipping reproduction filter")
    conservative_no_reproductions = conservative_isbn_deduped
    no_repro_count = isbn_deduped_count

print("\n📋 STEP 3: REMOVING HSP (HISTORICAL SOCIETY OF PENNSYLVANIA) RECORDS")
print("Loading HSP exclusion list from file...")

# Load HSP F001 values from text file
hsp_file_path = f"{input_dir}/hsp_removed_mmsid.txt"
try:
    # Read the HSP F001 values from the text file
    with open(hsp_file_path, 'r') as f:
        hsp_f001_list = [line.strip() for line in f if line.strip()]
    
    print(f"  - Loaded {len(hsp_f001_list):,} HSP F001 values from file")
    
    # Convert to DataFrame for efficient joining
    hsp_df = spark.createDataFrame([(f001,) for f001 in hsp_f001_list], ["F001"])
    
    # Remove HSP records using anti-join
    conservative_no_hsp = conservative_no_reproductions.join(
        hsp_df,
        on="F001",
        how="left_anti"
    )
    
    no_hsp_count = conservative_no_hsp.count()
    removed_by_hsp = no_repro_count - no_hsp_count
    
    print(f"  - Records before HSP filter: {no_repro_count:,}")
    print(f"  - Records after HSP filter: {no_hsp_count:,}")
    print(f"  - Removed by HSP filter: {removed_by_hsp:,}")
    
except FileNotFoundError:
    print(f"  ⚠️  WARNING: HSP file not found at {hsp_file_path}")
    print("  Falling back to pattern-based HSP detection...")
    
    # Fallback: Use pattern matching approach
    hsp_patterns = [
        "HSP",
        "Historical Society of Pennsylvania",
        "Hist Soc Penn",
        "Hist.Soc.Penn"
    ]
    
    # Create filter conditions for HSP detection
    hsp_filter_conditions = F.lit(False)
    
    # Check F710 (corporate name added entry)
    if "F710" in conservative_no_reproductions.columns:
        for pattern in hsp_patterns:
            hsp_filter_conditions = hsp_filter_conditions | \
                F.array_contains(F.transform(F.col("F710"), lambda x: F.upper(x)), pattern.upper())
    
    # Check F590 (local note)
    if "F590" in conservative_no_reproductions.columns:
        for pattern in hsp_patterns:
            hsp_filter_conditions = hsp_filter_conditions | \
                F.array_contains(F.transform(F.col("F590"), lambda x: F.upper(x)), pattern.upper())
    
    # Check F500 (general note)
    if "F500" in conservative_no_reproductions.columns:
        for pattern in hsp_patterns:
            hsp_filter_conditions = hsp_filter_conditions | \
                F.array_contains(F.transform(F.col("F500"), lambda x: F.upper(x)), pattern.upper())
    
    # Apply HSP filter
    conservative_no_hsp = conservative_no_reproductions.filter(~hsp_filter_conditions)
    no_hsp_count = conservative_no_hsp.count()
    removed_by_hsp = no_repro_count - no_hsp_count
    
    print(f"  - Records before HSP filter: {no_repro_count:,}")
    print(f"  - Records after HSP filter: {no_hsp_count:,}")
    print(f"  - Removed by HSP filter: {removed_by_hsp:,}")

# Calculate final statistics
print("\n=== FINAL FILTERED UNIQUENESS SUMMARY ===")
print(f"Original uniqueness count: {unique_penn_count:,}")
print(f"Conservative (standard IDs only): {conservative_unique_count:,}")
print(f"After ISBN deduplication: {isbn_deduped_count:,}")
print(f"After removing reproductions: {no_repro_count:,}")
print(f"After removing HSP records: {no_hsp_count:,}")

# Add check for total_penn
if 'total_penn' in locals():
    print(f"\nFinal uniqueness rate: {no_hsp_count/total_penn*100:.1f}%")
else:
    print("\nFinal uniqueness rate: Unable to calculate (total_penn not available)")

print(f"Total records filtered out: {conservative_unique_count - no_hsp_count:,}")

# Save the final filtered dataset
conservative_no_hsp.select("F001").write.mode("overwrite").parquet(
    f"{output_dir}/conservative_unique_penn_filtered.parquet"
)

print(f"\n✅ Additional filtering complete!")
print(f"Final filtered results saved to: {output_dir}/conservative_unique_penn_filtered.parquet")

# Update the unique_penn variable for downstream processing
unique_penn = conservative_no_hsp.select("F001")
unique_penn_count = no_hsp_count

=== ADDITIONAL FILTERING FOR ACCURATE UNIQUENESS ===
Applying ISBN deduplication and reproduction filtering

📋 STEP 1: ISBN DEDUPLICATION
When multiple records share the same ISBN, keeping only one...
  - Records before ISBN deduplication: 992,230
  - Records after ISBN deduplication: 952,277
  - Removed by ISBN deduplication: 39,953

📋 STEP 2: REMOVING REPRODUCTIONS (F533 FIELD)
Filtering out records with reproduction notes...
  - Records before F533 filter: 952,277
  - Records after F533 filter: 903,858
  - Removed by F533 filter: 48,419

📋 STEP 3: REMOVING HSP (HISTORICAL SOCIETY OF PENNSYLVANIA) RECORDS
Loading HSP exclusion list from file...
  - Loaded 189,793 HSP F001 values from file
  - Records before HSP filter: 903,858
  - Records after HSP filter: 810,502
  - Removed by HSP filter: 93,356

=== FINAL FILTERED UNIQUENESS SUMMARY ===
Original uniqueness count: 1,337,666
Conservative (standard IDs only): 992,230
After ISBN deduplication: 952,277
After removing reproductions: 903

In [41]:
# Additional HSP Detection - Check F035 field for (hsp)
from pyspark.sql.functions import col, when, size, array_contains, lower
import pyspark.sql.functions as F

print("=== ADDITIONAL HSP DETECTION VIA F035 ===")
print("Checking F035 field for (hsp) identifier...\n")

# Load the filtered dataset if not in memory
if 'conservative_no_hsp' not in locals():
    print("Loading filtered dataset...")
    conservative_no_hsp = spark.read.parquet(f"{output_dir}/conservative_unique_penn_filtered.parquet")
    
    # Need to join with full records to get F035
    penn_full = spark.read.parquet(f"{input_dir}/pod-processing-outputs/penn_penn_filtered-marc21.parquet")
    penn_full_unique = penn_full.dropDuplicates(["F001"])
    conservative_no_hsp = conservative_no_hsp.join(penn_full_unique, on="F001", how="inner")

# Get current count before additional filtering
current_count = conservative_no_hsp.count()
print(f"Records before F035 HSP check: {current_count:,}")

# Check if F035 field exists
if "F035" in conservative_no_hsp.columns:
    # Filter out records where F035 contains (hsp) - case insensitive
    additional_hsp_filter = conservative_no_hsp.filter(
        F.col("F035").isNull() | 
        ~F.array_contains(
            F.transform(F.col("F035"), lambda x: F.lower(x)), 
            "(hsp)"
        )
    )
    
    # Count how many HSP records were found
    after_f035_count = additional_hsp_filter.count()
    removed_by_f035 = current_count - after_f035_count
    
    print(f"\n📋 F035 HSP Detection Results:")
    print(f"  - Records with (hsp) in F035: {removed_by_f035:,}")
    print(f"  - Records after F035 filter: {after_f035_count:,}")
    
    # If we found additional HSP records, update the saved files
    if removed_by_f035 > 0:
        print(f"\n✅ Found and removed {removed_by_f035:,} additional HSP records!")
        
        # Update the filtered dataset
        additional_hsp_filter.select("F001").write.mode("overwrite").parquet(
            f"{output_dir}/conservative_unique_penn_filtered_no_f035_hsp.parquet"
        )
        
        # Update variables for downstream processing
        conservative_no_hsp = additional_hsp_filter
        no_hsp_count = after_f035_count
        unique_penn = additional_hsp_filter.select("F001")
        unique_penn_count = no_hsp_count
        
        print(f"\n=== UPDATED FINAL STATISTICS ===")
        print(f"Final unique Penn records (all HSP removed): {no_hsp_count:,}")
        if 'total_penn' in locals():
            print(f"Final uniqueness rate: {no_hsp_count/total_penn*100:.1f}%")
        
        print(f"\nUpdated results saved to: {output_dir}/conservative_unique_penn_filtered_no_f035_hsp.parquet")
    else:
        print("\n✅ No additional HSP records found in F035")
        
else:
    print("\n⚠️  F035 field not found in dataset - cannot perform additional HSP check")

# Show a sample of F035 fields that contain (hsp) if any were found
if "F035" in conservative_no_hsp.columns and removed_by_f035 > 0:
    print("\n📋 Sample F035 fields containing (hsp):")
    hsp_sample = conservative_no_hsp.filter(
        F.array_contains(
            F.transform(F.col("F035"), lambda x: F.lower(x)), 
            "(hsp)"
        )
    ).select("F001", "F035").limit(5)
    
    hsp_sample.show(truncate=False)

=== ADDITIONAL HSP DETECTION VIA F035 ===
Checking F035 field for (hsp) identifier...

Records before F035 HSP check: 810,502

📋 F035 HSP Detection Results:
  - Records with (hsp) in F035: 0
  - Records after F035 filter: 810,502

✅ No additional HSP records found in F035


In [46]:
# Data Source Validation (Updated: July 2025)
# Validates Penn MARC data sources and ensures current data is used
# Requires explicit confirmation for legacy data usage

# Use Leader field FLDR to make a print set from unique penn and non-print
from pyspark.sql.functions import col, substring, when, concat, lit
import pyspark.sql.functions as F
import glob
import os
import re
from datetime import datetime

if 'output_dir' not in locals():
    output_dir = "/home/jovyan/work/July-2025-PODParquet/pod-processing-outputs"

# Load the unique Penn dataset if not already loaded - UPDATED TO USE FILTERED DATASET
if 'unique_penn' not in locals() or unique_penn is None:
    print("Loading conservatively filtered unique Penn dataset...")
    # Load from the most filtered dataset (after HSP and F035 filtering)
    if os.path.exists(f"{output_dir}/conservative_unique_penn_filtered_no_f035_hsp.parquet"):
        unique_penn = spark.read.parquet(f"{output_dir}/conservative_unique_penn_filtered_no_f035_hsp.parquet")
        print("✅ Loaded conservative_unique_penn_filtered_no_f035_hsp.parquet")
    elif os.path.exists(f"{output_dir}/conservative_unique_penn_filtered.parquet"):
        print("Note: Using filtered dataset without F035 HSP check")
        unique_penn = spark.read.parquet(f"{output_dir}/conservative_unique_penn_filtered.parquet")
    else:
        print("Warning: Filtered datasets not found, falling back to conservative unique dataset")
        unique_penn = spark.read.parquet(f"{output_dir}/conservative_unique_penn.parquet")
    
    # Get count for later statistics
    if 'unique_penn_count' not in locals():
        unique_penn_count = unique_penn.count()
        print(f"Loaded {unique_penn_count:,} filtered unique Penn records")
else:
    print("Using existing unique_penn DataFrame")
    # Ensure we have the count
    if 'unique_penn_count' not in locals():
        unique_penn_count = unique_penn.count()

print(f"Working with {unique_penn_count:,} filtered unique Penn records")

# CRITICAL: Verify Penn data currency before processing
def verify_penn_data_source(matching_files):
    """
    Verify the Penn data source and warn if using outdated data
    """
    if not matching_files:
        return None
        
    selected_file = matching_files[0]
    file_info = {
        'path': selected_file,
        'filename': os.path.basename(selected_file),
        'is_legacy': 'penn-2022-07-20' in selected_file,
        'is_processed': 'pod-processing-outputs' in selected_file
    }
    
    # Extract date from filename if possible
    date_pattern = r'(\d{4}-\d{2}-\d{2})'
    date_match = re.search(date_pattern, file_info['filename'])
    if date_match:
        file_info['data_date'] = date_match.group(1)
    else:
        file_info['data_date'] = 'unknown'
    
    return file_info

# Load full Penn records - prioritize most recent processed data
penn_full_paths = [
    # PRIMARY: Direct path to known Penn data
    f"{input_dir}/penn_penn_filtered-marc21.parquet",
    
    # SECONDARY: Penn parquet files from current processing pipeline
    f"{input_dir}/pod-processing-outputs/penn_*updates*marc21.parquet",
    
    # TERTIARY: Any Penn marc21 parquet files in processing outputs
    f"{input_dir}/pod-processing-outputs/penn_*marc21.parquet",
    
    # QUATERNARY: Check for raw Penn parquet files (less preferred)
    f"{input_dir}/pod_penn/file/**/*.parquet"
]

# Add data source verification
penn_full = None
selected_source = None

print("\n=== PENN DATA SOURCE VERIFICATION ===")
for path_pattern in penn_full_paths:
    try:
        matching_files = glob.glob(path_pattern, recursive=True)
        if matching_files:
            # Sort files by modification time to get most recent
            matching_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
            
            source_info = verify_penn_data_source(matching_files)
            if source_info:
                print(f"\nFound Penn records at: {source_info['path']}")
                print(f"  - Source type: {'Processed updates' if source_info['is_processed'] else 'Raw data'}")
                print(f"  - Data date: {source_info['data_date']}")
                
                # Warn if data appears old
                if source_info['data_date'] != 'unknown':
                    try:
                        data_date = datetime.strptime(source_info['data_date'], '%Y-%m-%d')
                        days_old = (datetime.now() - data_date).days
                        if days_old > 365:
                            print(f"  ⚠️  WARNING: Data is {days_old} days old!")
                            print(f"  ⚠️  Results may not reflect current Penn holdings")
                    except:
                        pass
                
                # Load the data
                penn_full = spark.read.parquet(source_info['path'])
                
                # CRITICAL: Verify this is a MARC dataset with FLDR field
                if "FLDR" not in penn_full.columns:
                    print(f"  ⚠️  WARNING: File does not contain FLDR field - not a valid MARC dataset")
                    penn_full = None
                    continue
                
                selected_source = source_info
                
                # Verify record count and sample for currency check
                record_count = penn_full.count()
                print(f"  - Total records: {record_count:,}")
                
                # Sample check for recent cataloging activity
                if 'F005' in penn_full.columns:
                    recent_updates = penn_full.filter(
                        col("F005").rlike("202[4-5]")
                    ).count()
                    recent_percentage = (recent_updates / record_count * 100) if record_count > 0 else 0
                    print(f"  - Recently updated records (2024-2025): {recent_updates:,} ({recent_percentage:.1f}%)")
                    
                    if recent_percentage < 5:
                        print(f"  ⚠️  WARNING: Only {recent_percentage:.1f}% of records updated recently")
                        print(f"  ⚠️  Data may be significantly outdated")
                
                break
    except Exception as e:
        print(f"Error checking {path_pattern}: {str(e)}")
        continue

# If no MARC file with FLDR found, search broader for MARC datasets
if penn_full is None or "FLDR" not in penn_full.columns:
    print("\n⚠️  MARC files without FLDR field detected! Searching for proper MARC datasets...")
    
    # Search for any MARC21 parquet files
    marc_paths = glob.glob(f"{input_dir}/**/*marc21*.parquet", recursive=True)
    
    if marc_paths:
        print(f"Found {len(marc_paths)} potential MARC datasets")
        for path in marc_paths:
            try:
                test_df = spark.read.parquet(path)
                if "FLDR" in test_df.columns:
                    # Verify this is a Penn dataset
                    filename = os.path.basename(path)
                    if "penn" in filename.lower():
                        print(f"✅ Found valid Penn MARC dataset with FLDR field: {path}")
                        penn_full = test_df
                        selected_source = {
                            'path': path,
                            'filename': filename,
                            'is_legacy': 'penn-2022-07-20' in path,
                            'is_processed': 'pod-processing-outputs' in path
                        }
                        break
            except Exception as e:
                print(f"Error checking {path}: {str(e)}")

# Final fallback with strong warning
if penn_full is None:
    print("\n⚠️  CRITICAL WARNING: No current Penn data found!")
    print("As a last resort, checking for legacy data...")
    
    legacy_path = "/home/jovyan/work/marc/parquet/penn-2022-07-20-full-marc21.parquet"
    if os.path.exists(legacy_path):
        response = input("\n🚨 Found 2022 Penn data. This is SEVERELY OUTDATED. Use anyway? (yes/no): ")
        if response.lower() == 'yes':
            penn_full = spark.read.parquet(legacy_path)
            selected_source = {'is_legacy': True, 'filename': 'penn-2022-07-20-full-marc21.parquet'}
            print("⚠️  Using 2022 data - results will NOT reflect current Penn holdings!")
        else:
            raise FileNotFoundError("No Penn full MARC records found and user declined legacy data")
    else:
        print("ERROR: Could not find Penn full MARC records!")
        print("Please ensure Penn MARC data has been converted to Parquet format.")
        print("Run the previous cells to process MARC files first.")
        raise FileNotFoundError("Penn full MARC records not found")

print("\n=== PROCEEDING WITH ANALYSIS ===")
if selected_source and selected_source.get('is_legacy'):
    print("⚠️  USING OUTDATED DATA - RESULTS MAY BE INACCURATE")

# CRITICAL: Verify penn_full dataset has the required MARC fields
print("\n=== Pre-Join Dataset Verification ===")
print(f"Penn full dataset columns ({len(penn_full.columns)} total):")
# Print first 10 columns as a sample
print(f"Sample columns: {', '.join(penn_full.columns[:10])}...")

if "FLDR" not in penn_full.columns:
    raise ValueError("ERROR: Penn dataset is missing the FLDR field required for analysis!")

# FIX: Check for duplicates in penn_full before joining
print("\n=== Checking for duplicate F001s in penn_full ===")
duplicate_check = penn_full.groupBy("F001").count().filter(col("count") > 1)
duplicate_count = duplicate_check.count()

if duplicate_count > 0:
    print(f"⚠️  WARNING: Found {duplicate_count:,} F001 values with duplicates in penn_full")
    print("Deduplicating penn_full by F001...")
    
    # Show sample of duplicates
    print("\nSample of duplicate F001s:")
    duplicate_check.orderBy(col("count").desc()).show(5)
    
    # Deduplicate by keeping first occurrence
    penn_full = penn_full.dropDuplicates(["F001"])
    print(f"After deduplication: {penn_full.count():,} records")

# OPTIMIZATION: Use broadcast join for better performance with small unique_penn_ids DataFrame
unique_penn_ids = unique_penn.select("F001").distinct()
print(f"\nJoining {unique_penn_ids.count():,} unique Penn F001s with full Penn records...")

unique_penn_full = penn_full.join(F.broadcast(unique_penn_ids), on="F001", how="inner")

# Verify join results
joined_count = unique_penn_full.count()
print(f"Joined result: {joined_count:,} records")

# FIX: Now the counts should match after deduplication
if joined_count != unique_penn_count:
    print(f"⚠️  WARNING: Join count mismatch! Expected {unique_penn_count:,}, got {joined_count:,}")
    
    # Investigate missing records
    missing_count = unique_penn_count - joined_count
    if missing_count > 0:
        print(f"Missing {missing_count:,} records - these F001s exist in filtered dataset but not in penn_full")
        
        # Find which F001s are missing
        missing_f001s = unique_penn_ids.join(penn_full.select("F001").distinct(), on="F001", how="left_anti")
        missing_sample = missing_f001s.limit(10).collect()
        if missing_sample:
            print("Sample of missing F001s:")
            for row in missing_sample:
                print(f"  - {row['F001']}")
else:
    print("✅ Join count matches expected - all filtered records found in penn_full")

# Verify join kept FLDR column
print("\n=== Post-Join Dataset Verification ===")
print(f"Joined dataset columns ({len(unique_penn_full.columns)} total):")
# Print first 10 columns as a sample
print(f"Sample columns: {', '.join(unique_penn_full.columns[:10])}...")

if "FLDR" not in unique_penn_full.columns:
    raise ValueError("ERROR: FLDR column was lost during join operation!")

# Check available columns before filtering
print("\n=== Checking available columns for filtering ===")
available_columns = unique_penn_full.columns
print(f"Looking for F533 column to filter reproduction notes...")

# Start with the base dataset
df_with_material_type = unique_penn_full

# Only apply F533 filter if the column exists
if "F533" in available_columns:
    print("Filtering out records with F533 (reproduction notes)")
    df_with_material_type = df_with_material_type.filter(col("F533").isNull())
else:
    print("Note: F533 column not found in dataset, skipping reproduction filter")

# Continue with the rest of the transformations
unique_penn_with_material_type = (df_with_material_type
    # Add material type columns
    .withColumn("record_type", substring(col("FLDR"), 7, 1))
    .withColumn("bib_level", substring(col("FLDR"), 8, 1))
    .withColumn("combined_type", concat(col("record_type"), col("bib_level")))
    .withColumn("material_category", 
        when((col("record_type") == "a") & (col("bib_level").isin("m")), "print_book")
        .when((col("record_type") == "a") & (col("bib_level").isin("s")), "print_serial")
        .when((col("record_type") == "c") & (col("bib_level").isin("m", "s")), "print_music")
        .when((col("record_type") == "e") & (col("bib_level").isin("m", "s")), "print_maps")
        .when(col("record_type") == "m", "electronic_resource")
        .when(col("record_type").isin("g", "k"), "visual_material")
        .when(col("record_type") == "i", "audio_material")
        .otherwise("other")
    )
    .withColumn("is_print", 
        col("material_category").isin("print_book", "print_serial", "print_music", "print_maps")
    )
)

# Cache before multiple operations
unique_penn_with_material_type.cache()

# OPTIMIZATION: Get all statistics in one pass
print("\n=== Material Type Distribution ===")
material_stats = unique_penn_with_material_type.groupBy("material_category", "is_print").count().collect()

# Process statistics
material_counts_dict = {}
print_count = 0
non_print_count = 0

for row in material_stats:
    material_counts_dict[row["material_category"]] = row["count"]
    if row["is_print"]:
        print_count += row["count"]
    else:
        non_print_count += row["count"]

total_unique = print_count + non_print_count

# Display material distribution
for category, count in sorted(material_counts_dict.items(), key=lambda x: x[1], reverse=True):
    print(f"{category}: {count:,}")

# Filter for print materials only
print_only_df = unique_penn_with_material_type.filter(col("is_print") == True)

# Add metadata if we have source information
if selected_source:
    print_only_df_with_metadata = print_only_df.withColumn(
        "processing_date", lit(datetime.now().strftime("%Y-%m-%d"))
    ).withColumn(
        "source_file", lit(selected_source.get('filename', 'unknown'))
    ).withColumn(
        "data_currency_warning", 
        lit("OUTDATED - 2022 data" if selected_source.get('is_legacy') else "Current")
    )
else:
    print_only_df_with_metadata = print_only_df

# Save datasets
unique_penn_with_material_type.write.mode("overwrite").parquet(f"{output_dir}/unique_penn_full_no_533.parquet")
print_only_df_with_metadata.write.mode("overwrite").parquet(f"{output_dir}/physical_books_no_533.parquet")

# Print final statistics
print(f"\n=== Print Material Analysis ===")
print(f"Total unique Penn records: {total_unique:,}")

if total_unique > 0:
    print(f"Print materials: {print_count:,} ({print_count/total_unique*100:.1f}%)")
    print(f"Non-print materials: {non_print_count:,} ({non_print_count/total_unique*100:.1f}%)")
    
    # Show print categories breakdown
    print("\n=== Print Material Categories ===")
    print_categories = ["print_book", "print_serial", "print_music", "print_maps"]
    for category in print_categories:
        if category in material_counts_dict:
            count = material_counts_dict[category]
            print(f"{category}: {count:,} ({count/print_count*100:.1f}% of print materials)")
else:
    print("No unique Penn records found to analyze")

# Unpersist cached DataFrame
unique_penn_with_material_type.unpersist()

# Final warning if using outdated data
if selected_source and selected_source.get('is_legacy'):
    print("\n" + "="*60)
    print("🚨 CRITICAL WARNING: Analysis completed using 2022 Penn data")
    print("🚨 Results do NOT reflect current Penn holdings")
    print("🚨 Recommended: Re-run with current Penn MARC export")
    print("="*60)

Using existing unique_penn DataFrame
Working with 810,502 filtered unique Penn records

=== PENN DATA SOURCE VERIFICATION ===

Found Penn records at: /home/jovyan/work/July-2025-PODParquet/pod-processing-outputs/penn_penn_filtered-marc21.parquet
  - Source type: Processed updates
  - Data date: unknown
  - Total records: 3,663,990
  - Recently updated records (2024-2025): 609,205 (16.6%)

=== PROCEEDING WITH ANALYSIS ===

=== Pre-Join Dataset Verification ===
Penn full dataset columns (216 total):
Sample columns: FLDR, F001, F003, F005, F006, F007, F008, F010, F013, F015...

=== Checking for duplicate F001s in penn_full ===
Deduplicating penn_full by F001...

Sample of duplicate F001s:
+----------------+-----+
|            F001|count|
+----------------+-----+
|   9955913503681|  400|
|   9973703503681|  202|
|9915957333503681|  200|
|   9972463503681|  118|
| 991853173503681|  106|
+----------------+-----+
only showing top 5 rows

After deduplication: 2,443,080 records

Joining 810,502

In [50]:
# Stratified Sampling and Final Analysis
from pyspark.sql.functions import rand, col
import pyspark.sql.functions as F
import json
from datetime import datetime 
import builtins  # Import builtins to access Python's built-in functions

# Define output directory if not already defined
if 'output_dir' not in locals():
    output_dir = "/home/jovyan/work/July-2025-PODParquet/pod-processing-outputs"

# Load print materials dataset if not already loaded
if 'print_only_df' not in locals() or print_only_df is None:
    print("Loading print materials dataset...")
    print_only_df_raw = spark.read.parquet(f"{output_dir}/physical_books_no_533.parquet")
    
    # Check if metadata columns exist and drop them for sampling
    metadata_cols = ["processing_date", "source_file", "data_currency_warning"]
    existing_metadata_cols = [col for col in metadata_cols if col in print_only_df_raw.columns]
    
    if existing_metadata_cols:
        print(f"Dropping metadata columns: {existing_metadata_cols}")
        print_only_df = print_only_df_raw.drop(*existing_metadata_cols)
    else:
        print_only_df = print_only_df_raw
else:
    print("Using existing print_only_df DataFrame")

# Load or compute necessary statistics if not available
if 'total_penn' not in locals() or 'unique_penn_count' not in locals():
    print("Loading required statistics...")
    # Load from saved parquet files
    if 'unique_penn' not in locals():
        unique_penn = spark.read.parquet(f"{output_dir}/unique_penn.parquet")
    unique_penn_count = unique_penn.count()
    
    # Load Penn overlap analysis to get total Penn records
    penn_overlap = spark.read.parquet(f"{output_dir}/penn_overlap_analysis.parquet")
    total_penn = penn_overlap.select("F001").distinct().count()

# Compute print statistics if not available
if 'print_count' not in locals() or 'material_counts_dict' not in locals():
    print("Computing material type statistics...")
    # Check for material_category column
    if 'material_category' not in print_only_df.columns:
        print("ERROR: material_category column not found in print_only_df")
        raise ValueError("Missing required column: material_category")
    
    material_stats = print_only_df.groupBy("material_category").count().collect()
    material_counts_dict = {row["material_category"]: row["count"] for row in material_stats}
    print_count = sum(material_counts_dict.values())

# Define sampling function with improved stratification
def create_stratified_sample(df, strata_column, sample_size=1000):
    """
    Create a stratified sample with improved randomization.
    Uses multiple passes to ensure representation of all strata.
    """
    print(f"Creating stratified sample based on {strata_column}...")
    
    # Verify the strata column exists
    if strata_column not in df.columns:
        print(f"ERROR: Column '{strata_column}' not found in DataFrame")
        print(f"Available columns: {df.columns}")
        raise ValueError(f"Missing required column: {strata_column}")
    
    # Get counts by strata for weighting
    strata_counts = df.groupBy(strata_column).count().collect()
    total_records = df.count()
    
    if total_records == 0:
        print("WARNING: No records to sample from!")
        return df
    
    strata_map = {row[strata_column]: row["count"] for row in strata_counts}
    print(f"Strata distribution:")
    for strata, count in sorted(strata_map.items()):
        print(f"  - {strata}: {count:,} records ({count/total_records*100:.2f}%)")
    
    # Calculate proportional sample sizes with minimum threshold
    min_per_strata = 5  # Ensure at least a few records from each stratum
    sample_fractions = {}
    
    for strata, count in strata_map.items():
        # Proportional sampling with minimum threshold
        if count > 0:
            # Calculate proportional share but ensure at least min_per_strata
            prop_size = builtins.max(
                min_per_strata,
                int((count / total_records) * sample_size)
            )
            
            # Don't sample more than we have - use Python's built-in min
            prop_size = builtins.min(prop_size, count)
            
            # Calculate fraction
            sample_fractions[strata] = prop_size / count
    
    # First pass: Stratified sampling
    sampled_df = df.sampleBy(strata_column, fractions=sample_fractions, seed=42)
    
    # Check if we need a second pass to reach target size
    current_size = sampled_df.count()
    print(f"First pass sample size: {current_size}")
    
    if current_size < sample_size and current_size < total_records:
        # Second pass: Sample from under-represented strata
        remaining = builtins.min(sample_size - current_size, total_records - current_size)
        print(f"Need {remaining} more records to reach target sample size")
        
        # Get records not in first sample
        sampled_ids = sampled_df.select("F001").distinct()
        remaining_df = df.join(sampled_ids, on="F001", how="left_anti")
        
        remaining_count = remaining_df.count()
        if remaining_count > 0:
            # Simple random sample from remaining records
            additional_sample = remaining_df.orderBy(rand(seed=43)).limit(remaining)
            
            # Union the samples
            sampled_df = sampled_df.union(additional_sample)
            print(f"Added {builtins.min(remaining, remaining_count)} additional records")
    
    final_size = sampled_df.count()
    print(f"Final sample size: {final_size}")
    
    # Check distribution in final sample
    sample_distribution = sampled_df.groupBy(strata_column).count().collect()
    print(f"\nSample distribution by {strata_column}:")
    sample_dict = {row[strata_column]: row["count"] for row in sample_distribution}
    
    for strata_val in sorted(strata_map.keys()):
        original_count = strata_map.get(strata_val, 0)
        sample_count = sample_dict.get(strata_val, 0)
        if original_count > 0 and final_size > 0:
            print(f"  - {strata_val}: {sample_count} ({sample_count/final_size*100:.2f}% of sample vs {original_count/total_records*100:.2f}% of population)")
    
    return sampled_df

# Create a stratified sample by material category
sample_df = create_stratified_sample(print_only_df, "material_category", sample_size=1000)

# Cache the sample for better performance
sample_df.cache()

# Save the sample for API validation
sample_df.write.mode("overwrite").parquet(f"{output_dir}/statistical_sample_for_api_no_hsp.parquet")

# Select key fields for the CSV, handling array fields
sample_for_csv = sample_df.select(
    "F001", 
    # F020 is an array - get first ISBN if available
    F.when(F.col("F020").isNotNull() & (F.size(F.col("F020")) > 0), 
           F.col("F020").getItem(0)).otherwise("").alias("F020"),
    "F010",  # This is already a string
    "F245",  # This is already a string
    # F250 is an array - get first edition statement if available
    F.when(F.col("F250").isNotNull() & (F.size(F.col("F250")) > 0), 
           F.col("F250").getItem(0)).otherwise("").alias("F250"),
    # F260 is an array - get first publication info if available
    F.when(F.col("F260").isNotNull() & (F.size(F.col("F260")) > 0), 
           F.col("F260").getItem(0)).otherwise("").alias("F260"),
    "material_category"
)

# Save as CSV (single file for easier review)
sample_for_csv.coalesce(1).write.mode("overwrite").option("header", "true").csv(f"{output_dir}/statistical_sample_for_api_no_hsp.csv")

# Generate final summary statistics in JSON format
summary_stats = {
    "processing_timestamp": datetime.now().isoformat(),
    "total_penn_records": int(total_penn),
    "unique_penn_records": int(unique_penn_count),
    "uniqueness_rate": float(unique_penn_count/total_penn) if total_penn > 0 else 0.0,
    "print_materials": int(print_count),
    "print_materials_percentage": float(print_count/unique_penn_count) if unique_penn_count > 0 else 0.0,
    "sample_size": int(sample_df.count()),
    "material_categories": {}
}

# Add material categories to summary
for category, count in sorted(material_counts_dict.items()):
    summary_stats["material_categories"][category] = {
        "count": int(count),
        "percentage": float(count/print_count*100) if print_count > 0 else 0.0
    }

# Write summary to JSON file
with open(f"{output_dir}/sample_summary_no_hsp.json", "w") as f:
    json.dump(summary_stats, f, indent=2)

# Unpersist the cached sample
sample_df.unpersist()

print("\n✅ Processing complete!")
print(f"Results saved to {output_dir}/")
print("\nFinal outputs:")
print(f"  - unique_penn.parquet: All unique Penn records")
print(f"  - physical_books_no_533.parquet: Unique Penn physical books")
print(f"  - statistical_sample_for_api_no_hsp.parquet: Statistical sample for validation")
print(f"  - statistical_sample_for_api_no_hsp.csv: CSV version of sample")
print(f"  - sample_summary_no_hsp.json: Summary statistics")

# Display summary statistics
print("\n📊 Summary Statistics:")
print(f"  - Total Penn records: {summary_stats['total_penn_records']:,}")
print(f"  - Unique Penn records: {summary_stats['unique_penn_records']:,}")
print(f"  - Uniqueness rate: {summary_stats['uniqueness_rate']*100:.1f}%")
print(f"  - Print materials: {summary_stats['print_materials']:,}")
print(f"  - Print materials percentage: {summary_stats['print_materials_percentage']:.1f}%")
print(f"  - Sample size: {summary_stats['sample_size']:,}")

# Display material category breakdown
if material_counts_dict:
    print("\n📚 Material Category Breakdown:")
    for category, info in sorted(summary_stats["material_categories"].items()):
        print(f"  - {category}: {info['count']:,} ({info['percentage']:.1f}%)")

Using existing print_only_df DataFrame
Creating stratified sample based on material_category...
Strata distribution:
  - print_book: 697,799 records (95.22%)
  - print_maps: 903 records (0.12%)
  - print_music: 11,382 records (1.55%)
  - print_serial: 22,719 records (3.10%)
First pass sample size: 964
Need 36 more records to reach target sample size
Added 36 additional records
Final sample size: 1041

Sample distribution by material_category:
  - print_book: 1002 (96.25% of sample vs 95.22% of population)
  - print_maps: 4 (0.38% of sample vs 0.12% of population)
  - print_music: 13 (1.25% of sample vs 1.55% of population)
  - print_serial: 22 (2.11% of sample vs 3.10% of population)

✅ Processing complete!
Results saved to /home/jovyan/work/July-2025-PODParquet/pod-processing-outputs/

Final outputs:
  - unique_penn.parquet: All unique Penn records
  - physical_books_no_533.parquet: Unique Penn physical books
  - statistical_sample_for_api_no_hsp.parquet: Statistical sample for valida

In [51]:
# Optional Cleanup - Run this to free memory after analysis
def cleanup_spark_resources():
    """Clean up all cached DataFrames and temporary views"""
    try:
        # Get all cached DataFrames
        cached_count = len(spark.sparkContext._jsc.getPersistentRDDs().items())
        
        for (id, rdd) in spark.sparkContext._jsc.getPersistentRDDs().items():
            rdd.unpersist()
        
        # Drop all temporary views
        temp_views = [view.name for view in spark.catalog.listTables() if view.isTemporary]
        for view_name in temp_views:
            spark.catalog.dropTempView(view_name)
        
        print(f"✅ Cleaned up {cached_count} cached DataFrames and {len(temp_views)} temporary views")
        print("💡 Memory freed. You can safely re-run the notebook or close it.")
    except Exception as e:
        print(f"⚠️ Cleanup warning: {e}")

# Run cleanup
cleanup_spark_resources()

# Optional: Show memory status
print("\n📊 Spark UI still available at:", spark.sparkContext.uiWebUrl)
print("Check the Storage tab to verify all caches are cleared")

✅ Cleaned up 1 cached DataFrames and 0 temporary views
💡 Memory freed. You can safely re-run the notebook or close it.

📊 Spark UI still available at: http://7d7ed4cc3e7b:4040
Check the Storage tab to verify all caches are cleared
