# EMISSIONS/DAMAGES VALIDATION NOTEBOOK
- This notebook performs validations on estimates from the TARE model and unit tests for the functions used to calculate emissions and damages

In [5]:
print("Defining and running unit tests for: calculate_fossil_fuel_emission_factor")
def calculate_fossil_fuel_emission_factor(
    fuel_type, so2_factor, nox_factor, pm25_factor, conversion_factor1, conversion_factor2
):
    """
    Calculate Emission Factors for Fossil Fuels.

    Parameters:
    -----------
    fuel_type : str
        Type of fuel (e.g., "naturalGas", "fuelOil", "propane").
    so2_factor : float
        SO2 emission factor in lb/Mbtu.
    nox_factor : float
        NOx emission factor in lb/Mbtu.
    pm25_factor : float
        PM2.5 emission factor in lb per volume unit (varies by fuel).
    conversion_factor1 : int
        Conversion factor for volume units to gallons/thousand gallons.
    conversion_factor2 : int
        Conversion factor for energy content (e.g., BTU per gallon/cf).
    
    Returns:
    --------
    dict
        Dictionary containing emission factors for the given fuel type in lb/kWh.
    """

    # Conversion factor from Mbtu to kWh
    mbtu_to_kwh = 3412 / 1_000_000

    # Emission factors in lb/kWh
    emission_factors = {
        f"{fuel_type}_so2": so2_factor * mbtu_to_kwh,
        f"{fuel_type}_nox": nox_factor * mbtu_to_kwh,
        f"{fuel_type}_pm25": pm25_factor * (1 / conversion_factor1) * (1 / conversion_factor2) * 3412,
    }

    # Natural gas-specific CO2e calculation (including leakage)
    natural_gas_leakage_co2e_per_kwh = 0.043 / 1000
    if fuel_type == "naturalGas":
        emission_factors[f"{fuel_type}_co2e"] = (
            228.5 / 1_000_000 + natural_gas_leakage_co2e_per_kwh
        )
    # CO2e for propane and fuel oil
    elif fuel_type == "propane":
        emission_factors[f"{fuel_type}_co2e"] = 275.8 / 1_000_000
    elif fuel_type == "fuelOil":
        emission_factors[f"{fuel_type}_co2e"] = 303.9 / 1_000_000

    return emission_factors

import pandas as pd

Defining and running unit tests for: calculate_fossil_fuel_emission_factor


In [6]:
import unittest

class TestCalculateFossilFuelEmissionFactor(unittest.TestCase):

    def test_fuelOil_emission_factors(self):
        fuel_type = "fuelOil"
        so2_factor = 0.0015
        nox_factor = 0.1300
        pm25_factor = 0.83
        conversion_factor1 = 1000
        conversion_factor2 = 138500

        # Expected calculations
        expected_so2 = so2_factor * (1 / 1_000_000) * 3412
        expected_nox = nox_factor * (1 / 1_000_000) * 3412
        expected_pm25 = pm25_factor * (1 / conversion_factor1) * (1 / conversion_factor2) * 3412
        expected_co2e = 303.9 * (1 / 1_000_000)

        result = calculate_fossil_fuel_emission_factor(
            fuel_type, so2_factor, nox_factor, pm25_factor,
            conversion_factor1, conversion_factor2
        )

        self.assertAlmostEqual(result[f"{fuel_type}_so2"], expected_so2, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_nox"], expected_nox, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_pm25"], expected_pm25, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_co2e"], expected_co2e, places=10)

    def test_naturalGas_emission_factors(self):
        fuel_type = "naturalGas"
        so2_factor = 0.0006
        nox_factor = 0.0922
        pm25_factor = 1.9
        conversion_factor1 = 1_000_000
        conversion_factor2 = 1039

        # Expected calculations
        expected_so2 = so2_factor * (1 / 1_000_000) * 3412
        expected_nox = nox_factor * (1 / 1_000_000) * 3412
        expected_pm25 = pm25_factor * (1 / conversion_factor1) * (1 / conversion_factor2) * 3412
        natural_gas_leakage_mtCO2e_per_kWh = 0.043 / 1000
        expected_co2e = (228.5 / 1_000_000) + natural_gas_leakage_mtCO2e_per_kWh

        result = calculate_fossil_fuel_emission_factor(
            fuel_type, so2_factor, nox_factor, pm25_factor,
            conversion_factor1, conversion_factor2
        )

        self.assertAlmostEqual(result[f"{fuel_type}_so2"], expected_so2, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_nox"], expected_nox, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_pm25"], expected_pm25, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_co2e"], expected_co2e, places=10)

    def test_propane_emission_factors(self):
        fuel_type = "propane"
        so2_factor = 0.0002
        nox_factor = 0.1421
        pm25_factor = 0.17
        conversion_factor1 = 1000
        conversion_factor2 = 91452

        # Expected calculations
        expected_so2 = so2_factor * (1 / 1_000_000) * 3412
        expected_nox = nox_factor * (1 / 1_000_000) * 3412
        expected_pm25 = pm25_factor * (1 / conversion_factor1) * (1 / conversion_factor2) * 3412
        expected_co2e = 275.8 / 1_000_000

        result = calculate_fossil_fuel_emission_factor(
            fuel_type, so2_factor, nox_factor, pm25_factor,
            conversion_factor1, conversion_factor2
        )

        self.assertAlmostEqual(result[f"{fuel_type}_so2"], expected_so2, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_nox"], expected_nox, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_pm25"], expected_pm25, places=10)
        self.assertAlmostEqual(result[f"{fuel_type}_co2e"], expected_co2e, places=10)


