# Environmental Footprint NG supply-demand scenarios

In [None]:
import os
import copy
import pickle
import pprint
import warnings
from collections import namedtuple
from pathlib import Path
import IPython.display as ipd
from string import ascii_uppercase

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
import scipy
from scipy.stats import pearsonr, norm
import brightway2 as bw

from ng_lca_toolbox import result2df, plot_proc_contrib, excel2ngScenarioLCA

warnings.filterwarnings("ignore")

LCAResult = namedtuple("LCAResult", ["demand", "method", "score"])

COLORS = ["#66c2a5", "#fc8d62", "#ffd92f", "#8da0cb", "#e78ac3"]
SCENARIOS = ["Base", "REPowerEU", "Aftermath", "Coal", "Clean"]
supply_scenario_y2021, supply_scenario_y2022, supply_scenario_alternative = (
    "y2021_mid",
    "y2022_mid",
    "alternative_mid",
)

pprinter = pprint.PrettyPrinter(indent=4).pprint

BW_PROJECT = "NG_2022crisis_LCA"
DB_NAME = "ecoinvent 3.9.1 cutoff, natural gas scenario EU27"

bw.projects.set_current(BW_PROJECT)  # Accessing the project
ei_ng_db = bw.Database(DB_NAME)

save_fig = False
save_tables = False

PATH2PICKLES = "./data/persisted_files/"
if os.path.exists(PATH2PICKLES) is False:
    os.makedirs(PATH2PICKLES)
PATH2RESULTS = "./data/results/"
if os.path.exists(PATH2RESULTS) is False:
    os.makedirs(PATH2RESULTS)

DO_LCA = False
LCA_PATH = PATH2PICKLES + "scenario_lca.pickle"

DO_PROC_CONTRIB = False
PROC_CONTRIB_PATH = PATH2PICKLES + "scenario_proc_contrib.pickle"

RUN_MC = False
SKIP_MC = []
N_mc = 1000
MC_PATH = PATH2PICKLES + f"monte_carlo_{N_mc}_ite.pickle"
MC_PATH_LCA_ONLY = PATH2PICKLES + "monte_carlo_LCA_only.pickle"

VERBOSE = False

## Setup

In [None]:
# Plotting parameters
from matplotlib.ticker import (
    ScalarFormatter,
    FuncFormatter,
    AutoMinorLocator,
    LogLocator,
)
from matplotlib import ticker
from matplotlib.colors import ListedColormap
from matplotlib.markers import MarkerStyle
import matplotlib.transforms as transforms


fig_length = {
    1: 3.50394,  # 1 column
    1.5: 5.35433,  # 1.5 columns
    2: 7.20472,
}  # 2 columns
fig_height = 9.72441  # maxium height
fontsize_title = 9 - 0
fontsize_label = 8 - 0
fontsize_legend = 8 - 0
fontsize_axs = 8 - 0

spineline_width = 0.3

# sns.set_style('ticks')  # darkgrid, white grid, dark, white and ticks
# plt.style.use('default')

plt.rcParams["axes.spines.right"] = False
plt.rcParams["axes.spines.top"] = False
plt.rcParams["axes.linewidth"] = spineline_width

plt.rcParams["font.family"] = "calibri"  # "times new roman"
plt.rcParams["legend.facecolor"] = "white"
plt.rcParams["legend.edgecolor"] = "black"
plt.rcParams["legend.shadow"] = False
plt.rcParams["legend.facecolor"] = "white"
plt.rcParams["legend.edgecolor"] = "black"
plt.rcParams["font.size"] = fontsize_axs
plt.rcParams["legend.fontsize"] = fontsize_legend
plt.rcParams["axes.labelsize"] = fontsize_axs
plt.rcParams["ytick.labelsize"] = fontsize_axs
plt.rcParams["xtick.labelsize"] = fontsize_axs
plt.rcParams["axes.labelpad"] = 0.0
plt.rcParams["axes.linewidth"] = spineline_width
plt.rcParams["axes.spines.bottom"] = True
plt.rcParams["axes.spines.left"] = True
plt.rcParams["axes.spines.right"] = True
plt.rcParams["axes.spines.top"] = True
plt.rcParams["axes.titlesize"] = fontsize_label
plt.rcParams["xtick.labelsize"] = fontsize_label
plt.rcParams["xtick.major.width"] = spineline_width
plt.rcParams["ytick.major.width"] = spineline_width
plt.rcParams["xtick.minor.width"] = spineline_width
plt.rcParams["ytick.minor.width"] = spineline_width
plt.rcParams["figure.titlesize"] = fontsize_title
plt.rcParams["grid.linewidth"] = spineline_width
plt.rcParams["axes.grid.axis"] = "y"

In [None]:
# Implementa LANCA land use method - erosion potential
metadata = (
    ("LANCA v2.5 - land use", "erosion potential"),
    "kg soil loss",
    "LANCA v2.5 - Characterization Factors for Erosion Potential due to land occupation and transformation (to/from) \
            Available at: https://www.bookshop.fraunhofer.de/buch/LANCA/244600",
    Path("./data/Soil-Erosion-Potential_CFs_LANCA_v2.5.xlsx"),
)

lanca_method = bw.ExcelLCIAImporter(
    filepath=metadata[-1], name=metadata[0], unit=metadata[1], description=metadata[2]
)
lanca_method.apply_strategies()
lanca_method.drop_unlinked()
lanca_method.write_methods(overwrite=True, verbose=True)

In [None]:
# LCA methods
lca_methods = [
    met
    for met in list(bw.methods)
    if "EF v3.1 no LT" in met[0]
    and (
        "acidification no LT" == met[1]
        or "climate change no LT" == met[1]
        or "ecotoxicity: freshwater no LT" == met[1]
        or "energy resources: non-renewable no LT" == met[1]
        or "eutrophication" in met[1]
        or "human toxicity: carcinogenic no LT" in met[1]
        or "human toxicity: non-carcinogenic no LT" in met[1]
        or "ionising radiation: human health no LT" in met[1]
        or "land use no LT" == met[1]
        or "material resources: metals/minerals no LT" in met[1]
        or "ozone depletion no LT" in met[1]
        or "particulate matter formation no LT" in met[1]
        or "photochemical" in met[1]
        or "water use no LT" in met[1]
    )
]
lca_methods[1] = [
    met
    for met in list(bw.methods)
    if "climate change: including SLCFs" in met[1]
    and met[0] == "IPCC 2021"
    and "GWP100" in met[2]
][0]

lca_methods.append([met for met in list(bw.methods) if "LANCA" in met[0]][0])
lca_methods

In [None]:
# Quality level of LCA methods
quality_level = pd.Series(
    {
        "Acidification": "II",
        "Climate change": "I",
        "Ecotoxicity freshwater": "II/III",
        "Energy resources nonrenewable": "III",
        "Eutrophication freshwater": "II",
        "Eutrophication marine": "II",
        "Eutrophication terrestrial": "II",
        "Human toxicity carcinogenic": "II/III",
        "Human toxicity noncarcinogenic": "II/III",
        "Ionising radiation": "II",
        "Land use": "III",
        "Material resources metals minerals": "III",
        "Ozone depletion": "I",
        "Particulate matter formation": "I",
        "Photochemical oxidant formation": "II",
        "Water use": "III",
    },
    name="Quality level",
)
quality_level

## Scenarios' LCIA

In [None]:
# scenarios definition and LCIA calculation
if DO_LCA or not os.path.isfile(LCA_PATH):
    print("base", "--------------------", sep="\n")
    baseScenario = excel2ngScenarioLCA("base", supply_scenario_y2021, ei_ng_db)
    assert baseScenario.no_double_counting is True
    baseScenario.doLCA(lca_methods)
    print("Repower", "--------------------", sep="\n")
    repowerScenario = excel2ngScenarioLCA(
        "repower", supply_scenario_alternative, ei_ng_db
    )
    assert repowerScenario.no_double_counting is True
    repowerScenario.doLCA(lca_methods)
    print(SCENARIOS[2], "--------------------", sep="\n")
    y2022Scenario = excel2ngScenarioLCA("y2022", supply_scenario_y2022, ei_ng_db)
    y2022Scenario.NUCLEAR_POWER = max(y2022Scenario.NUCLEAR_POWER, 0.0)
    y2022Scenario.HYDRO_POWER = max(y2022Scenario.HYDRO_POWER, 0.0)
    assert y2022Scenario.no_double_counting is True
    y2022Scenario.doLCA(lca_methods)
    print(SCENARIOS[3], "--------------------", sep="\n")
    coalScenario = excel2ngScenarioLCA("coal", supply_scenario_alternative, ei_ng_db)
    assert coalScenario.no_double_counting is True
    coalScenario.doLCA(lca_methods)
    print(SCENARIOS[4], "--------------------", sep="\n")
    cleanScenario = excel2ngScenarioLCA("clean", supply_scenario_alternative, ei_ng_db)
    assert cleanScenario.no_double_counting is True
    cleanScenario.doLCA(lca_methods)

    lca_bkp = copy.deepcopy(
        [baseScenario, repowerScenario, y2022Scenario, coalScenario, cleanScenario]
    )
    for sce in lca_bkp:
        sce.lca_results = [
            LCAResult(
                demand=l_.demand,
                method=l_.method,
                score=l_.score,
            )
            for l_ in sce.lca_results
        ]

    with open(LCA_PATH, "wb") as fp:
        pickle.dump(lca_bkp, fp)
else:
    with open(LCA_PATH, "rb") as fp:
        lca_bkp = pickle.load(fp)
    baseScenario = lca_bkp[0]
    repowerScenario = lca_bkp[1]
    y2022Scenario = lca_bkp[2]
    coalScenario = lca_bkp[3]
    cleanScenario = lca_bkp[4]

In [None]:
# generating/printing/saving dataframes
baseScenario = copy.deepcopy(lca_bkp[0])
repowerScenario = copy.deepcopy(lca_bkp[1])
y2022Scenario = copy.deepcopy(lca_bkp[2])
coalScenario = copy.deepcopy(lca_bkp[3])
cleanScenario = copy.deepcopy(lca_bkp[4])

i = [i for i, v in enumerate(baseScenario.categories) if "land use no LT" in v][0]

list_of_NgScenarioLCA = [
    baseScenario,
    repowerScenario,
    y2022Scenario,
    coalScenario,
    cleanScenario,
]

for scenario in list_of_NgScenarioLCA:
    scenario.lca_results[i] = scenario.lca_results[-1]
    scenario.lca_results = scenario.lca_results[:-1]
    scenario.lca_methods.pop(-1)
    scenario.categories.pop(-1)
# baseScenario.categories

df_results = result2df(list_of_NgScenarioLCA, verbose=False)

# dataframe manipulation
pd.options.display.float_format = "{:0.2e}".format
df_results_lca_only = [df.iloc[:, 0] for df in df_results]
df_results_lca_only = pd.concat(df_results_lca_only, axis=1)
df_results_lca_only.sort_index(inplace=True)
col = [col for col in df_results_lca_only.columns]
for ite, colu in enumerate(df_results_lca_only.columns):
    try:
        col[ite] = (
            colu.split(" no LT ")[1]
            .replace(" human health", "")
            .replace(":", "")
            .replace(" ", "_")
            .replace("-", "")
            .replace("/", "_")
            .capitalize()
        )
    except IndexError:
        if "land use" in colu:
            col[ite] = "Land use"
        else:
            col[ite] = "Climate change"

col = [f"{k.replace('_', ' ')} ({v})" for k, v in zip(col, quality_level.values)]

df_results_lca_only.columns = col
df_results_lca_only.index = SCENARIOS

if VERBOSE:
    print("LCA results")
    ipd.display(df_results_lca_only)

In [None]:
lanca_method = [met for met in list(bw.methods) if "LANCA" in met[0]][0]

if DO_LCA or not os.path.exists(PATH2PICKLES + "scenarios_lanca_only.pickle"):
    scenarios_lanca_only = [
        excel2ngScenarioLCA("base", supply_scenario_y2021, ei_ng_db),
        excel2ngScenarioLCA("repower", supply_scenario_alternative, ei_ng_db),
        excel2ngScenarioLCA("y2022", supply_scenario_y2022, ei_ng_db),
        excel2ngScenarioLCA("coal", supply_scenario_alternative, ei_ng_db),
        excel2ngScenarioLCA("clean", supply_scenario_alternative, ei_ng_db),
    ]
    scenarios_lanca_only[2].HYDRO_POWER = max(scenarios_lanca_only[2].HYDRO_POWER, 0.0)
    scenarios_lanca_only[2].NUCLEAR_POWER = max(
        scenarios_lanca_only[2].NUCLEAR_POWER, 0.0
    )

    [sce.doLCA([lanca_method]) for sce in scenarios_lanca_only]

    for sce in scenarios_lanca_only:
        sce.lca_results = [
            LCAResult(
                demand=l_.demand,
                method=l_.method,
                score=l_.score,
            )
            for l_ in sce.lca_results
        ]

    with open(PATH2PICKLES + "scenarios_lanca_only.pickle", "wb") as fp:
        pickle.dump(scenarios_lanca_only, fp)
else:
    with open(PATH2PICKLES + "scenarios_lanca_only.pickle", "rb") as fp:
        scenarios_lanca_only = pickle.load(fp)

ipcc_methods = [" ".join(met) for met in scenarios_lanca_only[0].lca_methods]
dict_of_ipccs = {}

for sce, sce_name in zip(scenarios_lanca_only, SCENARIOS):
    dict_of_ipccs[sce_name] = sce.lca_results[0].score

df_lanca_only = pd.DataFrame.from_dict(
    dict_of_ipccs, orient="index", columns=ipcc_methods
)
df_lanca_only

In [None]:
ipcc_method = [
    met
    for met in list(bw.methods)
    if met[0] == "IPCC 2021"
    and "climate change: including SLCFs" in met[1]
    and ("GWP100" in met[2] or "GWP20" in met[2])
]

IPCC_CO2_ONLY = ("IPCC 2021", "Life cycle CO2 emissions")
try:
    bw.Method(IPCC_CO2_ONLY).load()
except:
    gwp = bw.Method(ipcc_method[0]).load()
    co2_cf = [
        i
        for i in gwp
        if "Carbon dioxide" in bw.Database("biosphere3").get(i[0][1])["name"]
    ]
    metadata = {"unit": "kg CO2"}
    co2_method = bw.Method(("IPCC 2021", "Life cycle CO2 emissions"))
    co2_method.register(**metadata)
    co2_method.write(co2_cf)
    co2_method.process()

