# Use examples of [edges](https://github.com/romainsacchi/edges) for Mass Flow Analysis (MFA)

Author:  
Romain Sacchi, Researcher  
Laboratory for Energy Systems Analyses  
Paul Scherrer Institut (PSI)  
GitHub: [romainsacchi](https://github.com/romainsacchi)
Email: [romain.sacchi@psi.ch](mailto:romain.sacchi@psi.ch)

This notebook shows examples on how to use `edge` to use exchange-specific
characterization factors in the characterization matrix of `bw2calc`, in order
to derive in-use stock flows, which can be used for MFA.

## Requirements

* **Pyhton 3.10 or higher is recommended**
* **bw2data**
* **bw2calc**

# Use case with [Brightway](https://brightway.dev/)

`brightway2` is an open source LCA framework for Python.
Please refer to the brightway [documentation](https://brightway.dev) if you do not know how to create a project.

We start by importing the `EdgeLCIA` class.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import logging
import re
import pandas as pd
import numpy as np


from edges import EdgeLCIA, get_available_methods, setup_package_logging
import bw2data, bw2io

# logger setup
setup_package_logging(logging.INFO)

LCIA method files for `edges` can be json files, or simply a Python dictionary.
Let's consider the following LCIA method:

In [None]:
method = {
  "name": "Copper", #metadata
    "version": "1.0",
    "description": "Example LCIA method to track copper use",
    "unit": "kg Cu",
    "strategies": [ 
        "map_exchanges" # recommended strategies, can be later called by `EdgeLCIA.apply_strategies()`
    ],
  "exchanges": [
    {
      "supplier": { # selection criteria on the supplier side
        "name": "market for copper, cathode",
        "operator": "contains", # can be `equals`, `contains` or `startswith`
        "matrix": "technosphere", # nodes that belong to the technosphere matrix
      },
      "consumer": {
        "matrix": "technosphere" # nodes that belong to the technosphere matrix
      },
      "value": 1.0 # CF value
    },
    {
    "supplier": {
        "name": "market for copper, anode",
        "operator": "contains",
        "matrix": "technosphere",
    },
    "consumer": {
        "matrix": "technosphere"
    },
    "value": 1.0
    },
  ]
}

In [None]:
# activate the bw project
bw2data.projects.set_current("ecoinvent-3.11-cutoff-bw25")
act = [a for a in bw2data.Database("ecoinvent-3.11-cutoff") if a["name"].startswith("distribution network")][0]
act

In [None]:
# EdgeLCIA class instantiation
LCA = EdgeLCIA(
    demand={act: 1}, # functional unit
    method=method, # method defined above
)
LCA.lci()

LCA.apply_strategies() # either run `.apply_strategies()`
#LCA.map_exchanges() # or run mapping strategies manually

LCA.evaluate_cfs() # fill-in the characterization matrix
LCA.lcia() # calculates score
LCA.score # print score

We can generate a pandas.Dataframe that lists characterized exchanges

In [None]:
df = LCA.generate_cf_table(include_unmatched=False)

In [None]:
df.sort_values(by="impact", ascending=False)

We've built a small mapping between CPC codes a lifetimes (in years).

In [None]:
lifetimes = pd.read_csv("cpc_with_lifetimes.csv")
lifetimes.head()

Let's see how lifetimes distribute across CPC codes.

In [None]:
lifetimes.plot(kind="hist", bins=50)

Let's try to map those with the exchanges in the dataframe, based on the consumer's CPC code.

In [None]:
# Extract numeric CPC codes
lifetimes["cpc_code"] = lifetimes["CPC classification"].str.extract(r"^\s*(\d+)")
df["consumer cpc code"] = df["consumer cpc"].str.extract(r"^\s*(\d+)")

# Create mapping dict
code_to_lifetime = lifetimes.set_index("cpc_code")["lifetime_years_refined"]

# Map lifetimes to consumer CPCs
df["lifetime"] = df["consumer cpc code"].map(code_to_lifetime)

In [None]:
df

# Use case with sales of passenger cars
We will use a scenario from an Integrated Assessment Model (IAM).  
The scenario is from the [REMIND](https://www.pik-potsdam.de/en/institute/departments/transformation-pathways/models/remind) model: SSP1 - Peak CO2 budget of 650 gigatons i.e., aligns with a global mean surface temperature increase of +1.5 C by 2100).  
The data describe the sales of new passenger cars in Europe, in millions of vehicles.

In [None]:
sales = pd.read_excel("vehicle_sales.xlsx")

In [None]:
import re
import pandas as pd
import numpy as np


df = sales.copy()

year_like = [c for c in df.columns if isinstance(c, (int, np.integer)) or (isinstance(c, str) and c.isdigit())]
rename_map = {c: int(c) for c in year_like}
df = df.rename(columns=rename_map)

year_cols = sorted([c for c in df.columns if isinstance(c, (int, np.integer))])
id_cols = [c for c in df.columns if c not in year_cols]  # <-- this was missing

df[year_cols] = df[year_cols].apply(pd.to_numeric, errors="coerce")

all_years = list(range(min(year_cols), max(year_cols) + 1))

year_block = (
    df.set_index(id_cols)[year_cols]
      .reindex(columns=all_years)                       
      .interpolate(axis=1, limit_direction="both")      
)

sales = (
    year_block
      .reset_index()
      .reindex(columns=id_cols + all_years)             
)

In [None]:
sales

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

df = sales.copy()
df = df.loc[:, :2100]

# years are the numeric columns
years = [c for c in df.columns if isinstance(c, (int, float))]

# extract powertrain from the end of the pipe-delimited Variable
df["Powertrain"] = df["Variable"].str.extract(r'([^|]+)$')

# normalize names to your legend labels
name_map = {
    "Gases": "CNG",
    "Hybrid electric": "PHEV",
    "Liquids": "ICEV",
    "BEV": "BEV",
    "FCEV": "FCEV",
}
df["Powertrain"] = df["Powertrain"].replace(name_map)

# long → wide: index=Year, columns=Powertrain
wide = (
    df.melt(id_vars="Powertrain", value_vars=years,
            var_name="Year", value_name="Sales")
      .pivot_table(index="Year", columns="Powertrain",
                   values="Sales", aggfunc="sum")
      .sort_index()
)

# optional: enforce legend/stack order
order = ["BEV", "FCEV", "CNG", "PHEV", "ICEV"]
wide = wide[[c for c in order if c in wide.columns]]

ax = wide.plot(kind="area")
ax.set_title("New vehicle sales")
ax.set_ylabel("million vehicles")
ax.legend(title="Powertrain", loc="upper left", bbox_to_anchor=(1.02, 1))
plt.tight_layout()


Now, let's import in brightway, using `bw2io`, some life cycle inventories that represent the manufacture of passenger cars.

In [None]:
i = bw2io.ExcelImporter("lci-pass_cars.xlsx")

In [None]:
i.apply_strategies()

We link the imported inventories against themselves, the ecoinvent database and the biosphere database (in that order).

In [None]:
i.match_database(fields=["name", "reference product", "location"])
i.match_database("ecoinvent-3.11-cutoff", fields=["name", "reference product", "location"])
i.match_database("ecoinvent-3.11-biosphere", fields=["name", "categories"])

Zero unlinked exchanges, we can go ahead!

In [None]:
i.statistics()

In [None]:
i.write_database()

We create shorcuts/references to the dataset objects representing the building of a car from our newly imported database

In [None]:
icev = [ds for ds in bw2data.Database("lci-pass_cars") if ds["name"] == "Passenger car, gasoline, Medium, EURO-6d"][0]
bev = [ds for ds in bw2data.Database("lci-pass_cars") if ds["name"] == "Passenger car, battery electric, Medium"][0]
cng = [ds for ds in bw2data.Database("lci-pass_cars") if ds["name"] == "Passenger car, compressed gas, Medium, EURO-6d"][0]
hybrid = [ds for ds in bw2data.Database("lci-pass_cars") if ds["name"] == "Passenger car, plugin gasoline hybrid, Medium, EURO-6d"][0]
fcev = [ds for ds in bw2data.Database("lci-pass_cars") if ds["name"] == "Passenger car, fuel cell electric, Medium"][0]

In [None]:
# convert from millions of vehicles to vehicles
wide *= 1e6

In [None]:
wide

In [None]:
# create a dictionary to store the results per vehicle type
per_vehicle = {}

for fu, tech in [
    ({icev.id: 1.0},   "ICEV"),
    ({bev.id: 1.0},    "BEV"),
    ({cng.id: 1.0},    "CNG"),
    ({hybrid.id: 1.0}, "PHEV"),
    ({fcev.id: 1.0},   "FCEV"),
]:
    if tech == "ICEV":
        LCA = EdgeLCIA(demand=fu, method=method)
        LCA.lci()
        LCA.apply_strategies()
        LCA.evaluate_cfs()
        LCA.lcia()
    else:
        LCA.redo_lcia(fu)

    df = LCA.generate_cf_table().copy()

    # Extract CPC code and map lifetime
    if "consumer cpc code" not in df.columns and "consumer cpc" in df.columns:
        df["consumer cpc code"] = df["consumer cpc"].str.extract(r"^\s*(\d+)")
    df["lifetime"] = df["consumer cpc code"].map(code_to_lifetime)

    # Optional: one dataset does not have a CPC code
    df.loc[df["consumer name"] == "Glider lightweighting", "lifetime"] = 12

    per_vehicle[tech] = df[["amount", "lifetime"]].copy()  # keep only what we need


wide = wide.sort_index()

yearly_frames = []
for tech, exch in per_vehicle.items():
    # sales series for this tech
    if tech not in wide.columns:
        continue
    sales = wide[[tech]].reset_index().rename(columns={tech: "vehicles"})  # columns: Year, vehicles

    # cross-join: add a constant key
    exch_ = exch.copy()
    exch_["key"] = 1
    sales["key"] = 1
    merged = exch_.merge(sales, on="key", how="left").drop(columns="key")

    # inflow per exchange-year
    merged["tech"] = tech
    merged["total_amount"] = merged["amount"] * merged["vehicles"]

    yearly_frames.append(merged)

df_yearly_exchanges = pd.concat(yearly_frames, ignore_index=True)

df_yearly_exchanges["Year"] = df_yearly_exchanges["Year"].astype(int)
df_yearly_exchanges["lifetime"] = df_yearly_exchanges["lifetime"].fillna(0).astype(int)

# Year when copper returns
df_yearly_exchanges["return_year"] = df_yearly_exchanges["Year"] + df_yearly_exchanges["lifetime"]

# Outflow (returns)
copper_returns = (
    df_yearly_exchanges.groupby("return_year", as_index=False)["total_amount"]
    .sum()
    .rename(columns={"return_year": "Year", "total_amount": "copper_return"})
)

# Inflow (sales)
copper_sales = (
    df_yearly_exchanges.groupby("Year", as_index=False)["total_amount"]
    .sum()
    .rename(columns={"total_amount": "copper_sales"})
)

# Merge and compute stock
copper_stock_flow = pd.merge(copper_sales, copper_returns, on="Year", how="outer").fillna(0.0)
copper_stock_flow = copper_stock_flow.sort_values("Year")
copper_stock_flow["stock_in_use"] = (
    copper_stock_flow["copper_sales"].cumsum() - copper_stock_flow["copper_return"].cumsum()
)

In [None]:
# here is our dataframe with summed (across powertrain types) in-use, incoming and outgoing copper flows per year.
copper_stock_flow

In [None]:
# convert to ktons
copper_stock_flow.loc[:, "copper_sales":] /= 1e6

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))

plt.plot(
    copper_stock_flow["Year"],
    copper_stock_flow["copper_sales"],
    label="Copper inflow (sales)",
    color="#1f77b4",
    linewidth=2,
)
plt.plot(
    copper_stock_flow["Year"],
    copper_stock_flow["copper_return"],
    label="Copper outflow (end-of-life)",
    color="#ff7f0e",
    linewidth=2,
)
plt.plot(
    copper_stock_flow["Year"],
    copper_stock_flow["stock_in_use"],
    label="Copper stock in use",
    color="black",
    linewidth=2.5,
    linestyle="--",
)

plt.xlabel("Year", fontsize=12)
plt.ylabel("Copper (ktons)", fontsize=12)
#plt.yscale('log')
plt.title("Copper inflows, outflows, and in-use stock", fontsize=13)
plt.legend(title="", loc="upper right")
plt.grid(True, linestyle=":", alpha=0.6)
plt.tight_layout()
plt.show()


By looking at the logarithmic scale, we can see copper coming back before the first discarded cars.  
Also, we can see copper coming back long after the last discarded cars.  
What are those?


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))