if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)


...
----------------------------------------------------------------------
Ran 3 tests in 0.003s

OK


# Public Perspective: Monetized Marginal Damages from Emissions

## Fossil Fuels: Climate and Health-Related Pollutants

In [None]:
# Print header
print(""" 
-------------------------------------------------------------------------------------------------------
Calculate Emissions Factors: FOSSIL FUELS
-------------------------------------------------------------------------------------------------------
Fossil Fuels (Natural Gas, Fuel Oil, Propane):
- NOx, SO2, CO2: 
    - RESNET Table 7.1.2 Emissions Factors for Household Combustion Fuels
    - Source: https://www.resnet.us/wp-content/uploads/ANSIRESNETICC301-2022_resnetpblshd.pdf
    - All factors are in units of lb/Mbtu, so energy consumption in kWh needs conversion.
- PM2.5: 
    - A National Methodology and Emission Inventory for Residential Fuel Combustion
    - Source: https://www3.epa.gov/ttnchie1/conference/ei12/area/haneke.pdf
-------------------------------------------------------------------------------------------------------
""")

# Calculate emission factors for each fuel type
fuel_oil_factors = calculate_fossil_fuel_emission_factor(
    fuel_type="fuelOil", so2_factor=0.0015, nox_factor=0.1300, pm25_factor=0.83,
    conversion_factor1=1000, conversion_factor2=138500
)
natural_gas_factors = calculate_fossil_fuel_emission_factor(
    fuel_type="naturalGas", so2_factor=0.0006, nox_factor=0.0922, pm25_factor=1.9,
    conversion_factor1=1_000_000, conversion_factor2=1039
)
propane_factors = calculate_fossil_fuel_emission_factor(
    fuel_type="propane", so2_factor=0.0002, nox_factor=0.1421, pm25_factor=0.17,
    conversion_factor1=1000, conversion_factor2=91452
)

# Combine all factors
all_factors = {**fuel_oil_factors, **natural_gas_factors, **propane_factors}

# Create DataFrame
df_marg_emis_factors = pd.DataFrame.from_dict(all_factors, orient="index", columns=["value"])
df_marg_emis_factors.reset_index(inplace=True)
df_marg_emis_factors.columns = ["pollutant", "value"]

# Split pollutant into fuel type and pollutant name
df_marg_emis_factors[["fuel_type", "pollutant"]] = df_marg_emis_factors["pollutant"].str.split("_", expand=True)

# Update the units to metric tons per kWh
df_marg_emis_factors["unit"] = "[mt/kWh]"

# Convert units from lb/kWh to metric tons/kWh
lb_to_mt = 0.00045359237
df_marg_emis_factors["value"] *= lb_to_mt

# Add 'state' column with default value
df_marg_emis_factors = df_marg_emis_factors.assign(state="National")

# Reorder columns for clarity
df_marg_emis_factors = df_marg_emis_factors[["state", "fuel_type", "pollutant", "value", "unit"]]
df_marg_emis_factors

 
-------------------------------------------------------------------------------------------------------
Calculate Emissions Factors: FOSSIL FUELS
-------------------------------------------------------------------------------------------------------
Fossil Fuels (Natural Gas, Fuel Oil, Propane):
- NOx, SO2, CO2: 
    - RESNET Table 7.1.2 Emissions Factors for Household Combustion Fuels
    - Source: https://www.resnet.us/wp-content/uploads/ANSIRESNETICC301-2022_resnetpblshd.pdf
    - All factors are in units of lb/Mbtu, so energy consumption in kWh needs conversion.
- PM2.5: 
    - A National Methodology and Emission Inventory for Residential Fuel Combustion
    - Source: https://www3.epa.gov/ttnchie1/conference/ei12/area/haneke.pdf
-------------------------------------------------------------------------------------------------------



Unnamed: 0,state,fuel_type,pollutant,value,unit
0,National,fuelOil,so2,2.321486e-09,[mt/kWh]
1,National,fuelOil,nox,2.011954e-07,[mt/kWh]
2,National,fuelOil,pm25,9.274769e-09,[mt/kWh]
3,National,fuelOil,co2e,1.378467e-07,[mt/kWh]
4,National,naturalGas,so2,9.285943e-10,[mt/kWh]
5,National,naturalGas,nox,1.42694e-07,[mt/kWh]
6,National,naturalGas,pm25,2.830172e-09,[mt/kWh]
7,National,naturalGas,co2e,1.231503e-07,[mt/kWh]
8,National,propane,so2,3.095314e-10,[mt/kWh]
9,National,propane,nox,2.199221e-07,[mt/kWh]