if DO_LCA or not os.path.exists(PATH2PICKLES + "scenarios_co2_only.pickle"):
    scenarios_co2_only = [
        excel2ngScenarioLCA("base", supply_scenario_y2021, ei_ng_db),
        excel2ngScenarioLCA("repower", supply_scenario_alternative, ei_ng_db),
        excel2ngScenarioLCA("y2022", supply_scenario_y2022, ei_ng_db),
        excel2ngScenarioLCA("coal", supply_scenario_alternative, ei_ng_db),
        excel2ngScenarioLCA("clean", supply_scenario_alternative, ei_ng_db),
    ]
    scenarios_co2_only[2].HYDRO_POWER = max(scenarios_co2_only[2].HYDRO_POWER, 0.0)
    scenarios_co2_only[2].NUCLEAR_POWER = max(scenarios_co2_only[2].NUCLEAR_POWER, 0.0)

    [sce.doLCA([IPCC_CO2_ONLY]) for sce in scenarios_co2_only]

    for sce in scenarios_co2_only:
        sce.lca_results = [
            LCAResult(
                demand=l_.demand,
                method=l_.method,
                score=l_.score,
            )
            for l_ in sce.lca_results
        ]

    with open(PATH2PICKLES + "scenarios_co2_only.pickle", "wb") as fp:
        pickle.dump(scenarios_co2_only, fp)
else:
    with open(PATH2PICKLES + "scenarios_co2_only.pickle", "rb") as fp:
        scenarios_co2_only = pickle.load(fp)

ipcc_methods = [" ".join(met) for met in scenarios_co2_only[0].lca_methods]
dict_of_ipccs = {}

for sce, sce_name in zip(scenarios_co2_only, SCENARIOS):
    dict_of_ipccs[sce_name] = sce.lca_results[0].score

df_co2_only = pd.DataFrame.from_dict(
    dict_of_ipccs, orient="index", columns=ipcc_methods
)
df_co2_only

In [None]:
# PB calculations
def modify_pb_df(df):
    df = df.copy()
    df.sort_index(inplace=True)
    df.index = df_results_lca_only.columns
    df = df.T
    return df


# planetary boundaries data
pb_percapta_dict = {
    "climate_change": 501,  # 66.7% likelihood https://www.ipcc.ch/report/ar6/wg1/downloads/report/IPCC_AR6_WGI_SPM_Stand_Alone.pdf
    "ozone_depletion": 0.078,
    "eutrophication_marine": 290,
    "eutrophication_freshwater": 0.84,
    "eutrophication_terrestrial": 887,
    "acidification": 145,
    "land_use": 1840,
    "water_use": 26300,
    "particulate_matter": 7.47e-5,
    "photochemical_ozone_formation": 58.8,
    "human_toxicity_cancer": 1.39e-4,
    "human_toxicity_noncancer": 5.93e-4,
    "ecotoxicity_freshwater": 1.9e4,
    "ionising_radiation": 7.62e4,
    "energy_resources_nonrenewable": 3.24e4,
    "material_resources_metals_minerals": 3.18e-2,
}  # https://doi.org/10.1016/j.jenvman.2020.110686

EU_POPULATION = 447.0
GLOBAL_POPULATION = 8000.0
CUM_GLOBAL_POPULATION = 799_098_891_918  # from 2020-2100 https://www.sciencedirect.com/science/article/abs/pii/S0959378017312153?via%3Dihub

pb_europe_dict = {}
for k, v in pb_percapta_dict.items():
    pb_europe_dict[k] = v * EU_POPULATION * 1e6

df_pb_eu = pd.DataFrame.from_dict(pb_europe_dict, orient="index", columns=["PB_EU"])
df_pb_percapita = pd.DataFrame.from_dict(
    pb_percapta_dict, orient="index", columns=["PB_percapita_yearly"]
)

df_pb_eu = modify_pb_df(df_pb_eu)
df_pb_percapita = modify_pb_df(df_pb_percapita)

df_results_lca_only_ = df_results_lca_only.copy(deep=True)
df_results_lca_only_.loc[:, "Climate change (I)"] = df_co2_only.values
df_results_lca_only_.loc[:, "Land use (III)"] = df_lanca_only.values

df_LCA_PB = pd.concat([df_results_lca_only_, df_pb_eu])

df_ef_pb = pd.DataFrame()
for col in df_LCA_PB.columns.to_list():
    df_ef_pb[[col.replace("_", " ")]] = (
        100 * df_LCA_PB[[col]].iloc[0 : len(SCENARIOS)] / df_LCA_PB[[col]].iloc[-1]
    )
df_ef_pb.set_axis(SCENARIOS)

df_pb_unit = pd.DataFrame(
    {
        col: [bw.methods[res.method]["unit"]]
        for col, res in zip(df_ef_pb.columns, baseScenario.lca_results)
    },
    index=["Unit"],
)

print("Per capita yearly budget", "--------------------", sep="\n")
ipd.display(pd.concat([df_pb_percapita, df_pb_unit]))
print("EU27 yearly budget", "--------------------", sep="\n")
ipd.display(pd.concat([df_pb_eu, df_pb_unit]))
print("EU27 yearly transgression", "--------------------", sep="\n")
(df_ef_pb / 1).map("{:,.2f}%".format)

In [None]:
stat_relevant_cols = [
    "Acidification (II)",
    "Climate change (I)",
    "Energy resources nonrenewable (III)",

    "Eutrophication freshwater (II)",
    "Eutrophication marine (II)",
    "Eutrophication terrestrial (II)",

    "Ionising radiation (II)",
    "Land use (III)",
    "Ozone depletion (I)",

    "Particulate matter formation (I)",
    "Photochemical oxidant formation (II)",
    "Water use (III)",
]

stat_relevant_index = [df_ef_pb.columns.tolist().index(col) for col in stat_relevant_cols]

## Monte Carlo simulation

#### Calculations

In [None]:
# Monte Carlo calculation
all_methods = (
    ipcc_method
    + [IPCC_CO2_ONLY]
    + [
        met
        for met in list(bw.methods)
        if "EF v3.1 no LT" in met[0]
        and (
            "acidification no LT" == met[1]
            or "climate change no LT" == met[1]
            or "ecotoxicity: freshwater no LT" == met[1]
            or "energy resources: non-renewable no LT" == met[1]
            or "eutrophication" in met[1]
            or "human toxicity: carcinogenic no LT" in met[1]
            or "human toxicity: non-carcinogenic no LT" in met[1]
            or "ionising radiation: human health no LT" in met[1]
            or "land use no LT" == met[1]
            or "material resources: metals/minerals no LT" in met[1]
            or "ozone depletion no LT" in met[1]
            or "particulate matter formation no LT" in met[1]
            or "photochemical" in met[1]
            or "water use no LT" in met[1]
        )
    ]
    + [met for met in list(bw.methods) if "LANCA" in met[0]]
)

all_methods_names = [" | ".join(met) for met in all_methods]

dict_ipcc_methods = {
    k: v
    for k, v in zip(
        all_methods_names,
        all_methods,
    )
}

if RUN_MC or not os.path.exists(MC_PATH):
    baseScenario_mc = copy.deepcopy(lca_bkp[0])
    repowerScenario_mc = copy.deepcopy(lca_bkp[1])
    y2022Scenario_mc = copy.deepcopy(lca_bkp[2])
    coalScenario_mc = copy.deepcopy(lca_bkp[3])
    cleanScenario_mc = copy.deepcopy(lca_bkp[4])

    list_of_NgScenarioLCA = [
        baseScenario_mc,
        repowerScenario_mc,
        y2022Scenario_mc,
        coalScenario_mc,
        cleanScenario_mc,
    ]

    if len(SKIP_MC) > 0:
        try:
            with open(MC_PATH, "rb") as pf:
                mc_lca = pickle.load(pf)
        except FileNotFoundError:
            raise FileNotFoundError(
                "No Monte Carlo results found. Please set RUN_MC to True."
            )
    else:
        mc_lca = dict()
    for sce_name, sce in zip(SCENARIOS, list_of_NgScenarioLCA):
        if sce_name in SKIP_MC:
            continue
        print(f"Running Monte Carlo for {sce_name}")
        mc_lca[sce_name] = sce.multi_lcia_MonteCarlo(
            N_mc,
            dict_ipcc_methods,
            seed=42,
        )
else:
    with open(MC_PATH, "rb") as pf:
        mc_lca = pickle.load(pf)

mc_lca_bkp = copy.deepcopy(mc_lca)

df_mc_lca = dict()
for j, sce_name in enumerate(mc_lca.keys()):
    df_mc_lca[sce_name] = pd.DataFrame.from_dict(mc_lca[sce_name])
    df_mc_lca[sce_name].columns = all_methods_names

for sce in SCENARIOS:
    print(sce)
    ipd.display(df_mc_lca[sce])

if RUN_MC or not os.path.exists(MC_PATH):
    with open(MC_PATH, "wb") as fp:
        pickle.dump(df_mc_lca, fp)

In [None]:
# Concatenate all scenarios in one dataframe
categories = [" " for _ in baseScenario.categories]
for ite, cate in enumerate(baseScenario.categories):
    try:
        categories[ite] = (
            cate.split(" no LT ")[1]
            .replace(" human health", "")
            .replace(":", "")
            .replace(" ", "_")
            .replace("-", "")
            .replace("/", "_")
        )
    except IndexError:
        categories[ite] = "climate change"

categories = [
    f"{nm.split(': h')[0].replace('_',' ').capitalize()} ({ql})\n[{u.replace('-Eq','-eq').replace(' eq.','-eq').replace('CO2','CO$_2$').replace('H+','H$^+$').replace('m3','m$^3$')}]"
    for nm, u, ql in zip(categories, baseScenario.categories_unit, quality_level.values)
]
xlabels = [
    " ".join(txt.split("_")).replace("nonrenewable", "non-renewable")
    for txt in categories
]

selected_categories = [
    col for col in df_mc_lca[SCENARIOS[0]].columns if col.startswith("EF v3.1")
]
selected_categories[
    selected_categories.index(
        "EF v3.1 no LT | climate change no LT | global warming potential (GWP100) no LT"
    )
] = "IPCC 2021 | climate change: including SLCFs | global warming potential (GWP100)"

selected_cat_PB = copy.deepcopy(selected_categories)
selected_cat_PB[
    selected_cat_PB.index("EF v3.1 no LT | land use no LT | soil quality index no LT")
] = "LANCA v2.5 - land use | erosion potential"
selected_cat_PB[
    selected_cat_PB.index(
        "IPCC 2021 | climate change: including SLCFs | global warming potential (GWP100)"
    )
] = "IPCC 2021 | Life cycle CO2 emissions"

lcia2category_map_pb = dict(zip(selected_cat_PB, df_results_lca_only.columns))
lcia2category_map = dict(zip(selected_categories, df_results_lca_only.columns))

df_box_plot = pd.concat({k: d for k, d in df_mc_lca.items()})

df_box_plot = df_box_plot.reset_index()
df_box_plot.drop("level_1", axis=1, inplace=True)
df_box_plot.rename(columns={"level_0": "scenario"}, inplace=True)

df_box_plot

#### MC results

In [None]:
# Violin MC results
fig, ax = plt.subplots(
    4,
    4,
    figsize=(fig_length[2], fig_length[2]),
    sharey=True,
)
ax = ax.flatten()


for i, col in enumerate(selected_categories):
    ax[i].set_title(
        xlabels[i],
    )
    q_quantile = 0.0
    q_quantile = 0.02
    q_low = df_box_plot[col].quantile(q_quantile)
    q_hi = df_box_plot[col].quantile(1 - q_quantile)
    df_box_plot_ = df_box_plot[(df_box_plot[col] < q_hi) & (df_box_plot[col] > q_low)]

    sns.violinplot(
        data=df_box_plot_,
        x=col,
        y="scenario",
        hue="scenario",
        palette=COLORS[: len(SCENARIOS)],
        edgecolor="black",
        linewidth=0.3,
        inner=None,
        ax=ax[i],
        gap=0.0,
        density_norm="count",
        saturation=0.5,
        legend=True if i == 0 else False,
    )
    sns.boxplot(
        data=df_box_plot_,
        x=col,
        y="scenario",
        hue="scenario",
        palette=COLORS[: len(SCENARIOS)],
        boxprops={"zorder": 2},
        ax=ax[i],
        width=0.3,
        linewidth=0.5,
        linecolor="black",
        flierprops={"marker": "x", "color": "red"},
        fliersize=0.8,
        legend=False,
    )

    ax[i].xaxis.set_major_formatter(ScalarFormatter())
    ax[i].xaxis.set_minor_locator(AutoMinorLocator(3))
    ax[i].tick_params(which="minor", length=2)
    ax[i].tick_params(which="major", length=4)

    ax[i].set(xlabel=None)
    ax[i].set(ylabel=None)
    ax[i].tick_params(axis="y", which="both", length=0)

    def format_xtick(x, pos):
        return f"{x:.0g}" if abs(x) >= 1_000 else f"{x:g}"

    ax[i].xaxis.set_major_formatter(ticker.FuncFormatter(format_xtick))

fig.subplots_adjust(hspace=0.2, wspace=0.2)

ax[0].get_legend().remove()

plt.tight_layout()

if save_fig:
    plt.savefig(f"./figs/mc_violin_{N_mc}_ite.svg", bbox_inches="tight")
plt.show()

In [None]:
# Basic statistical results from MC simulations
if RUN_MC or not os.path.exists(MC_PATH_LCA_ONLY):
    results_lca_dict = dict()

    for sce_name, sce in zip(SCENARIOS, list_of_NgScenarioLCA):
        sce.doLCA(list(dict_ipcc_methods.values()))
        results_lca_dict[sce_name] = {
            k: v.score / 1e12 for k, v in zip(all_methods_names, sce.lca_results)
        }

    with open(MC_PATH_LCA_ONLY, "wb") as fp:
        pickle.dump(results_lca_dict, fp)
else:
    with open(MC_PATH_LCA_ONLY, "rb") as fp:
        results_lca_dict = pickle.load(fp)

df_results_lca_all = pd.DataFrame.from_dict(results_lca_dict).T

dict_stats_mc = {}

for j, (key, val) in enumerate(df_mc_lca.items()):
    q_quantile = 0.01
    val_no_outlier = {}
    for col in val.columns:
        q_low = val[col].quantile(q_quantile)
        q_hi = val[col].quantile(1 - q_quantile)
        val_no_outlier[col] = val[(val[col] <= q_hi) & (val[col] >= q_low)][col]

    val = pd.DataFrame(val_no_outlier)

    df_base_mc_results = val.describe().T
    df_base_mc_results[[col.capitalize() for col in df_base_mc_results.columns]] = (
        df_base_mc_results
    )
    df_base_mc_results["Baseline"] = df_results_lca_all.loc[key] * 1e12

    df_base_mc_results["Mean-Baseline diff. [%]"] = (
        100
        * (df_base_mc_results["Baseline"] - df_base_mc_results["Mean"])
        / df_base_mc_results["Baseline"]
    )

    distance_from_mean = (df_base_mc_results["75%"] - df_base_mc_results["25%"]) / 2

    df_base_mc_results["CV [%]"] = 100 * distance_from_mean / df_base_mc_results["Mean"]
    df_base_mc_results["CV-Std [%]"] = (
        100 * df_base_mc_results["Std"] / df_base_mc_results["Mean"]
    )
    df_base_mc_results["QCD [%]"] = (
        100
        * abs(df_base_mc_results["75%"] - df_base_mc_results["25%"])
        / abs(df_base_mc_results["75%"] + df_base_mc_results["25%"])
    )

    df_base_mc_results["MAD [%]"] = (
        100 * scipy.stats.median_abs_deviation(df_mc_lca[key]) / df_mc_lca[key].mean()
    )

    df_base_mc_results = df_base_mc_results[
        [
            "Baseline",
            "Mean",
            "Std",
            # "Count",
            # "CV [%]",
            # "CV-Std [%]",
            "QCD [%]",
            # "MAD [%]",
            "Min",
            "25%",
            "50%",
            "75%",
            "Max",
        ]
    ]

    dict_stats_mc[key] = df_base_mc_results

    df_base_mc_results_fmt = df_base_mc_results.map("{:,.2e}".format)
    df_base_mc_results_fmt["QCD [%]"] = pd.to_numeric(
        df_base_mc_results_fmt["QCD [%]"]
    ).map("{:,.2f}".format)

    print(key + ":")
    ipd.display(
        df_base_mc_results_fmt.loc[df_base_mc_results.sort_values(by="QCD [%]").index]
    )

    if save_tables:
        with pd.ExcelWriter(
            "./data/results/lca_MC_statistical_results.xlsx",
            engine="openpyxl",
            mode="w" if j == 0 else "a",
        ) as writer:
            df_base_mc_results_fmt.to_excel(writer, sheet_name=f"MC-{key}")