plt.plot(
    copper_stock_flow["Year"],
    copper_stock_flow["copper_sales"],
    label="Copper inflow (sales)",
    color="#1f77b4",
    linewidth=2,
)
plt.plot(
    copper_stock_flow["Year"],
    copper_stock_flow["copper_return"],
    label="Copper outflow (end-of-life)",
    color="#ff7f0e",
    linewidth=2,
)
plt.plot(
    copper_stock_flow["Year"],
    copper_stock_flow["stock_in_use"],
    label="Copper stock in use",
    color="black",
    linewidth=2.5,
    linestyle="--",
)

plt.xlabel("Year", fontsize=12)
plt.ylabel("Copper (ktons)", fontsize=12)
plt.yscale('log')
plt.title("Copper inflows, outflows, and in-use stock", fontsize=13)
plt.legend(title="", loc="upper right")
plt.grid(True, linestyle=":", alpha=0.6)
plt.tight_layout()
plt.show()


Let's try with other metals!

In [None]:
copper_method = {
  "name": "Copper",
    "version": "1.0",
    "description": "Example LCIA method to track copper use",
    "unit": "kg Cu",
    "strategies": [
        "map_exchanges"
    ],
  "exchanges": [
    {
      "supplier": {
        "name": "market for copper, cathode",
        "operator": "contains",
        "matrix": "technosphere",
      },
      "consumer": {
        "matrix": "technosphere"
      },
      "value": 1.0
    },
    {
    "supplier": {
        "name": "market for copper, anode",
        "operator": "contains",
        "matrix": "technosphere",
    },
    "consumer": {
        "matrix": "technosphere"
    },
    "value": 1.0
    },
  ]
}

