# Code for calculating and storing as HDF5 Monte Carlo LCA results for an entire DB (aggregated results)

In [1]:
import brightway2 as bw
import os               # to use "operating system dependent functionality"
import numpy as np      # "the fundamental package for scientific computing with Python"
import pandas as pd     # "high-performance, easy-to-use data structures and data analysis tools" for Python
import csv
import stats_arrays
import scipy as sp
import pandas as pd
import pickle
import h5py
import time

In [None]:
from brightway2 import *
import numpy as np
import os
import multiprocessing as mp
from scipy.sparse.linalg import factorized, spsolve
from scipy import sparse
import time
import datetime
import pickle
import click
import scipy as sp
import h5py
from bw2calc.matrices import MatrixBuilder
from stats_arrays.random import MCRandomNumberGenerator

##################
# HDF5 functions #
##################

# All those functions work based on LCA objects from Brightway

# function to rebuild csr matrix from hdf5 storage

def hdf5_to_csr_matrix(hdf5_file,group_full_path):
    
    # Access hdf5 group of the csr info
    group=hdf5_file[group_full_path]
    
    #Rebuild csr matrix
    csr=sp.sparse.csr_matrix((group['data'][:],group['indices'][:],group['indptr'][:]), group.attrs['shape'])
    
    return csr;


#function to create list to store in hfd5 for LCA object _dict: biosphere_dict, activity_dict, product_dict
def hdf5_to_LCA_dict(hdf5_file,group_path):
    
    # Retrieve or create the groups and subgroups
    group=hdf5_file.require_group(group_path)
    
    #Retrieve Keys and Items
    keys_0=group.attrs['keys_0']
    keys_1_list=group['keys_1'][()].decode('UTF-8') #### Use .decode('UTF-8') to convert keys_1_list items for bytes to str ?
    items_list=group['values'][()]
    
    keys_list=[(keys_0,keys_1) for keys_1 in keys_1_list]
    
    #Rebuild LCA_dict
    LCA_dict={}
    
    LCA_dict=dict(zip(keys_list,items_list))
    
    return LCA_dict;


#Function to write csr matrix, _dict from LCA objects and any numpy.ndarray

def write_LCA_obj_to_HDF5_file(LCA_obj,hdf5_file,group_path):
    
    dict_names_to_check=['biosphere_dict', 'activity_dict', 'product_dict']
    
    #If object = A or B matrix
    if type(LCA_obj)==sp.sparse.csr.csr_matrix:
        #csr_matrix_to_hdf5(LCA_obj,hdf5_file,group_path)
        
        #### Direct copy of the function because the call to the function does not work... --> Works now!
        # Retrieve or create groups and subgroups
        group=hdf5_file.require_group(group_path)

        # Create datasets containing values of csr matrix
        group.create_dataset('data',data=LCA_obj.data,compression="gzip",dtype=np.float32)
        group.create_dataset('indptr',data=LCA_obj.indptr,compression="gzip")
        group.create_dataset('indices',data=LCA_obj.indices,compression="gzip")

        group.attrs['shape']=LCA_obj.shape
        ######
        
        
    
    #If object = ***_dict
    elif group_path.rsplit('/', 1)[1] in dict_names_to_check:
        LCA_dict_to_hdf5(LCA_obj,hdf5_file,group_path)
        
    else:
        
        #store as float32 if type is float64 to save space
        if LCA_obj.dtype == np.dtype('float64'):
            hdf5_file.create_dataset(group_path,data=LCA_obj,compression="gzip",dtype=np.float32)
            
        else:
            hdf5_file.create_dataset(group_path,data=LCA_obj,compression="gzip")
            
    
    return;


def h5py_dataset_iterator(g, prefix=''):
    for key in g.keys():
        item = g[key]
        path = '{}/{}'.format(prefix, key)
        if isinstance(item, h5py.Dataset): # test for dataset
            yield (path, item)
        elif isinstance(item, h5py.Group): # test for group (go down)
            yield from h5py_dataset_iterator(item, path)




