In [None]:
!pip install pandas pyarrow memory-profiler ijson

Collecting ijson
  Downloading ijson-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (21 kB)
Downloading ijson-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (114 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m114.5/114.5 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ijson
Successfully installed ijson-3.3.0


In [None]:
import json
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from typing import Generator, Dict, Any
import time
import psutil
import os

def get_memory_usage() -> float:
    """Return current memory usage in MB"""
    return psutil.Process().memory_info().rss / (1024 * 1024)

def stream_jsonl(file_path: str) -> Generator[Dict[Any, Any], None, None]:
    """Stream JSON objects from a large JSON Lines file"""
    with open(file_path, 'r') as file:
        for line in file:
            try:
                yield json.loads(line.strip())
            except json.JSONDecodeError:
                print(f"Skipping invalid JSON line: {line[:50]}...")
                continue

def flatten_dict(d: Dict[Any, Any], parent_key: str = '', sep: str = '_') -> Dict[str, Any]:
    """Flatten nested dictionaries"""
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

def process_jsonl_to_parquet(input_jsonl_path: str, output_parquet_path: str, chunk_size: int = 10000):
    """
    Process large JSON Lines file to Parquet format using chunking

    Args:
        input_jsonl_path: Path to input JSON Lines file
        output_parquet_path: Path to output Parquet file
        chunk_size: Number of records to process at once

    Returns:
        dict: Performance metrics and data comparison
    """
    start_time = time.time()
    initial_memory = get_memory_usage()

    # Initialize metrics
    total_rows = 0
    metrics = {
        'stars_sum': 0,
        'useful_sum': 0,
        'funny_sum': 0,
        'cool_sum': 0
    }

    # Initialize schema
    schema = None
    writer = None

    chunk = []
    try:
        for item in stream_jsonl(input_jsonl_path):
            flattened_item = flatten_dict(item)
            chunk.append(flattened_item)

            # Update metrics
            total_rows += 1
            metrics['stars_sum'] += flattened_item.get('stars', 0)
            metrics['useful_sum'] += flattened_item.get('useful', 0)
            metrics['funny_sum'] += flattened_item.get('funny', 0)
            metrics['cool_sum'] += flattened_item.get('cool', 0)

            if len(chunk) >= chunk_size:
                df = pd.DataFrame(chunk)

                if schema is None:
                    schema = pa.Schema.from_pandas(df)
                    writer = pq.ParquetWriter(output_parquet_path, schema)

                table = pa.Table.from_pandas(df, schema=schema)
                writer.write_table(table)
                chunk = []

                # Print progress
                if total_rows % (chunk_size * 10) == 0:
                    print(f"Processed {total_rows} rows...")

        # Write remaining records
        if chunk:
            df = pd.DataFrame(chunk)
            if schema is None:
                schema = pa.Schema.from_pandas(df)
                writer = pq.ParquetWriter(output_parquet_path, schema)
            table = pa.Table.from_pandas(df, schema=schema)
            writer.write_table(table)

    finally:
        if writer:
            writer.close()

    end_time = time.time()
    final_memory = get_memory_usage()

    # Get Parquet file details
    parquet_file = pq.ParquetFile(output_parquet_path)
    parquet_schema = parquet_file.schema
    parquet_size = os.path.getsize(output_parquet_path)

    return {
        'performance': {
            'runtime_seconds': end_time - start_time,
            'memory_usage_mb': final_memory - initial_memory,
            'peak_memory_mb': psutil.Process().memory_info().rss / (1024 * 1024)
        },
        'parquet_details': {
            'file_size_bytes': parquet_size,
            'schema': str(parquet_schema)
        },
        'data_comparison': {
            'total_rows': total_rows,
            'metrics': metrics
        }
    }

# Example usage
if __name__ == "__main__":
    INPUT_JSONL_PATH = '/content/drive/MyDrive/source/yelp_academic_dataset_review.json'
    OUTPUT_PARQUET_PATH = '/content/drive/MyDrive/output/reviews.parquet'

    print(f"Starting processing of {INPUT_JSONL_PATH}")

    # Process the data and get metrics
    metrics = process_jsonl_to_parquet(INPUT_JSONL_PATH, OUTPUT_PARQUET_PATH)

    # Print metrics
    print("\nPerformance Metrics:")
    print(f"Runtime: {metrics['performance']['runtime_seconds']:.2f} seconds")
    print(f"Memory Usage: {metrics['performance']['memory_usage_mb']:.2f} MB")
    print(f"Peak Memory: {metrics['performance']['peak_memory_mb']:.2f} MB")

    print("\nParquet File Details:")
    print(f"File Size: {metrics['parquet_details']['file_size_bytes']} bytes")
    print(f"Schema:\n{metrics['parquet_details']['schema']}")

    print("\nData Comparison:")
    print(f"Total Rows: {metrics['data_comparison']['total_rows']}")
    for metric, value in metrics['data_comparison']['metrics'].items():
        print(f"{metric}: {value}")

Starting processing of /content/drive/MyDrive/source/yelp_academic_dataset_review.json
Processed 100000 rows...
Processed 200000 rows...
Processed 300000 rows...
Processed 400000 rows...
Processed 500000 rows...
Processed 600000 rows...
Processed 700000 rows...
Processed 800000 rows...
Processed 900000 rows...
Processed 1000000 rows...
Processed 1100000 rows...
Processed 1200000 rows...
Processed 1300000 rows...
Processed 1400000 rows...
Processed 1500000 rows...
Processed 1600000 rows...
Processed 1700000 rows...
Processed 1800000 rows...
Processed 1900000 rows...
Processed 2000000 rows...
Processed 2100000 rows...
Processed 2200000 rows...
Processed 2300000 rows...
Processed 2400000 rows...
Processed 2500000 rows...
Processed 2600000 rows...
Processed 2700000 rows...
Processed 2800000 rows...
Processed 2900000 rows...
Processed 3000000 rows...
Processed 3100000 rows...
Processed 3200000 rows...
Processed 3300000 rows...
Processed 3400000 rows...
Processed 3500000 rows...
Processed 36