aluminium_method = {
  "name": "Aluminium",
    "version": "1.0",
    "description": "Example LCIA method to track aluminium use",
    "unit": "kg Cu",
    "strategies": [
        "map_exchanges"
    ],
  "exchanges": [
    {
      "supplier": {
        "name": "market for aluminium alloy",
        "operator": "contains",
        "matrix": "technosphere",
      },
      "consumer": {
        "matrix": "technosphere"
      },
      "value": 1.0
    },
  ]
}

steel_method = {
  "name": "Steel",
    "version": "1.0",
    "description": "Example LCIA method to track steel use",
    "unit": "kg Cu",
    "strategies": [
        "map_exchanges"
    ],
  "exchanges": [
    {
      "supplier": {
        "name": "market for steel, ",
        "operator": "contains",
        "matrix": "technosphere",
      },
      "consumer": {
        "matrix": "technosphere"
      },
      "value": 1.0
    },
  ]
}

platinum_method = {
  "name": "Platinum",
    "version": "1.0",
    "description": "Example LCIA method to track platinum use",
    "unit": "kg Cu",
    "strategies": [
        "map_exchanges"
    ],
  "exchanges": [
    {
      "supplier": {
        "name": "market for platinum",
        "operator": "equals",
        "matrix": "technosphere",
      },
      "consumer": {
        "matrix": "technosphere"
      },
      "value": 1.0
    },
  ]
}

