# HATHITRUST VOLUME DEDUPLICATION PIPELINE
# ==========================================
#
# This notebook deduplicates a HathiTrust workset of volumes printed in England (1500-1900),
# written in English only. For serial publications (multi-volume works), it  
# selects the most complete set from a single university, then fills gaps with volumes from 
# other universities.
#
# PIPELINE OVERVIEW:
# 1. Load and filter workset (year, language)
# 2. Separate unique vs duplicated volumes
# 3. For duplicates: separate serials vs non-serials
# 4. Standardize serial volume descriptions (v.1, vol.1, V1 → v.1)
# 5. Select most complete serial set per university
# 6. Fill missing volumes from other universities
# 7. Export final deduplicated list for download via HTRC Analytics

In [None]:
# STEP 1: Load HTRC workset
# Load the initial workset CSV file containing all volumes printed in England (1500-1900)
# as provided by the HTRC Librarian

import pandas as pd
from collections import Counter

initial_workset = pd.read_csv('./htrc_workset_final.csv', index_col=0)

In [None]:
# STEP 2: Filter workset by year and language
# Keep only volumes that:
#   - Have a valid year field
#   - Were published between 1500-1900
#   - Are in English only (language='eng')

filtered_workset = initial_workset[~initial_workset['year'].isna()]
filtered_workset = filtered_workset[(filtered_workset['year'] <= 1900) & (filtered_workset['year'] >= 1500)]
filtered_workset = filtered_workset[filtered_workset['language'] == 'eng']
filtered_workset

In [None]:
# STEP 3: Separate unique vs duplicated volumes
# Split volumes into two groups based on record_id:
#   - unique_volumes: Only one copy exists across all universities (keep as-is)
#   - duplicated_volumes: Multiple copies exist (need deduplication)

unique_volumes = filtered_workset[filtered_workset.duplicated(subset=['record_id'], keep=False) == False]
duplicated_volumes = filtered_workset[filtered_workset.duplicated(subset=['record_id'], keep=False) == True]
duplicated_htids = set(duplicated_volumes.index.tolist())

In [None]:
# STEP 4: Load HathiTrust metadata for duplicated volumes 
# Load metadata in chunks with filtering to minimize memory usage
# Filter each chunk to only keep duplicated volumes

import pandas as pd
import gzip
from tqdm import tqdm

def parse_metadata_line(line):
    """Handle metadata lines with too many fields by combining extras into last field"""
    fields = line.strip().split('\t')
    if len(fields) > 26:
        fields = fields[:25] + ['\t'.join(fields[25:])]
    return fields[:26]

# Column names for HathiTrust metadata
METADATA_COLUMNS = [
    'htid', 'access', 'rights', 'ht_bib_key', 'description',
    'source', 'source_bib_num', 'oclc_num', 'isbn', 'issn',
    'lccn', 'title', 'imprint', 'rights_reason_code',
    'rights_timestamp', 'us_gov_doc_flag', 'rights_date_used',
    'pub_place', 'lang', 'bib_fmt', 'collection_code',
    'content_provider_code', 'responsible_entity_code',
    'digitization_agent_code', 'access_profile_code',
    'author'
]

# Read file in chunks, filtering as we go
chunk_size = 100000
filtered_chunks = []
chunk_lines = []

with gzip.open(r'.\hathi_full_20241001.txt.gz', 'rt', encoding='utf-8') as file:
    # Process file with progress bar (estimated ~17M lines)
    for line in tqdm(file, desc="Loading metadata", unit=" lines", total=17000000):
        parsed = parse_metadata_line(line)
        
        # Only keep lines for duplicated volumes
        if parsed[0] in duplicated_htids:
            chunk_lines.append(parsed)
        
        # Process chunk when it reaches size limit
        if len(chunk_lines) >= chunk_size:
            chunk_df = pd.DataFrame(chunk_lines, columns=METADATA_COLUMNS)
            filtered_chunks.append(chunk_df)
            chunk_lines = []
    
    # Process remaining lines
    if chunk_lines:
        chunk_df = pd.DataFrame(chunk_lines, columns=METADATA_COLUMNS)
        filtered_chunks.append(chunk_df)

# Combine all filtered chunks
hathi_metadata = pd.concat(filtered_chunks, ignore_index=True)
del filtered_chunks, chunk_lines

print(f"Loaded {len(hathi_metadata):,} metadata records for duplicated volumes")
hathi_metadata

