# Split Large JSON.gz Files

This notebook splits large JSON.gz files by chunking top-level arrays.

**How it works:**
1. Gets structure info from `mrf_landing` table (or detects from file as fallback)
2. Identifies top-level scalars (repeated in every output file)
3. Identifies top-level arrays (all arrays chunked by same chunk_size)
4. Writes output files - each file gets up to chunk_size items from each array

**Example:**
```
Input file structure:
{
    "reporting_entity_name": "Blue Cross",  # scalar - repeated in all outputs
    "version": "1.0.0",                      # scalar - repeated in all outputs
    "provider_references": [...10 items...], # small array - fits in first chunk
    "in_network": [...500000 items...]       # large array - spans multiple files
}

With chunk_size=100000, outputs:
- input_part001.json.gz: scalars + provider_references[0:10] (all) + in_network[0:100000]
- input_part002.json.gz: scalars + in_network[100000:200000]  (provider_references exhausted)
- input_part003.json.gz: scalars + in_network[200000:300000]
- ...etc
```

## Configuration

In [11]:
from pathlib import Path
import yaml

# ============ CONFIGURE THESE ============

# Input file path
INPUT_FILE = Path("D:/payer_mrf/test/2025-12_690_08D0_in-network-rates_12_of_35.json.gz")

# Output directory (where split files will be written)
OUTPUT_DIR = Path("D:/payer_mrf/test/split")

# Number of items per chunk for the largest array
CHUNK_SIZE = 1000

# Minimum file size in MB to trigger splitting (set to 0 to disable)
MIN_FILE_SIZE_MB = 0

# =========================================

# Load database connection string from config.yaml
CONFIG_PATH = Path("../config.yaml")

def get_connection_string_from_config(config_path: Path) -> str:
    """Build connection string from config.yaml database settings."""
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)
    
    db_config = config.get("database", {})
    
    # Check if full connection string is provided
    conn_str = db_config.get("connection_string", "")
    if conn_str:
        return conn_str
    
    # Build from individual components
    host = db_config.get("host", "localhost")
    port = db_config.get("port", 5432)
    database = db_config.get("database", "postgres")
    username = db_config.get("username", "postgres")
    password = db_config.get("password", "")
    sslmode = db_config.get("sslmode", "")
    
    conn_str = f"postgresql://{username}:{password}@{host}:{port}/{database}"
    if sslmode:
        conn_str += f"?sslmode={sslmode}"
    
    return conn_str

DB_CONNECTION_STRING = get_connection_string_from_config(CONFIG_PATH)
print(f"Database connection: {DB_CONNECTION_STRING.split('@')[1] if '@' in DB_CONNECTION_STRING else 'configured'}")

Database connection: localhost:5432/postgres


## Imports and Setup

In [12]:
import gzip
import json
import logging
from decimal import Decimal
from typing import Any, Dict, Iterator, List, Tuple, Optional

import ijson
import psycopg2

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s - %(message)s",
)
LOG = logging.getLogger("split_json_gz")


def convert_decimals(obj: Any) -> Any:
    """Recursively convert Decimal objects to float for JSON serialization."""
    if isinstance(obj, Decimal):
        return float(obj)
    if isinstance(obj, dict):
        return {key: convert_decimals(value) for key, value in obj.items()}
    if isinstance(obj, list):
        return [convert_decimals(item) for item in obj]
    return obj

## Helper Functions

In [13]:
def open_json_file(file_path: Path):
    """Open a JSON file (compressed or uncompressed)."""
    if file_path.suffix == ".gz" or str(file_path).endswith(".json.gz"):
        return gzip.open(file_path, "rt", encoding="utf-8")
    else:
        return open(file_path, "r", encoding="utf-8")

In [14]:
def get_structure_from_mrf_landing(
    connection_string: str,
    file_name: str,
) -> Tuple[List[str], List[str]]:
    """
    Query mrf_landing to get structure info for a file.
    
    Returns:
        Tuple of (scalar_keys, array_keys) where:
        - scalar_keys: list of record_types that are scalars (max record_index = 0)
        - array_keys: list of record_types that are arrays (max record_index > 0)
    """
    LOG.info(f"Querying mrf_landing for structure of '{file_name}'...")
    
    conn = psycopg2.connect(connection_string)
    cursor = conn.cursor()
    
    try:
        # Get record_type and max record_index for each type
        cursor.execute("""
            SELECT record_type, MAX(record_index) as max_index
            FROM mrf_landing
            WHERE file_name = %s
            GROUP BY record_type
            ORDER BY record_type
        """, (file_name,))
        
        rows = cursor.fetchall()
        
        scalar_keys = []
        array_keys = []
        
        for record_type, max_index in rows:
            if max_index == 0:
                # Scalar (or array with 1 item, treated as scalar)
                scalar_keys.append(record_type)
            else:
                # Array with multiple items
                array_keys.append(record_type)
        
        LOG.info(f"Found {len(scalar_keys)} scalar(s): {scalar_keys}")
        LOG.info(f"Found {len(array_keys)} array(s): {array_keys}")
        
        return scalar_keys, array_keys
        
    finally:
        cursor.close()
        conn.close()