## P(A > B) and z-test

#### Calculations

##### Paired t-test for identical means

1. Calculate the difference between the paired observations: $d = x_1 - x_2$, where $x_1$ and $x_2$ are the paired observations.
2. Calculate the mean of the differences: $\bar{d} = \frac{1}{n}\sum_{i=1}^{n} d_i$, where $n$ is the number of paired observations.
3. Calculate the standard deviation of the differences: $s_d = \sqrt{\frac{1}{n-1}\sum_{i=1}^{n}(d_i - \bar{d})^2}$.
4. Calculate the standard error of the mean difference: $SE = \frac{s_d}{\sqrt{n}}$.
5. Calculate the t-statistic: $t = \frac{\bar{d}}{SE}$.
6. Determine the degrees of freedom: $df = n - 1$.
7. Calculate the p-value using the t-distribution with the appropriate degrees of freedom.

The p-value represents the probability of observing a t-statistic as extreme as the one calculated, assuming the null hypothesis is true (i.e., the means are equal). A small p-value indicates strong evidence against the null hypothesis, suggesting that the means are significantly different.

##### Probability of superiority $P(A>B)$

$$P(A>B) = \sum_{i=1}^n\frac{\min\{0,\max\{1,a_i-b_i\}\}}{n}
$$

In [None]:
prob_dict = {}
prob_numeric_dict = {}

for col in df_mc_lca[SCENARIOS[0]].columns:
    scenario_dict = {}
    a = df_mc_lca[SCENARIOS[0]][col]

    for sce in SCENARIOS:
        if sce == SCENARIOS[0]:
            continue
        b = df_mc_lca[sce][col]
        correlation, p_value = pearsonr(a, b)
        t_test_rel = scipy.stats.ttest_rel(a, b)
        z_test = round(t_test_rel.pvalue, 4)

        scenario_dict[f"P[Base > {sce}]"] = (a > b).mean()
        scenario_dict[f"Base == {sce}?"] = z_test

    prob_numeric_dict[col] = scenario_dict

df_prob_mc_numeric = pd.DataFrame.from_dict(prob_numeric_dict).T
df_prob_mc_numeric

In [None]:
# correlation scatter plot
fig, ax = plt.subplots(4, 4, figsize=(10, 10), dpi=300 if save_fig else 100)
ax = ax.flatten()


df_mc_lca_renamed = {
    k: v.copy(deep=True)[selected_categories].rename(columns=lcia2category_map)
    for k, v in df_mc_lca.items()
}

for i, col in enumerate(df_mc_lca_renamed[SCENARIOS[0]].columns):
    # remove outliers before scatter plot
    q_quantile = 0.01
    q_low = df_mc_lca_renamed[SCENARIOS[0]][col].quantile(q_quantile)
    q_hi = df_mc_lca_renamed[SCENARIOS[0]][col].quantile(1 - q_quantile)
    df_mc_lca_tmp = df_mc_lca_renamed[SCENARIOS[0]][
        (df_mc_lca_renamed[SCENARIOS[0]][col] < q_hi)
        & (df_mc_lca_renamed[SCENARIOS[0]][col] > q_low)
    ]
    for j, sce_name in enumerate(SCENARIOS[1:]):
        df_mc_lca_tmp2 = df_mc_lca_renamed[sce_name][
            (df_mc_lca_renamed[SCENARIOS[0]][col] < q_hi)
            & (df_mc_lca_renamed[SCENARIOS[0]][col] > q_low)
        ]
        ax[i].scatter(
            df_mc_lca_tmp[col],
            df_mc_lca_tmp2[col],
            s=0.5,
            alpha=0.5,
            c=COLORS[j],
            marker="o",
        )

    t_test_rel = scipy.stats.ttest_rel(
        df_mc_lca_renamed[SCENARIOS[0]][col], df_mc_lca_renamed[SCENARIOS[1]][col]
    )
    ax[i].annotate(
        f"Paired t-test = {t_test_rel.pvalue:.2f}",
        xy=(0.05, 0.95),
        xycoords="axes fraction",
        ha="left",
        va="top",
        fontsize=8,
    )

    ax[i].set_title(col)

    ax[i].set(xlabel=SCENARIOS[0], ylabel=SCENARIOS[1])

    ax[i].tick_params(axis="y", which="both", length=0)
    ax[i].tick_params(axis="x", which="both", length=0)
    ax[i].xaxis.set_major_formatter(ScalarFormatter())
    ax[i].yaxis.set_major_formatter(ScalarFormatter())

    ax[i].xaxis.set_minor_locator(AutoMinorLocator(3))
    ax[i].tick_params(which="minor", length=2)
    ax[i].tick_params(which="major", length=4)

    ax[i].yaxis.set_minor_locator(AutoMinorLocator(3))

plt.tight_layout()
plt.show()

In [None]:
# Format statistical results
# df_prob_mc = pd.DataFrame.from_dict(prob_dict)
df_prob_mc_numeric_renamed = (
    df_prob_mc_numeric.loc[selected_categories]
    .rename(index=lcia2category_map)
    .copy(deep=True)
)

df_prob_mc_numeric_fmt = df_prob_mc_numeric_renamed.copy(deep=True)

for col in df_prob_mc_numeric_fmt.columns:
    df_prob_mc_numeric_fmt[col] = (
        df_prob_mc_numeric_fmt[col].apply(lambda x: round(x, 2)).map("{:,.2f}".format)
    )

df_prob_mc_numeric_fmt

In [None]:
# Sanity check thatn probability numeric (apos relation based on fixed seed in MC) is close to theoretical one
a = (
    df_mc_lca_renamed[SCENARIOS[0]]["Climate change (I)"]
    / df_pb_eu["Climate change (I)"].iloc[0]
)
b = (
    df_mc_lca_renamed[SCENARIOS[1]]["Climate change (I)"]
    / df_pb_eu["Climate change (I)"].iloc[0]
)
c = (
    df_mc_lca_renamed[SCENARIOS[2]]["Climate change (I)"]
    / df_pb_eu["Climate change (I)"].iloc[0]
)
d = (
    df_mc_lca_renamed[SCENARIOS[3]]["Climate change (I)"]
    / df_pb_eu["Climate change (I)"].iloc[0]
)
e = (
    df_mc_lca_renamed[SCENARIOS[4]]["Climate change (I)"]
    / df_pb_eu["Climate change (I)"].iloc[0]
)

prob_numeric = [(a > b).mean(), (a > c).mean(), (a > d).mean(), (a > e).mean()]

# Theoretical approach (considering normal distribution)
prob = [
    norm.cdf((a.mean() - b.mean()) / np.sqrt(a.std() ** 2 + b.std() ** 2)),
    norm.cdf((a.mean() - c.mean()) / np.sqrt(a.std() ** 2 + c.std() ** 2)),
    norm.cdf((a.mean() - d.mean()) / np.sqrt(a.std() ** 2 + d.std() ** 2)),
    norm.cdf((a.mean() - e.mean()) / np.sqrt(a.std() ** 2 + d.std() ** 2)),
]

print(
    "Climate change (I)\nP[base>REPower]\tP[base>y2022]\tP[base>Coal]\tP[base>Clean]",
    prob_numeric,
    prob,
    sep="\n",
)

In [None]:
# Save statistical results in Excel file
if save_tables:
    with pd.ExcelWriter(
        "./data/results/lca_MC_statistical_results.xlsx",
        engine="openpyxl",
        mode="a",
        if_sheet_exists="replace",
    ) as writer:
        df_prob_mc_numeric_fmt[
            ["P[Base > REPowerEU]", "P[Base > Coal]", "P[Base > Clean]"]
        ].to_excel(writer, sheet_name="probabilities-numeric")
        df_prob_mc_numeric_fmt[
            ["Base == REPowerEU?", "Base == Coal?", "Base == Clean?"]
        ].to_excel(writer, sheet_name="z-test")

#### P(A > B) visualizations

In [None]:
# Base - REPowerEU comparison
def create_base_minus_scenario(scenario=SCENARIOS[1]):
    base_minus_repower = (
        100
        * (df_mc_lca_renamed[scenario] - df_mc_lca_renamed[SCENARIOS[0]])
        / df_mc_lca_renamed[SCENARIOS[0]].abs()
    )
    # base_minus_repower = df_mc_lca_renamed["base"] - df_mc_lca_renamed["repower"]

    for col in base_minus_repower.columns:
        base_minus_repower[f"color_{col}"] = -1
        base_minus_repower.loc[base_minus_repower[col] > 0, f"color_{col}"] = 1

    base_minus_repower["dummy_color"] = 0

    base_minus_repower.rename(
        columns={
            col: col.replace("_", " ")
            .replace("color ", "color_")
            .replace("dummy ", "dummy_")
            for col in base_minus_repower.columns
            # if not col.startswith("color") and not col.startswith("dummy")
        },
        inplace=True,
    )
    return base_minus_repower


base_minus_repower = create_base_minus_scenario(SCENARIOS[1])
base_minus_y2022 = create_base_minus_scenario(SCENARIOS[2])
base_minus_repower

