In [2]:
import math
import numpy as np
import os
os.chdir('../')

# EF Calc Scientific

In [7]:
from src.emission_factor_calculator import EmissionFactorCalculator

In [23]:
class SensitivityEmissionFactor:
    def __init__(self, farm_data, operation_mode = 'farmer'):
        self.farm_data = farm_data
        self.variables = list(farm_data['climate_data'].keys()) + list(farm_data['modifiers'].keys())
        self.mode = operation_mode
        self.results = {}
        self.output = {}

    def perform_analysis(self):
        # Iterating over all variables
        for variable in self.variables:
            # Initialize dictionary for each variable to store lists of results
            self.results[variable] = {}
            # Extract array values for the current variable
            values_array = self.farm_data['climate_data'].get(variable, []) + self.farm_data['modifiers'].get(variable, [])
            
            # Initialize results storage for this variable
            init_dict = {}
            for index in range(len(values_array)):
                modified_data = self.prepare_data_for_efc(variable, values_array[index])
                efc = EmissionFactorCalculator(modified_data)
                ef_results = efc.get_ef()
                
                # Initialize dictionary to collect results by ef type
                if index == 0:  # Setup the dictionary with keys and empty lists
                    for key in ef_results.keys():
                        init_dict[key] = []
                
                # Append results to the corresponding key
                for key, value in ef_results.items():
                    init_dict[key].append(value)
            
            # Store aggregated results
            for key, value_list in init_dict.items():
                self.results[variable][key] = np.array(value_list)
        
        return self.results
    
    def get_result(self):
        output_temp = self.perform_analysis()
        if self.mode == 'farmer':
            self.output = output_temp['P']
        elif self.mode == 'scientific':
            self.output = output_temp
            
        return self.output

    def prepare_data_for_efc(self, selected_variable, value):
        data = {'climate_data': {}, 'modifiers': {}}
        # Initialize all variables with their default values
        for var in self.variables:
            if var in self.farm_data['climate_data']:
                data['climate_data'][var] = self.farm_data['climate_data'][var][0]
            if var in self.farm_data['modifiers']:
                data['modifiers'][var] = self.farm_data['modifiers'][var][0]
        
        # Replace the value of the selected variable with the current value from its array
        if selected_variable in data['climate_data']:
            data['climate_data'][selected_variable] = value
        elif selected_variable in data['modifiers']:
            data['modifiers'][selected_variable] = value
        
        return data

In [24]:
farm_data = {'climate_data': {'P': [652,653,654], 'PE': [652,653,654], 'FR_Topo': [11.71,11.71,11.71]},
             'modifiers': {'RF_AM': [1.0,1.0,1.0], 'RF_CS': [1.0,1.0,1.0], 'RF_NS': [1.0,1.0,1.0], 'RF_Till': [1.0,1.0,1.0], 'RF_TX': [1.0,1.0,1.0]}}
sa = SensitivityEmissionFactor(farm_data)
results = sa.get_result()

results

{'EF_CT_P': array([0.01721731, 0.01731365, 0.01741053]),
 'EF_CT_PE': array([0.01721731, 0.01721731, 0.01721731]),
 'EF_Topo': array([0.01721731, 0.01731365, 0.01741053]),
 'EF': array([0.0266935 , 0.02684287, 0.02699307])}

In [25]:
farm_data = {'climate_data': {'P': [652,653,654], 'PE': [652,653,654], 'FR_Topo': [11.71,11.71,11.71]},
             'modifiers': {'RF_AM': [1.0,1.0,1.0], 'RF_CS': [1.0,1.0,1.0], 'RF_NS': [1.0,1.0,1.0], 'RF_Till': [1.0,1.0,1.0], 'RF_TX': [1.0,1.0,1.0]}}
sa = SensitivityEmissionFactor(farm_data, operation_mode = 'scientific')
results = sa.perform_analysis()

results