In [None]:
# Create lookup dictionary for fossil fuel emissions factors
emis_fossil_fuel_lookup = {}
fuel_types = df_marg_emis_factors["fuel_type"].unique()

for fuel in fuel_types:
    emis_fossil_fuel_lookup[fuel] = {
        pollutant: df_marg_emis_factors[
            (df_marg_emis_factors["fuel_type"] == fuel) & 
            (df_marg_emis_factors["pollutant"] == pollutant)
        ]["value"].values[0]
        for pollutant in ["co2e", "so2", "nox", "pm25"]
    }

emis_fossil_fuel_lookup


{'fuelOil': {'co2e': 1.3784672124299999e-07,
  'so2': 2.3214857496599997e-09,
  'nox': 2.0119543163720001e-07,
  'pm25': 9.274768578665706e-09},
 'naturalGas': {'co2e': 1.2315032845499999e-07,
  'so2': 9.285942998640001e-10,
  'nox': 1.42693990745768e-07,
  'pm25': 2.8301719116804617e-09},
 'propane': {'co2e': 1.25100775646e-07,
  'so2': 3.09531433288e-10,
  'nox': 2.1992208335112402e-07,
  'pm25': 2.876937828530814e-09}}

In [None]:
# LAST UPDATED/TESTED NOV 24, 2024 @ 5 PM
from scipy.interpolate import interp1d
import numpy as np
import pandas as pd

def calculate_electricity_co2e_cambium(df_cambium_import):
    """
    Interpolates Cambium electricity emission factors and converts units.

    This function takes a dataframe containing Cambium electricity emission factors and performs the following:
    - Interpolates the Long Run Marginal Emissions Rates (LRMER) and Short Run Marginal Emissions Rates (SRMER)
      values for each scenario and GEA region on an annual basis.
    - Converts the LRMER and SRMER values from kg per MWh to tons per MWh and tons per kWh.

    Parameters
    ----------
    df_cambium_import : pandas.DataFrame
        DataFrame containing Cambium electricity emission factors with the following columns:
        - 'scenario': Scenario name or identifier.
        - 'gea_region': GEA region identifier.
        - 'year': Year of the data.
        - 'lrmer_co2e_kg_per_MWh': Long Run Marginal Emissions Rate in kg CO2e per MWh.
        - 'srmer_co2e_kg_per_MWh': Short Run Marginal Emissions Rate in kg CO2e per MWh.

    Returns
    -------
    df_cambium_import_copy : pandas.DataFrame
        DataFrame with interpolated LRMER and SRMER values for each year and additional columns for emission factors
        converted to tons per MWh and tons per kWh.

    Notes
    -----
    - The interpolation is performed linearly between the available years for each unique combination of scenario and GEA region.
    - The converted emission factors are added as new columns:
        - 'lrmer_co2e_ton_per_MWh'
        - 'lrmer_co2e_ton_per_kWh'
        - 'srmer_co2e_ton_per_MWh'
        - 'srmer_co2e_ton_per_kWh'
    - The conversion from kg to tons is done by dividing by 1,000 (1 ton = 1,000 kg).
    - The conversion from MWh to kWh is done by dividing by 1,000 (1 MWh = 1,000 kWh).

    """
    # Create a copy of the dataframe
    df_cambium_import_copy = df_cambium_import.copy()

    # Create a new DataFrame to store interpolated results
    interpolated_data = []

    # Group by 'scenario', 'state', and 'gea_region'
    grouped = df_cambium_import_copy.groupby(['scenario', 'gea_region'])

    for (scenario, gea_region), group in grouped:
        years = group['year'].values

        # Interpolate for LRMER (Long Run Marginal Emissions Rates)
        lrmer_values = group['lrmer_co2e_kg_per_MWh'].values
        lrmer_interp_func = interp1d(years, lrmer_values, kind='linear')

        # Interpolate for SRMER (Short Run Marginal Emissions Rates)
        srmer_values = group['srmer_co2e_kg_per_MWh'].values
        srmer_interp_func = interp1d(years, srmer_values, kind='linear')

        # Generate new years in 1-year increments
        new_years = np.arange(years.min(), years.max() + 1)

        # Interpolate the LRMER and SRMER values for these new years
        new_lrmer_values = lrmer_interp_func(new_years)
        new_srmer_values = srmer_interp_func(new_years)

        # Store the results in a DataFrame
        interpolated_group = pd.DataFrame({
            'scenario': scenario,
            'gea_region': gea_region,
            'year': new_years,
            'lrmer_co2e_kg_per_MWh': new_lrmer_values,
            'srmer_co2e_kg_per_MWh': new_srmer_values
        })

        interpolated_data.append(interpolated_group)

    # Concatenate all the interpolated data into a single DataFrame
    df_cambium_import_copy = pd.concat(interpolated_data).reset_index(drop=True)

    # Convert both LRMER and SRMER values to tons per MWh and tons per kWh
    df_cambium_import_copy['lrmer_co2e_ton_per_MWh'] = df_cambium_import_copy['lrmer_co2e_kg_per_MWh'] / 1000
    df_cambium_import_copy['lrmer_co2e_ton_per_kWh'] = df_cambium_import_copy['lrmer_co2e_kg_per_MWh'] / 1_000_000

    df_cambium_import_copy['srmer_co2e_ton_per_MWh'] = df_cambium_import_copy['srmer_co2e_kg_per_MWh'] / 1000
    df_cambium_import_copy['srmer_co2e_ton_per_kWh'] = df_cambium_import_copy['srmer_co2e_kg_per_MWh'] / 1_000_000

    return df_cambium_import_copy