def detect_structure(file_path: Path) -> Tuple[Dict[str, Any], List[str]]:
    """
    Detect top-level structure by parsing the file: identify scalars/objects vs arrays.
    (Fallback when mrf_landing data is not available)
    
    Returns:
        Tuple of (scalars_dict, array_keys) where:
        - scalars_dict: dict of scalar/object values to repeat in every output
        - array_keys: list of keys that are arrays (to be chunked)
    """
    scalars = {}
    array_keys = []
    
    LOG.info("Detecting top-level structure from file...")
    
    with open_json_file(file_path) as fh:
        parser = ijson.parse(fh)
        current_key = None
        depth = 0
        
        for prefix, event, value in parser:
            if event == "start_map":
                depth += 1
            elif event == "end_map":
                depth -= 1
                if depth == 0:
                    break
            elif event == "start_array" and depth == 1 and current_key:
                array_keys.append(current_key)
                current_key = None
            elif event == "map_key" and depth == 1:
                current_key = value
            elif event in ("string", "number", "boolean", "null") and depth == 1 and current_key:
                scalars[current_key] = value
                current_key = None
    
    LOG.info(f"Found {len(scalars)} scalar(s): {list(scalars.keys())}")
    LOG.info(f"Found {len(array_keys)} array(s): {array_keys}")
    
    return scalars, array_keys


def extract_scalars(file_path: Path, scalar_keys: List[str]) -> Dict[str, Any]:
    """
    Extract scalar values from file for the given keys.
    """
    scalars = {}
    
    LOG.info(f"Extracting {len(scalar_keys)} scalar values from file...")
    
    with open_json_file(file_path) as fh:
        parser = ijson.parse(fh)
        current_key = None
        depth = 0
        
        for prefix, event, value in parser:
            if event == "start_map":
                depth += 1
            elif event == "end_map":
                depth -= 1
                if depth == 0:
                    break
            elif event == "map_key" and depth == 1:
                current_key = value
            elif event in ("string", "number", "boolean", "null") and depth == 1 and current_key:
                if current_key in scalar_keys:
                    scalars[current_key] = value
                current_key = None
    
    return scalars

In [15]:
def stream_all_arrays(file_path: Path, array_keys: List[str], chunk_size: int) -> Iterator[Dict[str, List[Any]]]:
    """
    Stream all arrays from file, yielding chunks of up to chunk_size items per array.
    
    Each yield is a dict of {array_key: [items...]} for that chunk.
    Arrays that run out of items simply won't appear in subsequent chunks.
    
    Args:
        file_path: Path to JSON file
        array_keys: List of array keys to stream
        chunk_size: Max items per array per chunk
        
    Yields:
        Dict of {array_key: [items]} for each chunk
    """
    # Create iterators for each array
    iterators = {}
    file_handles = []
    
    for key in array_keys:
        fh = open_json_file(file_path)
        file_handles.append(fh)
        ijson_path = f"{key}.item"
        iterators[key] = ijson.items(fh, ijson_path)
    
    try:
        while True:
            chunk = {}
            any_items = False
            
            # Collect up to chunk_size items from each array
            for key, iterator in iterators.items():
                items = []
                for _ in range(chunk_size):
                    try:
                        item = next(iterator)
                        items.append(item)
                        any_items = True
                    except StopIteration:
                        break
                
                if items:
                    chunk[key] = items
            
            if not any_items:
                break
                
            yield chunk
            
    finally:
        for fh in file_handles:
            fh.close()

In [16]:
# (stream_array_items removed - using stream_all_arrays instead)

In [17]:
# (extract_full_array removed - using stream_all_arrays instead)

## Main Split Function