#######################################
# Dependant LCI Monte Carlo functions #
#######################################



#Dependant LCA Monte Carlo for each activity and functional unit defined in functional_units = [{act.key: 1}]
def worker_process(project, job_id, worker_id, iterations,functional_units,hdf5_file_MC_LCI_results_path,hdf5_file_MC_LCA_results_path,impact_method_name_list):
    
    #Open the HDF5 files for each worker to write LCA results
    hdf5_file_MC_LCI_results=h5py.File(hdf5_file_MC_LCI_results_path,'r')
    hdf5_file_MC_LCA_results=h5py.File(hdf5_file_MC_LCA_results_path,'a')
    
    #Retrieve biosphere_dict and activity_dict which is the same for the entire database
    biosphere_dict=hdf5_to_LCA_dict(hdf5_file_MC_LCI_results,'/biosphere_dict')
    activity_dict=hdf5_to_LCA_dict(hdf5_file_MC_LCI_results,'/activity_dict')
    
    #Construct the CF useful info (cf_params and cf_rng) for all LCIA methods
    impact_method_dict={}
    
    for impact_method_name in impact_method_name_list:
        
        method_filepath = Method(impact_method_name).filepath_processed()

        cf_params, _, _, characterization_matrix = MatrixBuilder.build(
                    method_filepath,
                    "amount",
                    "flow",
                    "row",
                    row_dict=biosphere_dict,
                    one_d=True,
                )

        cf_rng = MCRandomNumberGenerator(cf_params, seed=None)
        
        impact_method_dict[impact_method_name]={}
        impact_method_dict[impact_method_name]['cf_params']=cf_params
        impact_method_dict[impact_method_name]['cf_rng']=cf_rng


    #LCA iterations
    for iteration in range(iterations):
        
        #Retrieve B matrix --> randomly choose an LCI iteration?
        
        biosphere_matrix=hdf5_to_csr_matrix(hdf5_file_MC_LCI_results,'/biosphere_matrix/'+str(iteration))
        
        #Regenerate the CF matrix for each iteration for all impact methods
        
        characterization_matrix_dict={}
        
        for impact_method_name in impact_method_dict:
            
            cf_params=impact_method_dict[impact_method_name]['cf_params']
            cf_rng=impact_method_dict[impact_method_name]['cf_rng']
            
            #For disaggregated results
            characterization_matrix = MatrixBuilder.build_diagonal_matrix(cf_params, biosphere_dict,"row", "row", new_data=cf_rng.next())#For disaggregated results
            
            #For aggregated results
            characterization_matrix_array=np.array(characterization_matrix.sum(1)) #sum sur axe 0 ou 1? Le vecteur est-il dans le bon sens?
            characterization_matrix_dict[impact_method_name]=characterization_matrix_array
            
        #Iterations per activity
        for act_index, fu in enumerate(functional_units):

            #Creating UUID for each activity
            actKey = list(fu.keys())[0][1]
            
            #Retrieve supply_array
            supply_array=hdf5_file_MC_LCI_results['/supply_array/'+actKey+'/'+str(iteration)]
            
            #Calculate aggregated inventory
            
            #For disaggregated results
            #count = len(activity_dict)
            #inventory = biosphere_matrix * sparse.spdiags([supply_array], [0], count, count) #For disaggregated results
            #inventory_array=np.array(inventory.sum(1)) #For aggregated results
            
            #For aggregated results --> check result
            inventory = biosphere_matrix * supply_array 
            
            #Calculate aggregated impact scores for all impact categories
            for impact_method_name in characterization_matrix_dict:
                
                characterization_matrix=characterization_matrix_dict[impact_method_name]
                
                impact_score= characterization_matrix * inventory
                
                #Store impact_score
                impact_score_path='/Uncertain LCI/'+actKey+'/'+str(impact_method_name)
                
                
        

        
        
            
        
    
    
    