def create_cambium_co2e_lookup(df_cambium_processed):
    """
    Creates a nested lookup dictionary for Cambium emission factors.

    This function takes a processed dataframe containing Cambium emission factors and constructs a nested dictionary
    that allows quick lookup of LRMER and SRMER emission factors based on scenario, GEA region, and year.

    Parameters
    ----------
    df_cambium_processed : pandas.DataFrame
        DataFrame containing processed Cambium emission factors with the following columns:
        - 'scenario': Scenario name or identifier.
        - 'gea_region': GEA region identifier.
        - 'year': Year of the data.
        - 'lrmer_co2e_ton_per_kWh': Long Run Marginal Emissions Rate in tons CO2e per kWh.
        - 'srmer_co2e_ton_per_kWh': Short Run Marginal Emissions Rate in tons CO2e per kWh.

    Returns
    -------
    emis_scenario_cambium_lookup : dict
        Nested dictionary structured as:
        {
            (scenario, gea_region): {
                year: {
                    'lrmer_co2e': lrmer_value,
                    'srmer_co2e': srmer_value
                },
                ...
            },
            ...
        }

    Notes
    -----
    - The outer keys of the dictionary are tuples containing (scenario, gea_region).
    - The inner dictionary maps years to a dictionary containing both LRMER and SRMER values.
    - This structure allows efficient retrieval of emission factors based on scenario, location, and year.

    """

    # Create a copy of the dataframe
    df_cambium_processed_copy = df_cambium_processed.copy()

    # Create the nested lookup dictionary for both LRMER and SRMER in tons CO2e per kWh
    emis_scenario_cambium_lookup = {}

    # Populate the dictionary
    for _, row in df_cambium_processed_copy.iterrows():
        outer_key = (row['scenario'], row['gea_region'])
        year = row['year']

        # Extract both LRMER and SRMER values in tons per kWh
        lrmer_value = row['lrmer_co2e_ton_per_kWh']
        srmer_value = row['srmer_co2e_ton_per_kWh']

        # Initialize the outer key if not already present
        if outer_key not in emis_scenario_cambium_lookup:
            emis_scenario_cambium_lookup[outer_key] = {}

        # Assign both LRMER and SRMER values in the inner dictionary for each year
        emis_scenario_cambium_lookup[outer_key][year] = {
            'lrmer_ton_per_kWh_co2e': lrmer_value,
            'srmer_ton_per_kWh_co2e': srmer_value
        }

    return emis_scenario_cambium_lookup

### Baseline Marginal Damages: WHOLE-HOME

In [None]:
import numpy as np
import pandas as pd

# Constants
TD_LOSSES = 0.06
TD_LOSSES_MULTIPLIER = 1 / (1 - TD_LOSSES)
EQUIPMENT_SPECS = {'heating': 15, 'waterHeating': 12, 'clothesDrying': 13, 'cooking': 15}

def calculate_marginal_damages(df, menu_mp, policy_scenario, df_summary):
    """
    Calculate marginal damages of pollutants based on equipment usage, emissions, and policy scenarios.

    Parameters:
        df (DataFrame): Input data with emissions and consumption data.
        menu_mp (int): Measure package identifier.
        policy_scenario (str): Specifies the policy scenario ('No Inflation Reduction Act' or 'AEO2023 Reference Case').
        df_summary (DataFrame): Summary DataFrame to store aggregated results.

    Returns:
        DataFrame: Updated DataFrame with calculated marginal emissions and damages.
    """
    df_copy = df.copy()

    # Define policy-specific settings
    scenario_prefix, cambium_scenario, emis_electricity_lookup = define_scenario_settings(menu_mp, policy_scenario)

    # Precompute HDD adjustment factors by region and year
    hdd_factors_per_year = precompute_hdd_factors(df_copy)

    # Compute marginal damages based on grid scenario
    df_new_columns = calculate_damages_grid_scenario(
        df_copy, df_summary, menu_mp, TD_LOSSES_MULTIPLIER, emis_electricity_lookup,
        cambium_scenario, scenario_prefix, hdd_factors_per_year
    )

    # Handle overlapping columns
    overlapping_columns = df_new_columns.columns.intersection(df_copy.columns)
    if not overlapping_columns.empty:
        df_copy.drop(columns=overlapping_columns, inplace=True)

    # Merge newly calculated columns
    df_copy = df_copy.join(df_new_columns, how='left')
    return df_copy


