## Imports

In [1]:
import xarray as xr
import pandas as pd
import os
import numpy as np
import sys
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from metpy.calc import advection
from metpy.units import units


In [2]:
import warnings

warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)

In [3]:
# Add the folder to the Python path

os.chdir("../")
# change working directory to project's root path
print(os.getcwd())

folder_path = os.path.abspath("functions/") #INPUT_PATH)#'path_to_your_folder')  # Replace with the actual folder path
sys.path.insert(0, folder_path)

c:\Users\marti\Desktop\data\hw_extra


In [4]:
from IndexDrivers import (
    AnomaliesIndex,
    MaxIndex, 
    calculate_anomalies
)
from PredictorsDrivers import (
    Predictor
)

In [5]:
meta_indices = pd.read_csv("data/my_indices/metadata.csv")

In [6]:
import os
import pandas as pd
import re
import numpy as np

def deduplicate_metadata(file_path, indices_dir="data/my_indices/"):
    # Read the CSV file
    df = pd.read_csv(file_path)
    
    # Identify the columns to check for duplicates
    duplicate_check_columns = [
        'method', 'rolling', 'variables', 'boxes', 
        'reference_period', 'target_period'
    ]
    
    # Print original number of rows
    print(f"Original number of rows: {len(df)}")
    
    # Dictionary to store the mapping from unique combination to latest id
    latest_id_by_key = {}
    
    # Dictionary to store all ids associated with each unique combination
    all_ids_by_key = {}
    
    # Iterate through the dataframe rows in order
    for index, row in df.iterrows():
        # Create a tuple of the values in the duplicate check columns
        key = tuple(row[duplicate_check_columns])
        
        # Store the id, overwriting any previous id with the same key
        current_id = row['id']
        
        # Track all ids associated with this key
        if key not in all_ids_by_key:
            all_ids_by_key[key] = []
        all_ids_by_key[key].append(current_id)
        
        # Update the latest id for this key
        latest_id_by_key[key] = current_id
    
    # Create mapping dictionary from any id to its preserved id
    id_mapping = {}
    verification_results = {}
    
    for key, ids in all_ids_by_key.items():
        preserved_id = latest_id_by_key[key]
        
        for id_val in ids:
            id_mapping[id_val] = preserved_id
            
            # Skip verification if id is the preserved id (comparing with itself)
            if id_val == preserved_id:
                verification_results[id_val] = "SELF"
                continue
            
            # Verify that corresponding dataframes are identical
            try:
                old_df_path = os.path.join(indices_dir, f"index_{id_val}.parquet")
                preserved_df_path = os.path.join(indices_dir, f"index_{preserved_id}.parquet")
                
                if os.path.exists(old_df_path) and os.path.exists(preserved_df_path):
                    old_df = pd.read_parquet(old_df_path)
                    preserved_df = pd.read_parquet(preserved_df_path)
                    
                    # Convert to numpy arrays and check if they're equal
                    old_array = old_df.values
                    preserved_array = preserved_df.values
                    
                    # Check if shapes match
                    if old_array.shape != preserved_array.shape:
                        verification_results[id_val] = f"SHAPE_MISMATCH: {old_array.shape} vs {preserved_array.shape}"
                    # Check if column names match
                    elif not np.array_equal(old_df.columns, preserved_df.columns):
                        verification_results[id_val] = "COLUMN_NAMES_MISMATCH"
                    # Check if values are identical
                    elif np.array_equal(old_array, preserved_array):
                        verification_results[id_val] = "IDENTICAL"
                    else:
                        # Check for almost equal (floating point differences)
                        if np.issubdtype(old_array.dtype, np.number) and np.issubdtype(preserved_array.dtype, np.number):
                            if np.allclose(old_array, preserved_array, rtol=1e-5, atol=1e-8, equal_nan=True):
                                verification_results[id_val] = "ALMOST_IDENTICAL"
                            else:
                                verification_results[id_val] = "DIFFERENT_VALUES"
                        else:
                            verification_results[id_val] = "DIFFERENT_VALUES"
                else:
                    if not os.path.exists(old_df_path):
                        verification_results[id_val] = f"MISSING_FILE: {old_df_path}"
                    elif not os.path.exists(preserved_df_path):
                        verification_results[id_val] = f"MISSING_FILE: {preserved_df_path}"
                    
            except Exception as e:
                verification_results[id_val] = f"ERROR: {str(e)}"
    
    # Print verification summary
    print("\nVerification Results:")
    status_counts = {"IDENTICAL": 0, "ALMOST_IDENTICAL": 0, "DIFFERENT_VALUES": 0, 
                     "SHAPE_MISMATCH": 0, "COLUMN_NAMES_MISMATCH": 0, "MISSING_FILE": 0, 
                     "ERROR": 0, "SELF": 0}
    
    for id_val, status in verification_results.items():
        base_status = status.split(":")[0] if ":" in status else status
        status_counts[base_status] = status_counts.get(base_status, 0) + 1
    
    for status, count in status_counts.items():
        print(f"  {status}: {count}")
    
    # List any problematic comparisons
    problems = [id_val for id_val, status in verification_results.items() 
                if not (status == "IDENTICAL" or status == "ALMOST_IDENTICAL" or status == "SELF")]
    
    if problems:
        print("\nProblematic ID comparisons:")
        for id_val in problems[:10]:  # Limit to first 10 to avoid huge output
            print(f"  {id_val} -> {id_mapping[id_val]}: {verification_results[id_val]}")
        if len(problems) > 10:
            print(f"  ... and {len(problems) - 10} more")
    
    # Assert that all comparisons are valid
    invalid_comparisons = [id_val for id_val, status in verification_results.items() 
                          if not (status == "IDENTICAL" or status == "ALMOST_IDENTICAL" or status == "SELF")]
    
    if invalid_comparisons:
        print("\nWARNING: Some dataframes are not identical to their preserved versions!")
        print("This could indicate data inconsistency in duplicated IDs.")
        print("Consider reviewing these files before proceeding.")
    else:
        print("\nAll verification checks passed. Duplicated IDs reference identical data.")
    
    return id_mapping, verification_results

