<span style="color:red">Work in progress</span>

--> This jupyter notebook serve as the basic structure to perform life cycle assessment within our research group (Used at least by Augustin & Robin).

# Introduction

The architecture of this notebook is explained in the Figure bellow that depicts the three different type of brightway2 (BW2) databases that are used in the notebook:
- the biosphere database from ecoinvent
- the original ecoinvent database version 3.9.1 cutoff
- an Open Source (OS) database where custom activities are created.

This jupyter notebook does the following:
1. Create the BW2 environment to compute the LCA
2. Generate the foreground activities, custom activities and modified activies in the framework of LCA algebraic using several excel file as data input
3. Compute LCIA results with uncertainty and distribution based on parameter distributions.

The following figure illustrates the database structure of this BW2 notebook and the purpose of the various files used.

![title](image/figure_notebook_structure.png)


# Importing relevant packages

In [None]:
import brightway2 as bw
from brightway2 import *
import lca_algebraic as agb
from sympy import init_printing
import bw2io
from dotenv import load_dotenv
import pandas as pd
from sympy import symbols
import logging
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import os 
import json
from importlib import reload

In [None]:
# Custom packages
from src.ei_access import EI_Access
from src.ei_access.setup import setup_database

# Parameters

In [None]:

#### Project & ecoinvent database parameters
project_name= 'ECS-LCA'

ei_acc = EI_Access()

#### Life cycle data parameters
xlsx_path = "./sheets/"
# This Excel file must be formatted according to the provided template.
LC_data_Foreground_file_path = xlsx_path + "LC_data_Foreground.xlsx"
foreground_list=["foreground_ICT_2024_BE"] #These foreground can represent different scenario for example

#Specify the files that contain the data to build the custom and modified activities.
custom_meta_data= xlsx_path + "custom_meta_data.xlsx"

# Specify the Foreground for which the results will be computed
selected_foreground = 'foreground_ICT_2024_BE'

# Specify the LCIA method with which the results will be computed
impacts_GWP_specific=('EF v3.1', 'climate change', 'global warming potential (GWP100)')
#impacts_GWP_specific=('CML v4.8 2016 no LT', 'climate change no LT', 'global warming potential (GWP100) no LT')
impacts_ADP_specific=('EF v3.1', 'material resources: metals/minerals', 'abiotic depletion potential (ADP): elements (ultimate reserves)')
LCIA_method_list=[impacts_GWP_specific,impacts_ADP_specific]
#Specify amoung which label the contribution analysis should be performed:
contribution_analysis_label = "LC phase"


#Specify the output folder to export the results
output_folder = r"./results"

# Initialize the project and the ecoinvent database

In [None]:
# Uncomment and run only if you think you messed up your project
#bw.projects.delete_project(name=project_name, delete_dir=True)

In [None]:
bw.projects.set_current(project_name) # Set the current project, can be any name

In [None]:
setup_database(ei_acc, project_name)

# Custom functions to parse the excel

In [None]:


def find_activity(ei_activity_name, location, custom_process_name, modified_activity_name, custom_db):
    """
    Finds activities in the ecoinvent database or in the foreground DB.
    The latter contains either (1) custom activities or (2) modified ecoinvent activities.
    """
    if pd.notna(modified_activity_name): # Check for the modified activities first
        try:
            modified_activity = agb.findActivity(modified_activity_name, db_name=custom_db)
            # print(f"Found activity '{modified_activity_name}' in custom database '{custom_db}'.") # for debugging only
            return modified_activity
        except Exception as e:
            print(f"Failed to find activity '{modified_activity_name}' in custom database '{custom_db}': {e}")

    if pd.notna(custom_process_name): # Check for the custom activity
        try:
            custom_process = agb.findActivity(custom_process_name, db_name=custom_db)
            # print(f"Found custom process '{custom_process_name}' in custom database '{custom_db}'.") # for debugging only
            return custom_process
        except Exception as e:
            print(f"Failed to find custom process '{custom_process_name}' in custom database '{custom_db}': {e}")

    try: # Fallback to the native Ecoinvent database, if the activity is not custom or modified
        if pd.notna(location):  # If location is provided
            return agb.findTechAct(ei_activity_name, loc=location)
        else:
            return agb.findTechAct(ei_activity_name)
    except Exception as e:
        print(f"Failed to find activity '{ei_activity_name}' in Ecoinvent for location '{location}': {e}")
        return None