def define_scenario_settings(menu_mp, policy_scenario):
    """
    Define scenario-specific settings based on menu and policy inputs.

    Parameters:
        menu_mp (int): Measure package identifier.
        policy_scenario (str): Policy scenario.

    Returns:
        Tuple: (scenario_prefix, cambium_scenario, emis_electricity_lookup)
    """
    if menu_mp == 0:
        return "baseline_", "MidCase", emis_preIRA_co2e_cambium21_lookup

    if policy_scenario == 'No Inflation Reduction Act':
        return f"preIRA_mp{menu_mp}_", "MidCase", emis_preIRA_co2e_cambium21_lookup

    if policy_scenario == 'AEO2023 Reference Case':
        return f"iraRef_mp{menu_mp}_", "MidCase", emis_IRA_co2e_cambium22_lookup

    raise ValueError("Invalid Policy Scenario! Choose 'No Inflation Reduction Act' or 'AEO2023 Reference Case'.")


def precompute_hdd_factors(df):
    """
    Precompute heating degree day (HDD) factors for each region and year.

    Parameters:
        df (DataFrame): Input data.

    Returns:
        dict: HDD factors mapped by year and region.
    """
    max_lifetime = max(EQUIPMENT_SPECS.values())
    hdd_factors_per_year = {}
    for year_label in range(2024, 2024 + max_lifetime + 1):
        # Map census_division to HDD factors
        hdd_factors = df['census_division'].map(
            lambda x: hdd_factor_lookup.get(x, hdd_factor_lookup['National']).get(year_label, 1.0)
        )
        hdd_factors_per_year[year_label] = hdd_factors
    return hdd_factors_per_year


def calculate_damages_grid_scenario(df, df_summary, menu_mp, td_losses_multiplier, emis_electricity_lookup,
                                    cambium_scenario, scenario_prefix, hdd_factors):
    """
    Calculate damages for electricity grid emissions under different scenarios.

    Parameters:
        df (DataFrame): Input DataFrame.
        df_summary (DataFrame): DataFrame to store summary results.
        menu_mp (int): Measure package identifier.
        td_losses_multiplier (float): Adjusted factor for transmission/distribution losses.
        emis_electricity_lookup (dict): Lookup for emissions data.
        cambium_scenario (str): Scenario identifier for emissions data.
        scenario_prefix (str): Prefix for column naming.
        hdd_factors (dict): Precomputed HDD adjustment factors.

    Returns:
        DataFrame: New columns with calculated emissions and damages.
    """
    new_columns_data = {}
    for category, lifetime in EQUIPMENT_SPECS.items():
        for mer_type in ['lrmer', 'srmer']:
            process_emissions_for_category(
                df, df_summary, menu_mp, td_losses_multiplier, emis_electricity_lookup,
                cambium_scenario, scenario_prefix, hdd_factors, new_columns_data,
                category, lifetime, mer_type
            )

    return pd.DataFrame(new_columns_data, index=df.index)


def process_emissions_for_category(df, df_summary, menu_mp, td_losses_multiplier, emis_electricity_lookup,
                                   cambium_scenario, scenario_prefix, hdd_factors, new_columns_data,
                                   category, lifetime, mer_type):
    """
    Process emissions and damages for a specific category and MER type.

    Parameters:
        df (DataFrame): Input DataFrame.
        df_summary (DataFrame): Summary DataFrame.
        category (str): Equipment category (e.g., heating, cooking).
        lifetime (int): Equipment lifetime in years.
        mer_type (str): Marginal emissions type ('lrmer' or 'srmer').
        hdd_factors (dict): Precomputed HDD factors.
    """
    lifetime_emissions = np.zeros(len(df))
    lifetime_damages = np.zeros(len(df))

    for year in range(1, lifetime + 1):
        year_label = year + 2023
        emis_col, damage_col = generate_column_names(scenario_prefix, year_label, category, mer_type)

        # Calculate electricity emissions
        emis_electricity = calculate_electricity_emissions(
            df, category, hdd_factors[year_label], td_losses_multiplier, emis_electricity_lookup,
            cambium_scenario, year_label, mer_type
        )

        # Calculate fossil fuel emissions
        fossil_fuel_emissions = calculate_fossil_fuel_emissions(
            df, category, hdd_factors[year_label], emission_factors=emis_factors_fossil_fuels
        )

        # Total emissions and damages
        total_emissions = fossil_fuel_emissions + emis_electricity
        total_damages = total_emissions * EPA_SCC_USD2023_PER_TON

        # Store results
        new_columns_data[emis_col] = np.round(total_emissions, 2)
        new_columns_data[damage_col] = np.round(total_damages, 2)
        lifetime_emissions += total_emissions
        lifetime_damages += total_damages

    # Lifetime and avoided emissions
    store_lifetime_and_avoided_emissions(df, df_summary, scenario_prefix, category, mer_type,
                                         lifetime_emissions, lifetime_damages, new_columns_data)