In [None]:
# histogram of Base - REPowerEU
def plot_hist_base_minus_sce(
    base_minus_repower, scenario=SCENARIOS[1], save_fig=save_fig
):
    fig, ax = plt.subplots(4, 4, figsize=(fig_length[2], fig_length[2]), dpi=300 if save_fig else 100)
    ax = ax.flatten()

    ax_lt0_leg_handles = []
    ax_gt0_leg_handles = []

    for i, col in enumerate(map(lcia2category_map.get, selected_categories)):
        ax[i].set_title(
            xlabels[i].split("\n[")[0],
            fontsize=7,
        )

        q_quantile = 0.02
        q_low = base_minus_repower[col].quantile(q_quantile)
        q_hi = base_minus_repower[col].quantile(1 - q_quantile)
        df_A_B_final = base_minus_repower[
            (base_minus_repower[col] < q_hi) & (base_minus_repower[col] > q_low)
        ][[col, f"color_{col}", "dummy_color"]]

        pallete = ["#fc8d62", "#66c2a5"]
        pallete = [COLORS[SCENARIOS.index(scenario)], "#66c2a5"]
        if df_A_B_final.loc[df_A_B_final[col] < 0].sort_values(
            by=f"color_{col}", ascending=False
        ).shape[0] > 0.05 * len(df_A_B_final):
            ax_lt0 = sns.histplot(
                data=df_A_B_final.loc[df_A_B_final[col] < 0].sort_values(
                    by=f"color_{col}", ascending=False
                ),
                x=col,
                stat="frequency",
                hue=f"color_{col}",
                palette=[pallete[0]],
                binwidth=abs(df_A_B_final[col].max() - df_A_B_final[col].min()) / 25,
                fill=True,
                alpha=1.0,
                ax=ax[i],
                linewidth=0.3,
                edgecolor="white",
                label="less",
                legend=True,
            )
            ax_lt0_leg_handles.append(ax_lt0.get_legend().legend_handles)
            ax_lt0.get_legend().remove()
        else:
            ax_lt0_leg_handles.append(None)

        if df_A_B_final.loc[df_A_B_final[col] >= 0].sort_values(
            by=f"color_{col}", ascending=True
        ).shape[0] > 0.05 * len(df_A_B_final):
            ax_gt0 = sns.histplot(
                data=df_A_B_final.loc[df_A_B_final[col] >= 0].sort_values(
                    by=f"color_{col}", ascending=True
                ),
                x=col,
                stat="frequency",
                hue=f"color_{col}",
                palette=reversed(pallete),
                binwidth=abs(df_A_B_final[col].max() - df_A_B_final[col].min()) / 25,
                fill=True,
                alpha=1.0,
                ax=ax[i],
                linewidth=0.3,
                edgecolor="white",
                label="more",
                legend=True,
            )
            ax_gt0_leg_handles.append(ax_gt0.get_legend().legend_handles)
            ax_gt0.get_legend().remove()
        else:
            ax_gt0_leg_handles.append(None)

        line_treshold = ax[i].axvline(
            x=0.0,
            ls="--",
            lw=0.7,
            color="black",
            label="Threshold" if j == 15 else None,
            alpha=0.75,
        )

        line_mean = ax[i].axvline(
            x=df_A_B_final[col].mean(),
            ls="-.",
            lw=0.7,
            color="magenta",
            label="Mean" if j == 15 else None,
            alpha=0.75,
        )
        # plot an x at the mean and y=0
        ax[i].plot(
            [df_A_B_final[col].mean()],
            [0],
            marker="x",
            markersize=4,
            color="magenta",
            lw=0.7,
            label=None,
        )

        p_A_minus_B = (
            f"{100*df_prob_mc_numeric_renamed.loc[col, f'P[Base > {scenario}]']:0.0f}"
        )
        p_A_minus_B = f"{100*(base_minus_repower[col]>0).sum()/base_minus_repower[col].notna().sum():0.0f}"
        p_A_equals_B = (
            f"{100*df_prob_mc_numeric_renamed.loc[col, f'Base == {scenario}?']:0.0f}"
        )

        xaxis_len = abs(ax[i].get_xlim()[1] - ax[i].get_xlim()[0])
        yaxis_len = abs(ax[i].get_ylim()[1] - ax[i].get_ylim()[0])

        # extend 20% of y-axis upwards to make room for the annotation
        ax[i].set_ylim(
            ax[i].get_ylim()[0],
            ax[i].get_ylim()[1] + 0.2 * abs(ax[i].get_ylim()[1] - ax[i].get_ylim()[0]),
        )

        # add a coloured box below x-axis above the x=0 line to indicate the probability of A>B (called burden shifting)
        right_side_annotation = True
        is_yellow = False
        if (
            df_prob_mc_numeric_renamed.loc[col, f"P[Base > {scenario}]"] <= 0.75
            and df_prob_mc_numeric_renamed.loc[col, f"P[Base > {scenario}]"] >= 0.25
        ):
            # set x-axis to be centered at 0
            ax[i].set_xlim(
                -1 * max(abs(ax[i].get_xlim()[0]), abs(ax[i].get_xlim()[1])),
                max(abs(ax[i].get_xlim()[0]), abs(ax[i].get_xlim()[1])),
            )
            is_yellow = True

        box_width = 180.0
        if df_prob_mc_numeric_renamed.loc[col, f"Base == {scenario}?"] > 0.05:
            annotation_txt = ax[i].text(
                0.0 + 0.0 * xaxis_len,  # x-coordinate of the text
                ax[i].get_ylim()[1] - 0.075 * yaxis_len,  # y-coordinate of the text
                "Inconclusive"
                + "\np-value = "
                + f"{df_prob_mc_numeric_renamed.loc[col, f'Base == {scenario}?']:0.2f}",
                ha="center",  # horizontal alignment
                va="top",  # vertical alignment
                fontfamily="calibri",
                fontsize=fontsize_axs - 2,
                # transform=ax[i].transAxes,
                wrap=True,
                bbox=dict(
                    facecolor="#ffeda0",
                    alpha=0.85,
                    edgecolor="black",
                    linewidth=0.3,
                    boxstyle="darrow,pad=0.3",
                ),
            )
            annotation_txt._get_wrap_line_width = lambda: 2 * box_width
        else:
            if df_prob_mc_numeric_renamed.loc[col, f"P[Base > {scenario}]"] < 0.75:
                annotation_txt = ax[i].text(
                    0.0 + 0.02 * xaxis_len,  # x-coordinate of the text
                    ax[i].get_ylim()[1] - 0.075 * yaxis_len,  # y-coordinate of the text
                    (
                        f"superiority\nP = {100-100*df_prob_mc_numeric_renamed.loc[col, f'P[Base > {scenario}]']:0.0f}%   "  # text to display
                        if df_prob_mc_numeric_renamed.loc[col, f"P[Base > {scenario}]"]
                        > 0.25
                        else f"Burden shifting\nP = {100-100*df_prob_mc_numeric_renamed.loc[col, f'P[Base > {scenario}]']:0.0f}%   "
                    ),  # text to display
                    ha="left",  # horizontal alignment
                    va="top",  # vertical alignment
                    fontfamily="calibri",  # font family
                    fontsize=fontsize_axs - 2,  # font size
                    wrap=True,
                    bbox=dict(
                        facecolor="red" if not is_yellow else "#ffeda0",
                        edgecolor="black",
                        alpha=0.5 if not is_yellow else 0.85,
                        linewidth=0.3,
                        boxstyle="rarrow,pad=0.3",
                    ),
                )
                annotation_txt._get_wrap_line_width = lambda: box_width
            if df_prob_mc_numeric_renamed.loc[col, f"P[Base > {scenario}]"] > 0.25:
                annotation_txt = ax[i].text(
                    0.0 - 0.02 * xaxis_len,  # x-coordinate of the text
                    ax[i].get_ylim()[1] - 0.075 * yaxis_len,  # y-coordinate of the text
                    (
                        f"Inconclusive\nP = {100*df_prob_mc_numeric_renamed.loc[col, f'P[Base > {scenario}]']:0.0f}%"  # text to display
                        if df_prob_mc_numeric_renamed.loc[col, f"P[Base > {scenario}]"]
                        < 0.75
                        else f"Reduction\nP = {100*df_prob_mc_numeric_renamed.loc[col, f'P[Base > {scenario}]']:0.0f}%"
                    ),  # text to display
                    ha="right",  # horizontal alignment
                    va="top",  # vertical alignment
                    fontfamily="calibri",  # font family
                    fontsize=fontsize_axs - 2,  # font size
                    wrap=True,
                    bbox=dict(
                        facecolor="green" if not is_yellow else "#ffeda0",
                        edgecolor="black",
                        alpha=0.5 if not is_yellow else 0.85,
                        linewidth=0.3,
                        boxstyle="larrow,pad=0.3",
                    ),
                )
                right_side_annotation = False
                annotation_txt._get_wrap_line_width = lambda: box_width

            N_stars = np.floor(np.log10(abs(df_A_B_final[col].mean()))).astype(int) + 1
            N_stars = 0
            if abs(df_A_B_final[col].mean()) > 10:
                N_stars = 1
            if abs(df_A_B_final[col].mean()) > 25:
                N_stars = 2
            if abs(df_A_B_final[col].mean()) > 100:
                N_stars = 3

            if df_prob_mc_numeric_renamed.loc[col, f"P[Base > {scenario}]"] >= 0.75:
                # draw yellow stars for ratings in terms of base_minus_repower mean values
                rating_ax = ax[i].scatter(
                    [
                        ax[i].get_xlim()[0] + (ite / 10 + 0.125) * xaxis_len
                        for ite in range(3)
                    ],
                    3 * [ax[i].get_ylim()[1] - 0.5 * yaxis_len],
                    marker=r"$\star$",
                    s=50,
                    c=["yellow"] * N_stars + ["white"] * (3 - N_stars),
                    edgecolor="black",
                    linewidth=0.3,
                    zorder=9,
                    label="Relative mean difference" if j == 15 else None,
                )
                # write 10%, 25% and 100% below each star in the plot
                [
                    ax[i].text(
                        ax[i].get_xlim()[0] + (ite / 10 + 0.125) * xaxis_len,
                        ax[i].get_ylim()[1] - 0.5 * yaxis_len - 0.075 * yaxis_len,
                        val,
                        ha="center",
                        va="center",
                        fontfamily="calibri",
                        fontsize=fontsize_axs - 3,
                    )
                    for ite, val in enumerate([" 10", " 25", "100"])
                ]
                # write relative mean difference [%] on top of the stars
                ax[i].text(
                    ax[i].get_xlim()[0] + (0.175 + 0.15 / 2) * xaxis_len,
                    ax[i].get_ylim()[1] - 0.485 * yaxis_len + 0.125 * yaxis_len,
                    "Relative mean\ndifference [%]",
                    ha="center",
                    va="center",
                    fontfamily="calibri",
                    fontsize=fontsize_axs - 2,
                )
            elif df_prob_mc_numeric_renamed.loc[col, f"P[Base > {scenario}]"] <= 0.25:
                # draw red stars for ratings in terms of base_minus_repower mean values
                rating_ax = ax[i].scatter(
                    [
                        ax[i].get_xlim()[1] - ((ite) / 10 + 0.1) * xaxis_len
                        for ite in range(3)
                    ],
                    3 * [ax[i].get_ylim()[1] - 0.5 * yaxis_len],
                    marker=r"$\star$",
                    s=50,
                    c=["white"] * (3 - N_stars) + ["yellow"] * N_stars,
                    edgecolor="black",
                    linewidth=0.3,
                    zorder=9,
                    label="Relative mean difference" if j == 15 else None,
                )

                # write 10%, 25% and 100% below each star in the plot
                [
                    ax[i].text(
                        ax[i].get_xlim()[1] - (ite / 10 + 0.1) * xaxis_len,
                        ax[i].get_ylim()[1] - 0.5 * yaxis_len - 0.075 * yaxis_len,
                        val,
                        ha="center",
                        va="center",
                        fontfamily="calibri",
                        fontsize=fontsize_axs - 3,
                    )
                    for ite, val in enumerate(reversed([" 10", " 25", "100"]))
                ]
                # write relative mean difference [%] on top of the stars
                ax[i].text(
                    ax[i].get_xlim()[1] - 0.225 * xaxis_len,
                    ax[i].get_ylim()[1] - 0.485 * yaxis_len + 0.125 * yaxis_len,
                    "Relative mean\ndifference [%]",
                    ha="center",
                    va="center",
                    fontfamily="calibri",
                    fontsize=fontsize_axs - 2,
                )

        ax[i].set(xlabel=None)
        # ax2.set_ylabel(None)
        if not i % 4 == 0:
            ax[i].set(ylabel=None)

        ax[i].xaxis.set_major_formatter("{x:.0f}%")
        ax[i].xaxis.set_minor_locator(AutoMinorLocator(4))
        ax[i].tick_params(which="minor", length=2)
        ax[i].tick_params(which="major", length=3)

        ax[i].yaxis.set_minor_locator(AutoMinorLocator(4))

        ax[i].annotate(
            ascii_uppercase[i] + ")",
            xy=(-0.2, 1.15),
            xycoords="axes fraction",
            xytext=(+0.5, -0.5),
            textcoords="offset fontsize",
            fontsize="medium",
            verticalalignment="top",
            fontfamily="calibri",
            # bbox=dict(facecolor="0.7", edgecolor="none", pad=3.0),
        )

    fig.supxlabel(f"Environmental impact change from Base to {scenario} [%]", y=0.02)

    fig.subplots_adjust(hspace=0.2, wspace=0.2)

    lt0_leg_handles = [x for x in ax_lt0_leg_handles if x is not None][0]
    gt0_leg_handles = [x for x in ax_gt0_leg_handles if x is not None][0]
    leg = fig.legend(
        # ax[1].get_legend().legend_handles
        lt0_leg_handles + gt0_leg_handles
        # + ax2_tmp.get_legend().legend_handles
        + [line_treshold, line_mean, rating_ax],
        [
            "Impact reduction",
            "Burden shifting",
            "Threshold",
            "Mean",
            "Relative mean difference [%]",
        ],
        ncol=5,
        loc="lower center",
        borderaxespad=0,
        frameon=True,
    )
    for axx in ax:
        try:
            axx.get_legend().remove()
        except AttributeError:
            pass

    leg.get_frame().set_linewidth(0.3)

    plt.tight_layout()

    if save_fig:
        plt.savefig(
            f"./figs/mc_hist_A-{''.join(scenario.split(' '))}_pct_{N_mc}_ite.svg",
            bbox_inches="tight",
        )

    return fig, ax


fig, ax = plot_hist_base_minus_sce(
    base_minus_repower, scenario=SCENARIOS[1], save_fig=save_fig
)

plt.show()

In [None]:
fig, ax = plot_hist_base_minus_sce(
    base_minus_y2022, scenario=SCENARIOS[2], save_fig=save_fig
)

plt.show()

In [None]:
# p-value heatmap
for sce in SCENARIOS[1:]:
    df_prob_mc_numeric_renamed[f"Base and {sce}"] = df_prob_mc_numeric_renamed[
        f"Base == {sce}?"
    ]

ax_heatmap = sns.heatmap(
    df_prob_mc_numeric_renamed[[f"Base and {sce}" for sce in SCENARIOS[1:]]],
    vmin=0.0,
    vmax=1.0,
    cmap="Blues",
    annot=True,
    fmt=".0%",
    linewidths=1.5,
    linecolor="white",
    annot_kws={"size": 8},
    cbar=True,
    cbar_kws=dict(
        ticks=[_ / 10 for _ in range(11)],
    ),
)
ax_heatmap.collections[0].colorbar.set_ticklabels(
    [f"{abs(t) * 100:0.0f}%" for t in [_ / 10 for _ in range(11)]]
)
ax_heatmap.xaxis.tick_top()
ax_heatmap.xaxis.set_tick_params(labeltop=True, rotation=30)
plt.setp(ax_heatmap.get_yticklabels(), ha="right", rotation_mode="anchor")
ax_heatmap.yaxis.set_tick_params(rotation=-0)
plt.setp(ax_heatmap.get_xticklabels(), ha="left", rotation_mode="anchor")

# plt.title("MC statistical results - p-value")

plt.tight_layout()

if save_fig:
    plt.savefig(
        f"./figs/mc_statsNumeric_pvalue_heatmap_{N_mc}_ite.svg", bbox_inches="tight"
    )

plt.show()

In [None]:
# Probability of burden shifting heatmap
df_prob_mc_numeric_renamed[
    ["P[REPowerEU > Base]", "P[Coal > Base]", "P[Clean > Base]"]
] = (
    1
    - df_prob_mc_numeric_renamed[
        ["P[Base > REPowerEU]", "P[Base > Coal]", "P[Base > Clean]"]
    ]
)
for sce in SCENARIOS[1:]:
    df_prob_mc_numeric_renamed[f"P[{sce} > Base]"] = (
        1 - df_prob_mc_numeric_renamed[f"P[Base > {sce}]"]
    )

ax_heatmap = sns.heatmap(
    df_prob_mc_numeric_renamed[[f"P[{sce} > Base]" for sce in SCENARIOS[1:]]],
    vmin=0.0,
    vmax=1.0,
    cmap="Greens",
    annot=True,
    fmt=".0%",
    linewidths=1.5,
    linecolor="white",
    annot_kws={"size": 8},
    cbar=True,
    cbar_kws=dict(
        ticks=[_ / 10 for _ in range(11)],
    ),
)
ax_heatmap.collections[0].colorbar.set_ticklabels(
    [f"{abs(t) * 100:0.0f}%" for t in [_ / 10 for _ in range(11)]]
)
ax_heatmap.xaxis.tick_top()
ax_heatmap.xaxis.set_tick_params(labeltop=True, rotation=30)
plt.setp(ax_heatmap.get_yticklabels(), ha="right", rotation_mode="anchor")
ax_heatmap.yaxis.set_tick_params(rotation=-0)
plt.setp(ax_heatmap.get_xticklabels(), ha="left", rotation_mode="anchor")

# plt.title("MC statistical results - Numeric")

plt.tight_layout()

if save_fig:
    plt.savefig(f"./figs/mc_statsNumeric_heatmap_{N_mc}_ite.svg", bbox_inches="tight")

plt.show()

## Statistically significant burden-shifting

In [None]:
# Chance of Base > REPowerEU for p-value<0.05
significant_categories = (
    df_prob_mc_numeric_renamed.iloc[:, 0]
    .loc[df_prob_mc_numeric_renamed["P[Base > REPowerEU]"] < 0.05]
    .index.tolist()
)

quality_level_fmt = quality_level.rename(
    index={k: f"{k} ({v})" for k, v in quality_level.items()}
)

# statistically significant results
pb_base = df_ef_pb.loc[SCENARIOS[0]][significant_categories]
pb_repower = df_ef_pb.loc[SCENARIOS[1]][significant_categories]
bs_repower = pd.Series((pb_repower - pb_base) / pb_base, name="Burden shifting")
qcd_df = (
    dict_stats_mc[SCENARIOS[0]]["QCD [%]"]
    .drop(index="LANCA v2.5 - land use | erosion potential")
    .rename(index=lcia2category_map)
)
df_significant_results = pd.concat(
    [
        pb_base,
        pb_repower,
        100 * bs_repower,
        qcd_df.loc[significant_categories],
        quality_level_fmt.loc[significant_categories],
    ],
    axis=1,
)
print("Sort by PB allocation of Base scenario")
ipd.display(df_significant_results.sort_values(by=SCENARIOS[1], ascending=False))

print("Sort by burden shifting")
df_significant_results.sort_values(by="Burden shifting", ascending=False)