In [None]:
methods_map = {
    "copper": copper_method,
    "aluminium": aluminium_method,
    "steel": steel_method,
    "platinum": platinum_method,
}

tech_fus = [
    ({icev.id: 1.0},   "ICEV"),
    ({bev.id: 1.0},    "BEV"),
    ({cng.id: 1.0},    "CNG"),
    ({hybrid.id: 1.0}, "PHEV"),
    ({fcev.id: 1.0},   "FCEV"),
]


sales_tech = (
    wide.reset_index()
        .melt(id_vars="Year", var_name="tech_raw", value_name="vehicles")
        .assign(
            tech=lambda d: d["tech_raw"].replace({"CNG": "Gases", "PHEV": "Hybrid"})
        )[["Year", "tech", "vehicles"]]
)

per_vehicle = {metal: {} for metal in methods_map}

for metal, method in methods_map.items():
    print(f"Calculating LCA coefficients for {metal}")
    # init LCA once per metal
    first_fu, first_tech = tech_fus[0]
    LCA = EdgeLCIA(demand=first_fu, method=method)
    LCA.lci()
    LCA.apply_strategies()
    LCA.evaluate_cfs()
    LCA.lcia()

    # capture first tech
    df = LCA.generate_cf_table().copy()
    if "consumer cpc code" not in df.columns and "consumer cpc" in df.columns:
        df["consumer cpc code"] = df["consumer cpc"].str.extract(r"^\s*(\d+)")
    if "consumer cpc code" in df.columns:
        df["lifetime"] = df["consumer cpc code"].map(code_to_lifetime)
    df.loc[df["consumer name"] == "Glider lightweighting", "lifetime"] = 12
    per_vehicle[metal][first_tech] = df[["amount", "lifetime"]].copy()

    # redo for others
    for fu, tech in tech_fus[1:]:
        LCA.redo_lcia(fu)
        df = LCA.generate_cf_table().copy()
        if "consumer cpc code" not in df.columns and "consumer cpc" in df.columns:
            df["consumer cpc code"] = df["consumer cpc"].str.extract(r"^\s*(\d+)")
        if "consumer cpc code" in df.columns:
            df["lifetime"] = df["consumer cpc code"].map(code_to_lifetime)
        df.loc[df["consumer name"] == "Glider lightweighting", "lifetime"] = 12
        per_vehicle[metal][tech] = df[["amount", "lifetime"]].copy()


