In [1]:
from pathways import Pathways
p = Pathways(datapackage="/Users/romain/GitHub/sweet_sure-2050-switzerland/dev/image-SSP2-stem-SPS1.zip", debug=True)

Invalid datapackage: Descriptor validation error: {'path': 'mapping/mapping.yaml', 'profile': 'data-resource', 'name': 'mapping', 'format': 'yaml', 'mediatype': 'text/yaml', 'encoding': 'utf-8'} is not valid under any of the given schemas at "resources/9" in descriptor and at "properties/resources/items/oneOf" in profile
Invalid datapackage: Descriptor validation error: 'data-resource' is not one of ['tabular-data-resource'] at "resources/9/profile" in descriptor and at "properties/resources/items/properties/profile/enum" in profile


In [2]:
import numpy as np
p.calculate(
    methods=[
        #'IPCC 2021 - climate change - GWP 100a, incl. H',
        'IPCC 2021 - climate change - GWP 100a, incl. H and bio CO2',
    ], #+ [m for m in p.lcia_methods if "relics" in m.lower()][-3:],
    regions=["CH",],
    scenarios=[
        #"SSP2-Base",
        "SSP2-RCP26",
    ],
    variables=[
        "hydro_run_of_river",
        "hydro (reservoir)",
        "hydro (pumped storage)",
        "nuclear (boiling water)",
        "nuclear (pressure water)",
        "oil (peak devices)",
        "oil (DH CHP)",
        "oil (industry CHP)",
        "gas (CCGT)",
        "gas (OCGT)",
        "gas (CCGT CCS)",
        "gas (CHP DH)",
        "gas (fuel cell DH)",
        "gas (CHP industry)",
        "gas (CHP CCS industry)",
        "gas (CHP fuel cell reformer)",
        "gas (CHP single family)",
        "gas (single family CHP fuel cell)",
        "gas (multi family CHP)",
        "gas (multi family CHP fuel cell)",
        "biogas (services CHP)",
        "gas (services CHP)",
        "gas (services CHP fuel cell)",
        "waste incineration, fossil",
        "waste incineration, fossil (CHP)",
        "waste incineration, fossil (industrial)",
        "waste incineration, fossil (CCS)",
        "waste incineration, fossil (CHP, CCS)",
        "waste incineration, fossil (industrial, CCS)",
        "waste incineration, non-fossil",
        "waste incineration, non-fossil (CHP)",
        "waste incineration, non-fossil (industrial)",
        "waste incineration, non-fossil (CCS)",
        "waste incineration, non-fossil (CHP, CCS)",
        "waste incineration, non-fossil (industrial, CCS)",
        "pv (large scale)",
        "pv (industry)",
        "pv (services)",
        "pv residential (multi family)",
        "pv residential (single family)",
        "wind",
        "geothermal",
        "hydrogen fuel cell (DH CHP)",
        "hydrogen fuel cell (industrial CHP)",
        "hydrogen fuel cell (services CHP)",
        "biomass (DH)",
        "biomass (CHP)",
        "biomass (CCS, BECCS)",
        "biomass (CCS, industrial)",
        "battery (large)",
        "battery (medium)",
        "battery (industry)",
        "battery (services)",
        "battery (single family)",
        "battery (multi family)",
        "CAES",
        "import",
    ],
    years=[
        2020,
        #2022,
        #2030,
        #2040,
        2050
    ],
    multiprocessing=False,
    use_distributions=3,
    subshares=True
)

Calculating LCA results for image...
--- Calculating LCA results for SSP2-RCP26...
------ Calculating LCA results for 2020...