In [18]:
def split_json_gz(
    input_path: Path,
    output_dir: Path,
    chunk_size: int = 100000,
    min_file_size_mb: float = 0,
    connection_string: Optional[str] = None,
) -> int:
    """
    Split a JSON.gz file by chunking all top-level arrays.
    
    All arrays are chunked by the same chunk_size. When an array runs out of items,
    it simply doesn't appear in subsequent output files.
    
    Args:
        input_path: Path to input JSON.gz file
        output_dir: Directory to write output files
        chunk_size: Number of items per chunk for each array
        min_file_size_mb: Minimum file size (MB) to trigger splitting (0 to disable)
        connection_string: Database connection string to query mrf_landing (optional)
        
    Returns:
        Number of output files created
    """
    if not input_path.exists():
        LOG.error(f"Input file not found: {input_path}")
        return 0
    
    # Check file size
    file_size_mb = input_path.stat().st_size / (1024 * 1024)
    LOG.info(f"Input file: {input_path.name} ({file_size_mb:.1f} MB)")
    
    if min_file_size_mb > 0 and file_size_mb < min_file_size_mb:
        LOG.info(f"File size ({file_size_mb:.1f} MB) is below threshold ({min_file_size_mb} MB), skipping split")
        return 0
    
    file_name = input_path.name
    
    # Try to get structure from mrf_landing first
    scalar_keys = None
    array_keys = None
    
    if connection_string:
        try:
            scalar_keys, array_keys = get_structure_from_mrf_landing(connection_string, file_name)
        except Exception as e:
            LOG.warning(f"Could not query mrf_landing: {e}. Falling back to file parsing.")
    
    # If we got structure from mrf_landing, extract scalar values from file
    if scalar_keys is not None and array_keys is not None:
        scalars = extract_scalars(input_path, scalar_keys)
    else:
        # Fallback: detect structure from file
        scalars, array_keys = detect_structure(input_path)
    
    if not array_keys:
        LOG.warning("No arrays found in file, nothing to split")
        return 0
    
    LOG.info(f"Will chunk {len(array_keys)} array(s): {array_keys}")
    LOG.info(f"Chunk size: {chunk_size:,} items per array per file")
    
    # Create output directory
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Generate output filename pattern
    stem = input_path.name
    if stem.endswith(".json.gz"):
        stem = stem[:-8]
    elif stem.endswith(".json"):
        stem = stem[:-5]
    
    # Stream all arrays and write chunks
    LOG.info(f"Streaming arrays and writing chunks...")
    
    files_created = 0
    
    for chunk_data in stream_all_arrays(input_path, array_keys, chunk_size):
        files_created += 1
        output_path = output_dir / f"{stem}_part{files_created:03d}.json.gz"
        
        # Build output JSON: scalars + array chunks
        output_data = dict(scalars)  # Start with scalars (always included)
        output_data.update(chunk_data)  # Add array chunks
        
        # Convert Decimals to floats for JSON serialization
        output_data = convert_decimals(output_data)
        
        # Log what's in this chunk
        chunk_info = ", ".join([f"{k}: {len(v):,}" for k, v in chunk_data.items()])
        LOG.info(f"Writing {output_path.name} ({chunk_info})...")
        
        with gzip.open(output_path, "wt", encoding="utf-8") as f:
            json.dump(output_data, f)
    
    LOG.info(f"Split complete: created {files_created} files in {output_dir}")
    return files_created

## Run Split

In [None]:
# Run the split
files_created = split_json_gz(
    input_path=INPUT_FILE,
    output_dir=OUTPUT_DIR,
    chunk_size=CHUNK_SIZE,
    min_file_size_mb=MIN_FILE_SIZE_MB,
    connection_string=DB_CONNECTION_STRING,  # Uses mrf_landing for structure info
)

print(f"\nFiles created: {files_created}")

2026-01-21 22:34:03,551 INFO - Input file: 2025-12_690_08D0_in-network-rates_12_of_35.json.gz (14.4 MB)
2026-01-21 22:34:03,552 INFO - Querying mrf_landing for structure of '2025-12_690_08D0_in-network-rates_12_of_35.json.gz'...
2026-01-21 22:34:04,418 INFO - Found 4 scalar(s): ['last_updated_on', 'reporting_entity_name', 'reporting_entity_type', 'version']
2026-01-21 22:34:04,419 INFO - Found 2 array(s): ['in_network', 'provider_references']
2026-01-21 22:34:04,420 INFO - Extracting 4 scalar values from file...
2026-01-21 22:35:51,214 INFO - Will chunk 2 array(s): ['in_network', 'provider_references']
2026-01-21 22:35:51,232 INFO - Chunk size: 1,000 items per array per file
2026-01-21 22:35:51,237 INFO - Streaming arrays and writing chunks...


## (Optional) Process Multiple Files

In [None]:
# Uncomment and modify to process multiple files in a directory

# INPUT_DIR = Path("D:/payer_mrf/raw/test/2025-11-25")
# OUTPUT_DIR = Path("D:/payer_mrf/raw/test/2025-11-25/split")
# 
# json_gz_files = list(INPUT_DIR.glob("*.json.gz"))
# print(f"Found {len(json_gz_files)} JSON.gz files")
# 
# total_files_created = 0
# for input_file in json_gz_files:
#     print(f"\nProcessing: {input_file.name}")
#     files_created = split_json_gz(
#         input_path=input_file,
#         output_dir=OUTPUT_DIR,
#         chunk_size=CHUNK_SIZE,
#         min_file_size_mb=MIN_FILE_SIZE_MB,
#     )
#     total_files_created += files_created
# 
# print(f"\nTotal files created: {total_files_created}")