In [None]:
# statistically insignificant results
insignificant_categories = [
    ind for ind in df_prob_mc_numeric_renamed.index if ind not in significant_categories
]
pb_base = df_ef_pb.loc[SCENARIOS[0]][insignificant_categories]
pb_repower = df_ef_pb.loc[SCENARIOS[1]][insignificant_categories]
bs_repower = pd.Series((pb_repower - pb_base) / pb_base, name="Burden shifting")
df_insignificant_results = pd.concat(
    [
        pb_base,
        pb_repower,
        100 * bs_repower,
        qcd_df.loc[insignificant_categories],
        quality_level_fmt.loc[insignificant_categories],
    ],
    axis=1,
)

print("Sort by PB allocation of Base scenario")
ipd.display(df_insignificant_results.sort_values(by=SCENARIOS[0], ascending=False))

print("Sort by burden shifting")
df_insignificant_results.sort_values(by="Burden shifting", ascending=False)

In [None]:
# Chance of Base > REPowerEU for QCD < 20
significant_categories_qcd20 = (
    df_prob_mc_numeric_renamed["P[Base > REPowerEU]"]
    .loc[abs(qcd_df.loc[df_prob_mc_numeric_renamed.index]) < 20]
    .index.tolist()
)

# statistically significant results
pb_base = df_ef_pb.loc[SCENARIOS[0]][significant_categories_qcd20]
pb_repower = df_ef_pb.loc[SCENARIOS[1]][significant_categories_qcd20]
bs_repower = pd.Series((pb_repower - pb_base) / pb_base, name="Burden shifting")
df_significant_results = pd.concat(
    [
        pb_base,
        pb_repower,
        100 * bs_repower,
        100
        * df_prob_mc_numeric_renamed["Base == REPowerEU?"].loc[
            significant_categories_qcd20
        ],
        qcd_df.loc[significant_categories_qcd20],
        quality_level_fmt.loc[significant_categories_qcd20],
    ],
    axis=1,
)

print("Sort by QCD")
ipd.display(df_significant_results.sort_values(by="QCD [%]", ascending=True))

print("Sort by PB allocation of REPowerEU scenario")
ipd.display(df_significant_results.sort_values(by=SCENARIOS[1], ascending=False))

print("Sort by burden shifting")
df_significant_results.sort_values(by="Burden shifting", ascending=False)

In [None]:
# statistically insignificant results
insignificant_categories_qcd20 = [
    ind
    for ind in df_prob_mc_numeric_renamed.index
    if ind not in significant_categories_qcd20
]
pb_base = df_ef_pb.loc[SCENARIOS[0]][insignificant_categories_qcd20]
pb_repower = df_ef_pb.loc[SCENARIOS[1]][insignificant_categories_qcd20]
bs_repower = pd.Series((pb_repower - pb_base) / pb_base, name="Burden shifting")
df_insignificant_results = pd.concat(
    [
        pb_base,
        pb_repower,
        100 * bs_repower,
        # 100 * df_prob_mc_numeric_renamed["P[Base > REPowerEU]"].loc[insignificant_categories_qcd20],
        100
        * df_prob_mc_numeric_renamed["Base == REPowerEU?"].loc[
            insignificant_categories_qcd20
        ],
        qcd_df.loc[insignificant_categories_qcd20],
        quality_level_fmt.loc[insignificant_categories_qcd20],
    ],
    axis=1,
)

print("Sort by PB allocation of Base scenario")
ipd.display(df_insignificant_results.sort_values(by=SCENARIOS[0], ascending=False))

print("Sort by burden shifting")
df_insignificant_results.sort_values(by="Burden shifting", ascending=False)

In [None]:
# probability of transgressing 10% of SOS
fig, ax_heatmap = plt.subplots(figsize=(fig_length[1.5], fig_length[1.5]))


p_transgress_pb_10pct = {}

for sce, df in df_mc_lca_renamed.items():
    p_transgress_pb_10pct[sce] = (df >= 0.05 * df_pb_eu.iloc[0]).sum() / df.shape[0]

p_transgress_pb_10pct = pd.DataFrame(p_transgress_pb_10pct)

significant_categories_qcd20_no_quality3 = [
    cat for cat in p_transgress_pb_10pct.index.tolist()  # if not cat.endswith("III)")
]

ax_heatmap = sns.heatmap(
    p_transgress_pb_10pct.loc[significant_categories_qcd20_no_quality3],
    vmin=0.0,
    vmax=1.0,
    cmap="Greens",
    annot=True,
    fmt=".0%",
    linewidths=1.5,
    linecolor="white",
    annot_kws={"size": 8},
    cbar=True,
    cbar_kws=dict(
        ticks=[_ / 10 for _ in range(11)],
    ),
)
ax_heatmap.collections[0].colorbar.set_ticklabels(
    [f"{abs(t) * 100:0.0f}%" for t in [_ / 10 for _ in range(11)]]
)
ax_heatmap.xaxis.tick_top()
ax_heatmap.xaxis.set_tick_params(labeltop=True, rotation=30)
plt.setp(ax_heatmap.get_yticklabels(), ha="right", rotation_mode="anchor")
ax_heatmap.yaxis.set_tick_params(rotation=-0)
plt.setp(ax_heatmap.get_xticklabels(), ha="left", rotation_mode="anchor")

plt.title(
    "Probability of transgressing 10% of the SOS downscaled to energy consumption in the EU"
)

plt.tight_layout()

if save_fig:
    plt.savefig(
        f"./figs/bs_pb_transgress10_heatmap_{N_mc}_ite.svg", bbox_inches="tight"
    )

plt.show()

In [None]:
# probability of transgressing 100% of SOS
fig, ax_heatmap = plt.subplots(figsize=(fig_length[1.5], fig_length[1.5]))

p_transgress_pb_10pct = {}

for sce, df in df_mc_lca_renamed.items():
    p_transgress_pb_10pct[sce] = (df >= 1.0 * df_pb_eu.iloc[0]).sum() / df.shape[0]

p_transgress_pb_10pct = pd.DataFrame(p_transgress_pb_10pct)

significant_categories_qcd20_no_quality3 = [
    cat for cat in p_transgress_pb_10pct.index.tolist()  # if not cat.endswith("III)")
]

ax_heatmap = sns.heatmap(
    p_transgress_pb_10pct.loc[significant_categories_qcd20_no_quality3],
    vmin=0.0,
    vmax=1.0,
    cmap="Greens",
    annot=True,
    fmt=".0%",
    linewidths=1.5,
    linecolor="white",
    annot_kws={"size": 8},
    cbar=True,
    cbar_kws=dict(
        ticks=[_ / 10 for _ in range(11)],
    ),
)
ax_heatmap.collections[0].colorbar.set_ticklabels(
    [f"{abs(t) * 100:0.0f}%" for t in [_ / 10 for _ in range(11)]]
)
ax_heatmap.xaxis.tick_top()
ax_heatmap.xaxis.set_tick_params(labeltop=True, rotation=30)
plt.setp(ax_heatmap.get_yticklabels(), ha="right", rotation_mode="anchor")
ax_heatmap.yaxis.set_tick_params(rotation=-0)
plt.setp(ax_heatmap.get_xticklabels(), ha="left", rotation_mode="anchor")

plt.title(
    r"Probability of transgressing the SOS downscaled to energy consumption in the EU"
)

plt.tight_layout()

if save_fig:
    plt.savefig(
        f"./figs/bs_pb_transgress100_heatmap_{N_mc}_ite.svg", bbox_inches="tight"
    )

plt.show()

In [None]:
# SOS heatmap
fig, ax_heatmap = plt.subplots(figsize=(fig_length[1.5], fig_length[1.5]))

ax_heatmap = sns.heatmap(
    df_ef_pb.T / 100,
    vmin=0.0,
    vmax=1.0,
    cmap="Reds",
    annot=True,
    fmt=".0%",
    linewidths=1.5,
    linecolor="white",
    annot_kws={"size": 8},
    cbar=True,
    cbar_kws=dict(
        ticks=[_ / 10 for _ in range(11)],
    ),
)
ax_heatmap.collections[0].colorbar.set_ticklabels(
    [f"{abs(t) * 100:0.0f}%" for t in [_ / 10 for _ in range(11)]]
)
ax_heatmap.xaxis.tick_top()
ax_heatmap.xaxis.set_tick_params(labeltop=True, rotation=30)
plt.setp(ax_heatmap.get_yticklabels(), ha="right", rotation_mode="anchor")
ax_heatmap.yaxis.set_tick_params(rotation=-0)
plt.setp(ax_heatmap.get_xticklabels(), ha="left", rotation_mode="anchor")

plt.title(r"Share of the SOS downscaled to energy consumption in the EU")

plt.tight_layout()

if save_fig:
    plt.savefig(f"./figs/bs_pb_heatmap_{N_mc}_ite.svg", bbox_inches="tight")

plt.show()

## Process contribution

In [None]:
# Do process contribution analysis
if DO_PROC_CONTRIB or not os.path.isfile(PROC_CONTRIB_PATH):
    baseScenario_proc_contrib = copy.deepcopy(lca_bkp[0])
    repowerScenario_proc_contrib = copy.deepcopy(lca_bkp[1])
    y2022Scenario_proc_contrib = copy.deepcopy(lca_bkp[2])
    coalScenario_proc_contrib = copy.deepcopy(lca_bkp[3])
    cleanScenario_proc_contrib = copy.deepcopy(lca_bkp[4])

    lca_methods_bkp = copy.deepcopy(lca_methods)

    baseScenario_proc_contrib.do_process_contribution(lca_methods, verbose=False)
    repowerScenario_proc_contrib.do_process_contribution(lca_methods, verbose=False)
    y2022Scenario_proc_contrib.do_process_contribution(lca_methods, verbose=False)
    coalScenario_proc_contrib.do_process_contribution(lca_methods, verbose=False)
    cleanScenario_proc_contrib.do_process_contribution(lca_methods, verbose=False)

    proc_contrib_bkp = [
        copy.deepcopy(baseScenario_proc_contrib),
        copy.deepcopy(repowerScenario_proc_contrib),
        copy.deepcopy(y2022Scenario_proc_contrib),
        copy.deepcopy(coalScenario_proc_contrib),
        copy.deepcopy(cleanScenario_proc_contrib),
    ]

    for sce in proc_contrib_bkp:
        sce.lca_results = [
            LCAResult(
                demand=l_.demand,
                method=l_.method,
                score=l_.score,
            )
            for l_ in sce.lca_results
        ]

    with open(PROC_CONTRIB_PATH, "wb") as fp:
        pickle.dump(proc_contrib_bkp, fp)
else:
    with open(PROC_CONTRIB_PATH, "rb") as fp:
        proc_contrib_bkp = pickle.load(fp)
    baseScenario_proc_contrib = copy.deepcopy(proc_contrib_bkp[0])
    repowerScenario_proc_contrib = copy.deepcopy(proc_contrib_bkp[1])
    y2022Scenario_proc_contrib = copy.deepcopy(proc_contrib_bkp[2])
    coalScenario_proc_contrib = copy.deepcopy(proc_contrib_bkp[3])
    cleanScenario_proc_contrib = copy.deepcopy(proc_contrib_bkp[4])

In [None]:
# plot product/source contribution simplified
fig_proc_contrib, ax_proc_contrib = plot_proc_contrib(
    [
        baseScenario_proc_contrib,
        repowerScenario_proc_contrib,
        y2022Scenario_proc_contrib,
        coalScenario_proc_contrib,
        cleanScenario_proc_contrib,
    ],
    save_fig=save_fig,
    path2save_fig="./figs/proc_contrib_NG_scenarios.png",
    quality_level=quality_level,
)

In [None]:
# process contribution ranking
cc_pc = list(baseScenario_proc_contrib.process_contribution.keys())
cc_pc = [
    v[
        "('IPCC 2021', 'climate change: including SLCFs', 'global warming potential (GWP100)')"
    ]
    for k, v in baseScenario_proc_contrib.process_contribution.items()
]
cc_pc_pct = [c / sum(cc_pc) for c in cc_pc]
cc_pc_pct_df = pd.DataFrame(
    cc_pc_pct,
    columns=["climate change base [%]"],
    index=[
        k.split(" - ")[0] for k in baseScenario_proc_contrib.process_contribution.keys()
    ],
)
cc_pc_pct_df.sort_values(by="climate change base [%]", ascending=False).head(5)

In [None]:
# plot product/source contribution simplified
fig_proc_contrib, ax_proc_contrib = plot_proc_contrib(
    [
        baseScenario_proc_contrib,
        repowerScenario_proc_contrib,
        y2022Scenario_proc_contrib,
        # coalScenario_proc_contrib,
        # cleanScenario_proc_contrib,
    ],
    save_fig=save_fig,
    path2save_fig="./figs/proc_contrib_NG_scenarios_errorbar.png",
    quality_level=quality_level,
    dict_stats_mc=dict_stats_mc,
    stat_relevant_index=stat_relevant_index,
)

In [None]:
# plot product/source contribution simplified
fig_proc_contrib, ax_proc_contrib = plot_proc_contrib(
    [
        baseScenario_proc_contrib,
        repowerScenario_proc_contrib,
        y2022Scenario_proc_contrib,
        coalScenario_proc_contrib,
        cleanScenario_proc_contrib,
    ],
    save_fig=save_fig,
    path2save_fig="./figs/proc_contrib_NG_scenarios_errorbar_ESI.png",
    quality_level=quality_level,
    dict_stats_mc=dict_stats_mc,
)
plt.tight_layout()
plt.show()

## Planetary boundaries based on EF 3.1

In [None]:
if save_tables:
    try:
        (df_ef_pb / df_ef_pb.max()).to_excel("./data/results/ef_pb_results.xlsx")
    except PermissionError:
        print("close excel to save a new one")
print("PB results")
(df_ef_pb / 1).map("{:,.2f}%".format)

In [None]:
# categories with >1% of the SOS
for sce in SCENARIOS:
    print(f"Categories with > 10% of the SOS for {sce} scenario:")
    ipd.display(
        df_ef_pb.loc[sce][df_ef_pb.loc[SCENARIOS[0]] > 10]
        .sort_values(ascending=False)
        .map("{:,.2f}%".format)
    )

In [None]:
# Box+violing PB log-scale unshared x-axis
import math
from string import ascii_uppercase


fig, ax = plt.subplots(
    4,
    math.ceil(len(stat_relevant_cols) / 4),
    figsize=(fig_length[2], fig_length[2]),
    sharey=True,
    dpi=300 if save_fig else 100,
)
ax = ax.flatten()

df_box_plot_pb = df_box_plot.copy(deep=True)
df_box_plot_pb = df_box_plot_pb[selected_cat_PB + ["scenario"]].rename(
    columns=lcia2category_map_pb
)
df_box_plot_pb = df_box_plot_pb.loc[df_box_plot_pb["scenario"] != SCENARIOS[3]]
df_box_plot_pb = df_box_plot_pb.loc[df_box_plot_pb["scenario"] != SCENARIOS[4]]
df_box_plot_pb[df_box_plot_pb.select_dtypes(float).columns] = (
    df_box_plot_pb.select_dtypes(float).multiply(100)
)

