# Climate Data Processing

This notebook processes the ingested climate data, performing cleaning, transformation, and feature engineering to prepare it for analytics and visualization in Databricks.

## Setup and Imports

In [None]:
# Import required libraries
import os
import pandas as pd
import numpy as np
from datetime import datetime

# In a real Databricks environment, we would use:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ClimateDataProcessing").getOrCreate()

# Define data directories - in Databricks these would typically be in DBFS
DATA_DIR = "/dbfs/FileStore/climate_resilience/datasets"
PROCESSED_DIR = "/dbfs/FileStore/climate_resilience/processed"
os.makedirs(PROCESSED_DIR, exist_ok=True)

print("Climate Data Processing environment initialized.")

## Load Data

This function loads the ingested climate data from Delta Lake.

In [None]:
def load_data():
    """
    Loads the ingested climate data from Delta Lake
    """
    print("Loading climate data for processing...")
    
    try:
        # In a real Databricks environment, we would use:
        df = spark.read.format("delta").load("/dbfs/FileStore/climate_resilience/delta/climate_data").toPandas()
        print("Loaded data from Delta Lake")
        return df
    except Exception as e:
        print(f"Error loading data from Delta Lake: {e}")
        print("Falling back to parquet/CSV files...")
        
        try:
            # Try to load the parquet file first
            parquet_path = os.path.join(DATA_DIR, "climate_data.parquet")
            if os.path.exists(parquet_path):
                df = pd.read_parquet(parquet_path)
                print(f"Loaded data from {parquet_path}")
            else:
                # Fall back to CSV files
                climate_csv = os.path.join(DATA_DIR, "climate_change_indicators_singapore.csv")
                additional_csv = os.path.join(DATA_DIR, "additional_climate_indicators_singapore.csv")
                
                if os.path.exists(climate_csv):
                    df1 = pd.read_csv(climate_csv)
                    print(f"Loaded data from {climate_csv}")
                else:
                    df1 = pd.read_csv(os.path.join(DATA_DIR, "climate_change_indicators_singapore_sample.csv"))
                    print("Loaded sample climate data")
                    
                if os.path.exists(additional_csv):
                    df2 = pd.read_csv(additional_csv)
                    print(f"Loaded data from {additional_csv}")
                    df = pd.concat([df1, df2], ignore_index=True)
                else:
                    df = df1
                    
            return df
        except Exception as e:
            print(f"Error loading data: {e}")
            return None

## Clean Data

This function cleans the data by handling missing values, duplicates, and outliers.

In [None]:
def clean_data(df):
    """
    Cleans the data by handling missing values, duplicates, and outliers
    """
    print("Cleaning climate data...")
    
    if df is None or df.empty:
        print("No data to clean")
        return None
    
    # Make a copy to avoid modifying the original
    df_clean = df.copy()
    
    # Handle missing values
    print(f"Missing values before cleaning: {df_clean.isna().sum().sum()}")
    
    # For numeric columns, fill missing values with the median of that indicator
    numeric_cols = df_clean.select_dtypes(include=['number']).columns
    for col in numeric_cols:
        if df_clean[col].isna().any():
            if 'Indicator' in df_clean.columns:
                # Fill by indicator group
                for indicator in df_clean['Indicator'].unique():
                    mask = df_clean['Indicator'] == indicator
                    df_clean.loc[mask, col] = df_clean.loc[mask, col].fillna(
                        df_clean.loc[mask, col].median())
            else:
                # Fill with overall median
                df_clean[col] = df_clean[col].fillna(df_clean[col].median())
    
    # For categorical columns, fill with the most frequent value
    cat_cols = df_clean.select_dtypes(include=['object']).columns
    for col in cat_cols:
        if df_clean[col].isna().any():
            df_clean[col] = df_clean[col].fillna(df_clean[col].mode()[0])
    
    print(f"Missing values after cleaning: {df_clean.isna().sum().sum()}")
    
    # Remove duplicates
    initial_rows = len(df_clean)
    df_clean = df_clean.drop_duplicates()
    print(f"Removed {initial_rows - len(df_clean)} duplicate rows")
    
    # Handle outliers using IQR method for numeric columns
    for col in numeric_cols:
        if col == 'Year':  # Skip year column
            continue
            
        if 'Indicator' in df_clean.columns:
            # Handle outliers by indicator group
            for indicator in df_clean['Indicator'].unique():
                mask = df_clean['Indicator'] == indicator
                series = df_clean.loc[mask, col]
                
                # Calculate IQR
                Q1 = series.quantile(0.25)
                Q3 = series.quantile(0.75)
                IQR = Q3 - Q1
                
                # Define bounds
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                
                # Cap outliers instead of removing them
                df_clean.loc[mask & (df_clean[col] < lower_bound), col] = lower_bound
                df_clean.loc[mask & (df_clean[col] > upper_bound), col] = upper_bound
    
    return df_clean