def generate_column_names(scenario_prefix, year_label, category, mer_type):
    """
    Generate column names for emissions and damages.

    Parameters:
        scenario_prefix (str): Scenario prefix.
        year_label (int): Year of calculation.
        category (str): Equipment category.
        mer_type (str): Marginal emissions type.

    Returns:
        tuple: Emission and damage column names.
    """
    emis_col = f'{scenario_prefix}{year_label}_{category}_tons_co2e_{mer_type}'
    damage_col = f'{scenario_prefix}{year_label}_{category}_damages_climate_{mer_type}'
    return emis_col, damage_col


def calculate_electricity_emissions(df, category, hdd_factor, td_losses_multiplier,
                                    emis_electricity_lookup, cambium_scenario, year_label, mer_type):
    """
    Calculate electricity emissions for a category.

    Returns:
        Series: Calculated emissions.
    """
    # Precompute emission factors for each gea_region
    emis_factors = df['gea_region'].map(
        lambda gea_region: emis_electricity_lookup.get(
            (cambium_scenario, gea_region), {}
        ).get(year_label, {}).get(f'{mer_type}_co2e', 0)
    )

    return (
        df[f'base_electricity_{category}_consumption'] *
        hdd_factor * td_losses_multiplier *
        emis_factors.fillna(0)
    )


def calculate_fossil_fuel_emissions(df, category, hdd_factor, emission_factors=emis_factors_fossil_fuels):
    """
    Calculate fossil fuel emissions for a category using the provided emission factors lookup dictionary.

    Parameters:
        df (DataFrame): Input DataFrame containing fuel consumption data.
        category (str): Equipment category (e.g., 'heating', 'cooking').
        hdd_factor (float or Series): Heating Degree Day adjustment factor.
        emission_factors (dict): Lookup dictionary for emission factors.

    Returns:
        Series: Combined fossil fuel emissions (in tons CO2e).
    """
    # Access emission factors from the lookup dictionary
    natural_gas_factor = emission_factors['naturalGas']['co2e']
    propane_factor = emission_factors['propane']['co2e']
    fuel_oil_factor = emission_factors['fuelOil']['co2e']

    # Calculate emissions for each fuel type
    emis_naturalGas = (
        df[f'base_naturalGas_{category}_consumption'] * hdd_factor * natural_gas_factor
    )
    emis_propane = (
        df[f'base_propane_{category}_consumption'] * hdd_factor * propane_factor
    )

    # Fuel oil is not used for cooking or clothes drying
    if category not in ['cooking', 'clothesDrying']:
        emis_fuelOil = (
            df[f'base_fuelOil_{category}_consumption'] * hdd_factor * fuel_oil_factor
        )
    else:
        emis_fuelOil = pd.Series(0, index=df.index)

    # Sum the emissions from all applicable fuel types
    total_emissions = (
        emis_naturalGas.fillna(0) +
        emis_propane.fillna(0) +
        emis_fuelOil.fillna(0)
    )

    return total_emissions


def store_lifetime_and_avoided_emissions(df, df_summary, scenario_prefix, category, mer_type,
                                         lifetime_emissions, lifetime_damages, new_columns_data):
    """
    Store lifetime and avoided emissions in the summary DataFrame and new columns.
    """
    lifetime_emissions_col = f'{scenario_prefix}{category}_lifetime_tons_co2e_{mer_type}'
    lifetime_damages_col = f'{scenario_prefix}{category}_lifetime_damages_climate_{mer_type}'

    new_columns_data[lifetime_emissions_col] = np.round(lifetime_emissions, 2)
    new_columns_data[lifetime_damages_col] = np.round(lifetime_damages, 2)

    df_summary[lifetime_emissions_col] = np.round(lifetime_emissions, 2)
    df_summary[lifetime_damages_col] = np.round(lifetime_damages, 2)


In [None]:
import unittest
import pandas as pd
import numpy as np

# Mock data and constants for testing
EQUIPMENT_SPECS = {'heating': 15, 'waterHeating': 12, 'clothesDrying': 13, 'cooking': 15}

emis_preIRA_co2e_cambium21_lookup = {
    ('MidCase', 'Region1'): {
        2024: {'lrmer_co2e': 0.5, 'srmer_co2e': 0.6},
        2025: {'lrmer_co2e': 0.4, 'srmer_co2e': 0.5},
    },
    ('MidCase', 'Region2'): {
        2024: {'lrmer_co2e': 0.3, 'srmer_co2e': 0.35},
        2025: {'lrmer_co2e': 0.25, 'srmer_co2e': 0.3},
    },
}

emis_IRA_co2e_cambium22_lookup = emis_preIRA_co2e_cambium21_lookup  # Using the same for simplicity

EPA_SCC_USD2023_PER_TON = 51  # Example value

emis_factors_fossil_fuels = {
    'naturalGas': {'co2e': 0.2},
    'propane': {'co2e': 0.25},
    'fuelOil': {'co2e': 0.3},
}

