# Contribution analysis

In [1]:
import brightway2 as bw
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from lci_to_bw2 import lci_to_bw2

In [2]:
bw.projects.set_current('contr-analysis-fg') # a database where I have ecoinvent
bw.databases

Databases dictionary with 1 object(s):
	fg-ca-exmpl

# Create a foreground system

in the cell below, removing the lines that are commented out will give a different system, more complex

In [131]:
bw2_db = {('fg-ca-exmpl', 'Activity A'): {'name': 'Activity A',
  'unit': 'kilogram',
  'type': 'process',
  'exchanges': [{'input': ('fg-ca-exmpl', 'Activity A'),
    'amount': 1.0,
    'unit': 'kilogram',
    'type': 'production',
    'uncertainty type': 0},
   {'input': ('fg-ca-exmpl', 'Activity B'),
    'amount': 0.5,
    'unit': 'kilogram',
    'type': 'technosphere',
    'uncertainty type': 0},
   {'input': ('fg-ca-exmpl', 'Activity C'),
    'amount': 0.5,
    'unit': 'kilogram',
    'type': 'technosphere',
    'uncertainty type': 0},
 #  {'input': ('fg-ca-exmpl', 'Carbon Dioxide'),
 #   'amount': 1.0,
 #   'unit': 'kilogram',
 #   'type': 'biosphere',
 #   'uncertainty type': 0}
               ]},
 ('fg-ca-exmpl', 'Activity B'): {'name': 'Activity B',
  'unit': 'kilogram',
  'type': 'process',
  'exchanges': [{'input': ('fg-ca-exmpl', 'Activity B'),
    'amount': 10.0,
    'unit': 'kilogram',
    'type': 'production',
    'uncertainty type': 0},
 #  {'input': ('fg-ca-exmpl', 'Activity C'),
 #   'amount': 10.0,
 #   'unit': 'kilogram',
 #   'type': 'technosphere',
 #   'uncertainty type': 0},
   {'input': ('fg-ca-exmpl', 'Carbon Dioxide'),
    'amount': 12.5,
    'unit': 'kilogram',
    'type': 'biosphere',
    'uncertainty type': 0}]},
 ('fg-ca-exmpl', 'Activity C'): {'name': 'Activity C',
  'unit': 'kilogram',
  'type': 'process',
  'exchanges': [{'input': ('fg-ca-exmpl', 'Activity C'),
    'amount': 15.0,
    'unit': 'kilogram',
    'type': 'production',
    'uncertainty type': 0},
   {'input': ('fg-ca-exmpl', 'Carbon Dioxide'),
    'amount': 15.0,
    'unit': 'kilogram',
    'type': 'biosphere',
    'uncertainty type': 0}]},
 ('fg-ca-exmpl', 'Carbon Dioxide'): {'name': 'Carbon Dioxide',
  'unit': 'kilogram',
  'type': 'biosphere'}}

In [134]:
del(bw.databases['fg-ca-exmpl'])
fgca_db = bw.Database('fg-ca-exmpl')
fgca_db.write(bw2_db)