for i, col in enumerate(stat_relevant_cols):
    df_box_plot_pb[col] = df_box_plot_pb[col] / df_pb_eu[col].iloc[0]

    ax[i].set_title(col)

    q_quantile = 0.02
    q_low = df_box_plot_pb[col].quantile(q_quantile)
    q_hi = df_box_plot_pb[col].quantile(1 - q_quantile)
    df_box_plot_pb = df_box_plot_pb[
        (df_box_plot_pb[col] < q_hi) & (df_box_plot_pb[col] > q_low)
    ]

    sns.violinplot(
        data=df_box_plot_pb,
        x=col,
        hue="scenario",
        y="scenario",
        palette=COLORS[: len(SCENARIOS)],
        edgecolor="black",
        linewidth=0.3,
        inner=None,
        ax=ax[i],
        gap=0.0,
        density_norm="count",
        saturation=0.5,
        legend=True if i == 0 else False,
        log_scale=True,  # if df_box_plot_pb[col].min() > 0 else False,
        cut=0,
    )
    sns.boxplot(
        data=df_box_plot_pb,
        x=col,
        hue="scenario",
        y="scenario",
        palette=COLORS[: len(SCENARIOS)],
        boxprops={"zorder": 2},
        ax=ax[i],
        width=0.3,
        linewidth=0.5,
        linecolor="black",
        flierprops={"marker": "x", "color": "red"},
        fliersize=0.0,
        legend=False,
        log_scale=True,  # if df_box_plot_pb[col].min() > 0 else False,
    )

    line = ax[i].axvline(
        x=100.0,
        ls="--",
        lw=0.75,
        color="r",
        label="Earth's carrying capacity [%]" if j == 0 else None,
    )

    ax[i].set(xlabel=None, ylabel=None)

    lb = [1, 90, 1, 10, 0.1, 0.1, 0.1, 0.1, 0.1, 0.005, 0.1, 1.0, 0.1, 10, 1, 0.1]
    lb = [lb[j] for j in stat_relevant_index]
    ub = [110]*len(stat_relevant_index)
    ub[1] = 1000
    ub[2] = 1000
    # ax[i].set_xlim([lb[i], ub[i]])
    ax[i].set_xlim([lb[i], ax[i].get_xlim()[1]])

    # ax[i].tick_params(axis="y", which="both", length=0)
    ax[i].xaxis.set_major_formatter(ScalarFormatter())
    if df_box_plot_pb[col].min() > 0:
        ax[i].xaxis.set_major_locator(LogLocator(numticks=999))
        ax[i].xaxis.set_minor_locator(LogLocator(numticks=999, subs="auto"))
        ax[i].tick_params(which="minor", labelleft=False)

    def format_xtick(x, pos):
        return f"{x:.0f}%" if x >= 1 else f"{x:g}%"

    ax[i].xaxis.set_major_formatter(ticker.FuncFormatter(format_xtick))
    ax[i].xaxis.set_minor_formatter(ticker.FuncFormatter(lambda x, y: ""))


fig.supxlabel("Share of Earth's carrying capacity [%]", y=0.02)

fig.subplots_adjust(hspace=0.4, wspace=0.4)

handles, labels = ax[0].get_legend_handles_labels()
leg = fig.legend(
    handles + [line],
    SCENARIOS[:3] + ["Earth's carrying capacity"],
    ncol=4,
    loc="lower center",
    borderaxespad=0,
    frameon=True,
)
ax[0].get_legend().remove()

leg.get_frame().set_linewidth(0.3)

plt.tick_params(axis="y", length=0)

for i, ax_ in enumerate(ax):
    ax[i].annotate(
        ascii_uppercase[i] + ")",
        xy=(-0.1, 1.15),
        xycoords="axes fraction",
        xytext=(+0.5, -0.5),
        textcoords="offset fontsize",
        fontsize="medium",
        verticalalignment="top",
        fontfamily="calibri",
        # bbox=dict(facecolor="0.7", edgecolor="none", pad=3.0),
    )

plt.tight_layout()

if save_fig:
    plt.savefig(f"./figs/mc_box_violinplot_PB_log_{N_mc}_ite.svg", bbox_inches="tight")

plt.show()

In [None]:
# Box+violing PB log-scale unshared x-axis
from string import ascii_uppercase


fig, ax = plt.subplots(
    4,
    4,
    figsize=(fig_length[2], fig_length[2]),
    sharey=True,
)
ax = ax.flatten()

df_box_plot_pb = df_box_plot.copy(deep=True)
df_box_plot_pb = df_box_plot_pb[selected_cat_PB + ["scenario"]].rename(
    columns=lcia2category_map_pb
)
df_box_plot_pb[df_box_plot_pb.select_dtypes(float).columns] = (
    df_box_plot_pb.select_dtypes(float).multiply(100)
)

for i, col in enumerate(df_pb_eu.columns):
    df_box_plot_pb[col] = df_box_plot_pb[col] / df_pb_eu[col].iloc[0]

    ax[i].set_title(
        xlabels[i].split("\n")[0],
    )

    q_quantile = 0.02
    q_low = df_box_plot_pb[col].quantile(q_quantile)
    q_hi = df_box_plot_pb[col].quantile(1 - q_quantile)
    df_box_plot_pb = df_box_plot_pb[
        (df_box_plot_pb[col] < q_hi) & (df_box_plot_pb[col] > q_low)
    ]

    sns.violinplot(
        data=df_box_plot_pb,
        x=col,
        hue="scenario",
        y="scenario",
        palette=COLORS[: len(SCENARIOS)],
        edgecolor="black",
        linewidth=0.3,
        inner=None,
        ax=ax[i],
        gap=0.0,
        density_norm="count",
        saturation=0.5,
        legend=True if i == 0 else False,
        log_scale=True,  # if df_box_plot_pb[col].min() > 0 else False,
        cut=0,
    )
    sns.boxplot(
        data=df_box_plot_pb,
        x=col,
        hue="scenario",
        y="scenario",
        palette=COLORS[: len(SCENARIOS)],
        boxprops={"zorder": 2},
        ax=ax[i],
        width=0.3,
        linewidth=0.5,
        linecolor="black",
        flierprops={"marker": "x", "color": "red"},
        fliersize=0.0,
        legend=False,
        log_scale=True,  # if df_box_plot_pb[col].min() > 0 else False,
    )

    line = ax[i].axvline(
        x=100.0,
        ls="--",
        lw=0.75,
        color="r",
        label="Earth's carrying capacity [%]" if j == 0 else None,
    )

    ax[i].set(xlabel=None, ylabel=None)

    lb = [1, 90, 1, 10, 0.1, 0.1, 0.1, 0.1, 0.1, 0.005, 0.1, 1.0, 0.1, 10, 1, 0.1]
    ax[i].set_xlim([lb[i], ax[i].get_xlim()[1]])

    # ax[i].tick_params(axis="y", which="both", length=0)
    ax[i].xaxis.set_major_formatter(ScalarFormatter())
    if df_box_plot_pb[col].min() > 0:
        ax[i].xaxis.set_major_locator(LogLocator(numticks=999))
        ax[i].xaxis.set_minor_locator(LogLocator(numticks=999, subs="auto"))
        ax[i].tick_params(which="minor", labelleft=False)

    def format_xtick(x, pos):
        return f"{x:.0f}%" if x >= 1 else f"{x:g}%"

    ax[i].xaxis.set_major_formatter(ticker.FuncFormatter(format_xtick))
    ax[i].xaxis.set_minor_formatter(ticker.FuncFormatter(lambda x, y: ""))


fig.supxlabel("Share of Earth's carrying capacity [%]", y=0.02)

fig.subplots_adjust(hspace=0.4, wspace=0.4)

handles, labels = ax[0].get_legend_handles_labels()
leg = fig.legend(
    handles + [line],
    SCENARIOS[:3] + ["Earth's carrying capacity"],
    ncol=6,
    loc="lower center",
    borderaxespad=0,
    frameon=True,
)
ax[0].get_legend().remove()

leg.get_frame().set_linewidth(0.3)

plt.tick_params(axis="y", length=0)

for i, ax_ in enumerate(ax):
    ax[i].annotate(
        ascii_uppercase[i] + ")",
        xy=(-0.2, 1.05),
        xycoords="axes fraction",
        xytext=(+0.5, -0.5),
        textcoords="offset fontsize",
        fontsize="medium",
        verticalalignment="top",
        fontfamily="calibri",
        # bbox=dict(facecolor="0.7", edgecolor="none", pad=3.0),
    )

plt.tight_layout()

if save_fig:
    plt.savefig(f"./figs/mc_box_violinplot_PB_log_ESI.svg", bbox_inches="tight")

plt.show()

## Sensitivity analysis

### Calculations with `presamples`

In [None]:
import presamples as ps


ei_ng_db = bw.Database(DB_NAME)
bio_db_bw = bw.Database("biosphere3")


def read_presamples_scenario_data(scenario_file, db_name):
    """
    This function reads the scenario data from an excel file and prepares it into a dataframe for being used with Presamples.
    Secondly, it adds the bw codes for the involved activities.
    The dictionary map_bw_keys provides these codes, it needs to be generated beforehand with the involved databases.
    """

    # Create mapping of BW codes for involved databases
    map_bw_keys = dict()
    db = bw.Database(db_name)
    for ds in db:
        map_bw_keys[(ds["reference product"], ds["name"], ds["location"])] = ds.key

    scenario_df = pd.read_excel(scenario_file)  # Import data
    scenario_df = scenario_df.dropna(how="any")  # Delete empty rows

    scenario_label = list(scenario_df.columns)[9:]  # Get label of scenarios

    # add the bw code to your scenario DF (input = process, output = to_process)
    scenario_df["input"] = [
        map_bw_keys[
            (row["from_reference_product"], row["from_process"], row["from_location"])
        ]
        for i, row in scenario_df.iterrows()
    ]
    scenario_df["output"] = [
        map_bw_keys[
            (row["to_reference product"], row["to_process"], row["to_location"])
        ]
        for i, row in scenario_df.iterrows()
    ]
    return scenario_label, scenario_df


def make_pspackage(scenario_df, scenario_label, matrixlabel, ps_pkg_name):
    """
    This function prepares a Presamples package out of the scenario data.
    """

    # select needed data
    samples = scenario_df[scenario_label].values
    indices = [
        (row["input"], row["output"], row["from_type"])
        for i, row in scenario_df.iterrows()
    ]

    # Generate PS data in PSpackage
    data = [(samples, indices, matrixlabel)]

    ps_id, ps_filepath = ps.create_presamples_package(
        matrix_data=data, name=ps_pkg_name, seed="sequential"
    )

    print("\n ps_id, filepath:", ps_id, ps_filepath)

    return samples, indices, ps_filepath

In [None]:
# LCIA methods:
LCIA_METHODS = {
    "Acidification (II)": ("EF v3.1", "acidification", "accumulated exceedance (AE)"),
    "Climate change - CO2 (I)": ("IPCC 2021", "Life cycle CO2 emissions"),
    "Climate change (I)": (
        "IPCC 2021",
        "climate change",
        "global warming potential (GWP100)",
    ),
    "Ecotoxicity freshwater (II/III)": (
        "EF v3.1",
        "ecotoxicity: freshwater",
        "comparative toxic unit for ecosystems (CTUe)",
    ),
    "Energy resources nonrenewable (III)": (
        "EF v3.1",
        "energy resources: non-renewable",
        "abiotic depletion potential (ADP): fossil fuels",
    ),
    "Eutrophication freshwater (II)": (
        "EF v3.1",
        "eutrophication: freshwater",
        "fraction of nutrients reaching freshwater end compartment (P)",
    ),
    "Eutrophication marine (II)": (
        "EF v3.1",
        "eutrophication: marine",
        "fraction of nutrients reaching marine end compartment (N)",
    ),
    "Eutrophication terrestrial (II)": (
        "EF v3.1",
        "eutrophication: terrestrial",
        "accumulated exceedance (AE)",
    ),
    "Human toxicity carcinogenic (II/III)": (
        "EF v3.1",
        "human toxicity: carcinogenic",
        "comparative toxic unit for human (CTUh)",
    ),
    "Human toxicity noncarcinogenic (II/III)": (
        "EF v3.1",
        "human toxicity: non-carcinogenic",
        "comparative toxic unit for human (CTUh)",
    ),
    "Ionising radiation (II)": (
        "EF v3.1",
        "ionising radiation: human health",
        "human exposure efficiency relative to u235",
    ),
    "Land use (III)": ("EF v3.1", "land use", "soil quality index"),
    "Land use - LANCA (III)": ("LANCA v2.5 - land use", "erosion potential"),
    "Material resources metals minerals (III)": (
        "EF v3.1",
        "material resources: metals/minerals",
        "abiotic depletion potential (ADP): elements (ultimate reserves)",
    ),
    "Ozone depletion (I)": (
        "EF v3.1",
        "ozone depletion",
        "ozone depletion potential (ODP)",
    ),
    "Particulate matter formation (I)": (
        "EF v3.1",
        "particulate matter formation",
        "impact on human health",
    ),
    "Photochemical oxidant formation (II)": (
        "EF v3.1",
        "photochemical oxidant formation: human health",
        "tropospheric ozone concentration increase",
    ),
    "Water use (III)": (
        "EF v3.1",
        "water use",
        "user deprivation potential (deprivation-weighted water consumption)",
    ),
}

if not os.path.exists(PATH2RESULTS + "sensitivity_analysis_results.csv"):
    # Read the excel file to get the scenario data
    ps_data_path = "./data/scenarios/presamples_sensitivity_analysis.xlsx"

    scenario_label, scenario_df = read_presamples_scenario_data(ps_data_path, DB_NAME)

    # Create PS package with indices + samples matrix
    samples_v1, indices_v1, ps_filepath_v1 = make_pspackage(
        scenario_df, scenario_label, "technosphere", "ps_v1"
    )

    # Filter the activity in the database:
    # activity_for_ps = [
    #     ds for ds in bw.Database(DB_NAME) if "energy from NG" in ds["name"]
    # ][0]
    activity_for_ps_lca = bw.LCA(
        # demand={activity_for_ps: 1},
        demand=baseScenario.functional_unit,
        presamples=[ps_filepath_v1],
        override_presamples_seed=True,
    )

    lcia_scenarios = dict()
    for i in range(len(scenario_label)):  # Scenarios
        if (
            i == 0
        ):  # Don't update the first time around, since indexer already at 0th column
            activity_for_ps_lca.lci()  # Builds matrices
            multi_lcia_results = dict()
            for impact in LCIA_METHODS:
                activity_for_ps_lca.switch_method(LCIA_METHODS[impact])
                activity_for_ps_lca.lcia()
                multi_lcia_results[impact] = activity_for_ps_lca.score
        else:
            activity_for_ps_lca.presamples.update_matrices()  # Move to next column and update matrices
            activity_for_ps_lca.redo_lci()
            multi_lcia_results = dict()
            for impact in LCIA_METHODS:
                activity_for_ps_lca.switch_method(LCIA_METHODS[impact])
                activity_for_ps_lca.lcia()
                multi_lcia_results[impact] = activity_for_ps_lca.score

        lcia_scenarios[scenario_label[i]] = {**multi_lcia_results}

    df_sens_analysis = pd.DataFrame(lcia_scenarios)
    df_sens_analysis.to_csv(PATH2RESULTS + "sensitivity_analysis_results.csv")