In [None]:
# STEP 5: Separate non-serial volumes
# Volumes with empty 'description' field are NOT part of a serial publication
# For these, any copy is equivalent - keep first occurrence per record_id

nonserial_volumes = hathi_metadata[hathi_metadata['description'] =='']
nonserial_volumes = nonserial_volumes.drop_duplicates(subset = ['ht_bib_key'], keep = 'first')
nonserial_volumes

In [None]:
# STEP 6: Identify serial volumes
# Volumes with non-empty 'description' field are part of a serial publication
# Count occurrences of each description to understand the data before standardization

from collections import Counter

serial_volumes = hathi_metadata[hathi_metadata['description'] !='']
description_counts = Counter(serial_volumes['description'])
volume_descriptions = pd.DataFrame.from_dict(description_counts, orient='index', columns=['count']).reset_index()
volume_descriptions = volume_descriptions.sort_values('count', ascending=False)
volume_descriptions

In [None]:
# STEP 7: Standardize volume descriptions
# Serial volume descriptions vary widely (v.1, vol.1, V1, v. 1, v.001, etc.)
# Apply series of regex transformations to normalize all formats to "v.N"
# 
# Transformations applied in order:
#   1. vol → v  (e.g., vol1 → v1)
#   2. V → v   (e.g., V1 → v1)
#   3. v<space>N → v.N  (e.g., v 1 → v.1)
#   4. v.<space>N → v.N  (e.g., v. 1 → v.1)
#   5. Remove leading zeros  (e.g., v.001 → v.1)

import re

def vol_v(s):
    """Convert 'vol' to 'v', handle single numeric strings"""
    if 'vol' in s.lower():
        return s.lower().replace('vol','v')
    if len(s) < 2 and s.isnumeric():
        return 'v.' + s
    return s

def capital_v(s):
    """Convert capital 'V' to lowercase 'v'"""
    pattern = r'V([0-9\W]*)$'
    if re.search(pattern, s):
        return re.sub(pattern, r'v\1', s)
    return s

def process_v_number(s):
    """Convert 'v' followed by space and number to 'v.number'"""
    pattern = r'(?:^|\s)([^\s]*\s)?v\s*(\d+)'
    match = re.search(pattern, s, re.IGNORECASE)
    if match:
        return f'v.{match.group(2)}'
    return s

def process_v_dot(s):
    """Remove space between 'v.' and number"""
    pattern = r'\bv\.\s*((?:\d+-?)+\d*)'
    match = re.search(pattern, s)
    if match:
        return f'v.{match.group(1)}'
    return s

def remove_leading_zeros(s):
    """Remove leading zeros from volume numbers"""
    pattern = r'(v\.)0*(\d+)'
    def replace_zeros(match):
        prefix = match.group(1)
        number = match.group(2)
        return f"{prefix}{number}"
    return re.sub(pattern, replace_zeros, s)

# Apply transformations sequentially
volume_descriptions['step1_vol_to_v'] = volume_descriptions['index'].apply(vol_v)
volume_descriptions['step2_lowercase_v'] = volume_descriptions['step1_vol_to_v'].apply(capital_v)
volume_descriptions['step3_add_dot'] = volume_descriptions['step2_lowercase_v'].apply(process_v_number)
volume_descriptions['step4_remove_space'] = volume_descriptions['step3_add_dot'].apply(process_v_dot)
volume_descriptions['standardized_description'] = volume_descriptions['step4_remove_space'].apply(remove_leading_zeros)

standardized_descriptions = volume_descriptions[['index','standardized_description']]
standardized_descriptions

In [None]:
# STEP 8: Merge standardized descriptions with serial volumes
# Join serial volumes with their standardized descriptions
# Remove duplicates based on: record_id + standardized volume + source university
# This ensures each volume number appears only once per source

serials_with_std_desc = pd.merge(serial_volumes, standardized_descriptions, left_on='description', right_on='index')
serials_with_std_desc = serials_with_std_desc.drop_duplicates(subset = ['ht_bib_key', 'standardized_description','source'], keep = 'first')
serial_record_ids = set(serials_with_std_desc['ht_bib_key'])
serials_with_std_desc

In [None]:
# STEP 9: Count volumes per source for each serial
# For each record_id, count how many volumes each university has
# This identifies which university has the most complete set

volumes_per_source = serials_with_std_desc[['ht_bib_key','source','standardized_description']]
volumes_per_source[['count']] = 1
volumes_per_source = volumes_per_source.groupby(['ht_bib_key','source']).sum()
volumes_per_source