Writing activities to SQLite3 database:
0% [####] 100% | ETA: 00:00:00
Total time elapsed: 00:00:00


Title: Writing activities to SQLite3 database:
  Started: 12/08/2023 16:12:44
  Finished: 12/08/2023 16:12:44
  Total time elapsed: 00:00:00
  CPU %: 85.10
  Memory %: 1.25


In [135]:
# check it worked
activities = [('fg-ca-exmpl', act['code']) for act in bw.Database('fg-ca-exmpl')]
activities

[('fg-ca-exmpl', 'Activity B'),
 ('fg-ca-exmpl', 'Carbon Dioxide'),
 ('fg-ca-exmpl', 'Activity C'),
 ('fg-ca-exmpl', 'Activity A')]

In [136]:
for act in bw.Database('fg-ca-exmpl'):
    print(act.as_dict())
    for exc in list(act.exchanges()):
        print(exc)
    print('---')

{'name': 'Activity B', 'unit': 'kilogram', 'type': 'process', 'database': 'fg-ca-exmpl', 'code': 'Activity B'}
Exchange: 10.0 kilogram 'Activity B' (kilogram, None, None) to 'Activity B' (kilogram, None, None)>
Exchange: 12.5 kilogram 'Carbon Dioxide' (kilogram, None, None) to 'Activity B' (kilogram, None, None)>
---
{'name': 'Activity A', 'unit': 'kilogram', 'type': 'process', 'database': 'fg-ca-exmpl', 'code': 'Activity A'}
Exchange: 1.0 kilogram 'Activity A' (kilogram, None, None) to 'Activity A' (kilogram, None, None)>
Exchange: 0.5 kilogram 'Activity B' (kilogram, None, None) to 'Activity A' (kilogram, None, None)>
Exchange: 0.5 kilogram 'Activity C' (kilogram, None, None) to 'Activity A' (kilogram, None, None)>
---
{'name': 'Activity C', 'unit': 'kilogram', 'type': 'process', 'database': 'fg-ca-exmpl', 'code': 'Activity C'}
Exchange: 15.0 kilogram 'Activity C' (kilogram, None, None) to 'Activity C' (kilogram, None, None)>
Exchange: 15.0 kilogram 'Carbon Dioxide' (kilogram, None, 

In [137]:
myLCIAdata = [[('fg-ca-exmpl', 'Carbon Dioxide'), 1.0]]

method_key = ('dummy GWI method','none','none')
my_method = bw.Method(method_key)
my_method.validate(myLCIAdata)
my_method.register() 
my_method.write(myLCIAdata)
my_method.load()

[[('fg-ca-exmpl', 'Carbon Dioxide'), 1.0]]

In [140]:
fu_amount = 100

In [141]:
#mymethod = ('IPCC 2013', 'climate change', 'global warming potential (GWP100)')
mymethod = ('dummy GWI method','none','none')
act = fgca_db.get('Activity A')
functional_unit = {act: fu_amount} # the functional unit is 100 kg of A
lca = bw.LCA(functional_unit, mymethod)
lca.lci()
lca.lcia()
tot_score = lca.score
print("total score is", tot_score)

total score is 112.5


This is the total score of the system and we remember it for later

# Contribution analysis

### Super simple

Make a list of activities and the right amount, iterate throught those.

In [158]:
CA_dict = {'Activity A' : 1 * fu_amount  , 
           'Activity B' : 0.5 * fu_amount , # the total amount of B used in the system
           'Activity C' : 0.5 * fu_amount } # the total amount of C used in the system

In [159]:
#Use this if you are using the nested version of the system
#{'Activity A' : 1 * fu_amount, 
# 'Activity B' : 0.5 * fu_amount , # the total amount of B used in the system
# 'Activity C' : (0.5 + 0.5 * 10/10) * fu_amount } # the total amount of C used in the system (0.5 by A and 0.5 by B)

In [160]:
CA_results = []

for i in CA_dict:
    
    act = fgca_db.get(i)
    functional_unit = {act: CA_dict[i]}
    lca = bw.LCA(functional_unit, mymethod)
    lca.lci()
    lca.lcia()
    CA_results.append(lca.score)
    

CA_simple = pd.DataFrame(dict(zip([i for i in CA_dict], CA_results)), index = ['GWI']).T
CA_simple

Unnamed: 0,GWI
Activity A,112.5
Activity B,62.5
Activity C,50.0


The problem is, calculating the right amount might be difficult depending on how "nested" is this system. 
In this case it a not nested. But if removing the lines commented out then both A and B will require C so this gets complicated and the approach is going to give errors very easily on more complex systems...

## A better approach based on LCA matrix algebra

We use the scaling vector that tells how much each ativity is needed to perform the FU

In [161]:
# Initiate the inventory to get some scaling factors. These will be the scaling factor for the FU here chosen
fu_ca = bw.Database('fg-ca-exmpl').get('Activity A') 
lca = bw.LCA({fu_ca: fu_amount },) # contribution analysis for this FU only (and only that!)
lca.lci()

In [162]:
# list of all foreground activities
acts = [act['code'] for act in bw.Database('fg-ca-exmpl') if act['type'] == 'process']

In [163]:
# dictionary of scaling factors from the supply array
scaling = {}
for act in acts:
    index = lca.activity_dict[('fg-ca-exmpl', act)] # index is to find the activities in the tech matrix
    scaling[act] = lca.supply_array[index] # this is the amount of activity used, we need it for contribution analysis

scaling

{'Activity A': 100.0, 'Activity C': 3.3333333333333335, 'Activity B': 5.0}

In [164]:
# Dictionary of reference flows, because they are not always normalized!
refflows = {}
for act in acts:
    for exc in list(bw.Database('fg-ca-exmpl').get(act).exchanges()):
        if exc['type'] == 'production':
            refflows[act] = (exc['amount'])
refflows

{'Activity A': 1.0, 'Activity C': 15.0, 'Activity B': 10.0}

In this case all refernece flows are unitary so we have no problem, but it might not be always the case.

In [165]:
# getting the scaling factors normalized by reference flow
scaling_norm = dict(zip(acts,[scaling[i]*refflows[i] for i in acts]))
scaling_norm

{'Activity A': 100.0, 'Activity C': 50.0, 'Activity B': 50.0}

In [166]:
# again, normalised scaling factors are scaling factors times reference flow
factors = pd.DataFrame([scaling, refflows, scaling_norm], index = ['Scaling','Reference flow', 'Scaling normalized']).T
factors

Unnamed: 0,Scaling,Reference flow,Scaling normalized
Activity A,100.0,1.0,100.0
Activity C,3.333333,15.0,50.0
Activity B,5.0,10.0,50.0


Calculate the impact of 1 unit of each process and save it

In [167]:
def dolcacalc(myact, mydemand, mymethod):
    my_fu = {myact: mydemand} 
    lca = bw.LCA(my_fu, mymethod)
    lca.lci()
    lca.lcia()
    return lca.score

def getLCAresults(acts, mymethod):
    
    all_activities = []
    results = []
    for a in acts:
        act = bw.Database('fg-ca-exmpl').get(a)
        all_activities.append(act['name'])
        results.append(dolcacalc(act,1,mymethod)) # 1 stays for one unit of each process
        #print(act['name'])
     
    results_dict = dict(zip(all_activities, results))
    
    return results_dict

In [168]:
unitary_results = []
unitary_results_all_acts = getLCAresults(acts,mymethod) # total impact per unitary value of each activity
unitary_results.append(unitary_results_all_acts)
unitary_results # the impact of one unit of each process
my_output = pd.DataFrame(unitary_results)
my_output.index = ['GWI']
my_output.T.sort_index().head()

Unnamed: 0,GWI
Activity A,1.125
Activity B,1.25
Activity C,1.0


here below we multiply the unitary impact by the normalised scaling vector and we obtain the absolute impact of each activity considered base don how much this activity is required to provide the cuntional unit. **This is the contribution analysis** in absolute terms, then further below we can calcualte is as a fraction of the total impact.

In [169]:
scores = pd.DataFrame(my_output.T.sort_index().values * pd.DataFrame(factors['Scaling normalized']).sort_index().values, 
             index = my_output.T.sort_index().index, 
             columns = my_output.index)
scores

Unnamed: 0,GWI
Activity A,112.5
Activity B,62.5
Activity C,50.0


The sum of B + C does gives A  **only in this particular case**

If  A has also some impacts on its own (direct emission and input form background) and the contribution from B includes impacts that derive from C (nested system) then this will **not** be the case

Also note that this is the same as calculated before witht he simple approach... but here we will not risk to make mistakes

In [170]:
#need to use some rounding to check this...using round at the 10th digit after the comma
sign = 10
[round(i, sign) for i in scores['GWI'].values] == [round(i,sign) for i in CA_simple['GWI'].values]


True

In [171]:
scores / tot_score # to calcualte in percentage

Unnamed: 0,GWI
Activity A,1.0
Activity B,0.555556
Activity C,0.444444


#### Note: this is only contribution analysis for the FU specified when initializing the LCI, to do it for a different one a different set of scaling factors needs to be calculated