def process_parameters(params_df, parameter_registry,sheet_name):
    """
    Create the parameters in the lca algebraic framework for one of the excel sheet.

    params_df: the dataframe that has been created for one excel sheet
    parameter_registry: the register of parameter
    sheet_name: the name of the excel sheet params_df
    """
    params_df["Type"] = params_df["Type"].astype(str).str.strip().str.lower()

    for index, row in params_df.iterrows():
        param_name = str(sheet_name)+"_"+str(row["parameter number"])
        param_type = row["Type"].strip().lower()

        if pd.isna(row["EI activity name"]):
            print(f"Skipping row {index} as 'EI activity name' is NaN.")
            break

        try:
            if param_type == "float":
                param = agb.newFloatParam(
                    param_name,
                    default=row["Default"],
                    min=row.get("Min"),
                    max=row.get("Max"),
                    std=row.get("Std"),
                    distrib=getattr(agb.DistributionType, row["Distrib"].upper(), None),
                    description=row.get("Description"),
                    label=row.get("Label")
                )
            elif param_type == "bool":
                param = agb.newBoolParam(
                    param_name,
                    default=row["Default"]
                )
            elif param_type == "enum":
                values = eval(row["Values"]) if isinstance(row["Values"], str) else row["Values"]
                weights = eval(row["Weights"]) if isinstance(row["Weights"], str) else None

                if weights:
                    values = {k: v for k, v in zip(values, weights)}

                param = agb.newEnumParam(
                    param_name,
                    values=values,
                    default=row["Default"],
                    description=row.get("Description")
                )
            else:
                raise ValueError(f"Unsupported parameter type: {param_type}")

            # Register the parameter for later evaluation
            parameter_registry[param_name] = param
            print(f"Parameter created and registered: {param_name}") # for debugging only !
        except Exception as e:
            print(f"Error creating parameter '{param_name}': {e}")

# Load the life-cycle data (foreground systems, created and modified activities)

In [None]:
# Loading the foreground systems in a dictionnary
foregrounds_All = {}
for foreground_name in foreground_list:
    foreground=pd.read_excel(LC_data_Foreground_file_path, sheet_name=foreground_name)
    foregrounds_All[foreground_name]=foreground

In [None]:
custom_meta_data_DF=pd.read_excel(custom_meta_data)
custom_meta_data_DF=custom_meta_data_DF.set_index('sheet name')

In [None]:
# Creating a dataframe for each custom activity (custom activity = green sheets)
OS_database_dataframes = {}

for index,row in custom_meta_data_DF.iterrows():
    sheet_data = pd.read_excel(xlsx_path + row["file"], sheet_name=index)  
    OS_database_dataframes[index] = sheet_data

In [None]:
agb.resetParams()      # Reset parameters 

In [None]:
parameter_registry = {} # Add parameters from custom and modified activity to the parameter_registry

for sheet_name, sheet_df in OS_database_dataframes.items():
    print(f"Processing parameters from sheet: {sheet_name}")
    process_parameters(sheet_df, parameter_registry,sheet_name)  # create new parameters for custom activities

# Defining the foreground

In [None]:
OS_database="OS database"

agb.resetDb(OS_database)
agb.setForeground(OS_database) #Create one database where all custom and modified activities will be added.
agb.list_databases()        # Sanity check - All activities reset to zero ?

In [None]:
process_parameters(foregrounds_All[f"{selected_foreground}"], parameter_registry,selected_foreground) # create new parameters for foreground activities and add them to the parameter_registry

# Create custom activities

In [None]:
"""
The aim here is to create activities that don't exist in Ecoinvent. They are custom or modified activities.
Note: at the end of the day, they are still built from different activities that exist in the ecoinvent database.
"""

sorted_sheets = (
    custom_meta_data_DF
    .sort_values(by="priority")         # smallest priority first
    .index
    .tolist()
)