else:
    df_sens_analysis = pd.read_csv(
        PATH2RESULTS + "sensitivity_analysis_results.csv", index_col=0
    )

df_sens_analysis_pb = df_sens_analysis.copy(deep=True)
df_sens_analysis_pb.drop(["Climate change (I)", "Land use (III)"], axis=0, inplace=True)
df_sens_analysis_pb.rename(
    index={
        "Climate change - CO2 (I)": "Climate change (I)",
        "Land use - LANCA (III)": "Land use (III)",
    },
    inplace=True,
)
df_sens_analysis.drop(
    ["Climate change - CO2 (I)", "Land use - LANCA (III)"], axis=0, inplace=True
)

df_sens_analysis

### Heatmap visualization

In [None]:
import matplotlib


def heatmap_ng_lca(df_lca, save_fig=False, path2save="./figs/heatmap_ef"):
    plt.rcParams["font.size"] = 6
    plt.rcParams["axes.linewidth"] = 0.3

    fig = plt.figure(
        constrained_layout=True,
        figsize=(9.72441, 7.20472),
        dpi=300 if save_fig else 120,
    )
    axm = fig.subplot_mosaic(
        """
            dd
            ss
            qw
        """,
        gridspec_kw={
            "height_ratios": [16, 2.0, 0.8],
            "wspace": 0.02,
            "hspace": 0.02,
        },
    )
    axs = [axm["d"], axm["s"], axm["q"], axm["w"]]

    valid_index = df_lca.index.to_list()
    [valid_index.remove(key) for key in df_lca.index if key.startswith("$\Delta")]

    sns.heatmap(
        df_lca.loc[valid_index],
        linewidth=1.5,
        cmap="coolwarm",
        # center cmap in 0
        center=0,
        vmin=-0.005,  # max(df_lca.loc[valid_index].min().min(), -0.005),
        vmax=0.005,  # min(df_lca.loc[valid_index].max().max() / 15, 0.005),
        # vmax=0.05,
        # vmin=min(df_lca.loc[valid_index].min().min(),-df_lca.loc[valid_index].max().max()),
        # vmax=max(df_lca.loc[valid_index].max().max(), -df_lca.loc[valid_index].min().min()),
        annot=True,
        fmt="0.1%",
        annot_kws={"size": 7},
        linewidths=1.5,
        linecolor="white",
        ax=axs[0],
        cbar=True,
        cbar_ax=axs[2],  # orientation = 'horizontal'
        cbar_kws=dict(orientation="horizontal", shrink=1.0),
    )

    axs[0].set(
        xlabel="",
        ylabel="",
        yticks=np.arange(0.5, len(valid_index) + 0.5, 1),
        yticklabels=valid_index,
    )
    axs[0].set(xlabel="", ylabel="", xticklabels=df_lca.columns)
    fmted_labels = [
        "\n".join(x.get_text().split(r"\n")) for x in axs[0].get_yticklabels()
    ]
    axs[0].set_yticklabels(fmted_labels)
    axs[0].xaxis.set_tick_params(labeltop=True, rotation=30)
    axs[0].xaxis.tick_top()
    plt.setp(axs[0].get_yticklabels(), ha="right", rotation_mode="anchor")
    axs[0].yaxis.set_tick_params(rotation=-0)
    plt.setp(axs[0].get_xticklabels(), ha="left", rotation_mode="anchor")

    # highlight categories in twin y axis [power plants, CHP, Industry, Househols]
    # for i, cat in enumerate(["Power plants", "CHP", "Industry", "Households"]):
    #     contains_cat = [cat in x for x in valid_index]
    for ite, cat in zip(
        [7, 10, 13, 15], ["Power plants", "CHP", "Industry", "Households"]
    ):
        axs[0].axhline(
            y=ite - 0.05,
            color="black",
            linewidth=1.0,
            linestyle="--",
        )
        axs[0].annotate(
            cat,
            xy=(axs[0].get_xlim()[-1], ite - 0.05),
            xytext=(axs[0].get_xlim()[-1], ite - 0.05),
            textcoords="data",
            ha="left",
            va="bottom",
            fontsize=7,
        )

    # New supply chain heatmap
    delta_index = df_lca.index.str.startswith(r"$\Delta")
    df_delta_ng_comp = df_lca.loc[delta_index]

    sns.heatmap(
        df_delta_ng_comp,
        linewidth=1.5,
        # cmap="Greens",  #'vlag',
        cmap=sns.diverging_palette(145, 300, s=60, as_cmap=True),
        # cmap=sns.diverging_palette(300, 145, s=60, as_cmap=True),
        center=0,
        vmin=-0.05,  # max(df_delta_ng_comp.min().min(), -0.05),
        vmax=0.05,  # min(df_delta_ng_comp.max().max(), 0.05),
        # vmin=min(df_delta_ng_comp.min().min(),-df_delta_ng_comp.max().max()),
        # vmax=max(df_delta_ng_comp.max().max(), -df_delta_ng_comp.min().min()),
        annot=True,
        fmt="0.1%",
        annot_kws={"size": 7},
        linewidths=1.5,
        linecolor="white",
        ax=axs[1],
        square=False,
        # cbar=False,
        cbar=True,
        cbar_ax=axs[3],  # orientation = 'horizontal'
        cbar_kws=dict(orientation="horizontal", shrink=1.0),
    )

    axs[1].set(xlabel="", ylabel="", yticklabels=df_delta_ng_comp.index)
    axs[1].yaxis.set_tick_params(labeltop=False, rotation=-0)
    axs[1].xaxis.set_tick_params(
        labeltop=False, labelbottom=False, bottom=False, rotation=-0
    )

    for spine in axs[2].spines.values():
        spine.set(visible=True, lw=0.6, edgecolor="black")
    for spine in axs[3].spines.values():
        spine.set(visible=True, lw=0.6, edgecolor="black")

    axs[2].set(
        xlabel="Relative impact of each bcm switched",
        ylabel="",
    )
    axs[3].set(xlabel="Relative impact of new NG supply chain", ylabel="")

    for i in range(2, 4):
        axs[i].tick_params(axis="x", which="minor", bottom=False)
        axs[i].xaxis.set_major_formatter(
            matplotlib.ticker.StrMethodFormatter("{x:.1%}")
        )

    plt.rcParams.update(
        {
            "figure.facecolor": "white",  # (1.0, 1.0, 1.0, 1.0),  # red   with alpha = 30%
            "axes.facecolor": "white",  # (1.0, 1.0, 1.0, 1.0),  # green with alpha = 50%
            "savefig.facecolor": "white",  # (1.0, 1.0, 1.0, 1.0),  # blue  with alpha = 20%
        }
    )

    for ax in axs:
        ax.set_facecolor((1.0, 1.0, 1.0, 1.0))

    if save_fig:
        plt.savefig(path2save + ".png")
        plt.savefig(path2save + ".svg")

    return fig, axs


df_sens_analysis_pct = df_sens_analysis.drop(SCENARIOS[0], axis=1).T
df_sens_analysis_base = df_sens_analysis[SCENARIOS[0]]
df_sens_analysis_pct = (
    df_sens_analysis_pct.sub(df_sens_analysis_base) / df_sens_analysis_base
)

fig_sensitivity, axs_sensitivity = heatmap_ng_lca(
    df_sens_analysis_pct,
    save_fig=save_fig,
)

if save_fig:
    plt.savefig(f"./figs/heatmap_ef.svg")
    plt.savefig(f"./figs/heatmap_ef.png")

plt.show()

In [None]:
def heatmap_ng_lca_pb(df_lca, df_pb, save_fig=False, path2save="./figs/heatmap_ef_pb"):
    plt.rcParams["font.size"] = 6
    plt.rcParams["axes.linewidth"] = 0.3

    fig = plt.figure(
        constrained_layout=True,
        figsize=(9.72441, 7.20472),
        dpi=300 if save_fig else 120,
    )
    axm = fig.subplot_mosaic(
        """
            ddd
            sss
            ppp
            qwz
        """,
        gridspec_kw={
            "height_ratios": [16, 2.0, 1.0, 0.8],
            "wspace": 0.02,
            "hspace": 0.02,
        },
    )
    # axs = [axm["d"], axm["s"], axm["q"], axm["w"]]
    axs = [ax for ax in axm.values()]

    valid_index = df_lca.index.to_list()
    [valid_index.remove(key) for key in df_lca.index if key.startswith("$\Delta")]

    sns.heatmap(
        df_lca.loc[valid_index],
        linewidth=1.5,
        cmap="coolwarm",
        # center cmap in 0
        center=0,
        vmin=-0.005,  # max(df_lca.loc[valid_index].min().min(), -0.005),
        vmax=0.005,  # min(df_lca.loc[valid_index].max().max() / 15, 0.005),
        # vmax=0.05,
        # vmin=min(df_lca.loc[valid_index].min().min(),-df_lca.loc[valid_index].max().max()),
        # vmax=max(df_lca.loc[valid_index].max().max(), -df_lca.loc[valid_index].min().min()),
        annot=True,
        fmt="0.1%",
        annot_kws={"size": 7},
        linewidths=1.5,
        linecolor="white",
        ax=axs[0],
        cbar=True,
        cbar_ax=axs[3],  # orientation = 'horizontal'
        cbar_kws=dict(orientation="horizontal", shrink=1.0),
    )

    axs[0].set(
        xlabel="",
        ylabel="",
        yticks=np.arange(0.5, len(valid_index) + 0.5, 1),
        yticklabels=valid_index,
    )
    axs[0].set(xlabel="", ylabel="", xticklabels=df_lca.columns)
    fmted_labels = [
        "\n".join(x.get_text().split(r"\n")) for x in axs[0].get_yticklabels()
    ]
    axs[0].set_yticklabels(fmted_labels)
    axs[0].xaxis.set_tick_params(labeltop=True, rotation=30)
    axs[0].xaxis.tick_top()
    plt.setp(axs[0].get_yticklabels(), ha="right", rotation_mode="anchor")
    axs[0].yaxis.set_tick_params(rotation=-0)
    plt.setp(axs[0].get_xticklabels(), ha="left", rotation_mode="anchor")

    # highlight categories in twin y axis [power plants, CHP, Industry, Househols]
    # for i, cat in enumerate(["Power plants", "CHP", "Industry", "Households"]):
    #     contains_cat = [cat in x for x in valid_index]
    for ite, cat in zip(
        [7, 10, 13, 15], ["Power plants", "CHP", "Industry", "Households"]
    ):
        axs[0].axhline(
            y=ite - 0.05,
            color="black",
            linewidth=1.0,
            linestyle="--",
        )
        axs[0].annotate(
            cat,
            xy=(axs[0].get_xlim()[-1], ite - 0.05),
            xytext=(axs[0].get_xlim()[-1], ite - 0.05),
            textcoords="data",
            ha="left",
            va="bottom",
            fontsize=7,
        )

    # New supply chain heatmap
    delta_index = df_lca.index.str.startswith(r"$\Delta")
    df_delta_ng_comp = df_lca.loc[delta_index]

    sns.heatmap(
        df_delta_ng_comp,
        linewidth=1.5,
        # cmap="Greens",  #'vlag',
        cmap=sns.diverging_palette(145, 300, s=60, as_cmap=True),
        # cmap=sns.diverging_palette(300, 145, s=60, as_cmap=True),
        center=0,
        vmin=-0.01,  # max(df_delta_ng_comp.min().min(), -0.05),
        vmax=0.01,  # min(df_delta_ng_comp.max().max(), 0.05),
        # vmin=min(df_delta_ng_comp.min().min(),-df_delta_ng_comp.max().max()),
        # vmax=max(df_delta_ng_comp.max().max(), -df_delta_ng_comp.min().min()),
        annot=True,
        fmt="0.1%",
        annot_kws={"size": 7},
        linewidths=1.5,
        linecolor="white",
        ax=axs[1],
        square=False,
        # cbar=False,
        cbar=True,
        cbar_ax=axs[4],  # orientation = 'horizontal'
        cbar_kws=dict(orientation="horizontal", shrink=1.0),
    )

    axs[1].set(xlabel="", ylabel="", yticklabels=df_delta_ng_comp.index)
    axs[1].yaxis.set_tick_params(labeltop=False, rotation=-0)
    axs[1].xaxis.set_tick_params(
        labeltop=False, labelbottom=False, bottom=False, rotation=-0
    )

    sns.heatmap(
        df_pb,
        linewidth=1.5,
        cmap="Blues",  #'vlag',
        # cmap=sns.diverging_palette(300, 145, s=60, as_cmap=True),
        # center=0,
        vmin=0.0,  # max(df_delta_ng_comp.min().min(), -0.05),
        vmax=1.0,  # min(df_delta_ng_comp.max().max(), 0.05),
        # vmin=min(df_delta_ng_comp.min().min(),-df_delta_ng_comp.max().max()),
        # vmax=max(df_delta_ng_comp.max().max(), -df_delta_ng_comp.min().min()),
        annot=True,
        fmt="0.1%",
        annot_kws={"size": 7},
        linewidths=1.5,
        linecolor="white",
        ax=axs[2],
        square=False,
        cbar=True,
        cbar_ax=axs[5],
        cbar_kws=dict(orientation="horizontal", shrink=1.0),
    )

    axs[2].set(xlabel="", ylabel="", yticklabels=[SCENARIOS[0]])
    axs[2].yaxis.set_tick_params(labeltop=False, rotation=-0)
    axs[2].xaxis.set_tick_params(
        labeltop=False, labelbottom=False, bottom=False, rotation=-0
    )

    for i in range(3,6):
        for spine in axs[i].spines.values():
            spine.set(visible=True, lw=0.6, edgecolor="black")

    axs[3].set(xlabel="Relative impact of each bcm switched", ylabel="")
    axs[4].set(xlabel="Relative impact of new NG supply chain", ylabel="")
    axs[5].set(xlabel=f"Transgression level of {SCENARIOS[0]}", ylabel="")

    for i in range(2, 6):
        axs[i].tick_params(axis="x", which="minor", bottom=False)
        axs[i].xaxis.set_major_formatter(
            matplotlib.ticker.StrMethodFormatter("{x:.1%}")
        )
    axs[5].xaxis.set_major_formatter(
        matplotlib.ticker.StrMethodFormatter("{x:.0%}")
    )

    plt.rcParams.update(
        {
            "figure.facecolor": "white",  # (1.0, 1.0, 1.0, 1.0),  # red   with alpha = 30%
            "axes.facecolor": "white",  # (1.0, 1.0, 1.0, 1.0),  # green with alpha = 50%
            "savefig.facecolor": "white",  # (1.0, 1.0, 1.0, 1.0),  # blue  with alpha = 20%
        }
    )

    for ax in axs:
        ax.set_facecolor((1.0, 1.0, 1.0, 1.0))

    if save_fig:
        plt.savefig(path2save + ".png")
        plt.savefig(path2save + ".svg")

    return fig, axs