0% [#] 100% | ETA: 00:00:01
Total time elapsed: 00:00:00


------ Calculating LCA results for 2050...


0% [#] 100% | ETA: 00:00:01
Total time elapsed: 00:00:00
0% [##] 100% | ETA: 00:00:00
Total time elapsed: 00:00:03


In [3]:
p.lca_results = p.lca_results.sum(dim="act_category").interp(
    year=range(2020, 2051)
)

In [4]:
df = p.lca_results.to_dataframe("value")
df = df[df["value"]!=0.0]
df = df[~df["value"].isnull()]
df=df.reset_index()

In [5]:
from pivottablejs import pivot_ui
from IPython.display import HTML
pivot_ui(df, outfile_path='test.html')

In [9]:
import yaml
import numpy as np
SUBSHARES = "../pathways/data/technologies_shares.yaml"


def load_subshares() -> dict:
    """
    Load a YAML file and return its content as a Python dictionary.
    :return: A dictionary with the categories, technologies and market shares data.
    """
    with open(SUBSHARES) as stream:
        data = yaml.safe_load(stream)

    return data


def adjust_subshares(data) -> dict:
    """
    Adjust the subshares data to ensure that the sum of the 2020 values is equal to 1,
    after neglecting the technologies with no name.
    :param data: Dictionary with the categories, technologies and market shares data.
    :return: Adjusted dictionary.
    """
    for category, technologies in data.items():

        # Initialize totals
        total_2020_value = 0
        total_adjustable_value = 0

        # First pass to calculate totals
        for subcategory, tech_list in technologies.items():
            for tech in tech_list:
                if 2020 in tech:
                    value = tech[2020].get("value", 0)
                    total_2020_value += value
                    if tech.get("name") is not None:
                        total_adjustable_value += value

        # Skip adjustment if no values or all values are named
        if total_2020_value == 0 or total_adjustable_value == 0:
            continue

        adjustment_factor = total_2020_value / total_adjustable_value

        # Second pass to adjust values
        adjusted_total = 0
        for subcategory, tech_list in technologies.items():
            for tech in tech_list:
                if 2020 in tech and tech.get("name") is not None:
                    tech[2020]["value"] = tech[2020]["value"] * adjustment_factor
                    adjusted_total += tech[2020]["value"]

        # Check if the adjusted total is close to 1.00, allowing a small margin for floating-point arithmetic
        if not np.isclose(adjusted_total, 1.00, rtol=1e-9):
            print(
                f"Warning: Total of adjusted '2020' values in category '{category}' does not add up to 1.00 (Total: {adjusted_total})"
            )

    return data

In [11]:
load_subshares()

{'Wind': {'onshore': [{'name': 'electricity production, wind, <1MW turbine, onshore',
    'reference product': 'electricity, high voltage',
    'unit': 'kilowatt hour',
    2020: {'value': 0.96},
    2050: {'min': 0.83, 'max': 0.96, 'distribution': 'uniform'},
    'create copy': False}],
  'offshore': [{'name': 'electricity production, wind, 1-3MW turbine, offshore',
    'reference product': 'electricity, high voltage',
    'unit': 'kilowatt hour',
    2020: {'value': 0.04},
    2050: {'min': 0.04, 'max': 0.17, 'distribution': 'uniform'},
    'create copy': False}]},
 'PV': {'c-Si': [{'name': 'electricity production, photovoltaic, at 93 kWp slanted-roof, multi-Si, laminated, integrated',
    'reference product': 'electricity production, photovoltaic, at 93 kWp slanted-roof, multi-Si, laminated, integrated',
    'unit': 'kilowatt hour',
    2020: {'value': 0.944},
    2050: {'min': 0.625, 'max': 0.944, 'distribution': 'uniform'},
    'create copy': False}],
  'CdTe': [{'name': 'electric

In [13]:
adjust_subshares(load_subshares())

{'Wind': {'onshore': [{'name': 'electricity production, wind, <1MW turbine, onshore',
    'reference product': 'electricity, high voltage',
    'unit': 'kilowatt hour',
    2020: {'value': 0.96},
    2050: {'min': 0.83, 'max': 0.96, 'distribution': 'uniform'},
    'create copy': False}],
  'offshore': [{'name': 'electricity production, wind, 1-3MW turbine, offshore',
    'reference product': 'electricity, high voltage',
    'unit': 'kilowatt hour',
    2020: {'value': 0.04},
    2050: {'min': 0.04, 'max': 0.17, 'distribution': 'uniform'},
    'create copy': False}]},
 'PV': {'c-Si': [{'name': 'electricity production, photovoltaic, at 93 kWp slanted-roof, multi-Si, laminated, integrated',
    'reference product': 'electricity production, photovoltaic, at 93 kWp slanted-roof, multi-Si, laminated, integrated',
    'unit': 'kilowatt hour',
    2020: {'value': 0.944},
    2050: {'min': 0.625, 'max': 0.944, 'distribution': 'uniform'},
    'create copy': False}],
  'CdTe': [{'name': 'electric

In [16]:
def adjust_subshares2(data):
    """
    Adjusts the values in 'data' for each year ensuring the sum equals 1 for each category, excluding technologies without a name.
    It dynamically identifies years (integer keys) that contain a 'value' subkey and adjusts them.

    :param data: A dictionary with categories as keys, each category is a dictionary of subcategories containing a list of technology dictionaries.
    :return: A dictionary with the adjusted values.
    """
    for category, technologies in data.items():
        years = identify_years(technologies)
        for year in years:
            total_value, total_adjustable_value = compute_totals(technologies, year)
            if total_value == 0 or total_adjustable_value == 0:
                continue
            adjust_values(technologies, total_value, total_adjustable_value, category, year)

    return data

def identify_years(technologies):
    years = set()
    for subcategory, tech_list in technologies.items():
        for tech in tech_list:
            year_keys = [key for key in tech.keys() if isinstance(key, int) and 'value' in tech[key]]
            years.update(year_keys)
    return years

def compute_totals(technologies, year):
    total_value = 0
    total_adjustable_value = 0
    for subcategory, tech_list in technologies.items():
        for tech in tech_list:
            if year in tech:
                value = tech[year].get('value', 0)
                total_value += value
                if tech.get("name") is not None:
                    total_adjustable_value += value
    return total_value, total_adjustable_value

def adjust_values(technologies, total_value, total_adjustable_value, category, year):
    adjustment_factor = total_value / total_adjustable_value
    adjusted_total = 0
    for subcategory, tech_list in technologies.items():
        for tech in tech_list:
            if year in tech and tech.get("name") is not None:
                old_value = tech[year]["value"]
                tech[year]["value"] = old_value * adjustment_factor
                adjusted_total += tech[year]["value"]

    if not np.isclose(adjusted_total, 1.00, rtol=1e-9):
        print(f"Warning: Total of adjusted '{year}' values in category '{category}' does not add up to 1.00 (Total: {adjusted_total})")



In [18]:
adjust_subshares(load_subshares()) == adjust_subshares2(load_subshares())

True

In [34]:
def correlated_samples(ranges, defaults, iterations=1000):
    """
    Adjusts randomly selected shares for parameters to sum to 1
    while respecting their specified ranges.

    :param ranges: Dict with parameter names as keys and (min, max) tuples as values.
    :param defaults: Dict with default values for each parameter.
    :param iterations: Number of iterations to attempt to find a valid distribution.
    :return: A dict with the adjusted shares for each parameter.
    """
    for _ in range(iterations):
        shares = {
            param: np.random.uniform(low, high)
            for param, (low, high) in ranges.items()
        }
        total_share = sum(shares.values())
        shares = {param: share / total_share for param, share in shares.items()}
        if all(
                ranges[param][0] <= share <= ranges[param][1]
                for param, share in shares.items()
        ):
            return shares

    print(f"Failed to find a valid distribution after {iterations} iterations")
    return defaults

In [40]:
ranges = {
    "A": (0, 0.9),
    "B": (0, 0.5),
}
defaults = {
    "A": 0.8,
    "B": 0.2
}
correlated_samples(ranges, defaults)
print(sum(correlated_samples(ranges, defaults).values()))

1.0


In [65]:
import numpy as np

def correlated_samples2(ranges, defaults, iterations=1000):
    """
    Generates random samples for parameters such that they sum to 1 and stay within specified ranges.
    It tries to generate samples uniformly within the range, then scales them to sum to 1.

    :param ranges: Dict with parameter names as keys and (min, max) tuples as values.
    :param defaults: Dict with default values for each parameter.
    :param iterations: Number of iterations to attempt to find a valid distribution.
    :return: A dict with the adjusted shares for each parameter.
    """
    param_keys = list(ranges.keys())
    num_params = len(param_keys)

    for _ in range(iterations):
        raw_samples = np.random.rand(num_params)
        min_vals = np.array([ranges[param][0] for param in param_keys])
        max_vals = np.array([ranges[param][1] for param in param_keys])

        # Scale samples to their respective ranges
        scaled_samples = min_vals + raw_samples * (max_vals - min_vals)
        scaled_sum = sum(scaled_samples)
        
        # Normalize to make sure they sum to 1
        normalized_samples = scaled_samples / scaled_sum
        
        if all(ranges[param][0] <= sample <= ranges[param][1] for param, sample in zip(param_keys, normalized_samples)):
            return {param: sample for param, sample in zip(param_keys, normalized_samples)}

    print(f"Failed to find a valid distribution after {iterations} iterations")
    return defaults


In [66]:
ranges = {
    "A": (0, 0.9),
    "B": (0.3, 0.5),
}
defaults = {
    "A": 0.8,
    "B": 0.2
}



{'A': 0.6353254283714487, 'B': 0.3646745716285514}

In [69]:
%%timeit
correlated_samples(ranges, defaults)

7.63 µs ± 22 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


In [70]:
%%timeit
correlated_samples2(ranges, defaults)

7.61 µs ± 38.1 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


In [3]:
import bw_processing as bwp
import numpy as np
t_data = np.array([
    1,   # production of natural gas
    1,   # production of carbon fibre
    1,   # production of bike
    237, # input of natural gas
    2.5, # input of carbon fibre
])
t_indices = np.array([
    (101, 101), # production of natural gas
    (102, 102), # production of carbon fibre
    (103, 103), # production of bike
    (101, 102), # input of natural gas
    (102, 103), # input of carbon fibre
    ], 
    dtype=bwp.INDICES_DTYPE
)
t_flip = np.array([False, False, False, True, True])

b_data = np.array([26.6])
b_indices = np.array([
    (201, 102), # emission of CO2
    ], 
    dtype=bwp.INDICES_DTYPE
)

c_data = np.array([1])
c_indices = np.array([
    (201, 201), # CF of CO2
    ], 
    dtype=bwp.INDICES_DTYPE
)

dp_static = bwp.create_datapackage()

dp_static.add_persistent_vector(
    matrix='technosphere_matrix',
    indices_array=t_indices,
    data_array=t_data,
    flip_array=t_flip,
)
dp_static.add_persistent_vector(
    matrix='biosphere_matrix',
    indices_array=b_indices,
    data_array=b_data,
)
dp_static.add_persistent_vector(
    matrix='characterization_matrix',
    indices_array=c_indices,
    data_array=c_data,
)

In [4]:
import bw2calc as bc
lca = bc.LCA(
    demand={101: 1},
    data_objs=[dp_static],
)

In [5]:
lca.packages

[<bw_processing.datapackage.Datapackage at 0x1773dd310>]

In [87]:
lca.lci()

In [88]:
lca.technosphere_matrix

<3x3 sparse matrix of type '<class 'numpy.float64'>'
	with 5 stored elements in Compressed Sparse Row format>

In [80]:
tech_matrix = [resource["name"] for resource in dp_static.resources if "technosphere_matrix" in resource["matrix"]][0]

In [96]:
for key, val in lca.dicts.biosphere.items():
    print(key, val)

201 0


In [108]:
lca.technosphere_matrix.nonzero()[0]

array([0, 0, 1, 1, 2], dtype=int32)

In [107]:
lca.technosphere_matrix[0, 1]

-237.0