In [1]:
import polars as pl
import os
import time
import gc

# Step 1: Generator function to yield chunks of data from all parquet files in the folder
def yield_parquet_chunks(extracted_folder, chunk_size=500000):
    i = 0
    print("Starting yield")
    for root, _, files in os.walk(extracted_folder):
        for file_name in files:
            if file_name.endswith('.parquet'):
                file_path = os.path.join(root, file_name)
                print(f"file path is: {file_path}")
                # Read the file in chunks
                for df_chunk in pl.read_parquet(file_path, use_pyarrow=True).iter_slices(chunk_size):
                    i += 1
                    if (i % 500 ==0):
                        print(f"processed {f} lines.")
                    yield df_chunk

# Step 2: Separate function for missing values check
def check_missing_values(df_chunk, total_missing_values=None):
    missing_values_chunk = df_chunk.null_count()
    if total_missing_values is None:
        total_missing_values = missing_values_chunk
    else:
        total_missing_values += missing_values_chunk
    return total_missing_values

# Step 3: Separate function for duplicates check
def check_duplicates(df_chunk, total_duplicates=0):
    duplicates_chunk = df_chunk.filter(df_chunk.is_duplicated()).height
    return total_duplicates + duplicates_chunk

# Step 4: Collect density values for distribution plots
def collect_density_values(df_chunk, density_values=[]):
    density_values.extend(df_chunk['density'].to_list())
    return density_values

In [None]:
extracted_folder = '/Users/karim/Downloads/density-2000/date=2000-01-01/'
chunk_size = 5000  # Adjust the chunk size based on your memory capacity

# Initialize accumulators
total_missing_values = None
total_duplicates = 0
density_values = []

# Measure time for processing
start_time = time.time()

# Process each chunk yielded by the generator
for df_chunk in yield_parquet_chunks(extracted_folder, chunk_size):
    # Apply different analysis functions on each chunk
    total_missing_values = check_missing_values(df_chunk, total_missing_values)
    total_duplicates = check_duplicates(df_chunk, total_duplicates)
    density_values = collect_density_values(df_chunk, density_values)
    
    # Clear the current chunk from memory
    df_chunk = None
    del df_chunk
    gc.collect()