# Converting the meta-file into parquet batches to avoid RAM spikes

In [4]:
import os
import io
import polars as pl
import gc

ITEM_COL = 'parent_asin'
CHUNK_SIZE = 300_000 
# ---------------------

def convert_to_parquet_batches(input_path, output_folder):
    """
    Reads the JSONL file in large chunks and saves them as Parquet.
    This solves the 'JSON Parse Spike' memory issue.
    """
    print(f"Step 1: Converting {input_path} to Parquet in batches...")
    
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
        
    chunk_buffer = []
    batch_num = 0
    
    with open(input_path, 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            chunk_buffer.append(line)
            
            # When we hit the chunk size, save and clear memory
            if len(chunk_buffer) >= CHUNK_SIZE:
                _save_batch(chunk_buffer, output_folder, batch_num)
                batch_num += 1
                chunk_buffer = [] # Reset buffer
                gc.collect()      # Force memory cleanup
                print(f"   Processed {i+1} lines...")
                
        # Save the final remainder
        if chunk_buffer:
            _save_batch(chunk_buffer, output_folder, batch_num)

    print("Conversion complete.")

def _save_batch(lines, folder, batch_num):
    """Helper to write a batch of lines to a parquet file"""
    # Convert list of strings to bytes for Polars
    f = io.BytesIO("".join(lines).encode('utf-8'))
    
    # Read using Polars (fast)
    df = pl.read_ndjson(f, infer_schema_length=None, ignore_errors=True)
    
    # Enforce ID types to string to prevent mixing int/str errors
    df = df.with_columns([
                pl.col(ITEM_COL).cast(pl.String)
    ])
    
    # Write compressed parquet
    output_path = os.path.join(folder, f"part_{batch_num}.parquet")
    df.write_parquet(output_path)
    
    # Clean up
    del df
    del f


convert_to_parquet_batches("/kaggle/input/sports-and-outdoors-raw-amazon2023/meta_Sports_and_Outdoors.jsonl/meta_Sports_and_Outdoors.jsonl","/kaggle/working/temp_parquet_parts_meta")



Step 1: Converting /kaggle/input/sports-and-outdoors-raw-amazon2023/meta_Sports_and_Outdoors.jsonl/meta_Sports_and_Outdoors.jsonl to Parquet in batches...
   Processed 300000 lines...
   Processed 600000 lines...
   Processed 900000 lines...
   Processed 1200000 lines...
   Processed 1500000 lines...
Conversion complete.


# Taking the parquet batches and the 10-core interactions file to reduce the metadata to 10-core

In [10]:
import polars as pl
import glob
import os

def process_batches_to_jsonl(interaction_path, metadata_folder, output_path):
    print("--- Starting Process ---")
    
    # 1. OPTIMIZED LOAD of Interaction Keys
    # We only need the 'asin' column. We drop everything else to save RAM.
    # We also get unique values immediately to speed up the join.
    print(f"Loading interaction keys from {interaction_path}...")
    try:
        # Assuming interaction file is Parquet. If CSV, use read_csv.
        # We use select() immediately to avoid loading other columns.
        interaction_keys = (
            pl.read_ndjson(interaction_path)
            .select("parent_asin")
            .unique()
        )
    except Exception as e:
        print(f"Error loading interactions: {e}")
        return

    print(f"loaded {interaction_keys.height} unique items to keep.")

    # 2. Identify all metadata batches
    # Finds all files ending in .parquet in the specified folder
    parquet_files = glob.glob(os.path.join(metadata_folder, "*.parquet"))
    
    if not parquet_files:
        print("No .parquet files found in the metadata folder.")
        return

    print(f"Found {len(parquet_files)} metadata batches to process.")

    # 3. Loop, Filter, and Append
    # We open the output file in 'ab' (Append Binary) mode. 
    # This allows us to write chunk by chunk into one file.
    total_rows_saved = 0
    
    with open(output_path, "wb") as f_out:
        for i, file_path in enumerate(parquet_files):
            print(f"Processing batch {i+1}/{len(parquet_files)}: {os.path.basename(file_path)}")
            
            try:
                # Load the batch
                df_batch = pl.read_parquet(file_path)
                
                # Filter (Semi Join)
                # Keeps rows in df_batch only if asin is in interaction_keys
                df_filtered = df_batch.join(interaction_keys, on="parent_asin", how="semi")
                
                row_count = df_filtered.height
                
                if row_count > 0:
                    # Write to the open file handle
                    df_filtered.write_ndjson(f_out)
                    total_rows_saved += row_count
                
                # Explicitly remove dataframe to free up RAM for the next iteration
                del df_batch
                del df_filtered
                
            except Exception as e:
                print(f"Error processing file {file_path}: {e}")

    print("--- Processing Complete ---")
    print(f"Total rows written to {output_path}: {total_rows_saved}")

# --- Usage Instructions ---
# 1. Path to your big interaction file (10-core subset)
interaction_file = "/kaggle/working/sports_reviews_10core.jsonl" 

# 2. Folder containing your split metadata parquet files
metadata_temp_folder = "/kaggle/working/temp_parquet_parts_meta"

# 3. The final output file name
output_jsonl = "final_metadata_10_core.jsonl"

# Run the function
process_batches_to_jsonl(interaction_file, metadata_temp_folder, output_jsonl)

--- Starting Process ---
Loading interaction keys from /kaggle/working/sports_reviews_10core.jsonl...
loaded 17547 unique items to keep.
Found 6 metadata batches to process.
Processing batch 1/6: part_1.parquet
Processing batch 2/6: part_3.parquet
Processing batch 3/6: part_4.parquet
Processing batch 4/6: part_2.parquet
Processing batch 5/6: part_5.parquet
Processing batch 6/6: part_0.parquet
--- Processing Complete ---
Total rows written to final_metadata_10_core.jsonl: 17547