df_sens_analysis_pct_pb = df_sens_analysis_pb.drop(SCENARIOS[0], axis=1).T
df_sens_analysis_base = df_sens_analysis_pb[SCENARIOS[0]]
df_sens_analysis_pct_pb = df_sens_analysis_pct_pb.sub(df_sens_analysis_base)

mult_val = 10
multiplier_bcm = pd.Series(
    [mult_val] * len(df_sens_analysis_pct_pb.index), index=df_sens_analysis_pct_pb.index
)
multiplier_bcm.loc["$\Delta$NG REPowerEU"] = 1
multiplier_bcm.loc["$\Delta$NG Year 2022"] = 1

# df_sens_analysis_pct_pb = df_sens_analysis_pct_pb * df_ef_pb.loc[].values / 100
df_sens_analysis_pct_pb = (
    df_sens_analysis_pct_pb / df_pb_eu.iloc[0, :]
)  # * df_ef_pb.loc[].values / 100



fig_sensitivity, axs_sensitivity = heatmap_ng_lca_pb(
    # df_sens_analysis_pct_pb.multiply(multiplier_bcm, axis=0).loc[:, stat_relevant_cols],
    # df_ef_pb.loc[[SCENARIOS[0]], stat_relevant_cols] / 100,
    df_sens_analysis_pct_pb.multiply(multiplier_bcm, axis=0).loc[:, stat_relevant_cols],
    df_ef_pb.loc[[SCENARIOS[0]], stat_relevant_cols] / 100,
    save_fig=save_fig,
    path2save="./figs/heatmap_ef_pb",
)

axs_sensitivity[3].set(
    xlabel=f"EU's ecological limit impact per {mult_val} bcm switched",
    ylabel=""
)

axs_sensitivity[4].set(
    xlabel="EU's ecological limit impact of new NG supply chain", ylabel=""
)

if save_fig:
    plt.savefig("./figs/heatmap_ef_pb.png")
    plt.savefig("./figs/heatmap_ef_pb.svg")

plt.show()

### Quick NG scenario LCA

In [None]:
# PCT change in impact
alt_index = df_sens_analysis_pct.index.to_list()

df_alternative = pd.Series([0.0] * len(alt_index), index=alt_index)

df_alternative.loc["Nuclear power"] = 7.0
df_alternative.loc[
    r"Heat savings (mild winter,\nefficiency improvements,\nbehavior change)"
] = 10.0
df_alternative.loc["Heat pumps (household)"] = 9.0

df_alternative_rep = df_alternative.copy(deep=True)

df_alternative_rep.loc["Coal-fired power plants"] = 24.0
df_alternative.loc["Wind power"] = 24.0

assert df_alternative.sum() == 50.0
assert df_alternative_rep.sum() == 50.0

df_alternative = df_alternative * df_sens_analysis_pct.T
df_alternative_rep = df_alternative_rep * df_sens_analysis_pct.T
assert df_alternative.isna().sum().sum() == 0
assert df_alternative_rep.isna().sum().sum() == 0

solution = (
    df_alternative.sum(axis=1) + df_sens_analysis_pct.loc[r"$\Delta$NG REPowerEU"]
) * 100
solution_rep = (
    df_alternative_rep.sum(axis=1) + df_sens_analysis_pct.loc[r"$\Delta$NG REPowerEU"]
) * 100

final_alternative = pd.DataFrame(
    [solution, solution_rep], index=["Alternative", SCENARIOS[1]]
).T
final_alternative["Change"] = (
    final_alternative["Alternative"] - final_alternative[SCENARIOS[1]]
)
final_alternative.sort_values(by="Change", ascending=True).applymap(
    lambda x: f"{x:.1f}%"
)
final_alternative.sort_values(by="Alternative", ascending=True).applymap(
    lambda x: f"{x:.1f}%"
)

In [None]:
alt_index = df_sens_analysis_pct_pb.index.to_list()

df_alternative = pd.Series([0.0] * len(alt_index), index=alt_index)

df_alternative.loc["Nuclear power"] = 7.0
df_alternative.loc["Coal-fired power plants"] = 24.0
df_alternative.loc[
    r"Heat savings (mild winter,\nefficiency improvements,\nbehavior change)"
] = 10.0
df_alternative.loc["Heat pumps (household)"] = 9.0

assert df_alternative.sum() == 50.0

df_alternative = df_alternative * df_sens_analysis_pct_pb.T
assert df_alternative.isna().sum().sum() == 0

solution = (
    df_alternative.sum(axis=1) + df_sens_analysis_pct_pb.loc[r"$\Delta$NG REPowerEU"]
)
solution = solution * 100 + df_ef_pb.loc[SCENARIOS[0]]

solution.clip(lower=0).mean(), solution

In [None]:
alt_index = df_sens_analysis_pct_pb.index.to_list()

df_alternative = pd.Series([0.0] * len(alt_index), index=alt_index)

df_alternative.loc["Nuclear power"] = 0.0
df_alternative.loc["Wind power"] = 40.0
df_alternative.loc[
    r"Heat savings (mild winter,\nefficiency improvements,\nbehavior change)"
] = 10.0
df_alternative.loc["Heat pumps (household)"] = 0.0

assert df_alternative.sum() == 50.0

df_alternative = df_alternative * df_sens_analysis_pct_pb.T
assert df_alternative.isna().sum().sum() == 0

solution = (
    df_alternative.sum(axis=1) + df_sens_analysis_pct_pb.loc[r"$\Delta$NG REPowerEU"]
)
solution = solution * 100 + df_ef_pb.loc[SCENARIOS[0]]

solution.clip(lower=0).mean(), solution

### Optimized array of measures

#### Optimization model

\begin{split}
    \min        & \ \ \ \sum_\ell {\mathbf{e}_\ell}\\
    \text{s.t.} & \ \ \ \mathbf{t}_\ell=\sum_{r\in R} \left( \mathbf{d}_r\cdot\mathbf{\rho}_{r,\ell} \right) +\mathbf{\Delta}_\ell ,   \forall \ell\in L\\
                & \ \ \ \mathbf{e}_\ell\ge \mathbf{t}_\ell,   \forall \ell\in L\\
                & \ \ \ \mathbf{e}_\ell\ge 0,   \forall \ell\in L\\
                & \ \ \ \sum_{r\in R}{\mathbf{d}_r} = \bar{d}
\end{split}

In [None]:
from pyomo.environ import *


def instantiating_model(
    df_pct,
    demand_reduction=50,
    constrain_savings=True,
    obj_aggregator=sum,
    pb_base=None,
    limit_each_to=None,
    demand_reduction_equals=True,
):
    print(f"Demand reduction {demand_reduction}")
    model = ConcreteModel()

    # Define the set of activities
    valid_measures = [ind for ind in alt_index if not ind.startswith(r"$\Delta")]
    model.activities = Set(initialize=valid_measures)

    # # Define the set of scenarios
    lca_index_map = {col: i for i, col in enumerate(df_pct.columns)}
    model.lca = Set(initialize=df_pct.columns)
    M = 10_000

    # Define the decision variables
    model.decision = Var(model.activities, domain=NonNegativeReals)
    model.impact = Var(model.lca, domain=NonNegativeReals)
    model.slack_impact = Var(model.lca, domain=Reals)
    # model.y_impact = Var(model.lca, domain=Binary)

    # Define the objective function
    def obj(m):
        return summation(m.impact) / len(df_pct.columns)

    # model.objective = Objective(expr=obj_aggregator(model.impact), sense=minimize)
    model.objective = Objective(rule=obj, sense=minimize)

    def transgression_calculation(m, l):
        aux = (
            sum([m.decision[act] * df_pct.loc[act, l] for act in m.activities])
            + df_pct.loc[r"$\Delta$NG Year 2022"].values.tolist()[lca_index_map[l]]
        )
        if pb_base is not None:
            aux = 100 * (aux + pb_base[lca_index_map[l]])

        return aux

    model.slack_impact_def = Constraint(
        model.lca,
        expr=lambda m, l: m.slack_impact[l] == transgression_calculation(m, l),
    )
    model.slack_impact_constraint = Constraint(
        model.lca, expr=lambda m, l: m.impact[l] >= m.slack_impact[l]
    )

    # Define the constraints
    if demand_reduction_equals:
        model.demand_reduction = Constraint(
            expr=sum(model.decision[act] for act in model.activities)
            == demand_reduction
        )
    else:
        model.demand_reduction = Constraint(
            expr=sum(model.decision[act] for act in model.activities)
            <= demand_reduction
        )

    model.power_plants = Constraint(
        expr=model.decision["Coal-fired power plants"]
        + model.decision["Oil-fired power plants"]
        + model.decision["Nuclear power"]
        + model.decision["Solar power"]
        + model.decision["Wind power"]
        + model.decision["Hydro power"]
        + model.decision["Electricity savings"]
        <= 50
    )
    model.chp = Constraint(
        expr=model.decision["Coal-fired CHP"]
        + model.decision["Oil-fired CHP"]
        + model.decision["Biomass-fired CHP"]
        <= 62.1 + 8.1
    )
    model.industry = Constraint(
        expr=model.decision["Coal-based industrial heating"]
        + model.decision["Oil-based industrial heating"]
        + model.decision[
            "Industrial heating savings\\n(improved efficiency,\\nproduction curtailment)"
        ]
        <= 110.4
    )
    model.household = Constraint(
        expr=model.decision["Heat pumps (household)"]
        + model.decision[
            "Heat savings (mild winter,\\nefficiency improvements,\\nbehavior change)"
        ]
        <= 153.9
    )

    if constrain_savings:
        savings = [
            r"Electricity savings",
            r"Industrial heating savings\n(improved efficiency,\nproduction curtailment)",
            r"Heat savings (mild winter,\nefficiency improvements,\nbehavior change)",
        ]
        label_savings = [
            r"electricity_savings",
            r"curtailment",
            r"heat_savings",
        ]
        for l, s in zip(label_savings, savings):
            model.__setattr__(l, Constraint(expr=model.decision[s] == 0))

    if limit_each_to is not None:
        for act in model.activities:
            model.__setattr__(
                act, Constraint(expr=model.decision[act] <= limit_each_to)
            )

    return model


def check_print_opt_results(res, model):
    assert res["Solver"][0]["Status"] == SolverStatus.ok

    ovr = res["Problem"][0]
    assert (
        abs(ovr["Lower bound"] - ovr["Upper bound"]) / (ovr["Lower bound"] + 1e-12)
        < 1e-3
    )

    print("Objective value:", model.objective())
    model.objective.pprint()
    model.decision.pprint()


stat_relevant_cols_opt = copy.copy(stat_relevant_cols)
stat_relevant_cols_opt = copy.copy(df_sens_analysis_pct_pb.columns.tolist())


def process_opt_lca_result(model, df_pct, pb_base=None):
    decision_results = {k: v.value for k, v in model.decision.items()}
    df_decisions = pd.Series(decision_results)

    df_alternative = df_decisions * df_pct[stat_relevant_cols_opt].T
    df_alternative.dropna(how="all", axis=1, inplace=True)
    assert df_alternative.isna().sum().sum() == 0

    lca_soln = (
        df_alternative.sum(axis=1)
        + df_pct.loc[r"$\Delta$NG Year 2022", stat_relevant_cols_opt]
    )

    lca_soln = lca_soln if pb_base is None else (lca_soln + pb_base) * 100

    obj_calcd = lca_soln.map(lambda x: max(x, 0)).sum() / len(df_pct.columns)

    print("Objective value calculated:", obj_calcd)
    print("Objective value from model:", model.objective(), "\n")
    # assert abs(obj_calcd - model.objective()) / obj_calcd < 1e-3

    print(lca_soln.sort_values(ascending=False))

    return lca_soln


df_sens_analysis_pct_ = df_sens_analysis_pct[stat_relevant_cols_opt].copy()
# df_sens_analysis_pct_ = df_sens_analysis_pct_

df_sens_analysis_pct_pb_ = df_sens_analysis_pct_pb[stat_relevant_cols_opt].copy()
# df_sens_analysis_pct_pb_[df_sens_analysis_pct_pb.columns.str.replace(",", "")] = (
#     df_sens_analysis_pct_pb.values
# )
# df_sens_analysis_pct_pb_ = df_sens_analysis_pct_pb_

pb_base = df_ef_pb.loc[SCENARIOS[0], :].values / 100


def run_all_opt_analysis(
    df_pct,
    demand_reduction=50,
    constrain_savings=True,
    pb_base=None,
    limit_each_to=None,
    demand_reduction_equals=True,
    ax=None,
    fig=None,
):
    # Solve the model 50bcm & constrain savings
    model = instantiating_model(
        df_pct,
        demand_reduction=demand_reduction,
        constrain_savings=constrain_savings,
        pb_base=pb_base,
        limit_each_to=limit_each_to,
        demand_reduction_equals=demand_reduction_equals,
    )
    solver = SolverFactory("cplex")

    opt_results = solver.solve(model, tee=False, report_timing=False)
    check_print_opt_results(opt_results, model)
    lca_soln = process_opt_lca_result(model, df_pct, pb_base=pb_base)

    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 5))
    pd.DataFrame(lca_soln, columns=["Optimum"]).sort_values(by="Optimum").plot(
        kind="barh", ax=ax
    )

    return model, opt_results, lca_soln, fig, ax

In [None]:
# BASELINE
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_,
    demand_reduction=0,
    constrain_savings=True,
    pb_base=pb_base,
    limit_each_to=None,
)

#### 50 bcm target

In [None]:
# Solve the model 50bcm & constrain savings
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_,
    demand_reduction=50,
    constrain_savings=True,
    pb_base=None,
    limit_each_to=None,
)

In [None]:
# Solve the model 50bcm & constrain savings
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_,
    demand_reduction=50,
    constrain_savings=True,
    pb_base=None,
    limit_each_to=10,
)

In [None]:
# Solve the model 50bcm & not constraint savings & PB
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_pb_,
    demand_reduction=50,
    pb_base=pb_base,
    constrain_savings=False,
    limit_each_to=15,
)

In [None]:
# Solve the model 50bcm & constrain savings & PB
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_pb_,
    demand_reduction=50,
    constrain_savings=True,
    pb_base=pb_base,
    limit_each_to=None,
)

#### >100 bcm targets

In [None]:
# Solve the model 150bcm & constrain savings
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_,
    demand_reduction=150,
    constrain_savings=True,
    pb_base=None,
    limit_each_to=None,
)

In [None]:
# Solve the model 150bcm & constrain savings & PB
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_pb_,
    demand_reduction=150,
    constrain_savings=True,
    pb_base=pb_base,
    limit_each_to=None,
)

In [None]:
# Solve the model 400bcm & constrain savings & PB
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_pb_,
    demand_reduction=384,
    constrain_savings=True,
    pb_base=pb_base,
    limit_each_to=None,
    demand_reduction_equals=True,
)

In [None]:
# Solve the model 400bcm & constrain savings & PB
model, opt_results, lca_soln, fig, ax = run_all_opt_analysis(
    df_sens_analysis_pct_pb_,
    demand_reduction=400,
    constrain_savings=True,
    pb_base=pb_base,
    limit_each_to=None,
    demand_reduction_equals=False,
)