{'P': {'EF_CT_P': array([0.01721731, 0.01731365, 0.01741053]),
  'EF_CT_PE': array([0.01721731, 0.01721731, 0.01721731]),
  'EF_Topo': array([0.01721731, 0.01731365, 0.01741053]),
  'EF': array([0.0266935 , 0.02684287, 0.02699307])},
 'PE': {'EF_CT_P': array([0.01721731, 0.01721731, 0.01721731]),
  'EF_CT_PE': array([0.01721731, 0.01731365, 0.01741053]),
  'EF_Topo': array([0.01721731, 0.01722859, 0.01723994]),
  'EF': array([0.0266935 , 0.02671099, 0.02672858])},
 'FR_Topo': {'EF_CT_P': array([0.01721731, 0.01721731, 0.01721731]),
  'EF_CT_PE': array([0.01721731, 0.01721731, 0.01721731]),
  'EF_Topo': array([0.01721731, 0.01721731, 0.01721731]),
  'EF': array([0.0266935, 0.0266935, 0.0266935])},
 'RF_AM': {'EF_CT_P': array([0.01721731, 0.01721731, 0.01721731]),
  'EF_CT_PE': array([0.01721731, 0.01721731, 0.01721731]),
  'EF_Topo': array([0.01721731, 0.01721731, 0.01721731]),
  'EF': array([0.0266935, 0.0266935, 0.0266935])},
 'RF_CS': {'EF_CT_P': array([0.01721731, 0.01721731, 0.0172

# Emission Calc Scientific

In [26]:
from src.emission_calculator import EmissionCalculator

In [27]:
farm_data = {'climate_data': {'P': [652,653,654], 'PE': [652,653,654], 'FR_Topo': [11.71,11.71,11.71]},
             'modifiers': {'RF_AM': [1.0,1.0,1.0], 'RF_CS': [1.0,1.0,1.0], 'RF_NS': [1.0,1.0,1.0], 'RF_Till': [1.0,1.0,1.0], 'RF_TX': [1.0,1.0,1.0]}}
sa = SensitivityEmissionFactor(farm_data)
ef_data = sa.perform_analysis()

n_data = {'yield': {'n_crop_residue': [1,2,3]}, 'moisture': {'n_crop_residue': [1,2,3]}, 'carbon_concentration': {'n_crop_residue': [11,22,33]}}

In [28]:
class SensitivityEmission:
    def __init__(self, ef_data, n_data, operation_mode = 'farmer'):
        self.ef_data = ef_data
        self.n_data = n_data
        self.variables = list(ef_data.keys()) + list(n_data.keys())
        self.mode = operation_mode
        self.results = {}
        self.output = {}

    def perform_analysis(self):
        # Iterating over all variables
        for variable in self.variables:
            # Initialize dictionary for each variable to store lists of results
            self.results[variable] = {}
            # Extract array values for the current variable
            if variable in self.ef_data.keys():
                values_array = self.ef_data.get(variable).get('EF')
            elif variable in self.n_data.keys():
                values_array = self.n_data.get(variable).get('n_crop_residue')
            
            # Initialize results storage for this variable
            init_dict = {}
            for index in range(len(values_array)):
                modified_ef = self.prepare_ef_input_for_ec(variable, values_array[index])
                modified_n = self.prepare_n_input_for_ec(variable, values_array[index])
                ec = EmissionCalculator(modified_ef, modified_n)
                emission_results = ec.get_emission()
                
                # Initialize dictionary to collect results
                if index == 0:  # Setup the dictionary with keys and empty lists
                    for key in emission_results.keys():
                        init_dict[key] = []
                
                # Append results to the corresponding key
                for key, value in emission_results.items():
                    init_dict[key].append(value)
            
            # Store aggregated results
            for key, value_list in init_dict.items():
                self.results[variable][key] = np.array(value_list)
        
        return self.results
    
    def get_result(self):
        output_temp = self.perform_analysis()
        if self.mode == 'farmer':
            self.output = output_temp['P']
        elif self.mode == 'scientific':
            self.output = output_temp
            
        return self.output
    
    def prepare_ef_input_for_ec(self, selected_variable, value):

        ef_input = {'EF': self.ef_data.get('P').get('EF')[0]}
        if selected_variable in self.ef_data:
            ef_input['EF'] = value
        return ef_input


    def prepare_n_input_for_ec(self, selected_variable, value):
        n_input = {'n_crop_residue': self.n_data.get('moisture').get('n_crop_residue')[0]}
        
        if selected_variable in self.n_data:
            n_input['n_crop_residue'] = value
        return n_input  
    

In [30]:
sa_emission = SensitivityEmission(ef_data, n_data)
emission_data = sa_emission.get_result()

emission_data

{'n_crop_direct': array([0.0266935 , 0.02684287, 0.02699307]),
 'no2_crop_direct': array([0.04194693, 0.04218165, 0.04241768]),
 'co2_crop_direct': array([11.45151323, 11.51559129, 11.5800279 ])}

In [31]:
sa_emission = SensitivityEmission(ef_data, n_data, operation_mode = 'scientific')
emission_data = sa_emission.get_result()

emission_data

{'P': {'n_crop_direct': array([0.0266935 , 0.02684287, 0.02699307]),
  'no2_crop_direct': array([0.04194693, 0.04218165, 0.04241768]),
  'co2_crop_direct': array([11.45151323, 11.51559129, 11.5800279 ])},
 'PE': {'n_crop_direct': array([0.0266935 , 0.02671099, 0.02672858]),
  'no2_crop_direct': array([0.04194693, 0.04197442, 0.04200206]),
  'co2_crop_direct': array([11.45151323, 11.45901677, 11.4665623 ])},
 'FR_Topo': {'n_crop_direct': array([0.0266935, 0.0266935, 0.0266935]),
  'no2_crop_direct': array([0.04194693, 0.04194693, 0.04194693]),
  'co2_crop_direct': array([11.45151323, 11.45151323, 11.45151323])},
 'RF_AM': {'n_crop_direct': array([0.0266935, 0.0266935, 0.0266935]),
  'no2_crop_direct': array([0.04194693, 0.04194693, 0.04194693]),
  'co2_crop_direct': array([11.45151323, 11.45151323, 11.45151323])},
 'RF_CS': {'n_crop_direct': array([0.0266935, 0.0266935, 0.0266935]),
  'no2_crop_direct': array([0.04194693, 0.04194693, 0.04194693]),
  'co2_crop_direct': array([11.45151323

## 3. Calculate emission factor

In [6]:
class EmissionFactorCalculator:

    def __init__(self ,farm_data):
        ## validate input data
        self.validate_input(farm_data)
        ## initialize instance variables with validated data
        self.data = farm_data
        self.P = farm_data['climate_data']['P']
        self.PE = farm_data['climate_data']['PE']
        self.FR_Topo = farm_data['climate_data']['FR_Topo']
        self.RF_TX = farm_data['modifiers']['RF_TX']
        self.RF_NS = farm_data['modifiers']['RF_NS']
        self.RF_till = farm_data['modifiers']['RF_Till']
        self.RF_CS = farm_data['modifiers']['RF_CS']
        self.RF_AM = farm_data['modifiers']['RF_AM']

    def validate_input(self, farm_data):
        ## data should include all required fields and should be numbers

        required_climate_keys = ['P', 'PE', 'FR_Topo']
        for key in required_climate_keys:
            if key not in farm_data['climate_data']:
                raise ValueError(f"Missing required climate data key: {key}")
            if not isinstance(farm_data['climate_data'][key], (int, float)):
                raise TypeError(f"Value for climate_data[{key}] must be a number (int or float)")
    
        required_modifiers_keys = ['RF_TX', 'RF_NS', 'RF_Till', 'RF_CS', 'RF_AM']
        for key in required_modifiers_keys:
            if key not in farm_data['modifiers']:
                raise ValueError(f"Missing required modifiers key: {key}")
            if not isinstance(farm_data['modifiers'][key], (int, float)):
                raise TypeError(f"Value for modifiers[{key}] must be a number (int or float)")    

    def calculate_ef_ct(self):
        ## calculate EF_CT_P and EF_CT_PE based on P and PE 
        ## equation 2.5.1-1 and 2.5.1-2
        self.EF_CT_P = math.exp(0.00558 * self.P - 7.7)
        self.EF_CT_PE = math.exp(0.00558 * self.PE - 7.7)

        return self.EF_CT_P, self.EF_CT_PE
    
    def calculate_ef_topo(self):
        ## calculate EF_Topo
        ## equation 2.5.2-1, 2.5.2-2, and 2.5.2-3
        ## ensure EF_CT_P and EF_CT_PE are calculated
        if not hasattr(self, 'EF_CT_P') or not hasattr(self, 'EF_CT_PE'):
            self.calculate_ef_ct()

        intermediate_factor = self.P/self.PE

        if intermediate_factor > 1:
            self.EF_Topo = self.EF_CT_P
        elif self.P == self.PE:
            self.EF_Topo = self.EF_CT_PE
        else:
            self.EF_Topo = ((self.EF_CT_PE * self.FR_Topo/100) + (self.EF_CT_P * (1 - self.FR_Topo/100)))

        return self.EF_Topo
    
    def calculate_emission_factor(self):
        ## calculate emission factor
        ## equation 2.5.3-2 and 2.5.4-1
        ## ensure EF_CT_P and EF_CT_PE are calculated
        if not hasattr(self, 'EF_Topo'):
            self.calculate_ef_topo()
        
        EF_base = (self.EF_Topo*self.RF_TX) * (1/0.645)

        self.EF = EF_base * self.RF_NS * self.RF_till * self.RF_CS * self.RF_AM

        return self.EF
            
    def get_ef(self):
        if not hasattr(self, 'EF'):
            self.calculate_emission_factor()

        self.data_dict = {'EF_CT_P': self.EF_CT_P,
                          'EF_CT_PE': self.EF_CT_PE,
                          'EF_Topo': self.EF_Topo,
                          'EF': self.EF}
        
        return self.data_dict


In [9]:
test_data = {
            'climate_data' : {
                'P': 159,
                'PE': 678,
                'FR_Topo': 7.57
            },
            'modifiers' : {
                'RF_TX': 1,
                'RF_NS': 0.84,
                'RF_Till': 1,
                'RF_CS': 1,
                'RF_AM': 1
            }
}

ef_calculator = EmissionFactorCalculator(test_data)

print('EF_Topo for the farm: ', ef_calculator.calculate_ef_topo())
print('Emission Factor for the farm: ', ef_calculator.calculate_emission_factor())

test_farm_ef = ef_calculator.get_ef()
test_farm_ef

EF_Topo for the farm:  0.0025232347031386142
Emission Factor for the farm:  0.0032860731017619158


{'EF_CT_P': 0.001099631670775916,
 'EF_CT_PE': 0.019905484145844587,
 'EF_Topo': 0.0025232347031386142,
 'EF': 0.0032860731017619158}

## 4. Calculate CRN nitrogen emission 

In [13]:
class EmissionCalculator:
    def __init__(self, ef_data, n_data):
        ## validate input data
        self.validate_input(ef_data, n_data)
        ## initialize instance variables with validated data
        self.ef_data = ef_data
        self.n_data = n_data
        self.EF = ef_data['EF']
        self.n_crop_residue = n_data['n_crop_residue']
        
    
    def validate_input(self, ef_data, n_data):
        ## data should include all required fields and should be numbers
        required_ef_key = ['EF']
        for key in required_ef_key:
            if key not in ef_data:
                raise ValueError(f"Missing required key: {key}")
            if not isinstance(ef_data[key], (int, float)):
                raise TypeError(f"Value for {key} must be a number (int or float)")
            
        required_n_keys = ['n_crop_residue']
        for key in required_n_keys:
            if key not in n_data:
                raise ValueError(f"Missing required key: {key}")
            if not isinstance(n_data[key], (int, float)):
                raise TypeError(f"Value for {key} must be a number (int or float)")            
            
            
    def calculate_n_crn_direct(self):
        ## calculate n_crn_direct
        ## equation 2.6.5-2
        self.n_crn_direct = self.n_crop_residue * self.EF

        return self.n_crn_direct
    
    def calculate_n_other_direct(self):
        ## set values for other sources of n_direct
        ## currently set to 0, cen be modified to include other sources
        self.n_sn_direct = 0
        self.n_crnmin_direct = 0
        self.n_on_direct = 0
    
    def calculate_n_crop_direct(self):
        ## calculate n_crop_direct
        ## equation 2.6.9-1
        if not hasattr(self, 'n_sn_direct') or not hasattr(self, 'n_crnmin_direct') or not hasattr(self, 'n_on_direct'):
            self.calculate_n_other_direct()
        if not hasattr(self, 'n_crn_direct'):
            self.calculate_n_crn_direct()

        self.n_crop_direct = self.n_sn_direct + self.n_crnmin_direct + self.n_on_direct + self.n_crn_direct

        return self.n_crop_direct
    
    def convert_n_crop_direct_to_n2o(self):
        ## calculate n02_crop_direct using n_crop_direct
        ## using constant 44/28
        if not hasattr(self, 'n_crop_direct'):
            self.calculate_n_crop_direct()

        N_TO_NO2 = 44/28

        self.no2_crop_direct = self.n_crop_direct * N_TO_NO2

        return self.no2_crop_direct
    
    def calculate_n2o_crop_direct_to_co2e(self):
        ## calculate co2 equavilent for n2o_crop_direct
        ## using constant 273, the same value used by Holos
        if not hasattr(self, 'no2_crop_direct'):
            self.convert_n_crop_direct_to_n2o()

        NO2_TO_CO2 = 273

        self.co2_crop_direct = self.no2_crop_direct * NO2_TO_CO2

        return self.co2_crop_direct
    
    def get_emission(self):
        if not hasattr(self, 'co2_crop_direct'):
            self.calculate_n2o_crop_direct_to_co2e()

        self.data_dict = {
            'n_crop_direct': self.n_crop_direct,
            'no2_crop_direct': self.no2_crop_direct,
            'co2_crop_direct': self.co2_crop_direct
        }

        return self.data_dict



In [14]:
# test_data = {
#     'n_crop_residue': 560.2468642105264,
#     'EF': 0.0032860731017619158
# }

test_farm_ef
test_farm_n = {
    'n_crop_residue': 560.2468642105264
}

emission_calculator = EmissionCalculator(test_farm_ef, test_farm_n)

print('Crop N direct: ', emission_calculator.calculate_n_crop_direct())
print('Crop NO2 direct: ', emission_calculator.convert_n_crop_direct_to_n2o())
print('Crop CO2e direct: ', emission_calculator.calculate_n2o_crop_direct_to_co2e())

test_farm_emission = emission_calculator.get_emission()
test_farm_emission

Crop N direct:  1.8410121508286712
Crop NO2 direct:  2.8930190941593406
Crop CO2e direct:  789.7942127055


{'n_crop_direct': 1.8410121508286712,
 'no2_crop_direct': 2.8930190941593406,
 'co2_crop_direct': 789.7942127055}