In [1]:
import requests
import zipfile
import os
import pandas as pd
import dask.dataframe as dd
import warnings
import shutil

# 抑制 FutureWarning
warnings.simplefilter(action='ignore', category=FutureWarning)

# 远程文件的 URL
url = 'https://fema.gov/about/reports-and-data/openfema/nfirs_fire_hazmat_pdr_2022.zip'

# 本地保存的文件名
local_zip_file = 'nfirs_fire_hazmat_pdr_2022.zip'

# 第一级解压缩后的文件夹名
first_extracted_folder = 'nfirs_fire_hazmat_pdr_2022'

# 第二级解压缩后的文件夹名
second_extracted_folder = os.path.join(first_extracted_folder, 'NFIRS_FIRES_2022_102623')

# 下载文件
response = requests.get(url)
with open(local_zip_file, 'wb') as f:
    f.write(response.content)
print(f"Downloaded {local_zip_file}")

# 解压缩第一级文件
with zipfile.ZipFile(local_zip_file, 'r') as zip_ref:
    zip_ref.extractall(first_extracted_folder)
print(f"Extracted files to {first_extracted_folder}")

# 解压缩第二级文件
second_zip_file = os.path.join(first_extracted_folder, 'nfirs_fire_hazmat_pdr_2022', 'NFIRS_FIRES_2022_102623.zip')
with zipfile.ZipFile(second_zip_file, 'r') as zip_ref:
    zip_ref.extractall(second_extracted_folder)
print(f"Extracted files to {second_extracted_folder}")