for sheet_name in sorted_sheets: # iterate in the custom sheets
    sheet_df=OS_database_dataframes[sheet_name]
    if custom_meta_data_DF.loc[sheet_name]["type"] == "custom":
        sheet_location=custom_meta_data_DF.loc[sheet_name]["location"]
        sheet_unit=custom_meta_data_DF.loc[sheet_name]["unit"]
        print(f"Processing sheet: {sheet_name}") # to keep track of which sheet is been processed
        accumulated_exchanges = {}
        
        for index, row in sheet_df.iterrows():         # iterate inside a given custom sheet
            ei_activity_name = row["EI activity name"] # get the name of the ei activity at a given row 
            if pd.isna(ei_activity_name):              # Break when 'EI activity name' is NaN, -> when we're at the end of the i-th custom sheet
                print(f"Skipping row {index} in sheet '{sheet_name}' as 'EI activity name' is NaN.")
                break
    
            location = row["loc"]
            parameter_expression = str(sheet_name)+"_"+str(row["parameter number"])
            new_activity_name = row["LCA algebraic name"]
            custom_process_name = row["Custom process name"]
            modified_activity_name = row["Modified process name"]
            
            # Let's find the activity, either custom, modified or already present in ecoinvent
            ei_activity = find_activity(ei_activity_name, location, 
                                        custom_process_name = custom_process_name, 
                                        modified_activity_name = modified_activity_name,
                                        custom_db=OS_database) 

            
            if ei_activity is None:
                print(f"Skipping row {index} in sheet '{sheet_name}' due to unresolved activity issues.") # if the activity is not found, should not happen !
                continue 
    
            try: # We also need to associate exchanges inside custom activities with parameter values
                parameter_value = eval(parameter_expression, {}, parameter_registry) # evaluate the parameter expression
                if ei_activity in accumulated_exchanges:
                    accumulated_exchanges[ei_activity] += parameter_value  # accumulate value if activity already exists
                else:
                    accumulated_exchanges[ei_activity] = parameter_value  # add new activity to exchanges
            except Exception as e:
                print(f"Error in parameter expression for activity in row {index}: {e}")
                continue
    
        try:
            agb.newActivity(OS_database, sheet_name, sheet_unit, exchanges=accumulated_exchanges) # Create the custom process with accumulated exchanges and associated parameters
            print(f"Exchanges for {sheet_name}: {accumulated_exchanges} \n") # for debugging or for a better understanding
        
        except Exception as e:
            print(f"Error creating custom activity '{sheet_name}': {e} \n") # Should not happen !

    if custom_meta_data_DF.loc[sheet_name]["type"] == "modified":
        original_EI_activity_name=custom_meta_data_DF.loc[sheet_name]["original EI activity name"]
        original_EI_activity_location=custom_meta_data_DF.loc[sheet_name]["original EI activity location"]
        sheet_location=custom_meta_data_DF.loc[sheet_name]["location"]
        sheet_unit=custom_meta_data_DF.loc[sheet_name]["unit"]
        modified_activity_name=sheet_name
        try:
            original_activity = agb.findTechAct(original_EI_activity_name, loc=original_EI_activity_location)
            if original_activity is None:
                print(f"Skipping copy and update for {modified_activity_name} due to unresolved activity issues.")
                continue
            
            new_activity = agb.copyActivity(OS_database, original_activity, modified_activity_name)  # Create a copy of the activity
            print(f"Copied activity: {modified_activity_name}")
            
            sheet_data = OS_database_dataframes[modified_activity_name]
            exchanges_dict = {}

            # TBM = To Be Modified inside the copied activity
            for idx, sheet_row in sheet_data.iterrows():
                ei_activity_name_TBM = sheet_row["EI activity name"]
                parameter_expression_TBM = str(modified_activity_name)+"_"+str(sheet_row["parameter number"])
        
                try:
                    amount = eval(parameter_expression_TBM, {}, parameter_registry)  # Evaluate the parameter expression to get the amount
                    print(f"Evaluated amount for '{ei_activity_name_TBM}': {amount}")
                    exchanges_dict[ei_activity_name_TBM] = dict(amount=amount)  # Structure the exchange as a dictionary with amount
                except Exception as e:
                    print(f"Error evaluating parameter expression '{parameter_expression_TBM}' for activity '{ei_activity_name_TBM}': {e}")
                    continue  # Skip this exchange if there's an error

            try:  # Now, update exchanges for the new activity
                print(exchanges_dict)  # Optional print for debugging
                new_activity.updateExchanges(exchanges_dict)
                print(f"Updated exchanges for activity: {modified_activity_name}")

            except Exception as e:
                print(f"Error updating exchanges for activity '{modified_activity_name}': {e}")
                continue
        except Exception as e:
            print(f"Error in copying or updating activity '{modified_activity_name}': {e}")


