In [1]:
import os
import logging
import time
import gc
import threading
import pandas as pd
from datetime import datetime
from typing import Dict, List, Any, Tuple
from collections import defaultdict
import pyarrow as pa
import pyarrow.parquet as pq

COLD_STORAGE_DIR = "cold_storage"
OUTPUT_DIR = "master_parquet_files"
BATCH_SIZE = 50000

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('options_master_parquet_generator.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


In [2]:
def ensure_output_directory():
    """Create output directory if it doesn't exist."""
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)
        logger.info(f"Created output directory: {OUTPUT_DIR}")
    else:
        logger.info(f"Using existing output directory: {OUTPUT_DIR}")

In [3]:
def get_file_size(file_path):
    """Get file size in bytes."""
    return os.path.getsize(file_path)

def format_size(size_bytes):
    """Format bytes into human readable format."""
    for unit in ['B', 'KB', 'MB', 'GB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.2f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.2f} TB"

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Standardize column names and ensure required columns exist."""
    column_mapping = {
        'o': 'open',
        'h': 'high', 
        'l': 'low',
        'c': 'close',
        'v': 'volume',
        'oi': 'open_interest'
    }
    
    df = df.rename(columns=column_mapping)
    
    required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume', 'open_interest']
    for col in required_columns:
        if col not in df.columns:
            if col == 'timestamp':
                logger.error(f"Missing critical timestamp column in dataframe")
                raise ValueError("Timestamp column is required")
            else:
                df[col] = None
                logger.warning(f"Added missing column '{col}' with NULL values")
    
    return df

def generate_option_symbol(underlying: str, expiry_str: str, strike: int, option_type: str) -> str:
    """Generate standardized option symbol."""
    try:
        exp_date = datetime.strptime(expiry_str, '%Y%m%d')
        symbol_suffix = 'CE' if option_type.upper() == 'CE' or option_type.lower() == 'call' else 'PE'
        return f"{underlying}{exp_date.strftime('%y%b').upper()}{strike}{symbol_suffix}"
    except Exception as e:
        logger.error(f"Error generating symbol for {underlying}-{expiry_str}-{strike}-{option_type}: {e}")
        return f"{underlying}_{expiry_str}_{strike}_{option_type}"

def parse_option_file_info(file_path: str, underlying: str) -> Tuple[str, str, int, str]:
    """Parse option file path to extract expiry, strike, and option type."""
    path_parts = file_path.split(os.sep)
    
    expiry = None
    strike = None
    option_type = None
    
    for part in path_parts:
        if len(part) == 8 and part.isdigit():
            expiry = part
        elif part.isdigit() and len(part) <= 6:
            strike = int(part)
    
    filename = os.path.basename(file_path)
    if 'CE' in filename.upper():
        option_type = 'call'
    elif 'PE' in filename.upper():
        option_type = 'put'
    elif '_call' in filename.lower():
        option_type = 'call'
    elif '_put' in filename.lower():
        option_type = 'put'
    
    if not all([expiry, strike is not None, option_type]):
        logger.warning(f"Could not fully parse file info from {file_path}")
        if not expiry:
            expiry = "00000000"
        if strike is None:
            strike = 0
        if not option_type:
            option_type = "unknown"
    
    return expiry, str(strike), strike, option_type

def collect_option_files(underlying: str) -> List[Dict[str, Any]]:
    """Collect all parquet files for a specific underlying."""
    logger.info(f"Collecting option files for {underlying}")
    
    underlying_path = os.path.join(COLD_STORAGE_DIR, "BSE", "Options", underlying)
    
    if not os.path.exists(underlying_path):
        logger.warning(f"Path does not exist: {underlying_path}")
        return []
    
    option_files = []
    total_size = 0
    
    for root, dirs, files in os.walk(underlying_path):
        for file in files:
            if file.endswith('.parquet'):
                file_path = os.path.join(root, file)
                file_size = get_file_size(file_path)
                total_size += file_size
                
                expiry, strike_str, strike_int, option_type = parse_option_file_info(file_path, underlying)
                
                option_files.append({
                    'file_path': file_path,
                    'expiry': expiry,
                    'strike_str': strike_str,
                    'strike_int': strike_int,
                    'option_type': option_type,
                    'file_size': file_size,
                    'relative_path': os.path.relpath(file_path, COLD_STORAGE_DIR)
                })
    
    logger.info(f"Found {len(option_files)} option files for {underlying} "
               f"(Total size: {format_size(total_size)})")
    
    return option_files

def process_option_file(file_info: Dict[str, Any], underlying: str) -> pd.DataFrame:
    """Process a single option parquet file and return standardized DataFrame."""
    file_path = file_info['file_path']
    
    try:
        df = pd.read_parquet(file_path)
        
        if df.empty:
            logger.warning(f"Empty file: {file_path}")
            return pd.DataFrame()
        
        df = standardize_columns(df)
        
        df['symbol'] = generate_option_symbol(
            underlying, 
            file_info['expiry'], 
            file_info['strike_int'], 
            file_info['option_type']
        )
        df['underlying'] = underlying
        df['expiry'] = pd.to_datetime(file_info['expiry'], format='%Y%m%d').date()
        df['strike'] = file_info['strike_int']
        df['option_type'] = file_info['option_type']
        
        if 'timestamp' in df.columns:
            df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        column_order = [
            'timestamp', 'symbol', 'underlying', 'expiry', 'strike', 'option_type',
            'open', 'high', 'low', 'close', 'volume', 'open_interest'
        ]
        
        available_columns = [col for col in column_order if col in df.columns]
        df = df[available_columns]
        
        logger.debug(f"Processed {file_path}: {len(df)} rows")
        return df
        
    except Exception as e:
        logger.error(f"Error processing file {file_path}: {e}")
        return pd.DataFrame()

def create_master_parquet_for_underlying(underlying: str) -> bool:
    """Create master parquet file for a specific underlying."""
    logger.info(f"Creating master parquet file for {underlying}")
    start_time = time.time()
    
    try:
        option_files = collect_option_files(underlying)
        
        if not option_files:
            logger.warning(f"No option files found for {underlying}")
            return False
        
        option_files.sort(key=lambda x: (x['expiry'], x['strike_int'], x['option_type']))
        
        all_dataframes = []
        processed_files = 0
        failed_files = 0
        total_rows = 0
        
        logger.info(f"Processing {len(option_files)} files for {underlying}")
        
        for i, file_info in enumerate(option_files):
            df = process_option_file(file_info, underlying)
            
            if not df.empty:
                all_dataframes.append(df)
                total_rows += len(df)
                processed_files += 1
            else:
                failed_files += 1
            
            if (i + 1) % 100 == 0:
                logger.info(f"Processed {i + 1}/{len(option_files)} files for {underlying}")
                
                if len(all_dataframes) >= 50:
                    combined_df = pd.concat(all_dataframes, ignore_index=True)
                    all_dataframes = [combined_df]
                    gc.collect()
        
        if not all_dataframes:
            logger.warning(f"No valid data found for {underlying}")
            return False
        
        logger.info(f"Combining {len(all_dataframes)} dataframe chunks for {underlying}")
        master_df = pd.concat(all_dataframes, ignore_index=True)
        
        logger.info(f"Sorting {len(master_df)} rows by timestamp for {underlying}")
        master_df = master_df.sort_values('timestamp').reset_index(drop=True)
        
        output_file = os.path.join(OUTPUT_DIR, f"{underlying.lower()}_options_master.parquet")
        logger.info(f"Saving master parquet file: {output_file}")
        
        table = pa.Table.from_pandas(master_df)
        pq.write_table(
            table, 
            output_file,
            compression='snappy',
            use_dictionary=True,
            row_group_size=50000
        )

        output_size = get_file_size(output_file)
        
        elapsed_time = time.time() - start_time
        
        logger.info(f"✅ Successfully created master parquet for {underlying}:")
        logger.info(f"   - Output file: {output_file}")
        logger.info(f"   - Total rows: {len(master_df):,}")
        logger.info(f"   - Processed files: {processed_files}/{len(option_files)}")
        logger.info(f"   - Failed files: {failed_files}")
        logger.info(f"   - Output size: {format_size(output_size)}")
        logger.info(f"   - Processing time: {elapsed_time:.2f} seconds")
        logger.info(f"   - Processing rate: {len(master_df) / elapsed_time:.0f} rows/second")
        
        if 'timestamp' in master_df.columns and not master_df['timestamp'].isna().all():
            min_date = master_df['timestamp'].min()
            max_date = master_df['timestamp'].max()
            logger.info(f"   - Date range: {min_date} to {max_date}")
        
        unique_symbols = master_df['symbol'].nunique() if 'symbol' in master_df.columns else 0
        unique_expiries = master_df['expiry'].nunique() if 'expiry' in master_df.columns else 0
        unique_strikes = master_df['strike'].nunique() if 'strike' in master_df.columns else 0
        
        logger.info(f"   - Unique symbols: {unique_symbols}")
        logger.info(f"   - Unique expiries: {unique_expiries}")
        logger.info(f"   - Unique strikes: {unique_strikes}")
        
        return True
        
    except Exception as e:
        elapsed_time = time.time() - start_time
        logger.error(f"❌ Failed to create master parquet for {underlying} "
                    f"after {elapsed_time:.2f} seconds: {e}")
        return False
    finally:
        gc.collect()

def find_available_underlyings() -> List[str]:
    """Find all available underlyings in the cold storage directory."""
    logger.info("Scanning for available underlyings in cold storage")
    
    options_path = os.path.join(COLD_STORAGE_DIR, "BSE", "Options")
    
    if not os.path.exists(options_path):
        logger.error(f"Options path does not exist: {options_path}")
        return []
    
    underlyings = []
    for item in os.listdir(options_path):
        item_path = os.path.join(options_path, item)
        if os.path.isdir(item_path):
            underlyings.append(item)
    
    logger.info(f"Found {len(underlyings)} underlyings: {underlyings}")
    return underlyings

def generate_final_summary():
    """Generate final summary of created master parquet files."""
    logger.info("Generating final summary of created master parquet files")
    
    if not os.path.exists(OUTPUT_DIR):
        logger.warning("Output directory does not exist")
        return
    
    master_files = [f for f in os.listdir(OUTPUT_DIR) if f.endswith('_options_master.parquet')]
    
    if not master_files:
        logger.warning("No master parquet files found")
        return
    
    logger.info(f"Summary of {len(master_files)} created master parquet files:")
    
    total_size = 0
    for file in sorted(master_files):
        file_path = os.path.join(OUTPUT_DIR, file)
        file_size = get_file_size(file_path)
        total_size += file_size
        
        underlying = file.replace('_options_master.parquet', '').upper()
        
        try:
            parquet_file = pq.ParquetFile(file_path)
            row_count = parquet_file.metadata.num_rows
            logger.info(f"  - {underlying}: {row_count:,} rows ({format_size(file_size)})")
        except Exception as e:
            logger.info(f"  - {underlying}: {format_size(file_size)} (row count unavailable)")
    
    logger.info(f"Total size of all master files: {format_size(total_size)}")

In [4]:
script_start_time = time.time()
logger.info("=" * 70)
logger.info("Starting Options Master Parquet File Generation")
logger.info("=" * 70)

try:
    ensure_output_directory()
    
    underlyings = find_available_underlyings()
    
    if not underlyings:
        logger.error("No underlyings found. Exiting.")
    
    successful_underlyings = []
    failed_underlyings = []
    
    for underlying in underlyings:
        logger.info(f"Starting processing for underlying: {underlying}")
        
        if create_master_parquet_for_underlying(underlying):
            successful_underlyings.append(underlying)
            logger.info(f"✅ Successfully completed {underlying}")
        else:
            failed_underlyings.append(underlying)
            logger.error(f"❌ Failed to process {underlying}")
    
    generate_final_summary()
    
    script_elapsed_time = time.time() - script_start_time
    logger.info("=" * 70)
    logger.info(f"🎉 Script completed in {script_elapsed_time:.2f} seconds!")
    logger.info(f"✅ Successfully processed: {len(successful_underlyings)} underlyings")
    
    if successful_underlyings:
        logger.info(f"   Success: {', '.join(successful_underlyings)}")
    
    logger.info(f"❌ Failed to process: {len(failed_underlyings)} underlyings")
    
    if failed_underlyings:
        logger.info(f"   Failed: {', '.join(failed_underlyings)}")
    
    if len(successful_underlyings) + len(failed_underlyings) > 0:
        success_rate = len(successful_underlyings) / (len(successful_underlyings) + len(failed_underlyings)) * 100
        logger.info(f"📊 Success rate: {success_rate:.1f}%")
    
    logger.info("=" * 70)
    
except Exception as e:
    script_elapsed_time = time.time() - script_start_time
    logger.error(f"Script failed after {script_elapsed_time:.2f} seconds: {e}")
    raise

2025-05-30 14:27:36,193 - INFO - Starting Options Master Parquet File Generation
2025-05-30 14:27:36,196 - INFO - Using existing output directory: master_parquet_files
2025-05-30 14:27:36,198 - INFO - Scanning for available underlyings in cold storage
2025-05-30 14:27:36,199 - INFO - Found 2 underlyings: ['BANKEX', 'SENSEX']
2025-05-30 14:27:36,200 - INFO - Starting processing for underlying: BANKEX
2025-05-30 14:27:36,201 - INFO - Creating master parquet file for BANKEX
2025-05-30 14:27:36,202 - INFO - Collecting option files for BANKEX
2025-05-30 14:27:36,265 - INFO - Found 1599 option files for BANKEX (Total size: 29.25 MB)
2025-05-30 14:27:36,266 - INFO - Processing 1599 files for BANKEX
2025-05-30 14:27:37,148 - INFO - Processed 100/1599 files for BANKEX
2025-05-30 14:27:38,275 - INFO - Processed 200/1599 files for BANKEX
2025-05-30 14:27:39,125 - INFO - Processed 300/1599 files for BANKEX
2025-05-30 14:27:40,014 - INFO - Processed 400/1599 files for BANKEX
2025-05-30 14:27:40,772