In [None]:
import os
import pandas as pd
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import logging
from pathlib import Path

@dataclass
class ProcessingConfig:

    #======================================== SETTINGS ====================================================
    base_ori_folder: str = r'c:\Users\Desktop\Ori Data' 
    base_dest_folder: str = r'c:\Users\Desktop\Clean Data'

    # Date checking interval, day='D', hour='h', min='ME'
    frequency: str = 'ME'
    #======================================================================================================

    time_column: str = 'start_time' # for date checking
    required_columns: List[str] = None
    columns_to_drop: List[str] = None
    
    def __post_init__(self):
        self.required_columns = ['start_time', 'end_time', 'close', 'volume']
        self.columns_to_drop = ['open', 'high', 'low', 'blockheight', 't', 'date', 'datetime']

class DataProcessor:
    def __init__(self, config: ProcessingConfig):
        self.config = config
        self.setup_logging()
        
    def setup_logging(self):
        """Setup logging configuration"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)

    @staticmethod
    def get_log_file_path(dest_folder: str, log_type: str, folder_name: str) -> str:
        """Generate log file path with folder name as prefix"""
        return os.path.join(dest_folder, 'log', f'{folder_name}_{log_type}_log.txt')

    def write_log(self, log_path: str, messages: List[str]) -> None:
        """Write messages to log file if there are any"""
        if messages:
            os.makedirs(os.path.dirname(log_path), exist_ok=True)
            with open(log_path, 'a') as f:
                f.write('\n'.join(messages) + '\n')

    def check_dates(self, df: pd.DataFrame) -> Tuple[bool, Optional[pd.DatetimeIndex]]:
        """Check for missing dates in the dataframe"""
        try:
            # df[self.config.time_column] = pd.to_datetime(df[self.config.time_column])
            df[self.config.time_column] = pd.to_datetime(df[self.config.time_column], dayfirst=True)
            if df[self.config.time_column].isna().all():
                return False, "Empty or invalid date column"
                
            date_range = pd.date_range(
                start=df[self.config.time_column].min(),
                end=df[self.config.time_column].max(),
                freq=self.config.frequency
            )
            missing_dates = date_range.difference(df[self.config.time_column])
            return missing_dates.empty, missing_dates
        except Exception as e:
            self.logger.error(f"Error in check_dates: {str(e)}")
            return False, None
        
    def separate_and_save_factors(self, df: pd.DataFrame, file_path: str) -> Dict[str, pd.DataFrame]:
        """Separate factors and return dictionary of factor dataframes"""
        factor_dfs = {}
        factor_columns = [col for col in df.columns if col not in self.config.required_columns]
        
        for factor in factor_columns:
            columns_to_keep = self.config.required_columns + [factor]
            factor_df = df[columns_to_keep].copy()
            factor_dfs[factor] = factor_df
            
        return factor_dfs

    def save_factor_df(self, factor_df: pd.DataFrame, base_name: str, factor: str, has_nan: bool = False, has_missing_dates: bool = False) -> None:
        """Save individual factor DataFrame with appropriate filename"""
        
        factor_name = factor.replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_') \
                            .replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_')
        
        # Construct filename with factor name first
        filename = f"{base_name} ({factor_name})"
        
        # Add suffixes in the desired order
        if has_nan:
            filename = f"{filename} -fillna"
            
        if has_missing_dates:
            filename = f"{filename} -md"
        
        # Add the extension
        filename = f"{filename}.csv"
        
        factor_df.to_csv(filename, index=False)     

    def process_single_file(self, file_path: str, dest_folder: str) -> Dict[str, List[str]]:
        """Process a single CSV file and return log messages"""
        logs = {
            'empty_data': [],
            'missing_dates': [],
            'nan': [],
            'duplicates': []
        }
        
        try:
            filename = os.path.basename(file_path)
            df = pd.read_csv(file_path)
            base_output_path = os.path.join(dest_folder, os.path.splitext(filename)[0])
            
            # Check dates
            is_continuous, missing_dates = self.check_dates(df)
            has_missing_dates = False
            
            if isinstance(missing_dates, str):
                logs['empty_data'].append(f"{filename} has an empty or invalid date column.")
                return logs
            
            if not is_continuous:
                has_missing_dates = True
                min_date = df[self.config.time_column].min()
                max_date = df[self.config.time_column].max()
                total_dates = pd.date_range(start=min_date, end=max_date, freq=self.config.frequency)
                missing_percentage = (len(missing_dates) / len(total_dates)) * 100
                
                logs['missing_dates'].extend([
                    f"{filename} has missing dates: {', '.join(missing_dates.astype(str))}",
                    f"Missing {len(missing_dates)} out of {len(total_dates)} ({missing_percentage:.2f}%)\n"
                ])

            # Rename volume columns if needed
            if 'volume_x' in df.columns:
                df = df.rename(columns={'volume_x': 'volume'})
            
            # Drop unwanted columns
            existing_columns = [col for col in self.config.columns_to_drop if col in df.columns]
            if existing_columns:
                df.drop(columns=existing_columns, inplace=True)
            
            # Handle duplicates
            original_rows = len(df)
            df.drop_duplicates(inplace=True)
            if len(df) < original_rows:
                logs['duplicates'].append(
                    f"{filename} has duplicates and {original_rows - len(df)} row(s) were removed."
                )
            
            # Separate factors
            factor_dfs = self.separate_and_save_factors(df, base_output_path)
            
            # Handle NaN values
            for factor, factor_df in factor_dfs.items():
                has_nan = factor_df.isna().any().any()
                if has_nan:
                    factor_df.fillna(0, inplace=True)
                    logs['nan'].append(f"{filename} ({factor}) has NaN values and have filled with 0.")
                
                # Save the processed factor DataFrame
                self.save_factor_df(
                    factor_df,
                    base_output_path,
                    factor,
                    has_nan=has_nan,
                    has_missing_dates=has_missing_dates
                )
            
            return logs
            
        except Exception as e:
            self.logger.error(f"Error processing {file_path}: {str(e)}")
            return logs

    def parallel_process_files(self, folder_name: str) -> None:
        """Parallel process all csv files in a folder"""
        try:
            ori_folder = os.path.join(self.config.base_ori_folder, folder_name)
            dest_folder = os.path.join(self.config.base_dest_folder, folder_name)
            os.makedirs(dest_folder, exist_ok=True)
            
            self.logger.info(f"Processing folder: {folder_name}")
            
            # Process all CSV files in parallel
            csv_files = [f for f in os.listdir(ori_folder) if f.endswith('.csv')]
            all_logs = {
                'empty_data': [],
                'missing_dates': [],
                'nan': [],
                'duplicates': []
            }
            
            with ThreadPoolExecutor() as executor:
                futures = [
                    executor.submit(
                        self.process_single_file,
                        os.path.join(ori_folder, file),
                        dest_folder
                    )
                    for file in csv_files
                ]
                
                for future in futures:
                    logs = future.result()
                    for key in logs:
                        all_logs[key].extend(logs[key])
            
            # Write logs
            for log_type, messages in all_logs.items():
                self.write_log(
                    self.get_log_file_path(dest_folder, log_type, folder_name),
                    messages
                )
                
            self.logger.info(f"Completed processing {folder_name}")
            
        except Exception as e:
            self.logger.error(f"Error processing folder {folder_name}: {str(e)}")

    def process_all_folders(self) -> None:
        """Process all folders in the base directory"""
        all_folders = [
            f for f in os.listdir(self.config.base_ori_folder)
            if os.path.isdir(os.path.join(self.config.base_ori_folder, f))
        ]
        
        for folder in all_folders:
            self.parallel_process_files(folder)

def main():
    config = ProcessingConfig()
    processor = DataProcessor(config)
    processor.process_all_folders()

if __name__ == "__main__":
    main()

In [None]:
# base_ori_folder structure #

# Ori Data
# └── Folder 1
#     └── csv
# └── Folder 2
#     └── csv