In [None]:
import os
from pathlib import Path
import json
import boto3
from datetime import datetime
from botocore import UNSIGNED
from botocore.config import Config

s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
bucket = 'openalex'

SNAPSHOT_DIR = '../../openalex-snapshot'
last_updated = datetime(2024, 1, 1)
max_size = float('inf')
max_size = 400000000 # 400mb

def download_manifest(entity):
    """Download and parse the manifest file for an entity"""
    manifest_key = f'data/{entity}/manifest'
    manifest_path = Path(SNAPSHOT_DIR) / manifest_key
    manifest_path.parent.mkdir(parents=True, exist_ok=True)
    
    try:
        s3.download_file(bucket, manifest_key, manifest_path)
    except s3.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            raise RuntimeError(f"Manifest file for {entity} does not exist. OpenAlex may be updating this entity. Please try again later.")
        # TODO add retry logic
        raise  # Re-raise other errors
    
    with open(manifest_path, 'r') as f:
        manifest_entries = json.loads(f.read())
    
    return manifest_entries, manifest_path

# Get list of entity folders from bucket
response = s3.list_objects_v2(Bucket=bucket, Prefix='data/', Delimiter='/')
entities = [prefix.get('Prefix').removeprefix('data/').strip('/') for prefix in response.get('CommonPrefixes', [])]
print(f"Found {len(entities)} entities: {entities}")


Found 12 entities: ['authors', 'concepts', 'domains', 'fields', 'funders', 'institutions', 'merged_ids', 'publishers', 'sources', 'subfields', 'topics', 'works']


In [None]:
for entity in entities[7:]:
    print(f"\nProcessing {entity}...")
    
    if entity == 'merged_ids':
        continue
        # TODO has a folder structure. Needs handling 
         
    # Step 1: Download initial manifest
    initial_manifest, manifest_path = download_manifest(entity)
    initial_manifest_time = datetime.now()

    # Extract file paths and check dates and sizes
    files_to_download = []
    current_size = 0

    for entry in initial_manifest.get('entries', []):
        # Extract updated_date from the URL
        # URLs look like: s3://openalex/data/authors/updated_date=2024-01-01/filename.gz
        file_path = entry['url'].replace('s3://openalex/', '')  # Remove s3 prefix
        updated_date_str = file_path.split('updated_date=')[1].split('/')[0]
        updated_date = datetime.strptime(updated_date_str, '%Y-%m-%d')

        if updated_date > last_updated:
            files_to_download.append(file_path)

        current_size += entry['meta']['content_length']
        if current_size > max_size:
            break
    
    print(f"Found {len(files_to_download)} new files to download for {entity}")
    
    # Download the new files
    for file_path in files_to_download:
        s3_key = file_path
        local_path = Path(SNAPSHOT_DIR) / file_path
        local_path.parent.mkdir(parents=True, exist_ok=True)
        
        print(f"Downloading {s3_key}...")
        s3.download_file(bucket, s3_key, str(local_path))
    
    # Step 4: Download manifest again and verify it hasn't changed
    final_manifest, _ = download_manifest(entity)
    
    if initial_manifest != final_manifest:
        print(f"WARNING: Manifest for {entity} changed during download!")
        print("You may want to re-run the download for this entity.")