# 列出解压后的目录结构
def list_files(startpath):
    for root, dirs, files in os.walk(startpath):
        level = root.replace(startpath, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print(f"{indent}{os.path.basename(root)}/")
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print(f"{subindent}{f}")

list_files(second_extracted_folder)


Downloaded nfirs_fire_hazmat_pdr_2022.zip
Extracted files to nfirs_fire_hazmat_pdr_2022
Extracted files to nfirs_fire_hazmat_pdr_2022/NFIRS_FIRES_2022_102623
NFIRS_FIRES_2022_102623/
    incidentaddress.txt
    merged_data.csv
    fireincident.txt
    merged_casualty_data.csv
    civiliancasualty.txt
    hazchem.txt
    arson.txt
    basicincident.txt
    hazmat.txt
    wildlands.txt
    arsonjuvsub.txt
    ems.txt
    hazmatequipinvolved.txt
    codelookup.txt
    ffcasualty.txt
    fdheader.txt
    hazmobprop.txt
    basicaid.txt
    ffequipfail.txt
    filtered_basicincident.csv
    count_civiliancasualty.txt
    arsonagencyreferal.txt


In [2]:
# 手动指定每一列的数据类型
dtype_civiliancasualty = {
    'INCIDENT_KEY': 'object',
    'STATE': 'object',
    'FDID': 'object',
    'INC_DATE': 'object',
    'INC_NO': 'object',
    'EXP_NO': 'float64',  # 将 int64 改为 float64 以允许 NA 值
    'SEQ_NUMBER': 'float64', 
    'VERSION': 'object',
    'GENDER': 'object',
    'AGE': 'float64',
    'RACE': 'object',
    'ETHNICITY': 'object',
    'AFFILIAT': 'object',
    'INJ_DT_TIM': 'object',
    'SEV': 'object',
    'CAUSE_INJ': 'object',
    'HUM_FACT1': 'object',
    'HUM_FACT2': 'object',
    'HUM_FACT3': 'object',
    'HUM_FACT4': 'object',
    'HUM_FACT5': 'object',
    'HUM_FACT6': 'object',
    'HUM_FACT7': 'object',
    'HUM_FACT8': 'object',
    'FACT_INJ1': 'object',
    'FACT_INJ2': 'object',
    'FACT_INJ3': 'object',
    'ACTIV_INJ': 'object',
    'LOC_INC': 'object',
    'GEN_LOC_IN': 'object',
    'STORY_INC': 'object',
    'STORY_INJ': 'object',
    'SPC_LOC_IN': 'object',
    'PRIM_SYMP': 'object',
    'BODY_PART': 'object',
    'CC_DISPOS': 'object'
}

dtype_fireincident = {
    'INCIDENT_KEY': 'object',
    'STATE': 'object',
    'FDID': 'object',
    'INC_DATE': 'object',
    'INC_NO': 'object',
    'EXP_NO': 'float64',
    'VERSION': 'object',
    'NUM_UNIT': 'float64',
    'NOT_RES': 'object',
    'BLDG_INVOL': 'float64',
    'ACRES_BURN': 'float64',
    'LESS_1ACRE': 'object',
    'ON_SITE_M1': 'object',
    'MAT_STOR1': 'object',
    'ON_SITE_M2': 'object',
    'MAT_STOR2': 'object',
    'ON_SITE_M3': 'object',
    'MAT_STOR3': 'object',
    'AREA_ORIG': 'object',
    'HEAT_SOURC': 'object',
    'FIRST_IGN': 'object',
    'CONF_ORIG': 'object',
    'TYPE_MAT': 'object',
    'CAUSE_IGN': 'object',
    'FACT_IGN_1': 'object',
    'FACT_IGN_2': 'object',
    'HUM_FAC_1': 'object',
    'HUM_FAC_2': 'object',
    'HUM_FAC_3': 'object',
    'HUM_FAC_4': 'object',
    'HUM_FAC_5': 'object',
    'HUM_FAC_6': 'object',
    'HUM_FAC_7': 'object',
    'HUM_FAC_8': 'object',
    'AGE': 'object',
    'SEX': 'object',
    'EQUIP_INV': 'object',
    'SUP_FAC_1': 'object',
    'SUP_FAC_2': 'object',
    'SUP_FAC_3': 'object',
    'MOB_INVOL': 'object',
    'MOB_TYPE': 'object',
    'MOB_MAKE': 'object',
    'MOB_MODEL': 'object',
    'MOB_YEAR': 'object',
    'MOB_LIC_PL': 'object',
    'MOB_STATE': 'object',
    'MOB_VIN_NO': 'object',
    'EQ_BRAND': 'object',
    'EQ_MODEL': 'object',
    'EQ_SER_NO': 'object',
    'EQ_YEAR': 'object',
    'EQ_POWER': 'object',
    'EQ_PORT': 'object',
    'FIRE_SPRD': 'object',
    'STRUC_TYPE': 'object',
    'STRUC_STAT': 'object',
    'BLDG_ABOVE': 'object',
    'BLDG_BELOW': 'object',
    'BLDG_LGTH': 'object',
    'BLDG_WIDTH': 'object',
    'TOT_SQ_FT': 'object',
    'FIRE_ORIG': 'object',
    'ST_DAM_MIN': 'object',
    'ST_DAM_SIG': 'object',
    'ST_DAM_HVY': 'object',
    'ST_DAM_XTR': 'object',
    'FLAME_SPRD': 'object',
    'ITEM_SPRD': 'object',
    'MAT_SPRD': 'object',
    'DETECTOR': 'object',
    'DET_TYPE': 'object',
    'DET_POWER': 'object',
    'DET_OPERAT': 'object',
    'DET_EFFECT': 'object',
    'DET_FAIL': 'object',
    'AES_PRES': 'object',
    'AES_TYPE': 'object',
    'AES_OPER': 'object',
    'NO_SPR_OP': 'object',
    'AES_FAIL': 'object'
}

dtype_basicincident = {
    'INCIDENT_KEY': 'object',
    'STATE': 'object',
    'FDID': 'object',
    'INC_DATE': 'object',
    'INC_NO': 'object',
    'EXP_NO': 'float64',
    'VERSION': 'object',
    'DEPT_STA': 'object',
    'INC_TYPE': 'object',
    'ADD_WILD': 'object',
    'AID': 'object',
    'ALARM': 'object',
    'ARRIVAL': 'object',
    'INC_CONT': 'object',
    'LU_CLEAR': 'object',
    'SHIFT': 'object',
    'ALARMS': 'object',
    'DISTRICT': 'object',
    'ACT_TAK1': 'object',
    'ACT_TAK2': 'object',
    'ACT_TAK3': 'object',
    'APP_MOD': 'object',
    'SUP_APP': 'object',
    'EMS_APP': 'object',
    'OTH_APP': 'object',
    'SUP_PER': 'object',
    'EMS_PER': 'object',
    'OTH_PER': 'object',
    'RESOU_AID': 'object',
    'PROP_LOSS': 'float64',
    'CONT_LOSS': 'float64',
    'PROP_VAL': 'float64',
    'CONT_VAL': 'float64',
    'FF_DEATH': 'float64', 
    'OTH_DEATH': 'float64',  # 将 int64 改为 float64 以允许 NA 值
    'FF_INJ': 'float64',  # 将 int64 改为 float64 以允许 NA 值
    'OTH_INJ': 'float64',
    'DET_ALERT': 'object',
    'HAZ_REL': 'object',
    'MIXED_USE': 'object',
    'PROP_USE': 'object',
    'CENSUS': 'object'
}


In [14]:
import shutil
import os
import pandas as pd

# 读取并保存 civiliancasualty.txt 为 CSV 文件
civiliancasualty_path = os.path.join(second_extracted_folder, 'civiliancasualty.txt')
intermediate_csv_path = os.path.join(second_extracted_folder, 'intermediate_civiliancasualty.csv')

if os.path.exists(civiliancasualty_path):
    try:
        # 使用 pandas 读取 civiliancasualty.txt 文件
        civiliancasualty_df = pd.read_csv(civiliancasualty_path, delimiter='^', encoding='latin1')
        
        # 保存为中间 CSV 文件
        civiliancasualty_df.to_csv(intermediate_csv_path, sep='^', encoding='latin1', index=False)
        print(f"Intermediate CSV file saved to {intermediate_csv_path}")
        
        # 确保目标目录存在
        output_dir = os.path.join(os.getcwd(), 'downloads')
        os.makedirs(output_dir, exist_ok=True)

        # 下载文件到工作目录的 downloads 目录
        local_download_path = os.path.join(output_dir, 'intermediate_civiliancasualty.csv')
        shutil.copy(intermediate_csv_path, local_download_path)
        print(f"Downloaded {intermediate_csv_path} to {local_download_path}")
        
    except UnicodeDecodeError as e:
        print(f"Error reading the file with 'latin1' encoding: {e}")
else:
    print(f"Error: The file at path {civiliancasualty_path} does not exist.")


Intermediate CSV file saved to nfirs_fire_hazmat_pdr_2022/NFIRS_FIRES_2022_102623/intermediate_civiliancasualty.csv
Downloaded nfirs_fire_hazmat_pdr_2022/NFIRS_FIRES_2022_102623/intermediate_civiliancasualty.csv to /home/jovyan/work/004DISSERTATION/downloads/intermediate_civiliancasualty.csv


In [12]:
import shutil
import os
import pandas as pd
import dask.dataframe as dd

# 读取并保存 civiliancasualty.txt 为 CSV 文件
civiliancasualty_path = os.path.join(second_extracted_folder, 'civiliancasualty.txt')
intermediate_csv_path = os.path.join(second_extracted_folder, 'intermediate_civiliancasualty.csv')

if os.path.exists(civiliancasualty_path):
    try:
        # 使用 pandas 读取 civiliancasualty.txt 文件
        civiliancasualty_df = pd.read_csv(civiliancasualty_path, delimiter='^', encoding='latin1')
        
        # 保存为中间 CSV 文件
        civiliancasualty_df.to_csv(intermediate_csv_path, sep='^', encoding='latin1', index=False)
        print(f"Intermediate CSV file saved to {intermediate_csv_path}")
        
        # 使用 dask 读取中间 CSV 文件
        dask_civiliancasualty_df = dd.read_csv(intermediate_csv_path, delimiter='^', encoding='latin1', dtype=dtype_civiliancasualty)
        
        # 计算每个事件的伤亡人数
        civiliancasualty_count = dask_civiliancasualty_df.groupby('INCIDENT_KEY').size().reset_index()
        civiliancasualty_count.columns = ['INCIDENT_KEY', 'civil_num']
        
        # 使用 dask 计算结果进行合并
        merged_civiliancasualty = dd.merge(dask_civiliancasualty_df, civiliancasualty_count, on='INCIDENT_KEY', how='left')
        
        # 保存完整的文件
        output_full_file = os.path.join(second_extracted_folder, 'full_civiliancasualty.csv')
        merged_civiliancasualty.compute().to_csv(output_full_file, sep='^', encoding='latin1', index=False)
        print(f"Generated {output_full_file}")
        
        # 确保目标目录存在
        output_dir = os.path.join(os.getcwd(), 'downloads')
        os.makedirs(output_dir, exist_ok=True)

        # 下载文件到工作目录的 downloads 目录
        local_download_path = os.path.join(output_dir, 'full_civiliancasualty.csv')
        shutil.copy(output_full_file, local_download_path)
        print(f"Downloaded {output_full_file} to {local_download_path}")
        
    except UnicodeDecodeError as e:
        print(f"Error reading the file with 'latin1' encoding: {e}")
else:
    print(f"Error: The file at path {civiliancasualty_path} does not exist.")


Intermediate CSV file saved to nfirs_fire_hazmat_pdr_2022/NFIRS_FIRES_2022_102623/intermediate_civiliancasualty.csv
Generated nfirs_fire_hazmat_pdr_2022/NFIRS_FIRES_2022_102623/full_civiliancasualty.csv
Downloaded nfirs_fire_hazmat_pdr_2022/NFIRS_FIRES_2022_102623/full_civiliancasualty.csv to /home/jovyan/work/004DISSERTATION/downloads/full_civiliancasualty.csv


In [4]:
# 合并 count_civiliancasualty.txt, fireincident.txt, basicincident.txt 文件
fireincident_path = os.path.join(second_extracted_folder, 'fireincident.txt')
basicincident_path = os.path.join(second_extracted_folder, 'basicincident.txt')
output_merged_file = os.path.join(second_extracted_folder, 'merged_data.csv')

if os.path.exists(fireincident_path) and os.path.exists(basicincident_path):
    try:
        fireincident_df = dd.read_csv(fireincident_path, delimiter='^', encoding='latin1', dtype=dtype_fireincident)
        basicincident_df = dd.read_csv(basicincident_path, delimiter='^', encoding='latin1', dtype=dtype_basicincident)
        count_civiliancasualty_df = dd.read_csv(output_count_civiliancasualty, delimiter='^', encoding='latin1')

        # 修改位置：使用 how='inner' 实现交集合并
        merged_df = dd.merge(fireincident_df, basicincident_df, on='INCIDENT_KEY', how='inner')

        # 删除重复的列
        basicincident_columns_to_drop = [col for col in basicincident_df.columns if col in fireincident_df.columns and col != 'INCIDENT_KEY']
        merged_df = merged_df.drop(columns=basicincident_columns_to_drop)

        # 修改位置：使用 how='inner' 实现交集合并
        merged_df = dd.merge(merged_df, count_civiliancasualty_df, on='INCIDENT_KEY', how='inner')

        merged_df.to_csv(output_merged_file, single_file=True, sep='^', encoding='latin1', index=False)
        print(f"Merged data saved to {output_merged_file}")
    except UnicodeDecodeError as e:
        print(f"Error reading one of the files with 'latin1' encoding: {e}")
else:
    print("Error: One of the required files does not exist.")

# 显示合并后的数据
if os.path.exists(output_merged_file):
    # 使用 low_memory=False 以避免 DtypeWarning
    merged_df = pd.read_csv(output_merged_file, delimiter='^', encoding='latin1', low_memory=False)
    print(merged_df.head())
else:
    print(f"Error: The file at path {output_merged_file} does not exist.")

ValueError: Metadata inference failed in `drop_by_shallow_copy`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
KeyError("['STATE', 'FDID', 'INC_DATE', 'INC_NO', 'EXP_NO', 'VERSION'] not found in axis")

Traceback:
---------
  File "/opt/conda/lib/python3.11/site-packages/dask/dataframe/utils.py", line 193, in raise_on_meta_error
    yield
  File "/opt/conda/lib/python3.11/site-packages/dask/dataframe/core.py", line 6897, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/dask/dataframe/utils.py", line 772, in drop_by_shallow_copy
    df2.drop(columns=columns, inplace=True, errors=errors)
  File "/opt/conda/lib/python3.11/site-packages/pandas/core/frame.py", line 5347, in drop
    return super().drop(
           ^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pandas/core/generic.py", line 4711, in drop
    obj = obj._drop_axis(labels, axis, level=level, errors=errors)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pandas/core/generic.py", line 4753, in _drop_axis
    new_axis = axis.drop(labels, errors=errors)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pandas/core/indexes/base.py", line 6992, in drop
    raise KeyError(f"{labels[mask].tolist()} not found in axis")