print("Finished processing all sheets.")

In [None]:
agb.list_databases() #Should be non zero for the foreground custom DB

In [None]:
for index, row in foregrounds_All[f"{selected_foreground}"].iterrows():          # Iterate through the rows of the foreground sheet and create new activities
    
    ei_activity_name = row["EI activity name"]       # Read values from the Excel file
    location = row["loc"]                            # Location (optional)
    new_activity_name = row["LCA algebraic name"]    # the name of the new activity is defined in "LCA algebraic name"
    parameter_expression = str(selected_foreground)+"_"+str(row["parameter number"])          # Name of the associated parameter
    modified_activity_name = row["Modified process name"]         # name of the modified activity (if relevant)
    custom_process_name = row["Custom process name"] # name of the custom activity (if relevant)
   
    if pd.isna(ei_activity_name):  # Stop processing when 'EI activity name' is NaN
        print(f"Stopping processing at index {index} as 'EI activity name' is NaN.")
        break  # Exit the loop

    else:  
        ei_activity = find_activity(ei_activity_name, location, 
                                    custom_process_name = custom_process_name, 
                                    modified_activity_name = modified_activity_name,
                                    custom_db=OS_database) # Let's find the activity, either custom, modified or already present in ecoinvent
        
        if ei_activity is None:
            print(f"Skipping creation of {new_activity_name} due to unresolved activity issues.")
            continue  

        try:
            exchanges = {ei_activity: eval(parameter_expression, {}, parameter_registry)}  # Define exchanges
        except Exception as e:
            print(f"Error in parameter expression for activity '{new_activity_name}': {e}")
            continue

        try:
            agb.newActivity(OS_database, new_activity_name, "unit", exchanges=exchanges)
            print(f"Activity created: {new_activity_name}")
        except Exception as e:
            print(f"Error creating activity '{new_activity_name}': {e}")


In [None]:
agb.list_databases() #Should be non zero for the foreground custom DB

# Foreground generation

In [None]:
Exchanges_Foreground = {} # Initialize an empty dictionary to store exchanges

for index, row in foregrounds_All[f"{selected_foreground}"].iterrows(): # Iterate through the main sheet to collect activities for the exchanges
    Exchanges_Foreground[agb.findActivity(row["LCA algebraic name"], db_name=OS_database, loc="GLO")]=1

print(Exchanges_Foreground)
agb.newActivity(OS_database,selected_foreground,  "unit", exchanges=Exchanges_Foreground) # Create the foreground

In [None]:
total_foreground_exchanges = {}
total_foreground_exchanges[agb.findActivity(selected_foreground, db_name=OS_database, loc="GLO")] = 1

# Reference Flow

In [None]:
reference_flow = agb.newActivity(OS_database, "reference flow", "unit", exchanges=total_foreground_exchanges)

# Impact Methods Choice

In [None]:
# Retrieve impact categories (here from EF v3.1)
impacts_GWP = agb.findMethods(search="climate change", mainCat="EF v3.1") # GWP impact categories from EF 3.1
impacts_GWP_other = agb.findMethods(search="climate change") # GWP impact categories from EF 3.1
impacts_ADP = agb.findMethods(search="ADP", mainCat="EF v3.1") # GWP impact categories from EF 3.1
impacts_all = agb.findMethods(search="", mainCat="EF v3.1")               # All impact categories from EF 3.1

# Compute Absolute Impacts

In [None]:
absolute_impacts = agb.compute_impacts(reference_flow, LCIA_method_list, functional_unit= 1)
absolute_impacts

# Contribution Analysis

In [None]:
for index, row in foregrounds_All[f"{selected_foreground}"].iterrows():
    activity_name = row["LCA algebraic name"]
    sub_assembly_label = row[contribution_analysis_label]  # choosing labels for activities -> life cycle phases : results per phase, 
    agb.findActivity(activity_name, db_name=OS_database, loc="GLO").updateMeta(phase=sub_assembly_label, label=sub_assembly_label)
