In [None]:
# 概率人体健康风险评估 (Probabilistic Human Health Risk Assessment)
import os
import pandas as pd
import numpy as np

# 计算 口服摄入 暴露剂量 (Calculate oral ingestion exposure dose)
def cal_add_oral_ingestion(C_i, IR, EF, ED, BW, AT):
    return (C_i/1000 * IR * EF * ED) / (BW * AT)

# 计算 吸入 暴露剂量 (Calculate inhalation exposure dose)
def cal_add_inhalation(C_i, DAIR, EF, ED, VF, BW, AT):
    return (C_i/1000 * DAIR * EF * ED * VF) / (BW * AT)

# 计算 皮肤接触 暴露剂量 (Calculate dermal contact exposure dose)
def cal_add_dermal(C_i, SA, F, PC, ET, EF, ED, BW, AT):
    return (C_i/1000 * SA * F * PC * ET * EF * ED) / (BW * AT) * 1e-3

# 计算 致癌风险 (Calculate cancer risk)
def cal_CR(add, SF):
    return add * SF

# 计算 非致癌风险 (Calculate non-cancer risk)
def cal_hq(add, RfD):
    RfD[RfD == 0] = np.nan  # RfD里有0 (RfD contains 0)
    return add / RfD


def write_csv(file_path='./write.csv', rowdata=[]):
    with open(file_path, 'a') as f:
        f.write(','.join([f"{data:.9f}" if isinstance(data, float) else str(data) for data in rowdata]))
        f.write('\n')

In [None]:
# 读取污染物参数 (Read pollutant parameters)
file_root_dir = os.path.dirname(os.path.realpath('__file__'))
file_path = os.path.join(file_root_dir, 'data', 'Pollutants toxicity.xlsx')
param_data = pd.read_excel(file_path, index_col=0)
param_data = param_data.drop(param_data.index[0])
print(param_data.tail())

labels = ['(0-5) years', '(6-10) years', '(11-18) years', '(19-70) years']
iloc_col = {  # (0-5)
    'IR': 6, 
    'EF': 10, 
    'ED': 14, 
    'BW': 18, 
    'AT': 22, 
    'DAIR': 35, 
    'VF': 47, 
    'SA': 68, 
    'F': 72, 
    'PC': 76, 
    'ET': 80, 
    
    'SF_o': 26, 
    'RfD_o': 30,
    'SF_i': 59, 
    'RfD_i': 63, 
    'SF_d': 100, 
    'RfD_d': 104, 
}

# 蒙特卡洛随机区间 (Monte Carlo random intervals)
exposure_data = pd.read_excel(os.path.join(file_root_dir, 'data', 'pollutant exposure parameters.xlsx'), index_col=0)
print(exposure_data.tail())

# 读取污染物浓度 (Read pollutant concentrations)
conc_name = 'Max_pollutant'
conc_data = pd.read_excel(os.path.join(file_root_dir, 'data', f'{conc_name}.xlsx'), index_col=0)
print(conc_data.tail())

             CAS                 Pollutants_name Abbreviation   污染物名称  Class  \
No.                                                                            
273.0   375-73-5   Perfluorobutane sulfonic acid         PFBS  全氟丁烷磺酸  PFSAs   
274.0   355-46-4   Perfluorohexane sulfonic acid        PFHxS  全氟己烷磺酸  PFSAs   
275.0   375-92-8  Perfluoroheptane sulfonic acid        PFHpS  全氟庚烷磺酸  PFSAs   
276.0  1763-23-1   Perfluorooctane sulfonic acid         PFOS  全氟辛烷磺酸  PFSAs   
277.0   335-77-3   Perfluorodecane sulfonic acid         PFDS  全氟癸烷磺酸  PFSAs   

        -    IR IR.1 IR.2 IR.3  ...  AT.10  AT.11  SFd SFd.1 SFd.2 SFd.3 RfDd  \
No.                             ...                                             
273.0 NaN  0.78    1  1.3    2  ...  25550  25550  NaN   NaN   NaN   NaN  NaN   
274.0 NaN  0.78    1  1.3    2  ...  25550  25550  NaN   NaN   NaN   NaN  NaN   
275.0 NaN  0.78    1  1.3    2  ...  25550  25550  NaN   NaN   NaN   NaN  NaN   
276.0 NaN  0.78    1  1.3    2  ..

In [None]:
from SALib.sample import sobol
from SALib.analyze import sobol as sobol_analyze
# 蒙特卡洛 样本数 (Monte Carlo sample size)
num_simulations = 10000

conc_len = len(conc_data.index)
print('conc_len: ', conc_len)