def rename_columns_in_parquet_files(directory_path, id_mapping):
    # Regular expression to match the column name format "{id}-{var}"
    pattern = re.compile(r"([a-zA-Z0-9]+)-(.+)")
    
    # Count variables for summary
    total_files = 0
    modified_files = 0
    renamed_columns = 0
    
    # List all parquet files in the directory that match the pattern
    parquet_files = [f for f in os.listdir(directory_path) 
                     if f.startswith("predictor_") and f.endswith(".parquet")]
    
    print(f"Found {len(parquet_files)} parquet files to process")
    
    for file_name in parquet_files:
        total_files += 1
        file_path = os.path.join(directory_path, file_name)
        
        # Read the parquet file
        try:
            df = pd.read_parquet(file_path)
            original_columns = df.columns.tolist()
            
            # Create new column names dictionary
            column_mapping = {}
            file_modified = False
            
            for column in original_columns:
                match = pattern.match(column)
                if match:
                    old_id = match.group(1)
                    var_part = match.group(2)
                    
                    # Check if this ID needs to be replaced
                    if old_id in id_mapping and old_id != id_mapping[old_id]:
                        new_column = f"{id_mapping[old_id]}-{var_part}"
                        column_mapping[column] = new_column
                        renamed_columns += 1
                        file_modified = True
            
            # Rename columns if needed
            if file_modified:
                df = df.rename(columns=column_mapping)
                modified_files += 1
                
                # Save the file with renamed columns
                df.to_parquet(file_path, index=False)
                
                print(f"Modified {file_name}: renamed {len(column_mapping)} columns")
                
        except Exception as e:
            print(f"Error processing {file_name}: {str(e)}")
    
    # Print summary
    print(f"\nSummary:")
    print(f"Total files processed: {total_files}")
    print(f"Files modified: {modified_files}")
    print(f"Columns renamed: {renamed_columns}")

