In [29]:
import os

print(os.getcwd())
os.chdir('../..')

from typing import Any
from scipy.interpolate import interp1d
import numpy as np
import copy
from config.default_parameters import EconomicSubModules, EmissionsAvoidedDefaults

c:\


In [30]:
class MatterUse:
    """
    This class describes the matter-use dynamics in the JUSTICE model.
    """

    def __init__(
        self,
        input_dataset,
        time_horizon,
        climate_ensembles,
        economy,
    ):

        # Load the defaults #TODO Angela - you can implement this
        matter_defaults = EconomicSubModules().get_defaults("MATTER")
        # Load the emissions avoided defaults
        emissions_defaults = EmissionsAvoidedDefaults().get_defaults()
        # Emissions defaults
        self.emissions_defaults = emissions_defaults

        # Load the instantiated economy model and set it as an attribute
        self.economy = economy

        # Parameters
        self.physical_use_ratio = matter_defaults["physical_use_ratio"]
        self.discard_rate = matter_defaults["discard_rate"]
        self.conversion_rate_material_reserves = matter_defaults[
            "conversion_rate_material_reserves"
        ]
        self.recycling_rate = matter_defaults["recycling_rate"]

        # Saving the climate ensembles ?
        self.NUM_OF_ENSEMBLES = climate_ensembles

        # Saving the scenario
        self.scenario = self.economy.scenario
        # self.scenario = get_economic_scenario(scenario)

        self.region_list = input_dataset.REGION_LIST
        self.material_intensity_array = copy.deepcopy(
            input_dataset.MATERIAL_INTENSITY_ARRAY
        )
        self.income_level = copy.deepcopy(  #TODO sis, you have to add that to data input
            input_dataset.INCOME_LEVEL_ARRAY
        )


        self.timestep = time_horizon.timestep
        self.data_timestep = time_horizon.data_timestep
        self.data_time_horizon = time_horizon.data_time_horizon
        self.model_time_horizon = time_horizon.model_time_horizon

        # Selecting only the required scenario
        self.material_intensity_array = self.material_intensity_array[
            :, :, np.newaxis
        ]

        if self.timestep != self.data_timestep:
            # Interpolate Material Intensity Dictionary
            self._interpolate_material_intensity()

        """
        Initialize matter-use variables arrays
        """

        # TODO: put the units of each variable in default_parameters.py
        # Intializing the material intensity array Unit: kg/USD per year
        self.material_intensity = self.material_intensity_array

        # Intializing the material intensity array Unit: Gt per year
        self.material_consumption = np.zeros(
            (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )

       # Intializing the in-use stock array Unit: Gt per year
        self.in_use_stock = np.repeat(
            input_dataset.IN_USE_STOCK_INIT_ARRAY[:, :, np.newaxis], 
            self.NUM_OF_ENSEMBLES, axis=2
        )

        # Intializing the discarded material array Unit: Gt per year
        self.discarded_material = np.zeros(
            (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )

        # Intializing the recycled material array Unit: Gt per year
        self.recycled_material = np.zeros(
            (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )

        # Intializing the waste array Unit: Gt per year
        self.waste = np.zeros(
            (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )

        # Intializing the extracted matter array Unit: Gt per year
        self.extracted_matter = np.zeros(
            (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )

         # Intializing the material reserves array Unit: Gt per year
        self.material_reserves = np.repeat(
            input_dataset.MATERIAL_RESERVES_INIT_ARRAY[:, :, np.newaxis],
              self.NUM_OF_ENSEMBLES, axis=2
        )

        # Intializing the converted material reserves array Unit: Gt per year
        self.converted_material_reserves = np.zeros(
            (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )

        # Intializing the material resources array Unit: Gt per year
        self.material_resources = np.repeat(
            input_dataset.MATERIAL_RESOURCES_INIT_ARRAY[:, :, np.newaxis],
              self.NUM_OF_ENSEMBLES, axis=2
        )

        # Intializing the depletion ratio
        self.depletion_ratio = np.zeros(
            (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )
        # Initializing emissions avoided array
        self.emmissions_avoided = np.zeros(
            (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )

    #  Angela - if you are taking timestep as an argument, change the name to stepwise_run. You can create a run method that will just call the stepwise_run for the entire time horizon
    # TODO Palok - please review the new methods, is this what you meant here?
    def stepwise_run(self, timestep, output, recycling_rate):
        """
        Run the matter-use calculations for a given timestep.
        """
        if len(recycling_rate.shape) == 1:
            recycling_rate = recycling_rate.reshape(-1, 1)

        material_consumption = (
            self.material_intensity[:, timestep, :]
            * output[:, timestep, :]
            * 1000  # Output in trillions USD
        ) / 1_000_000_000  # Convert to Gt

        in_use_stock = self.get_in_use_stock(material_consumption, timestep)
        discarded_material = self.get_discarded_material(in_use_stock)
        recycled_material = self.get_recycled_material(discarded_material, recycling_rate
        )
        waste = self.get_waste(discarded_material, recycled_material)
        extracted_matter = self.get_extracted_matter(
            material_consumption, recycled_material
        )
        converted_material_reserves = self.get_converted_material_reserves(timestep)
        material_reserves = self.get_material_reserves(
            extracted_matter, converted_material_reserves, timestep
        )
        material_resources = self.get_material_resources(
            converted_material_reserves, timestep
        )
        depletion_ratio = self.get_depletion_ratio(
            extracted_matter, material_resources, timestep
        )

        # Emissions avoided by the amount of recycled material
        emissions_avoided = self.get_emissions_avoided(timestep, recycled_material)
        # Calculate recycling costs
        recycling_costs = self.recycling_cost(recycled_material)

        return depletion_ratio[:, timestep, :], emissions_avoided[:, timestep, :], recycling_costs[:, timestep, :]

    def run(self, output, recycling_rate):
        """
        Run the matter-use calculations for the entire time horizon.
        """
        depletion_ratios = np.zeros(
        (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )
        emissions_avoided = np.zeros(
        (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )
        recycling_costs = np.zeros(
        (len(self.region_list), len(self.model_time_horizon), self.NUM_OF_ENSEMBLES)
        )
        for timestep in range(len(self.model_time_horizon)):
            depletion_ratio, emissions_avoided_timestep, recycling_costs_timestep = self.stepwise_run(timestep, output, recycling_rate)
            depletion_ratios[:, timestep, :] = depletion_ratio
            emissions_avoided[:, timestep, :] = emissions_avoided_timestep
            recycling_costs[:, timestep, :] = recycling_costs_timestep
        return depletion_ratios, emissions_avoided, recycling_costs
    
    ############################################################################################################

    # Matter-use variable calculations functions

    ############################################################################################################

    # NOTE: if the following functions are only specific to this class, and not used anywhere else, you can use the decorator @classmethod to make them private
    # Palok I have never use decorators, so do you think are necessary? also I changed the methods and don't know if I should use timestep here ?
    def get_in_use_stock(self, material_consumption, timestep):
        if timestep == 0:
            return self.in_use_stock[:, timestep, :]
        else:
            return (
                self.in_use_stock[:, timestep - 1, :]
                + material_consumption * self.physical_use_ratio
                - self.discarded_material[:, timestep, :]
            )

    def get_discarded_material(self, in_use_stock):
        return self.discard_rate * in_use_stock

    def get_recycled_material(self, discarded_material, recycling_rate):
        return recycling_rate * discarded_material  


    def get_waste(self, discarded_material, recycled_material):
        return discarded_material - recycled_material

    def get_extracted_matter(self, material_consumption, recycled_material):
        return material_consumption - recycled_material

    def get_converted_material_reserves(self, timestep):
        return (
            self.conversion_rate_material_reserves
            * self.material_resources[:, timestep - 1, :]
        )

    def get_material_reserves(
        self, extracted_matter, converted_material_reserves, timestep
    ):
        if timestep == 0:
            return self.material_reserves[:, timestep, :]
        else:
            return (
                self.material_reserves[:, timestep - 1, :]
                + converted_material_reserves
                - extracted_matter
            )

    def get_material_resources(self, converted_material_reserves, timestep):
        if timestep == 0:
            return self.material_resources[:, timestep, :]
        else:
            return (
                self.material_resources[:, timestep - 1, :]
                - converted_material_reserves
            )

    def get_depletion_ratio(self, extracted_matter, material_resources, timestep):
        return extracted_matter / material_resources

    ########################################################################################
    # Emissions avoided through recycling of paper and plastics
    ########################################################################################

    def get_emissions_avoided(self, timestep, recycled_material):
        # Calculate proportions of recycled materials in gigatons (Gt)
        recycled_paper = recycled_material * self.emissions_defaults["PROPORTION_PAPER"]
        recycled_plastic = (
            recycled_material * self.emissions_defaults["PROPORTION_PLASTIC"]
        )
        # Calculate GHG emissions avoided
        em_ghg_avoided = self.calculate_ghg_avoided(recycled_paper, recycled_plastic)
        # Calculate energy savings
        e_total_saved = self.calculate_energy_saved(recycled_paper, recycled_plastic)
        # Calculate fuel saved and CO2 emissions avoided
        em_co2_avoided = self.calculate_co2_avoided(e_total_saved)
        # Total emissions avoided
        em_total = (
            (em_ghg_avoided + em_co2_avoided) * 365
        ) / 1e12  # Convert kg to Gt per year
        return em_total  # Gt per year

    def calculate_ghg_avoided(self, recycled_paper, recycled_plastic):
        # Calculate GHG emissions avoided for paper
        em_ghg_vg_paper = self.emissions_defaults["EFACTOR_VG_PAPER"] * recycled_paper
        em_ghg_rec_paper = self.emissions_defaults["EFACTOR_REC_PAPER"] * recycled_paper
        # Calculate GHG emissions avoided for plastic
        em_ghg_vg_plastic = (
            self.emissions_defaults["EFACTOR_VG_PLASTIC"] * recycled_plastic
        )
        em_ghg_rec_plastic = (
            self.emissions_defaults["EFACTOR_REC_PLASTIC"] * recycled_plastic
        )
        # Total GHG emissions avoided
        em_ghg_avoided_paper = em_ghg_vg_paper - em_ghg_rec_paper
        em_ghg_avoided_plastic = em_ghg_vg_plastic - em_ghg_rec_plastic

        return em_ghg_avoided_paper + em_ghg_avoided_plastic

    def calculate_energy_saved(self, recycled_paper, recycled_plastic):
        # Calculate energy savings for paper
        e_vg_paper = recycled_paper * self.emissions_defaults["ENERGY_FACTOR_VG_PAPER"]
        e_rec_paper = (
            recycled_paper * self.emissions_defaults["ENERGY_FACTOR_REC_PAPER"]
        )
        # Calculate energy savings for plastic
        e_vg_plastic = (
            recycled_plastic * self.emissions_defaults["ENERGY_FACTOR_VG_PLASTIC"]
        )
        e_rec_plastic = (
            recycled_plastic * self.emissions_defaults["ENERGY_FACTOR_REC_PLASTIC"]
        )
        # Total energy saved
        e_total_saved_paper = e_vg_paper - e_rec_paper
        e_total_saved_plastic = e_vg_plastic - e_rec_plastic

        return (e_total_saved_paper + e_total_saved_plastic) / self.emissions_defaults[
            "CONVERSION_FACTOR_GJ_TON"
        ]

    def calculate_co2_avoided(self, e_total_saved):
        fuel_saved = (e_total_saved) / (
            self.emissions_defaults["GENERATOR_EFFICIENCY"]
            * self.emissions_defaults["LOWER_HEATING_VALUE"]
        )
        return fuel_saved * self.emissions_defaults["EMISSION_FACTOR_DIESEL"]
    ##################################################################################
    # Recycling cost based on income level calculation
    ##################################################################################
    def linear_decrease_cost(self, min_cost, max_cost, timestep):
        """
        Calculate the cost of recycling with a linear decrease over the time horizon.
        """
        start_year = self.model_time_horizon[0]
        end_year = self.model_time_horizon[-1]
        year = self.model_time_horizon[timestep]
        slope = (min_cost - max_cost) / (end_year - start_year)
        return max_cost + slope * (year - start_year)

    def recycling_cost(self, recycled_material):
        """
        Calculate the recycling cost based on income level and recycled material.
        """
        cost_ranges = { # In trillions USD / Gt
            'Low income': (0 * 1e-3, 25 * 1e-3),
            'Lower middle income': (5 * 1e-3, 30 * 1e-3),
            'Upper middle income': (5 * 1e-3, 50 * 1e-3),
            'High income': (30 * 1e-3, 80 * 1e-3)
        }
        costs = np.zeros_like(recycled_material)
        for i in range(recycled_material.shape[0]):  # Loop over regions
            for j in range(recycled_material.shape[1]):  # Loop over timesteps
                for k in range(recycled_material.shape[2]):  # Loop over scenarios
                    income_level = self.income_level[i]
                    min_cost, max_cost = cost_ranges[income_level]
                    average_cost = self.linear_decrease_cost(min_cost, max_cost, j)
                    costs[i, j, k] = average_cost * recycled_material[i, j, k]   
        return costs

    def _interpolate_material_intensity(self):
        interp_data = np.zeros(
            (
                self.material_intensity_array.shape[0],
                len(self.model_time_horizon),
                # self.gdp_array.shape[2],
            )
        )
        for i in range(self.material_intensity_array.shape[0]):
            f = interp1d(
                self.data_time_horizon,
                self.material_intensity_array[i, :],
                kind="linear",  # , j
            )
            interp_data[i, :] = f(self.model_time_horizon)  # , j

        self.material_intensity_array = interp_data

    def __getattribute__(self, __name: str) -> Any:
        """
        This method returns the value of the attribute of the class.
        """
        return object.__getattribute__(self, __name)


In [28]:
import numpy as np
import matplotlib.pyplot as plt
from typing import Any

# Assuming you have defined MatterUse class with all its methods as provided above

# Create a mock input dataset
class MockInputDataset:
    REGION_LIST = ['Region1', 'Region2', 'Region3', 'Region4', 'Region5']
    MATERIAL_INTENSITY_ARRAY = np.random.rand(5, 1)
    IN_USE_STOCK_INIT_ARRAY = np.random.rand(5, 1)
    MATERIAL_RESERVES_INIT_ARRAY = np.random.rand(5, 1)
    MATERIAL_RESOURCES_INIT_ARRAY = np.random.rand(5, 1)
    INCOME_LEVEL_ARRAY = np.array(['Upper middle income', 'High income', 'High income', 'Lower middle income', 'Low income'])

# Mock time horizon and economy
class MockTimeHorizon:
    timestep = 1
    data_timestep = 1
    data_time_horizon = np.arange(2015, 2025, 1)
    model_time_horizon = np.arange(2015, 2025, 1)

class MockEconomy:
    scenario = 0

# Instantiate MatterUse with mock data
input_dataset = MockInputDataset()
time_horizon = MockTimeHorizon()
climate_ensembles = 3
economy = MockEconomy()

matter_use = MatterUse(input_dataset, time_horizon, climate_ensembles, economy)

# Generate sample output and recycling rate data
output = np.random.rand(5, len(time_horizon.model_time_horizon), climate_ensembles)
recycling_rate = np.random.rand(5, len(time_horizon.model_time_horizon), climate_ensembles)

# Run the model
depletion_ratios, emissions_avoided, recycling_costs = matter_use.run(output, recycling_rate)

# Plotting the results
years = time_horizon.model_time_horizon

def plot_results(data, title, ylabel):
    fig, ax = plt.subplots(figsize=(12, 6))
    for region_idx in range(data.shape[0]):
        for ensemble_idx in range(data.shape[2]):
            ax.plot(years, data[region_idx, :, ensemble_idx], label=f'Region {region_idx+1}, Ensemble {ensemble_idx+1}')
    ax.set_xlabel('Year')
    ax.set_ylabel(ylabel)
    ax.set_title(title)
    ax.legend()
    plt.grid(True)
    plt.show()

# Plot depletion ratios
plot_results(depletion_ratios, 'Depletion Ratios Over Time', 'Depletion Ratio')

# Plot emissions avoided
plot_results(emissions_avoided, 'Emissions Avoided Over Time', 'Emissions Avoided (Gt per year)')

# Plot recycling costs
plot_results(recycling_costs, 'Recycling Costs Over Time', 'Recycling Costs (Trillion $)')



ValueError: operands could not be broadcast together with shapes (5,10,3) (5,3) 