for idx in range(len(labels)):
    label = labels[idx]
    # # 蒙特卡洛 (Monte Carlo)
    IR_std, IR_mean = exposure_data.loc['GW ingestion rate (IR)', f'{label}_2'], exposure_data.loc['GW ingestion rate (IR)', f'{label}_1']
    IR = np.random.lognormal(IR_mean, IR_std, num_simulations)  # 水摄入率的对数正态分布（L/d） IR (Normal distribution of water intake rate (L/d) IR)
    EF = exposure_data.loc['Exposure frequency (EF)', f'{label}_2']   # 暴露频率（天/年）     EF (Exposure frequency (days/year) EF)
    ED_min, ED_max = exposure_data.loc['Exposure duration (ED)', f'{label}_2'], exposure_data.loc['Exposure duration (ED)', f'{label}_1'] 
    ED = np.random.uniform(ED_min, ED_max, num_simulations)  # 暴露持续时间（年）    ED (Exposure duration (years) ED)
    BW_std, BW_mean = exposure_data.loc['Body weight (BW)', f'{label}_2'], exposure_data.loc['Body weight (BW)', f'{label}_1']                 
    BW = np.random.normal(BW_mean, BW_std, num_simulations)  # 体重（kg） BW (Body weight (kg) BW)
    AT_C = exposure_data.loc['Averaging Time –Cancer risk', f'{label}_2']   # 致癌平均暴露时间     AT (Cancer risk averaging time AT)
    AT_NC = exposure_data.loc['Averaging Time - non cancer risk', f'{label}_2']   # 非致癌平均暴露时间     AT (Non-cancer risk averaging time AT)
    DAIR = exposure_data.loc['Daily air pulmonary ventilation rate (DAIR)', f'{label}_2']   # 每日空气肺通风率     DAIR (Daily air pulmonary ventilation rate DAIR)
    VF = exposure_data.loc['Volatile factor (VF)', f'{label}_2']   # 挥发性因子     VF (Volatile factor VF)
    SA_std, SA_mean = exposure_data.loc['Skin area (SA)', f'{label}_2'], exposure_data.loc['Skin area (SA)', f'{label}_1']
    SA = np.random.lognormal(SA_mean, SA_std, num_simulations)  # 皮肤暴露面积的对数正态分布（cm2）SA (Normal distribution of skin exposure area (cm2) SA)
    F_min, F_max = exposure_data.loc['Fraction of surface skin in contact with water (F)', f'{label}_2'], exposure_data.loc['Fraction of surface skin in contact with water (F)', f'{label}_1']
    F = np.random.uniform(F_min, F_max, num_simulations)  # 与水接触的表面皮肤的比例  F (Proportion of surface skin in contact with water F)
    PC = exposure_data.loc['Permeability constant (PC)', f'{label}_2']   # 皮肤渗透系数（cm/h） PC (Skin permeability constant (cm/h) PC)
    ET_std, ET_mean = exposure_data.loc['Exposure time (ET)', f'{label}_2'], exposure_data.loc['Exposure time (ET)', f'{label}_1']
    ET = np.random.lognormal(ET_mean, ET_std, num_simulations)  # 皮肤暴露时间（h/day） ET (Skin exposure time (h/day) ET)
    
    # 写入CSV文件头 (Write CSV file headers)
    write_csv(file_path=f'./{label}_CR_oral_ingestion.csv', rowdata=[
        "cas", "C_i_S1", "IR_S1", "ED_S1", "BW_S1", "SA_S1", "F_S1", "ET_S1", "C_i_ST", "IR_ST", "ED_ST", "BW_ST", "SA_ST", "F_ST", "ET_ST"])
    write_csv(file_path=f'./{label}_CR_dermal.csv', rowdata=[
        "cas", "C_i_S1", "IR_S1", "ED_S1", "BW_S1", "SA_S1", "F_S1", "ET_S1", "C_i_ST", "IR_ST", "ED_ST", "BW_ST", "SA_ST", "F_ST", "ET_ST"])
    write_csv(file_path=f'./{label}_CR_inhalation.csv', rowdata=[
        "cas", "C_i_S1", "IR_S1", "ED_S1", "BW_S1", "SA_S1", "F_S1", "ET_S1", "C_i_ST", "IR_ST", "ED_ST", "BW_ST", "SA_ST", "F_ST", "ET_ST"])
    write_csv(file_path=f'./{label}_HQ_oral_ingestion.csv', rowdata=[
        "cas", "C_i_S1", "IR_S1", "ED_S1", "BW_S1", "SA_S1", "F_S1", "ET_S1", "C_i_ST", "IR_ST", "ED_ST", "BW_ST", "SA_ST", "F_ST", "ET_ST"])
    write_csv(file_path=f'./{label}_HQ_dermal.csv', rowdata=[
        "cas", "C_i_S1", "IR_S1", "ED_S1", "BW_S1", "SA_S1", "F_S1", "ET_S1", "C_i_ST", "IR_ST", "ED_ST", "BW_ST", "SA_ST", "F_ST", "ET_ST"])
    write_csv(file_path=f'./{label}_HQ_inhalation.csv', rowdata=[
        "cas", "C_i_S1", "IR_S1", "ED_S1", "BW_S1", "SA_S1", "F_S1", "ET_S1", "C_i_ST", "IR_ST", "ED_ST", "BW_ST", "SA_ST", "F_ST", "ET_ST"])
    
    for conc_th in range(conc_len):
        pollutant_name = conc_data.iloc[conc_th, 0]
        print(f'{label}-{conc_th}-{pollutant_name}')
        
        C_i = conc_data.loc[:, 'Concentration(ug/L)'].values[conc_th]
        # 正态:  mean,  sd (Normal: mean, sd)
        # 均匀:  min,   max (Uniform: min, max)
        problem = {
            "num_vars": 7,
            "names": ["C_i", "IR", "ED", "BW", "SA", "F", "ET"],
            "bounds": [
                [C_i * 0.8, C_i * 1.2 + 1e-6], 
                [IR_mean, IR_std], 
                [ED_min, ED_max],  
                [BW_mean, BW_std], 
                [SA_mean, SA_std], 
                [F_min, F_max], 
                [ET_mean, ET_std]
            ],
            "dists": [   
                "unif",  # 浓度 (Concentration)
                "norm",  # 水摄入率 (Water intake rate)
                "unif",  # 暴露持续时间 (Exposure duration)
                "norm",  # 体重 (Body weight)
                "norm",  # 皮肤面积 (Skin area)
                "unif",  # 皮肤接触比例 (Skin contact fraction)
                "norm",  # 暴露时间 (Exposure time)
            ],
        }
        print(problem)
        X = sobol.sample(problem, num_simulations, calc_second_order=False)
        C_i_simulation, IR, ED, BW, SA, F, ET = X[:, 0], X[:, 1], X[:, 2], X[:, 3], X[:, 4], X[:, 5], X[:, 6]  # (9*num_simulations, )
        # # 剂量计算 (Dose calculation)
        
        # 口服摄入剂量 (Oral ingestion dose)
        add_oral_ingestion_C = cal_add_oral_ingestion(
            C_i=C_i_simulation, 
            IR=IR, 
            EF=EF, 
            ED=EF, 
            BW=BW, 
            AT=AT_C, 
        )
        add_oral_ingestion_NC = cal_add_oral_ingestion(
            C_i=C_i_simulation, 
            IR=IR, 
            EF=EF, 
            ED=EF, 
            BW=BW, 
            AT=AT_NC, 
        )
        
        # 皮肤接触剂量 (Dermal contact dose)
        add_dermal_C = cal_add_dermal(
            C_i=C_i_simulation, 
            SA=SA, 
            F=F, 
            PC=PC, 
            ET=ET, 
            EF=EF, 
            ED=ED, 
            BW=BW, 
            AT=AT_C, 
        )
        
        add_dermal_NC = cal_add_dermal(
            C_i=C_i_simulation, 
            SA=SA, 
            F=F, 
            PC=PC, 
            ET=ET, 
            EF=EF, 
            ED=ED, 
            BW=BW, 
            AT=AT_NC, 
        )
        
        # 吸入剂量 (Inhalation dose)
        add_inhalation_C = cal_add_inhalation(
            C_i=C_i_simulation, 
            DAIR=DAIR, 
            EF=EF, 
            ED=ED, 
            VF=VF, 
            BW=BW, 
            AT=AT_C, 
        )
        
        add_inhalation_NC = cal_add_inhalation(
            C_i=C_i_simulation, 
            DAIR=DAIR, 
            EF=EF, 
            ED=ED, 
            VF=VF, 
            BW=BW, 
            AT=AT_C, 
        )
        
        # 致癌 风险 (Cancer risk)
        CR_oral_ingestion_C = cal_CR(add=add_oral_ingestion_C, SF=np.array([1.0]))
        CR_dermal_C = cal_CR(add=add_dermal_C, SF=np.array([1.0]))
        CR_inhalation_C = cal_CR(add=add_inhalation_C, SF=np.array([1.0]))
        # 非致癌 风险 (Non-cancer risk)
        HQ_oral_ingestion_NC = cal_hq(add=add_oral_ingestion_NC, RfD=np.array([1.0]))
        HQ_dermal_C_NC = cal_hq(add=add_dermal_NC, RfD=np.array([1.0]))
        HQ_inhalation_NC = cal_hq(add=add_inhalation_NC, RfD=np.array([1.0]))
        
        # 敏感性分析 (Sensitivity analysis)
        CR_oral_ingestion_C_Si = sobol_analyze.analyze(problem,
                           CR_oral_ingestion_C,
                           calc_second_order=False,
                           print_to_console=False)
        CR_dermal_C_Si = sobol_analyze.analyze(problem,
                           CR_dermal_C,
                           calc_second_order=False,
                           print_to_console=False)
        CR_inhalation_C_Si = sobol_analyze.analyze(problem,
                           CR_inhalation_C,
                           calc_second_order=False,
                           print_to_console=False)
        HQ_oral_ingestion_NC_Si = sobol_analyze.analyze(problem,
                           HQ_oral_ingestion_NC,
                           calc_second_order=False,
                           print_to_console=False)
        HQ_dermal_C_NC_Si = sobol_analyze.analyze(problem,
                           HQ_dermal_C_NC,
                           calc_second_order=False,
                           print_to_console=False)
        HQ_inhalation_NC_Si = sobol_analyze.analyze(problem,
                           HQ_inhalation_NC,
                           calc_second_order=False,
                           print_to_console=False)
        # "C_i", "IR", "ED", "BW", "SA", "F", "ET"
        # 写入敏感性分析结果 (Write sensitivity analysis results)
        write_csv(file_path=f'./{label}_CR_oral_ingestion.csv', rowdata=np.concatenate([[pollutant_name], CR_oral_ingestion_C_Si["S1"], CR_oral_ingestion_C_Si['ST']], axis=0))
        write_csv(file_path=f'./{label}_CR_dermal.csv', rowdata=np.concatenate([[pollutant_name], CR_dermal_C_Si["S1"], CR_dermal_C_Si['ST']], axis=0))
        write_csv(file_path=f'./{label}_CR_inhalation.csv', rowdata=np.concatenate([[pollutant_name], CR_inhalation_C_Si["S1"], CR_inhalation_C_Si['ST']], axis=0))
        write_csv(file_path=f'./{label}_HQ_oral_ingestion.csv', rowdata=np.concatenate([[pollutant_name], HQ_oral_ingestion_NC_Si["S1"], HQ_oral_ingestion_NC_Si['ST']], axis=0))
        write_csv(file_path=f'./{label}_HQ_dermal.csv', rowdata=np.concatenate([[pollutant_name], HQ_dermal_C_NC_Si["S1"], HQ_dermal_C_NC_Si['ST']], axis=0))
        write_csv(file_path=f'./{label}_HQ_inhalation.csv', rowdata=np.concatenate([[pollutant_name], HQ_inhalation_NC_Si["S1"], HQ_inhalation_NC_Si['ST']], axis=0))