## Transform Data

This function transforms the data by creating new features and restructuring for analysis.

In [None]:
def transform_data(df):
    """
    Transforms the data by creating new features and restructuring for analysis
    """
    print("Transforming climate data...")
    
    if df is None or df.empty:
        print("No data to transform")
        return None
    
    # Make a copy to avoid modifying the original
    df_transform = df.copy()
    
    # Ensure Year is treated as a datetime for time-based analysis
    if 'Year' in df_transform.columns and df_transform['Year'].dtype != 'datetime64[ns]':
        # Convert to string first to handle potential numeric years
        df_transform['Year'] = pd.to_datetime(df_transform['Year'].astype(str), format='%Y')
    
    # Create decade column for trend analysis
    if 'Year' in df_transform.columns:
        df_transform['Decade'] = (df_transform['Year'].dt.year // 10) * 10
    
    # Pivot the data for easier analysis if in long format
    if 'Indicator' in df_transform.columns and 'Value' in df_transform.columns:
        # Create a wide format dataframe with indicators as columns
        df_wide = df_transform.pivot_table(
            index=['Year', 'Country', 'CountryCode'],
            columns='Indicator',
            values='Value',
            aggfunc='first'
        ).reset_index()
        
        # Clean up column names
        df_wide.columns.name = None
        
        # Save both formats
        df_long = df_transform
        
        # Add the decade column to wide format too
        if 'Decade' in df_long.columns:
            df_wide = df_wide.merge(
                df_long[['Year', 'Decade']].drop_duplicates(),
                on='Year',
                how='left'
            )
        
        return {'long': df_long, 'wide': df_wide}
    
    return {'long': df_transform}

## Engineer Features

This function engineers new features to enhance analysis capabilities.

In [None]:
def engineer_features(data_dict):
    """
    Engineers new features to enhance analysis capabilities
    """
    print("Engineering features for climate data...")
    
    if data_dict is None or 'long' not in data_dict or data_dict['long'] is None:
        print("No data for feature engineering")
        return None
    
    result_dict = {}
    
    # Process long format data
    df_long = data_dict['long'].copy()
    
    # Calculate year-over-year changes for each indicator
    if 'Indicator' in df_long.columns and 'Value' in df_long.columns:
        # Sort by indicator and year
        df_long = df_long.sort_values(['Indicator', 'Year'])
        
        # Calculate year-over-year change and percent change
        df_long['YoY_Change'] = df_long.groupby('Indicator')['Value'].diff()
        df_long['YoY_Pct_Change'] = df_long.groupby('Indicator')['Value'].pct_change() * 100
        
        # Calculate 5-year moving average
        df_long['MA_5Year'] = df_long.groupby('Indicator')['Value'].transform(
            lambda x: x.rolling(window=5, min_periods=1).mean()
        )
        
        # Calculate cumulative change from baseline (first year)
        df_long['Cumulative_Change'] = df_long.groupby('Indicator')['Value'].transform(
            lambda x: x - x.iloc[0] if len(x) > 0 else 0
        )
        
        result_dict['long'] = df_long
    
    # Process wide format data if available
    if 'wide' in data_dict and data_dict['wide'] is not None:
        df_wide = data_dict['wide'].copy()
        
        # Get numeric columns (indicators)
        indicator_cols = df_wide.select_dtypes(include=['number']).columns
        indicator_cols = [col for col in indicator_cols if col not in ['Decade']]
        
        # Calculate correlation matrix between indicators
        if len(indicator_cols) > 1:
            corr_matrix = df_wide[indicator_cols].corr()
            
            # Save correlation matrix
            corr_path = os.path.join(PROCESSED_DIR, "indicator_correlations.csv")
            corr_matrix.to_csv(corr_path)
            print(f"Saved correlation matrix to {corr_path}")
        
        # Create composite indicators if relevant indicators exist
        
        # Example: Climate Vulnerability Index
        temp_col = next((col for col in indicator_cols if 'Temperature' in col), None)
        rainfall_col = next((col for col in indicator_cols if 'Rainfall' in col), None)
        co2_col = next((col for col in indicator_cols if 'CO2' in col), None)
        
        if temp_col and rainfall_col:
            # Normalize the indicators
            df_wide['Temp_Norm'] = (df_wide[temp_col] - df_wide[temp_col].min()) / (df_wide[temp_col].max() - df_wide[temp_col].min())
            df_wide['Rainfall_Norm'] = (df_wide[rainfall_col] - df_wide[rainfall_col].min()) / (df_wide[rainfall_col].max() - df_wide[rainfall_col].min())
            
            # Create climate vulnerability index (example formula)
            df_wide['Climate_Vulnerability_Index'] = 0.6 * df_wide['Temp_Norm'] + 0.4 * (1 - df_wide['Rainfall_Norm'])
            
            if co2_col:
                df_wide['CO2_Norm'] = (df_wide[co2_col] - df_wide[co2_col].min()) / (df_wide[co2_col].max() - df_wide[co2_col].min())
                df_wide['Climate_Vulnerability_Index'] = 0.5 * df_wide['Climate_Vulnerability_Index'] + 0.5 * df_wide['CO2_Norm']
        
        result_dict['wide'] = df_wide
    
    return result_dict

## Save Processed Data

This function saves the processed data to Delta Lake format.

In [None]:
def save_processed_data(data_dict):
    """
    Saves the processed data to Delta Lake format
    """
    print("Saving processed climate data...")
    
    if data_dict is None:
        print("No data to save")
        return
    
    # Save long format
    if 'long' in data_dict and data_dict['long'] is not None:
        # In a real Databricks environment, we would use:
        spark_df_long = spark.createDataFrame(data_dict['long'])
        spark_df_long.write.format("delta").mode("overwrite").save("/dbfs/FileStore/climate_resilience/delta/climate_data_long")
        print("Saved long format data to Delta Lake at /dbfs/FileStore/climate_resilience/delta/climate_data_long")
        
        # Also save as CSV for easier inspection
        csv_long_path = os.path.join(PROCESSED_DIR, "climate_data_long.csv")
        data_dict['long'].to_csv(csv_long_path, index=False)
        print(f"Saved long format data to {csv_long_path}")
    
    # Save wide format
    if 'wide' in data_dict and data_dict['wide'] is not None:
        # In a real Databricks environment, we would use:
        spark_df_wide = spark.createDataFrame(data_dict['wide'])
        spark_df_wide.write.format("delta").mode("overwrite").save("/dbfs/FileStore/climate_resilience/delta/climate_data_wide")
        print("Saved wide format data to Delta Lake at /dbfs/FileStore/climate_resilience/delta/climate_data_wide")
        
        # Also save as CSV for easier inspection
        csv_wide_path = os.path.join(PROCESSED_DIR, "climate_data_wide.csv")
        data_dict['wide'].to_csv(csv_wide_path, index=False)
        print(f"Saved wide format data to {csv_wide_path}")
    
    print("All processed data saved successfully")

## Main Function

This function orchestrates the data processing pipeline.

In [None]:
def main():
    """
    Main function to orchestrate the data processing pipeline
    """
    print(f"Starting data processing at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    # Load the data
    df = load_data()
    
    if df is not None and not df.empty:
        # Clean the data
        df_clean = clean_data(df)
        
        # Transform the data
        data_dict = transform_data(df_clean)
        
        # Engineer features
        processed_data = engineer_features(data_dict)
        
        # Save the processed data
        save_processed_data(processed_data)
        
        print(f"Data processing completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("Data is now ready for analytics and visualization")
        
        return processed_data
    else:
        print("Data processing failed: No data available")
        return None

## Execute Data Processing

In [None]:
# Run the data processing pipeline
processed_data = main()

# Display a sample of the processed data
if processed_data is not None and 'long' in processed_data:
    processed_data['long'].head()