In [None]:
import pandas as pd
import numpy as np
from pathlib import Path

def process_parquet_files(data_folder):
    # Get all parquet files in the folder
    parquet_files = list(Path(data_folder).glob("*.parquet"))
    
    # List to store individual DataFrames
    dataframes = []
    for file_path in parquet_files:
          print(f"Processing: {file_path}")
          # Read parquet file
          df = pd.read_parquet(file_path)
          # Apply your preprocessing here
          df_processed = preprocess_data(df)
          # Save processed data back to the same file (overwrite)
          df_processed.to_parquet(file_path, index=False)
def preprocess_data(df):
     """Your preprocessing steps here"""
    
     # 1. Handle missing values
     df['Date']=pd.to_datetime(df['Date'])
     df = df.set_index('Date').sort_index()
     start_date = df.index.min()  # or specify: pd.Timestamp('2023-01-01')
     end_date = df.index.max()    # or specify: pd.Timestamp('2023-12-31')
     complete_date_range = pd.date_range(start=start_date, end=end_date, freq='D')
     missing_dates = complete_date_range.difference(df.index)
     if len(missing_dates) > 0:
          print("Missing dates:", missing_dates.tolist())
            
          # Reindex to include missing dates and forward fill
          df = df.reindex(complete_date_range)
          df = df.ffill()  # Forward fill missing values
            
            # Reset index to get date column back
          df = df.reset_index()
          df = df.rename(columns={'index': 'Date'}) 
     if 'Close' in df.columns:
          # Sort by date to ensure correct calculation
          df = df.sort_values('Date')
          ohlc_columns = ['open', 'high', 'low', 'close']
    
     for col in ohlc_columns:
          if col in df.columns:
               # Basic returns
               df[f'{col}_return'] = df[col].pct_change()
               df[f'{col}_log_return'] = np.log(df[col] / df[col].shift(1))
               df[f'{col}_return_pct'] = df[f'{col}_return'] * 100
               
               # Rolling statistics for each price series
               df[f'{col}_return_rolling_7d'] = df[f'{col}_return'].rolling(window=7).mean()
               df[f'{col}_return_volatility_30d'] = df[f'{col}_return'].rolling(window=30).std()
    
     # Special calculations for close price (most important)
     if 'close' in df.columns:
          df['daily_return'] = df['close'].pct_change()  # Primary returns column
          df['log_daily_return'] = np.log(df['close'] / df['close'].shift(1))
          df['daily_return_pct'] = df['daily_return'] * 100
          
          # Cumulative returns based on close price
          df['cumulative_return'] = (1 + df['daily_return']).cumprod() - 1
          df['log_cumulative_return'] = df['log_daily_return'].cumsum()
     # Volume analysis
     if 'volume' in df.columns:
          df['volume_pct_change'] = df['volume'].pct_change()
          df['volume_log_change'] = np.log(df['volume'] / df['volume'].shift(1))
          
          # Volume rolling averages
          df['volume_ma_7'] = df['volume'].rolling(window=7).mean()
          df['volume_ma_30'] = df['volume'].rolling(window=30).mean()
     
     # Additional OHLC-based metrics
     if all(col in df.columns for col in ['high', 'low']):
          df['daily_range'] = df['high'] - df['low']
          df['daily_range_pct'] = (df['daily_range'] / df['close'].shift(1)) * 100
     
     if all(col in df.columns for col in ['open', 'close']):
          df['overnight_gap'] = (df['open'] - df['close'].shift(1)) / df['close'].shift(1)
          df['intraday_move'] = (df['close'] - df['open']) / df['open']     
    
     return df

# Usage
data_folder = "/Users/vanshaj/Work/GitHub/Quant_Labs/Application/Data/Assets Data/EQUITY/INDIA"
combined_data = process_parquet_files(data_folder)