conc_len:  277
(0-5) years-0-7664-41-7
{'num_vars': 7, 'names': ['C_i', 'IR', 'ED', 'BW', 'SA', 'F', 'ET'], 'bounds': [[np.float64(349600.0), np.float64(524400.000001)], [np.float64(0.65), np.float64(0.39)], [np.float64(0.0), np.float64(5.0)], [np.float64(16.68), np.float64(1.48)], [np.float64(5838.0), np.float64(920.0)], [np.float64(0.4), np.float64(0.9)], [np.float64(0.77), np.float64(0.79)]], 'dists': ['unif', 'norm', 'unif', 'norm', 'norm', 'unif', 'norm']}


  sample = self._random(n, workers=workers)
  names = list(pd.unique(groups))


(0-5) years-1-14797-55-8
{'num_vars': 7, 'names': ['C_i', 'IR', 'ED', 'BW', 'SA', 'F', 'ET'], 'bounds': [[np.float64(568800.0), np.float64(853200.000001)], [np.float64(0.65), np.float64(0.39)], [np.float64(0.0), np.float64(5.0)], [np.float64(16.68), np.float64(1.48)], [np.float64(5838.0), np.float64(920.0)], [np.float64(0.4), np.float64(0.9)], [np.float64(0.77), np.float64(0.79)]], 'dists': ['unif', 'norm', 'unif', 'norm', 'norm', 'unif', 'norm']}
(0-5) years-2-14797-65-0
{'num_vars': 7, 'names': ['C_i', 'IR', 'ED', 'BW', 'SA', 'F', 'ET'], 'bounds': [[np.float64(40480.0), np.float64(60720.000001)], [np.float64(0.65), np.float64(0.39)], [np.float64(0.0), np.float64(5.0)], [np.float64(16.68), np.float64(1.48)], [np.float64(5838.0), np.float64(920.0)], [np.float64(0.4), np.float64(0.9)], [np.float64(0.77), np.float64(0.79)]], 'dists': ['unif', 'norm', 'unif', 'norm', 'norm', 'unif', 'norm']}
(0-5) years-3-14808-79-8
{'num_vars': 7, 'names': ['C_i', 'IR', 'ED', 'BW', 'SA', 'F', 'ET'], 'bou

PermissionError: [Errno 13] Permission denied: './(0-5) years_CR_oral_ingestion.xlsx'