In [2]:
!pip install dask




[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import os
import pandas as pd
import csv
from glob import glob
from pathlib import Path
from multiprocessing import Pool, cpu_count
from functools import partial

# Base directory and output directory
base_dir = 'C:/Users/harsh/Downloads/aditya-l1-isro-main/ace_daily'
output_dir = 'C:/Users/harsh/Downloads/aditya-l1-isro-main/output_data'
os.makedirs(output_dir, exist_ok=True)

# File type configurations
file_types = {
    '_ace_epam_5m': {
        'suffix': '*_ace_epam_5m.txt',
        'skiprows': 14,
        'columns': [
            'YR', 'MO', 'DA', 'HHMM', 'Julian Day', 'Seconds of the Day',
            'Electron S', '38-53', '175-315', 'Proton S', '47-68', '115-195',
            '310-580', '795-1193', '1060-1900', 'Anis. Index'
        ],
        'na_values': ['-1.00e+05', '-1.00'],
        'dtypes': {
            'YR': 'int32', 'MO': 'int32', 'DA': 'int32', 'HHMM': 'str',
            'Julian Day': 'int32', 'Seconds of the Day': 'int32',
            'Electron S': 'int32', '38-53': 'float32', '175-315': 'float32',
            'Proton S': 'int32', '47-68': 'float32', '115-195': 'float32',
            '310-580': 'float32', '795-1193': 'float32', '1060-1900': 'float32',
            'Anis. Index': 'float32'
        }
    },
    '_ace_mag_1m': {
        'suffix': '*_ace_mag_1m.txt',
        'skiprows': 12,
        'columns': [
            'YR', 'MO', 'DA', 'HHMM', 'Julian Day', 'Seconds of the Day',
            'S', 'Bx', 'By', 'Bz', 'Bt', 'Lat.', 'Long.'
        ],
        'na_values': ['-999.9'],
        'dtypes': {
            'YR': 'int32', 'MO': 'int32', 'DA': 'int32', 'HHMM': 'str',
            'Julian Day': 'int32', 'Seconds of the Day': 'int32',
            'S': 'int32', 'Bx': 'float32', 'By': 'float32', 'Bz': 'float32',
            'Bt': 'float32', 'Lat.': 'float32', 'Long.': 'float32'
        }
    },
    '_ace_sis_5m': {
        'suffix': '*_ace_sis_5m.txt',
        'skiprows': 12,
        'columns': [
            'YR', 'MO', 'DA', 'HHMM', 'Julian Day', 'Seconds of the Day',
            'S (>10 MeV)', '>10 MeV', 'S (>30 MeV)', '>30 MeV'
        ],
        'na_values': ['-1.00e+05'],
        'dtypes': {
            'YR': 'int32', 'MO': 'int32', 'DA': 'int32', 'HHMM': 'str',
            'Julian Day': 'int32', 'Seconds of the Day': 'int32',
            'S (>10 MeV)': 'int32', '>10 MeV': 'float32',
            'S (>30 MeV)': 'int32', '>30 MeV': 'float32'
        }
    },
    '_ace_swepam_1m': {
        'suffix': '*_ace_swepam_1m.txt',
        'skiprows': 12,
        'columns': [
            'YR', 'MO', 'DA', 'HHMM', 'Julian Day', 'Seconds of the Day',
            'S', 'Proton Density', 'Bulk Speed', 'Ion Temperature'
        ],
        'na_values': ['-9999.9', '-1.00e+05'],
        'dtypes': {
            'YR': 'int32', 'MO': 'int32', 'DA': 'int32', 'HHMM': 'str',
            'Julian Day': 'int32', 'Seconds of the Day': 'int32',
            'S': 'int32', 'Proton Density': 'float32', 'Bulk Speed': 'float32',
            'Ion Temperature': 'float32'
        }
    }
}

def load_ace_file(file_path, config):
    """
    Load an ACE file using csv.reader for faster parsing.
    """
    try:
        with open(file_path, 'r') as f:
            # Skip metadata lines and read data
            lines = f.readlines()[config['skiprows']:]
            data = [row.split() for row in lines if row.strip()]
        df = pd.DataFrame(data, columns=config['columns'])
        # Apply dtypes
        for col, dtype in config['dtypes'].items():
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors='coerce').astype(dtype)
        # Replace na_values with NaN
        for na_value in config['na_values']:
            df.replace(na_value, pd.NA, inplace=True)
        # Create datetime column
        df['datetime'] = pd.to_datetime(
            df['YR'].astype(str) + df['MO'].astype(str).str.zfill(2) +
            df['DA'].astype(str).str.zfill(2) + df['HHMM'].str.zfill(4),
            format='%Y%m%d%H%M',
            errors='coerce'
        )
        df['source_file'] = os.path.basename(file_path)
        cols = ['datetime'] + [col for col in config['columns'] if col not in ['datetime', 'source_file']] + ['source_file']
        return df[cols]
    except Exception as e:
        print(f"Error reading {file_path}: {e}")
        return pd.DataFrame()

def process_file_type(file_type, config, files):
    """
    Process all files of a given type using multiprocessing.
    """
    try:
        if not files:
            print(f"No files found for {file_type}")
            return
        # Parallel processing with multiprocessing
        with Pool(processes=cpu_count()) as pool:
            results = pool.map(partial(load_ace_file, config=config), files)
        # Filter out empty DataFrames and concatenate
        dfs = [df for df in results if not df.empty]
        if dfs:
            combined_df = pd.concat(dfs, ignore_index=True)
            output_file = os.path.join(output_dir, f"{file_type[1:]}_data.parquet")
            combined_df.to_parquet(output_file, engine='pyarrow', index=False)
            print(f"Saved {output_file} with {len(combined_df)} rows")
        else:
            print(f"No data found for {file_type}")
    except Exception as e:
        print(f"Error processing {file_type}: {e}")

# Process each file type
for file_type, config in file_types.items():
    files = glob(os.path.join(base_dir, '*', config['suffix']))
    process_file_type(file_type, config, files)