In [None]:
# STEP 10: Select university with most complete serial set
# For each record_id, find which university (source) has the highest volume count
# In case of ties, idxmax() returns the first alphabetically

best_sources_list = []
for rec_id in serial_record_ids:
    subset = volumes_per_source.loc[rec_id,:]
    best_sources_list.append((rec_id, subset['count'].idxmax()))

In [None]:
# STEP 11: Extract volumes from top sources
# Keep only volumes from the university with the most complete set for each serial

best_sources = pd.DataFrame(best_sources_list, columns=['ht_bib_key', 'source'])
serials_from_best_source = serials_with_std_desc.merge(best_sources, on=['ht_bib_key', 'source'], how='inner')
deduplicated_serials = serials_from_best_source[['htid', 'ht_bib_key', 'source', 'description','standardized_description', 'rights_date_used']]
deduplicated_serials = deduplicated_serials.reset_index(drop=True)
deduplicated_serials

In [None]:
# STEP 12: Identify all publication years for each serial across ALL sources
# Group by record_id and collect all unique publication years (rights_date_used)
# This shows the complete year range across all universities

all_years_all_sources = serials_with_std_desc
all_years_all_sources = all_years_all_sources[['ht_bib_key', 'rights_date_used']].groupby('ht_bib_key').agg(set)
all_years_all_sources

In [None]:
# STEP 13: Find missing volumes in top source
# Compare years from top source vs all sources to find gaps
# These missing volumes can be filled from other universities

def set_difference(row):
    """Return publication years in full set but not in selected source"""
    return list(set(row['rights_date_used_all']) - set(row['rights_date_used_best']))
    
missing_years_in_best_source = deduplicated_serials
missing_years_in_best_source = missing_years_in_best_source[['ht_bib_key', 'rights_date_used']].groupby('ht_bib_key').agg(set)
missing_years_in_best_source = missing_years_in_best_source.merge(all_years_all_sources, on=['ht_bib_key'], how='inner', suffixes = ('_best', '_all'))
missing_years_in_best_source['set_diff'] = missing_years_in_best_source.apply(set_difference, axis=1)
missing_years_in_best_source = missing_years_in_best_source[missing_years_in_best_source['set_diff'].apply(lambda x: len(x) != 0)]
missing_years_in_best_source = missing_years_in_best_source[['set_diff']].explode('set_diff').reset_index()
missing_years_in_best_source = missing_years_in_best_source.rename(columns={'set_diff':'rights_date_used'})
missing_years_in_best_source

In [None]:
# STEP 14: Add missing volumes from other sources
# For each missing year, find the volume from any other university
# Add these to complete the serial set

gap_fill_volumes = missing_years_in_best_source.merge(serials_with_std_desc, on=['ht_bib_key', 'rights_date_used'], how='inner')
gap_fill_volumes = gap_fill_volumes[['htid','ht_bib_key','source', 'description', 'standardized_description','rights_date_used']].drop_duplicates(subset=['ht_bib_key','rights_date_used'])
deduplicated_serials = pd.concat([deduplicated_serials, gap_fill_volumes])

In [None]:
# STEP 15: Combine all volume groups and export
# Merge three groups:
#   - unique_volumes: volumes with only one copy (no deduplication needed)
#   - nonserial_volumes: duplicated non-serial volumes (one copy per record_id)
#   - deduplicated_serials: deduplicated serial volumes (most complete + filled gaps)
#
# Export final list of deduplicated volumes

unique_htids = set(unique_volumes.index.to_list())
nonserial_htids = set(nonserial_volumes.htid.to_list())
serial_htids = set(deduplicated_serials.htid.to_list())
final_deduplicated_volumes = unique_htids.union(nonserial_htids).union(serial_htids)
final_deduplicated_volumes = pd.DataFrame(final_deduplicated_volumes, columns=['htid'])

# Format HTIDs for HTRC rsync script compatibility
final_deduplicated_volumes['clean_htid'] = final_deduplicated_volumes['htid'].apply(lambda x: x.replace(":", "+").replace("/", "=").replace("$","\$"))
final_deduplicated_volumes[['htid']].rename(columns={'htid':'volume'}).to_csv('./deduplicated_volume_list.txt', index=False, header=True)

# NEXT STEPS:
# 1. Upload 'deduplicated_volume_list.txt' to HTRC Analytics
# 2. Use 'Extracted Features Download Helper' to generate rsync script
# 3. Run rsync script locally to download .json.bz2 volume files
# 4. Proceed to preprocessing and topic modeling stages