flows_by_metal = {}

for metal, perveh_by_tech in per_vehicle.items():
    out_frames = []
    for tech, exch in perveh_by_tech.items():
        if exch.empty:
            continue
        s = sales_tech[sales_tech["tech"] == tech]
        if s.empty:
            continue

        # cross-join exchanges with yearly sales for this tech
        tmp = exch.copy()
        tmp["key"] = 1
        s_ = s.copy()
        s_["key"] = 1
        merged = tmp.merge(s_, on="key", how="left").drop(columns="key")

        merged["total_amount"] = merged["amount"] * merged["vehicles"]
        out_frames.append(merged)

    if not out_frames:
        flows_by_metal[metal] = pd.DataFrame(columns=["Year", "sales", "return", "stock_in_use"])
        continue

    df_yearly = pd.concat(out_frames, ignore_index=True)

    # ensure proper types
    df_yearly["Year"] = df_yearly["Year"].astype(int)
    df_yearly["lifetime"] = df_yearly["lifetime"].fillna(0).astype(int)

    # inflow/outflow/stock
    df_yearly["return_year"] = df_yearly["Year"] + df_yearly["lifetime"]

    returns = (
        df_yearly.groupby("return_year", as_index=False)["total_amount"]
        .sum().rename(columns={"return_year": "Year", "total_amount": "return"})
    )
    sales = (
        df_yearly.groupby("Year", as_index=False)["total_amount"]
        .sum().rename(columns={"total_amount": "sales"})
    )

    stock_flow = pd.merge(sales, returns, on="Year", how="outer").fillna(0.0)
    stock_flow = stock_flow.sort_values("Year")
    stock_flow["stock_in_use"] = stock_flow["sales"].cumsum() - stock_flow["return"].cumsum()

    # scale (assumes amounts are kg → ktons)
    stock_flow.loc[:, "sales":] = stock_flow.loc[:, "sales":] / 1e6

    flows_by_metal[metal] = stock_flow

n = max(1, len(methods_map))
rows, cols = 2, 2
fig, axes = plt.subplots(rows, cols, figsize=(12, 9), sharex=True)
axes = np.array(axes).reshape(rows, cols)

metal_list = list(methods_map.keys())
for idx in range(rows * cols):
    r, c = divmod(idx, cols)
    ax = axes[r, c]
    if idx >= n:
        ax.axis("off")
        continue

    metal = metal_list[idx]
    sf = flows_by_metal[metal]

    if sf.empty:
        ax.text(0.5, 0.5, f"No data for {metal}", ha="center", va="center")
        ax.set_axis_off()
        continue

    ax.plot(sf["Year"], sf["sales"],        label=f"{metal} inflow",  linewidth=2)
    ax.plot(sf["Year"], sf["return"],       label=f"{metal} outflow", linewidth=2)
    ax.plot(sf["Year"], sf["stock_in_use"], label=f"{metal} stock",   linewidth=2.5, linestyle="--", color="black")

    ax.set_title(f"{metal}: inflow / outflow / stock")
    ax.set_ylabel("ktons")
    ax.grid(True, linestyle=":", alpha=0.6)
    ax.legend(fontsize=9, loc="upper right")

for ax in axes[-1]:
    ax.set_xlabel("Year")

fig.tight_layout()
plt.show()


In [None]:
# check values for platinum
df.sort_values(by="impact", ascending=False)