In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os

# Define file paths
CSV_INPUT = '../data/2019-Oct.csv'
PARQUET_OUTPUT = '../data/2019-Oct_chunk.parquet'
CHUNK_SIZE = 500000

# Optimization Schema: Define downcasted types to save memory during loading
dtype_optimization = {
    'event_type': 'category',
    'category_id': 'int64',
    'product_id': 'int32',
    'user_id': 'int32',
    'price': 'float32'
}

print('Starting CSV to Parquet conversion with memory optimization...')

writer = None
try:
    # Use chunksize to stream the large CSV file instead of loading it at once
    df_reader = pd.read_csv(CSV_INPUT, chunksize=CHUNK_SIZE, dtype=dtype_optimization)

    for i, chunk in enumerate(df_reader):
        # 1. Standardize timestamp: Convert to datetime and remove timezone info
        chunk['event_time'] = pd.to_datetime(chunk['event_time']).dt.tz_localize(None)
        
        # 2. Handle missing data: Fill NaN values and optimize memory using categories
        chunk['brand'] = chunk['brand'].fillna('unknown').astype('category')
        chunk['category_code'] = chunk['category_code'].fillna('unknown').astype('category')

        # 3. Convert current pandas DataFrame chunk to PyArrow Table
        table = pa.Table.from_pandas(chunk)

        # 4. Initialize ParquetWriter with the first chunk's schema
        if writer is None:
            writer = pq.ParquetWriter(PARQUET_OUTPUT, table.schema, compression='snappy')

        # 5. Incrementally write the chunk to the destination file
        writer.write_table(table)

        # 6. Log progress every 10 chunks (approx. 5 million rows)
        if i % 10 == 0:
            print(f"Progress: Processed {i * CHUNK_SIZE} rows...")

except Exception as e:
    print(f"Error encountered: {e}")

finally:
    # Safely close the writer to finalize the Parquet file
    if writer:
        writer.close()
        final_size = os.path.getsize(PARQUET_OUTPUT) / (1024**2)
        print(f"Final Parquet file saved. Size: {final_size:.2f} MB")

Starting CSV to Parquet conversion with memory optimization...
Progress: Processed 0 rows...
Progress: Processed 5000000 rows...
Progress: Processed 10000000 rows...
Progress: Processed 15000000 rows...
Progress: Processed 20000000 rows...
Progress: Processed 25000000 rows...
Progress: Processed 30000000 rows...
Progress: Processed 35000000 rows...
Progress: Processed 40000000 rows...
Final Parquet file saved. Size: 1295.44 MB