print("Finished labeling activities.")

df_impacts_axis = agb.compute_impacts(reference_flow, LCIA_method_list, functional_unit=1, axis="label")
df_impacts_axis_ = df_impacts_axis.drop(index=['*sum*'])     # Drop the 'sum' 
df_sums = df_impacts_axis_.sum(axis=0)                       # Compute sums 
df_normalized = df_impacts_axis_.div(df_sums, axis=1) * 100  # Normalize data

In [None]:
### ------------------------------------------------------------------
# 1.  Set-up
# ------------------------------------------------------------------
df = df_impacts_axis.drop(index="*sum*")                    # convenience alias
labels = df.index                       # contribution labels (rows)
categories = df.columns                 # the two impact categories
palette = sns.color_palette("Spectral", n_colors=len(labels))

# ------------------------------------------------------------------
# 2.  Create a 1-by-N grid of sub-plots (one per impact category)
# ------------------------------------------------------------------
fig, axes = plt.subplots(
    nrows=1,
    ncols=len(categories),
    figsize=(10, 3),
    sharey=True,                 # keep the single y tick aligned
    constrained_layout=True
)

# ------------------------------------------------------------------
# 3.  Draw a stacked horizontal bar in *each* subplot
# ------------------------------------------------------------------
for ax, cat in zip(axes, categories):
    left = 0
    for color, (label, value) in zip(palette, df[cat].items()):
        ax.barh(
            y=0,                 # only one device
            width=value,
            left=left,
            height=0.6,
            color=color,
            label=label
        )
        left += value            # accumulate for stacking

    # --- Cosmetics -------------------------------------------------
    # Shorten the subplot title to everything before the first " - "
    ax.set_title(cat.split(' - ')[0], fontsize=11)

    # Label the x-axis with the unit text (text inside the brackets)
    unit = cat.split('[')[-1].rstrip(']')
    ax.set_xlabel(unit)

    ax.set_yticks([0])

    #if ax is axes[0]:
        #ax.set_yticklabels(['Device 0'])
        #ax.set_xlim(0, 8) 
    #else:
        #ax.set_yticklabels([])
        #ax.set_xlim(0, 0.001) 


# ------------------------------------------------------------------
# 4.  One shared legend and overall title
# ------------------------------------------------------------------
fig.legend(labels, loc='upper right', ncol=1, bbox_to_anchor=(0.97, 0.9))
#fig.suptitle('Contribution analysis â€” single device', y=1.15, fontsize=14)
plt.savefig(os.path.join(output_folder,"figure","fig_"+contribution_analysis_label+"_"+selected_foreground+".png"))
plt.show()

In [None]:
# Create the plot
fig, ax = plt.subplots(figsize=(12, 10))
dark_palette = sns.color_palette("Spectral", n_colors=len(df_normalized.index))
df_normalized.T.plot(kind='bar', stacked=True, ax=ax, width=0.8, color=dark_palette)
#dark_palette = sns.color_palette("Spectral", n_colors=len(df_impacts_axis_.index))
#df_impacts_axis_.T.plot(kind='bar', stacked=True, ax=ax, width=0.8, color=dark_palette)

# Customize plot
ax.set_title('Impact Categories with Contributions from Selected Labels (Normalized to 100%)')
ax.set_ylabel('Percentage (%)')
ax.set_xticklabels(df_impacts_axis_.columns, rotation=45, ha='right')
ax.legend(title="Labels", bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(os.path.join(output_folder,"figure","fig_normalized_"+contribution_analysis_label+"_"+selected_foreground+".png"))
plt.show()

# Monte-Carlo Analysis

In [None]:
MC_runs=agb.incer_stochastic_violin(reference_flow, LCIA_method_list, functional_unit=1, figspace=(0.5,0.5), n=512, figsize=(8, 8), sharex=True,  nb_cols=2)

# Exporting results

In [None]:
output_file = os.path.join(output_folder,  selected_foreground+"_contribution_results.xlsx")
df_impacts_axis.to_excel(output_file, index=True)
print(f"Impact saved to {output_file}")