hdd_factor_lookup = {
    'Division1': {2024: 1.1, 2025: 1.0},
    'Division2': {2024: 0.9, 2025: 0.95},
    'National': {2024: 1.0, 2025: 1.0},
}

class TestMarginalDamagesCalculations(unittest.TestCase):
    def setUp(self):
        # Create a sample DataFrame for testing
        data = {
            'census_division': ['Division1', 'Division2'],
            'gea_region': ['Region1', 'Region2'],
        }
        
        # Add consumption data for all equipment categories
        for category in EQUIPMENT_SPECS.keys():
            data[f'base_electricity_{category}_consumption'] = [1000, 2000]
            data[f'base_naturalGas_{category}_consumption'] = [500, 600]
            data[f'base_propane_{category}_consumption'] = [300, 400]
            # Fuel oil is not used for 'cooking' and 'clothesDrying'
            if category not in ['cooking', 'clothesDrying']:
                data[f'base_fuelOil_{category}_consumption'] = [200, 300]
        
        self.df = pd.DataFrame(data)
        # Sample df_summary DataFrame
        self.df_summary = pd.DataFrame(index=self.df.index)

    # (Your test methods remain the same.)

    def test_define_scenario_settings(self):
        # Test with menu_mp = 0
        scenario_prefix, cambium_scenario, emis_electricity_lookup = define_scenario_settings(0, 'AEO2023 Reference Case')
        self.assertEqual(scenario_prefix, 'baseline_')
        self.assertEqual(cambium_scenario, 'MidCase')
        self.assertEqual(emis_electricity_lookup, emis_preIRA_co2e_cambium21_lookup)

        # Test with 'No Inflation Reduction Act' policy
        scenario_prefix, cambium_scenario, emis_electricity_lookup = define_scenario_settings(1, 'No Inflation Reduction Act')
        self.assertEqual(scenario_prefix, 'preIRA_mp1_')
        self.assertEqual(emis_electricity_lookup, emis_preIRA_co2e_cambium21_lookup)

        # Test with 'AEO2023 Reference Case' policy
        scenario_prefix, cambium_scenario, emis_electricity_lookup = define_scenario_settings(2, 'AEO2023 Reference Case')
        self.assertEqual(scenario_prefix, 'iraRef_mp2_')
        self.assertEqual(emis_electricity_lookup, emis_IRA_co2e_cambium22_lookup)

        # Test with an invalid policy scenario
        with self.assertRaises(ValueError):
            define_scenario_settings(1, 'Invalid Policy')

    def test_precompute_hdd_factors(self):
        hdd_factors = precompute_hdd_factors(self.df)
        # Check that the HDD factors are correctly computed
        self.assertIn(2024, hdd_factors)
        self.assertIn(2025, hdd_factors)

        np.testing.assert_array_almost_equal(
            hdd_factors[2024],
            pd.Series([1.1, 0.9], index=self.df.index)
        )
        np.testing.assert_array_almost_equal(
            hdd_factors[2025],
            pd.Series([1.0, 0.95], index=self.df.index)
        )

    def test_calculate_electricity_emissions(self):
        hdd_factor = pd.Series([1.1, 0.9], index=self.df.index)
        td_losses_multiplier = 1 / (1 - 0.06)
        year_label = 2024
        mer_type = 'lrmer'
        cambium_scenario = 'MidCase'

        emis_electricity = calculate_electricity_emissions(
            self.df, 'heating', hdd_factor, td_losses_multiplier,
            emis_preIRA_co2e_cambium21_lookup, cambium_scenario, year_label, mer_type
        )

        expected_emissions = pd.Series([
            1000 * 1.1 * td_losses_multiplier * 0.5,
            2000 * 0.9 * td_losses_multiplier * 0.3
        ], index=self.df.index)

        np.testing.assert_array_almost_equal(emis_electricity, expected_emissions)

    def test_calculate_fossil_fuel_emissions(self):
        hdd_factor = pd.Series([1.1, 0.9], index=self.df.index)

        emissions = calculate_fossil_fuel_emissions(
            self.df, 'heating', hdd_factor, emission_factors=emis_factors_fossil_fuels
        )

        expected_emissions = pd.Series([
            500 * 1.1 * 0.2 + 300 * 1.1 * 0.25 + 200 * 1.1 * 0.3,
            600 * 0.9 * 0.2 + 400 * 0.9 * 0.25 + 300 * 0.9 * 0.3
        ], index=self.df.index)

        np.testing.assert_array_almost_equal(emissions, expected_emissions)

    def test_process_emissions_for_category(self):
        new_columns_data = {}
        category = 'heating'
        lifetime = EQUIPMENT_SPECS[category]
        mer_type = 'lrmer'
        scenario_prefix = 'test_'
        cambium_scenario = 'MidCase'
        td_losses_multiplier = TD_LOSSES_MULTIPLIER

        hdd_factors = precompute_hdd_factors(self.df)

        process_emissions_for_category(
            self.df, self.df_summary, 1, td_losses_multiplier,
            emis_preIRA_co2e_cambium21_lookup, cambium_scenario,
            scenario_prefix, hdd_factors, new_columns_data,
            category, lifetime, mer_type
        )

        # Check that new_columns_data has expected keys
        expected_columns = []
        for year in range(1, lifetime + 1):
            year_label = year + 2023
            emis_col, damage_col = generate_column_names(scenario_prefix, year_label, category, mer_type)
            expected_columns.extend([emis_col, damage_col])

        # Add lifetime columns
        lifetime_emissions_col = f'{scenario_prefix}{category}_lifetime_tons_co2e_{mer_type}'
        lifetime_damages_col = f'{scenario_prefix}{category}_lifetime_damages_climate_{mer_type}'
        expected_columns.extend([lifetime_emissions_col, lifetime_damages_col])

        self.assertEqual(set(new_columns_data.keys()), set(expected_columns))

    def test_calculate_damages_grid_scenario(self):
        scenario_prefix = 'test_'
        cambium_scenario = 'MidCase'
        td_losses_multiplier = TD_LOSSES_MULTIPLIER
        hdd_factors = precompute_hdd_factors(self.df)

        df_new_columns = calculate_damages_grid_scenario(
            self.df, self.df_summary, 1, td_losses_multiplier,
            emis_preIRA_co2e_cambium21_lookup, cambium_scenario,
            scenario_prefix, hdd_factors
        )

        # Check that df_new_columns has expected columns
        expected_columns = []
        for category, lifetime in EQUIPMENT_SPECS.items():
            for mer_type in ['lrmer', 'srmer']:
                for year in range(1, lifetime + 1):
                    year_label = year + 2023
                    emis_col, damage_col = generate_column_names(scenario_prefix, year_label, category, mer_type)
                    expected_columns.extend([emis_col, damage_col])

                # Lifetime columns
                lifetime_emissions_col = f'{scenario_prefix}{category}_lifetime_tons_co2e_{mer_type}'
                lifetime_damages_col = f'{scenario_prefix}{category}_lifetime_damages_climate_{mer_type}'
                expected_columns.extend([lifetime_emissions_col, lifetime_damages_col])

        self.assertEqual(set(df_new_columns.columns), set(expected_columns))

    def test_calculate_marginal_damages(self):
        # Run the full calculation and check outputs
        menu_mp = 1
        policy_scenario = 'No Inflation Reduction Act'

        df_result = calculate_marginal_damages(self.df, menu_mp, policy_scenario, self.df_summary)

        # Expected columns
        expected_columns = set(self.df.columns)
        scenario_prefix = 'preIRA_mp1_'
        for category, lifetime in EQUIPMENT_SPECS.items():
            for mer_type in ['lrmer', 'srmer']:
                for year in range(1, lifetime + 1):
                    year_label = year + 2023
                    emis_col, damage_col = generate_column_names(scenario_prefix, year_label, category, mer_type)
                    expected_columns.update([emis_col, damage_col])

                # Lifetime columns
                lifetime_emissions_col = f'{scenario_prefix}{category}_lifetime_tons_co2e_{mer_type}'
                lifetime_damages_col = f'{scenario_prefix}{category}_lifetime_damages_climate_{mer_type}'
                expected_columns.update([lifetime_emissions_col, lifetime_damages_col])

        self.assertEqual(set(df_result.columns), expected_columns)

        # Check that df_summary has the lifetime columns
        expected_summary_columns = []
        for category, lifetime in EQUIPMENT_SPECS.items():
            for mer_type in ['lrmer', 'srmer']:
                lifetime_emissions_col = f'{scenario_prefix}{category}_lifetime_tons_co2e_{mer_type}'
                lifetime_damages_col = f'{scenario_prefix}{category}_lifetime_damages_climate_{mer_type}'
                expected_summary_columns.extend([lifetime_emissions_col, lifetime_damages_col])

        self.assertEqual(set(self.df_summary.columns), set(expected_summary_columns))

# if __name__ == '__main__':
#     unittest.main(argv=['first-arg-is-ignored'], exit=False)
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False, verbosity=2)



test_calculate_damages_grid_scenario (__main__.TestMarginalDamagesCalculations.test_calculate_damages_grid_scenario) ... ok
test_calculate_electricity_emissions (__main__.TestMarginalDamagesCalculations.test_calculate_electricity_emissions) ... ok
test_calculate_fossil_fuel_emissions (__main__.TestMarginalDamagesCalculations.test_calculate_fossil_fuel_emissions) ... ok
test_calculate_marginal_damages (__main__.TestMarginalDamagesCalculations.test_calculate_marginal_damages) ... ok
test_define_scenario_settings (__main__.TestMarginalDamagesCalculations.test_define_scenario_settings) ... ok
test_precompute_hdd_factors (__main__.TestMarginalDamagesCalculations.test_precompute_hdd_factors) ... ok
test_process_emissions_for_category (__main__.TestMarginalDamagesCalculations.test_process_emissions_for_category) ... ok

----------------------------------------------------------------------
Ran 7 tests in 0.231s

OK