def update_experiment_metadata(experiment_metadata_path, id_mapping):
    """
    Update the indices column in the experiment metadata file by replacing IDs with their preserved versions.
    
    Parameters:
    experiment_metadata_path (str): Path to the experiment metadata CSV file
    id_mapping (dict): Dictionary mapping from original IDs to preserved IDs
    """
    try:
        # Read the experiment metadata CSV
        exp_df = pd.read_csv(experiment_metadata_path)
        
        print(f"\nUpdating experiment metadata file: {experiment_metadata_path}")
        print(f"Original rows: {len(exp_df)}")
        
        # Function to replace IDs in the indices column
        def replace_indices(indices_str):
            # Split the indices string by hyphens
            indices = indices_str.split('-')
            
            # Replace each index with its preserved version if it exists in the mapping
            for i, index in enumerate(indices):
                if index in id_mapping:
                    indices[i] = id_mapping[index]
            
            # Join the indices back with hyphens
            return '-'.join(indices)
        
        # Apply the function to the indices column
        original_indices = exp_df['indices'].copy()
        exp_df['indices'] = exp_df['indices'].apply(replace_indices)
        
        # Count how many rows were modified
        modified_rows = sum(original_indices != exp_df['indices'])
        
        # Save the updated dataframe back to CSV
        exp_df.to_csv(experiment_metadata_path, index=False)
        
        print(f"Updated experiment metadata:")
        print(f"  Modified rows: {modified_rows} out of {len(exp_df)}")
        
        # Show a sample of modifications if any were made
        if modified_rows > 0:
            print("\nSample of changes (first 3):")
            changes = 0
            for i, (orig, new) in enumerate(zip(original_indices, exp_df['indices'])):
                if orig != new:
                    print(f"  Row {i}:")
                    print(f"    Original: {orig}")
                    print(f"    Updated:  {new}")
                    changes += 1
                    if changes >= 3:
                        break
            
        return modified_rows
        
    except Exception as e:
        print(f"Error updating experiment metadata: {str(e)}")
        return 0

# File paths
indices_metadata_file = "data/my_indices/metadata.csv"  # Path to your indices metadata CSV file
indices_directory = "data/my_indices/"  # Directory containing index parquet files
parquet_directory = "data/climate_features/chile"  # Directory for predictor parquet files
experiment_metadata_file = "data/climate_features/chile/metadata.csv"  # Experiment metadata

# Get the ID mapping and verify data integrity
id_mapping, verification_results = deduplicate_metadata(indices_metadata_file, indices_directory)

# Flag to track if verification passed
verification_passed = not any(status not in ["IDENTICAL", "ALMOST_IDENTICAL", "SELF"] 
                                for status in verification_results.values())

if verification_passed or input("\nProceed despite verification issues? (y/n): ").lower() == 'y':
    # Process all parquet files
    rename_columns_in_parquet_files(parquet_directory, id_mapping)
    
    # Update experiment metadata file
    if os.path.exists(experiment_metadata_file):
        update_experiment_metadata(experiment_metadata_file, id_mapping)
    else:
        print(f"\nExperiment metadata file not found: {experiment_metadata_file}")

Original number of rows: 177

Verification Results:
  IDENTICAL: 124
  ALMOST_IDENTICAL: 0
  DIFFERENT_VALUES: 4
  SHAPE_MISMATCH: 0
  COLUMN_NAMES_MISMATCH: 0
  MISSING_FILE: 0
  ERROR: 0
  SELF: 49

Problematic ID comparisons:
  8334b687 -> fde0e327: DIFFERENT_VALUES
  e19aa330 -> fde0e327: DIFFERENT_VALUES
  f25567c1 -> 340e2882: DIFFERENT_VALUES
  19496680 -> 340e2882: DIFFERENT_VALUES

This could indicate data inconsistency in duplicated IDs.
Consider reviewing these files before proceeding.
Found 216 parquet files to process
Modified predictor_1b939ac5_1.parquet: renamed 24 columns
Modified predictor_1b939ac5_10.parquet: renamed 24 columns
Modified predictor_1b939ac5_11.parquet: renamed 24 columns
Modified predictor_1b939ac5_12.parquet: renamed 24 columns
Modified predictor_1b939ac5_2.parquet: renamed 24 columns
Modified predictor_1b939ac5_3.parquet: renamed 24 columns
Modified predictor_1b939ac5_4.parquet: renamed 24 columns
Modified predictor_1b939ac5_5.parquet: renamed 24 colu