def Dependant_LCA_Monte_Carlo_aggregated_results(project, database, iterations, cpus, hdf5_file_MC_LCI_results_path, path_for_saving,impact_method_name_list):
    
    projects.set_current(project)
    bw2setup()

    #Path the write the results
    BASE_OUTPUT_DIR = path_for_saving

    #ID to identify who and when was the calculation made
    now = datetime.datetime.now()
    job_id = "{}_{}-{}-{}_{}h{}".format(os.environ['COMPUTERNAME'],now.year, now.month, now.day, now.hour, now.minute)

    #Selection of activities for MC analysis
    db = Database(database)
    #activities = [activity for activity in db]
    act1=db.get('e929619f245df590fee5d72dc979cdd4')
    act2=db.get('bdf7116059abfcc6b8b9ade1a641e578')
    act3=db.get('c8c815c68836adaf964daaa001a638a3')
    activities = [act1,act2,act3]
    
    #Create objects to pass the functional units = 1 for each activity
    functional_units = [ {act.key: 1} for act in activities ]
    
    #Code to slipt the work between each CPUs of the computer (called workers). The work refers here to the dependant LCI MC for each activity 
    workers = []

    for worker_id in range(cpus):
        #Create or open the HDF5 file for each worker and write metadata
        hdf5_file_name="LCA_Dependant_Monte_Carlo_aggregated_results_worker"+str(worker_id)+".hdf5"
        hdf5_file_MC_results_path=BASE_OUTPUT_DIR+"\\"+hdf5_file_name

        hdf5_file_MC_results=h5py.File(hdf5_file_MC_results_path,'a')

        hdf5_file_MC_results.attrs['Database name']=db.name
        hdf5_file_MC_results.attrs['Worker ID']=worker_id
        hdf5_file_MC_results.attrs['Description']='HDF5 file containing all LCA dependant Monte Carlo results per activity/iteration'
        
        hdf5_file_MC_results.close()

        # Create child processes that can work apart from parent process
        child = mp.Process(target=worker_process, args=(projects.current, job_id, worker_id, iterations,functional_units,hdf5_file_MC_LCI_results_path,hdf5_file_MC_results_path,impact_method_name_list))
        workers.append(child)
        child.start()
        
    return;
      
        
        
#Useful when the code is run from the console to execute the main function
#if __name__ == '__main__':
#    Dependant_LCA_Monte_Carlo_aggregated_results()


  




In [2]:
bw.Method.filepath_processed

<function bw2data.data_store.ProcessedDataStore.filepath_processed>

In [3]:
ic_name=('IMPACTWorld+ - Endpoint - only with spatial variability - Four param beta integration - update august 15th 2017','Ecosystem Quality','Land transformation, biodiversity, GLO, with uncert')


In [4]:
dico={}
dico[ic_name]=0

In [5]:
dico

{('IMPACTWorld+ - Endpoint - only with spatial variability - Four param beta integration - update august 15th 2017',
  'Ecosystem Quality',
  'Land transformation, biodiversity, GLO, with uncert'): 0}

In [6]:
type(ic_name)

tuple

In [7]:
str(ic_name)

"('IMPACTWorld+ - Endpoint - only with spatial variability - Four param beta integration - update august 15th 2017', 'Ecosystem Quality', 'Land transformation, biodiversity, GLO, with uncert')"

In [17]:
from bw2calc.utils import (
    global_index,
    clean_databases,
    get_filepaths,
    load_arrays,
    mapping,
)

get_filepaths(bw.Method(ic_name),"method")

AssertionError: Can't find method object Brightway2 Method: IMPACTWorld+ - Endpoint - only with spatial variability - Four param beta integration - update august 15th 2017: Ecosystem Quality: Land transformation, biodiversity, GLO, with uncert

In [13]:
bw.Method(ic_name).filepath_processed()

'C:\\Users\\Laure\\AppData\\Local\\pylca\\Brightway3\\default.c21f969b5f03d33d43e04f8f136e7682\\processed\\impactworld-endpoint-only-with-spatial-variability-four-param-beta-integration-update-august-15th-2017el.f206e5ea37ed25b38d29df4d1e9d52ee.npy'