## 根据acs变量的组成构建下载数据
- 下载所有变量需要的 code（原始 ACS 变量）
- 通过给定的公式组建变量
- 输出结果为 CSV 文件

In [4]:
# 读取定义表
import pandas as pd

excel_path = "H:/GoogleDrive/Dissertation/dissertation-local/dissertation-Paper-#3/acs_data_version#2.xlsx"
json_path = "../data/acs_variable_definitions.json"
df_def = pd.read_json(json_path)
df_def.head()

Unnamed: 0,Category,Definition,Code,Description,Table
0,Education,% less_than_9th_grade,(B15003_002E + B15003_003E + B15003_004E + B15...,Percentage of population aged 25 and over whos...,acs/acs5
1,Education,% high_school,B15003_017E / B15003_001E * 100,Percentage of the population aged 25 years and...,acs/acs5
2,Education,% bachelor,B15003_022E / B15003_001E * 100,Percentage of the population aged 25 years and...,acs/acs5
3,Education,% master,B15003_023E / B15003_001E * 100,Percentage of the population aged 25 years and...,acs/acs5
4,Education,% high_school_or_higher,(B15003_017E + B15003_018E + B15003_019E + B15...,Percentage of the population aged 25 years and...,acs/acs5


In [None]:
# 测试用例
from census import Census
import pandas as pd

# 设置你的 API key
API_KEY = "47ccf4da248759e799cdbe75823014b6d9055b29"  # 替换为你自己的 key
c = Census(API_KEY)

# 查询变量：总人口 B01001_001E（只支持全 ZIP 下载后筛选）
data = c.acs5.state_zipcode(
    fields = ['B01001_001E', 'B19301_001E'],
    state_fips='06',  # 替换为你需要的州 FIPS 码
    zcta='*',
    year=2018
)

# 转为 DataFrame
df = pd.DataFrame(data)
df['B01001_001E'] = pd.to_numeric(df['B01001_001E'], errors='coerce')
df

In [7]:
import pandas as pd
import os
import re

def load_definition_table(json_path, excel_path):
    if os.path.exists(json_path):
        print(f"🔁 读取缓存 JSON 文件: {json_path}")
        df_def = pd.read_json(json_path)
    else:
        print(f"📥 读取 Excel 文件: {excel_path}")
        df_def = pd.read_excel(excel_path)
        df_def = df_def.dropna(subset=['Code'])
        df_def.to_json(json_path, orient="records", indent=2)
        print(f"✅ 已缓存为 JSON 文件: {json_path}")
    return df_def
    
def extract_variable_codes(code_series):
    all_vars = set()
    for formula in code_series.dropna():
        found = re.findall(r'[BC]\d{5}_\d+E', formula)
        all_vars.update(found)
    return sorted(all_vars)

- **下载所有变量需要的 code（原始 ACS 变量）**
- 通过给定的公式组建变量
- 输出结果为 CSV 文件

In [8]:
# === 加载定义表（优先使用 JSON 缓存） ===
excel_path = "H:/GoogleDrive/Dissertation/dissertation-local/dissertation-Paper-#3/acs_data_version#2.xlsx"
json_path = "../data/acs_variable_definitions.json"
df_def = load_definition_table(json_path, excel_path)
definition_list = df_def['Definition'].tolist()

# === 下载原始变量 ===
var_codes = extract_variable_codes(df_def['Code'])

🔁 读取缓存 JSON 文件: ../data/acs_variable_definitions.json


In [10]:
from census import Census
import pandas as pd

# 设置你的 API key
API_KEY = "47ccf4da248759e799cdbe75823014b6d9055b29"  # 替换为你自己的 key
c = Census(API_KEY)

# 查询变量：总人口 B01001_001E（只支持全 ZIP 下载后筛选）
data = c.acs5.state_zipcode(
    fields = var_codes,
    state_fips='06',  # 替换为你需要的州 FIPS 码
    zcta='*',
    year=2018
)

# 转为 DataFrame
df = pd.DataFrame(data)
# 将所有var_codes中的变量转换为数值类型
for var in var_codes:
    df[var] = pd.to_numeric(df[var], errors='coerce')
df

Unnamed: 0,B01001_001E,B01001_002E,B01001_003E,B01001_004E,B01001_005E,B01001_006E,B01001_007E,B01001_008E,B01001_009E,B01001_010E,...,C16001_032E,C16001_034E,C16001_035E,C16001_037E,C16001_038E,C24010_001E,C24010_003E,C24010_019E,C24010_042E,C24010_058E
0,58975.0,30349.0,2630.0,2882.0,2614.0,1586.0,1169.0,570.0,412.0,1455.0,...,0.0,5.0,0.0,19.0,17.0,23818.0,1124.0,2287.0,193.0,95.0
1,53111.0,25518.0,2560.0,2450.0,2361.0,1565.0,768.0,425.0,510.0,1304.0,...,0.0,0.0,0.0,51.0,26.0,19665.0,930.0,2299.0,227.0,140.0
2,72741.0,35361.0,3242.0,3391.0,3271.0,2179.0,1049.0,619.0,428.0,1605.0,...,0.0,70.0,0.0,73.0,13.0,28690.0,1281.0,3501.0,158.0,194.0
3,61586.0,30602.0,1649.0,1344.0,1727.0,1028.0,792.0,299.0,462.0,1356.0,...,644.0,41.0,8.0,67.0,52.0,33874.0,5988.0,3964.0,602.0,71.0
4,39479.0,19602.0,1188.0,1103.0,851.0,359.0,429.0,279.0,172.0,862.0,...,210.0,28.0,113.0,141.0,88.0,21500.0,3304.0,3683.0,402.0,44.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1759,696.0,419.0,41.0,42.0,23.0,0.0,11.0,0.0,16.0,15.0,...,0.0,0.0,0.0,0.0,0.0,386.0,37.0,51.0,0.0,0.0
1760,29357.0,14913.0,792.0,634.0,716.0,433.0,363.0,117.0,153.0,677.0,...,116.0,19.0,0.0,16.0,24.0,15276.0,2218.0,2221.0,349.0,14.0
1761,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1762,18333.0,9461.0,540.0,736.0,590.0,296.0,269.0,0.0,109.0,212.0,...,17.0,23.0,0.0,0.0,0.0,10361.0,2077.0,1075.0,310.0,40.0


In [11]:
def calculate_custom_variables(df_raw, df_def):
    import re

    def get_vars_from_formula(formula):
        return re.findall(r'[BC]\d{5}_\d+E', formula)

    def safe_eval_formula(df, formula, var_name):
        needed_vars = get_vars_from_formula(formula)
        missing_vars = [v for v in needed_vars if v not in df.columns]
        if missing_vars:
            print(f"⚠️ 跳过 {var_name}，缺失变量: {missing_vars}")
            return None
        try:
            return df.eval(formula)
        except Exception as e:
            print(f"❌ 公式错误 {var_name}: {e}")
            return None

    df = df_raw.copy()
    for _, row in df_def.iterrows():
        var_name = row['Definition']
        formula = row['Code']
        result = safe_eval_formula(df, formula, var_name)
        if result is not None:
            df[var_name] = result
    return df

In [12]:
df_def = load_definition_table(json_path, excel_path)
var_codes = extract_variable_codes(df_def["Code"])
# ... 下载 df_raw 逻辑省略 ...
df_final = calculate_custom_variables(df, df_def)

# 选出你需要保留的地理列 + 构建的新变量列
geo_cols = ['state', 'zip code tabulation area']
custom_vars = df_def['Definition'].tolist()
df_output = df_final[geo_cols + custom_vars]
# 重命名 zip code 列
df_output = df_output.rename(columns={'zip code tabulation area': 'zipcode'})
df_output

🔁 读取缓存 JSON 文件: ../data/acs_variable_definitions.json


Unnamed: 0,state,zipcode,% less_than_9th_grade,% high_school,% bachelor,% master,% high_school_or_higher,% english_speaker,% spanish_speaker,% asian_speaker,...,% multi-unit structures,% without plumbing,% overcrowded housing,% renter-occupied,% households without a vehicle,% with access to a vehicle,commute time,% work at home,% professinal,% service
0,06,90001,35.924476,22.624205,4.179488,0.561647,45.409733,61.408519,85.132304,0.153906,...,22.030222,0.166486,33.391241,64.053565,11.719146,88.280854,22586.0,2.596170,5.529432,10.000840
1,06,90002,31.188102,25.114738,3.920080,0.755720,49.998275,66.213560,74.741895,1.057249,...,27.996100,0.094444,21.934519,64.268849,14.599402,85.400598,18791.0,2.150594,5.883549,12.402746
2,06,90003,29.843352,24.879174,4.252580,1.095325,50.775956,66.399709,74.028432,0.369400,...,30.120748,0.029194,26.233433,71.647107,16.523618,83.476382,27281.0,2.159022,5.015685,12.879052
3,06,90004,15.497709,19.641677,24.676894,6.288892,74.270019,59.232449,47.395932,31.107825,...,78.997709,0.031860,22.033590,83.296163,17.154431,82.845569,31213.0,6.351635,19.454449,11.911791
4,06,90005,18.905748,20.205302,23.098192,5.129091,69.256558,48.643192,46.004953,38.709417,...,93.489729,0.000000,26.888456,92.075173,30.045007,69.954993,20096.0,4.934008,17.237209,17.334884
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1759,06,96148,12.314225,19.532909,23.779193,6.369427,81.528662,79.809221,42.766296,0.000000,...,7.360000,0.000000,5.238095,39.523810,0.000000,100.000000,386.0,0.000000,9.585492,13.212435
1760,06,96150,5.071792,17.943492,20.213062,6.044465,89.703566,90.372693,17.726355,5.907584,...,23.786225,0.173370,4.126214,46.298544,7.229542,92.770458,13523.0,8.813216,16.804137,14.630793
1761,06,96155,,,,,,,,,...,0.000000,,,,,,0.0,,,
1762,06,96161,3.601874,10.874741,34.436679,10.137470,92.711773,95.177752,13.927752,0.252294,...,13.823479,0.303074,2.770963,26.598355,1.948333,98.051667,9178.0,9.674245,23.038317,10.761510


In [13]:
df_output.columns = (
    df_output.columns
    .str.strip()
    .str.lower()
    .str.replace('%', 'pct')
    .str.replace(' ', '_')
    .str.replace('-', '_')
)
df_output

Unnamed: 0,state,zipcode,pct_less_than_9th_grade,pct_high_school,pct_bachelor,pct_master,pct_high_school_or_higher,pct_english_speaker,pct_spanish_speaker,pct_asian_speaker,...,pct_multi_unit_structures,pct_without_plumbing,pct_overcrowded_housing,pct_renter_occupied,pct_households_without_a_vehicle,pct_with_access_to_a_vehicle,commute_time,pct_work_at_home,pct_professinal,pct_service
0,06,90001,35.924476,22.624205,4.179488,0.561647,45.409733,61.408519,85.132304,0.153906,...,22.030222,0.166486,33.391241,64.053565,11.719146,88.280854,22586.0,2.596170,5.529432,10.000840
1,06,90002,31.188102,25.114738,3.920080,0.755720,49.998275,66.213560,74.741895,1.057249,...,27.996100,0.094444,21.934519,64.268849,14.599402,85.400598,18791.0,2.150594,5.883549,12.402746
2,06,90003,29.843352,24.879174,4.252580,1.095325,50.775956,66.399709,74.028432,0.369400,...,30.120748,0.029194,26.233433,71.647107,16.523618,83.476382,27281.0,2.159022,5.015685,12.879052
3,06,90004,15.497709,19.641677,24.676894,6.288892,74.270019,59.232449,47.395932,31.107825,...,78.997709,0.031860,22.033590,83.296163,17.154431,82.845569,31213.0,6.351635,19.454449,11.911791
4,06,90005,18.905748,20.205302,23.098192,5.129091,69.256558,48.643192,46.004953,38.709417,...,93.489729,0.000000,26.888456,92.075173,30.045007,69.954993,20096.0,4.934008,17.237209,17.334884
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1759,06,96148,12.314225,19.532909,23.779193,6.369427,81.528662,79.809221,42.766296,0.000000,...,7.360000,0.000000,5.238095,39.523810,0.000000,100.000000,386.0,0.000000,9.585492,13.212435
1760,06,96150,5.071792,17.943492,20.213062,6.044465,89.703566,90.372693,17.726355,5.907584,...,23.786225,0.173370,4.126214,46.298544,7.229542,92.770458,13523.0,8.813216,16.804137,14.630793
1761,06,96155,,,,,,,,,...,0.000000,,,,,,0.0,,,
1762,06,96161,3.601874,10.874741,34.436679,10.137470,92.711773,95.177752,13.927752,0.252294,...,13.823479,0.303074,2.770963,26.598355,1.948333,98.051667,9178.0,9.674245,23.038317,10.761510


✅ 功能清单：
**自动重命名列（将 %、空格、连字符转换成 SQL 友好格式）；

为每个 ZIP code 获取中心点经纬度（并构造 PostGIS Point 类型）；**

将数据写入 PostgreSQL 的 acs_data 表（假设你已运行建表 SQL）；

使用 SQLAlchemy + GeoAlchemy2 写入支持空间列的表。

In [None]:
import pandas as pd
from geoalchemy2 import WKTElement

def attach_zip_latlon_geom(df, zip_latlon_csv_path):
    """
    将 ZIP 经纬度信息合并进主 DataFrame，并创建 PostGIS 的 POINT(经度 纬度) geometry 字段。

    参数：
        df: Pandas DataFrame，包含 'zipcode' 列（应为字符串）
        zip_latlon_csv_path: 包含 ZIP, lat, lng 的 CSV 文件路径

    返回：
        df: 合并了经纬度和 geom 字段的新 DataFrame
    """
    # 读取 ZIP 坐标数据
    zip_coords = pd.read_csv(zip_latlon_csv_path)
    zip_coords.columns = zip_coords.columns.str.lower()
    
    # 确保字段存在
    if not {'zip', 'lat', 'lng'}.issubset(zip_coords.columns):
        raise ValueError("CSV 文件必须包含 'zip', 'lat', 'lng' 列")

    # 转为字符串类型以便匹配
    zip_coords['zip'] = zip_coords['zip'].astype(str)
    df['zipcode'] = df['zipcode'].astype(str)

    # 合并经纬度
    df = df.merge(zip_coords[['zip', 'lat', 'lng']], left_on='zipcode', right_on='zip', how='left')
    df.drop(columns=['zip'], inplace=True)

    # 创建 geometry 列（WKT 格式，4326 投影）
    df['geom'] = df.apply(
        lambda row: WKTElement(f"POINT({row['lng']} {row['lat']})", srid=4326)
        if pd.notnull(row['lat']) and pd.notnull(row['lng']) else None,
        axis=1
    )

    return df

In [None]:
df_acs = pd.read_csv("acs_processed_output.csv")
df_acs = attach_zip_latlon_geom(df_acs, "../data/us_zip_lat_lon.csv")

In [None]:
df_acs["year"] = 2018
df_acs

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.types import Float, Integer, Text
from geoalchemy2 import Geometry

def write_df_to_postgres(df, table_name, db_url):
    """
    将 DataFrame 写入 PostgreSQL 表（支持 PostGIS geom 列）

    参数:
        df: 包含数据的 DataFrame，必须包含 'geom' 列（GeoAlchemy WKTElement 类型）
        table_name: 目标表名，例如 'acs_data'
        db_url: SQLAlchemy 格式的连接字符串，例如 'postgresql+psycopg2://user:pwd@localhost:5432/db'
    """
    engine = create_engine(db_url)

    # 字段类型映射（可以根据实际数据再细化）
    dtype = {
        'zipcode': Text(),
        'state': Text(),
        'year': Integer(),
        'population': Integer(),
        'median_income': Float(),
        'per_capita_income': Float(),
        'geom': Geometry('POINT', srid=4326)
    }

    # 自动识别其余列为 Float 类型
    for col in df.columns:
        if col not in dtype and col not in ['geom']:
            dtype[col] = Float()

    # 写入数据库
    df.to_sql(
        name=table_name,
        con=engine,
        if_exists='append',
        index=False,
        dtype=dtype,
        method='multi'
    )

    print(f"✅ 数据已成功写入 PostgreSQL 表 `{table_name}`")

In [None]:
import pandas as pd

# 写入数据库
db_url = "postgresql+psycopg2://postgres:wym45123@localhost:5432/dashboard"
write_df_to_postgres(df_acs, table_name="acs_data", db_url=db_url)

In [71]:
import os
import re
import json
import pandas as pd
from census import Census
from geoalchemy2 import WKTElement
from sqlalchemy import create_engine
from sqlalchemy.types import Float, Integer, Text
from geoalchemy2 import Geometry
from us import states 

# ========== 参数配置 ==========
API_KEY = "47ccf4da248759e799cdbe75823014b6d9055b29"
YEAR = 2018
STATE_FIPS = '06'  # California
EXCEL_PATH = "H:/GoogleDrive/Dissertation/dissertation-local/dissertation-Paper-#3/acs_data_version#2.xlsx"
JSON_PATH = "../data/acs_variable_definitions.json"
ZIP_LATLON_PATH = "../data/us_zip_lat_lon.csv"
OUTPUT_PATH = "acs_processed_output.csv"


# ========== 函数定义 ==========

def load_definition_table(json_path: str, excel_path: str) -> pd.DataFrame:
    """加载变量定义表，优先读取 JSON 缓存，否则从 Excel 导入并保存 JSON。"""
    if os.path.exists(json_path):
        print(f"🔁 读取缓存 JSON 文件: {json_path}")
        return pd.read_json(json_path)
    else:
        print(f"📥 读取 Excel 文件: {excel_path}")
        df_def = pd.read_excel(excel_path)
        df_def = df_def.dropna(subset=['Code'])
        df_def.to_json(json_path, orient="records", indent=2)
        print(f"✅ 已缓存为 JSON 文件: {json_path}")
    return df_def


def extract_variable_codes(code_series: pd.Series) -> list:
    """提取公式或代码中的所有原始 ACS 变量名。"""
    all_vars = set()
    for formula in code_series.dropna():
        found = re.findall(r'[BC]\d{5}_\d+E', formula)
        all_vars.update(found)
    print(f"🔁 提取公式或代码中的所有原始 ACS 变量名")
    return sorted(all_vars)

def enrich_with_zip_info(df: pd.DataFrame, zip_info_path: str) -> pd.DataFrame:
    """
    清洗 df，并根据 zip_info CSV 添加 city 和 state 信息。
    步骤：
    1. 重命名 'zip code tabulation area' 为 'zipcode'
    2. 如果 'state' 列存在，则删除
    3. 根据 zipcode 合并 zip_info 中的 city 和 state
    """
    # 重命名字段
    if 'zip code tabulation area' in df.columns:
        df = df.rename(columns={'zip code tabulation area': 'zipcode'})

    # 删除已有 state 列
    if 'state' in df.columns:
        df = df.drop(columns=['state'])

    # 加载 ZIP -> city/state 映射表
    zip_info = pd.read_csv(zip_info_path)
    zip_info.columns = zip_info.columns.str.lower()
    
    if not {'zip', 'city', 'state'}.issubset(zip_info.columns):
        raise ValueError("ZIP 映射表必须包含 'zip', 'city', 'state' 字段")

    zip_info['zip'] = zip_info['zip'].astype(str)
    df['zipcode'] = df['zipcode'].astype(str)

    # 合并
    df = df.merge(zip_info[['zip', 'city', 'state']], left_on='zipcode', right_on='zip', how='left')
    df.drop(columns=['zip'], inplace=True)

    return df



def download_acs_data(api_key: str, year: int, state_fips: str, var_codes: list) -> pd.DataFrame:
    """使用 Census API 下载指定变量的 ACS 数据(ZIP 级别)。"""
    c = Census(api_key)
    data = c.acs5.state_zipcode(
        fields=var_codes,
        state_fips=state_fips,
        zcta='*',
        year=year
    )
    df = pd.DataFrame(data)
    for var in var_codes:
        df[var] = pd.to_numeric(df[var], errors='coerce')

    return df

def download_acs_data_old(api_key: str, year: int, var_codes: list) -> pd.DataFrame:
    """
    下载全美所有州的 ACS ZIP 级别数据，并合并为一个 DataFrame。

    参数：
        api_key: Census API Key
        year: 年份（例如 2018, 2019）
        var_codes: 要请求的 ACS 变量代码列表

    返回：
        包含全美 ZIP 数据的 Pandas DataFrame
    """
    c = Census(api_key)
    all_dfs = []

    # 排除阿拉斯加、夏威夷、华盛顿特区、海外领地等
    excluded = {"AK", "HI", "DC", "PR", "VI", "GU", "MP", "AS"}

    # 获取大陆州的 State 对象列表
    continental_states = [s for s in states.STATES if s.abbr not in excluded]

    for state in states.STATES:
        state_fips = state.fips
        try:
            print(f"⬇ 正在下载 {state.name} ({state_fips}) 的数据...")
            data = c.acs5.state_zipcode(
                fields=var_codes,
                state_fips=state_fips,
                zcta='*',
                year=year
            )
            df = pd.DataFrame(data)
            for var in var_codes:
                df[var] = pd.to_numeric(df[var], errors='coerce')
            all_dfs.append(df)
        except Exception as e:
            print(f"❌ 无法下载 {state.name}（{state_fips}）: {e}")
            continue

    # 合并所有州的数据
    if all_dfs:
        df_all = pd.concat(all_dfs, ignore_index=True)
        return df_all
    else:
        print("⚠️ 未能成功下载任何州的数据。")
        return pd.DataFrame()


def calculate_custom_variables(df_raw: pd.DataFrame, df_def: pd.DataFrame) -> pd.DataFrame:
    """根据自定义公式计算衍生变量。"""
    def get_vars_from_formula(formula):
        return re.findall(r'[BC]\d{5}_\d+E', formula)

    def safe_eval_formula(df, formula, var_name):
        needed_vars = get_vars_from_formula(formula)
        missing = [v for v in needed_vars if v not in df.columns]
        if missing:
            print(f"⚠️ 跳过 {var_name}，缺失变量: {missing}")
            return None
        try:
            return df.eval(formula)
        except Exception as e:
            print(f"❌ 错误公式 {var_name}: {e}")
            return None

    df = df_raw.copy()
    for _, row in df_def.iterrows():
        var_name = row['Definition']
        formula = row['Code']
        result = safe_eval_formula(df, formula, var_name)
        if result is not None:
            df[var_name] = result
    print(f"🔁 根据自定义公式计算衍生变量")
    return df


def clean_and_rename_columns(df: pd.DataFrame, df_def: pd.DataFrame) -> pd.DataFrame:
    """重命名列并标准化字段格式。"""
    geo_cols = ['state', 'city', 'zipcode']
    custom_vars = df_def['Definition'].tolist()
    df = df[geo_cols + custom_vars]
    # df = df.rename(columns={'zip code tabulation area': 'zipcode'})
    df.columns = (
        df.columns
        .str.strip()
        .str.lower()
        .str.replace('%', 'pct', regex=False)
        .str.replace(' ', '_')
        .str.replace('-', '_')
    )
    print(f"🔁 重命名列并标准化字段格式")
    return df


def attach_zip_latlon_geom(df: pd.DataFrame, zip_latlon_csv_path: str) -> pd.DataFrame:
    """合并 ZIP 经纬度坐标，并创建 PostGIS 兼容的 geom 列。"""
    zip_coords = pd.read_csv(zip_latlon_csv_path)
    zip_coords.columns = zip_coords.columns.str.lower()

    if not {'zip', 'lat', 'lng'}.issubset(zip_coords.columns):
        raise ValueError("CSV 文件必须包含 'zip', 'lat', 'lng' 列")

    zip_coords['zip'] = zip_coords['zip'].astype(str)
    df['zipcode'] = df['zipcode'].astype(str)

    df = df.merge(zip_coords[['zip', 'lat', 'lng']], left_on='zipcode', right_on='zip', how='left')
    df.drop(columns=['zip'], inplace=True)

    df['geom'] = df.apply(
        lambda row: WKTElement(f"POINT({row['lng']} {row['lat']})", srid=4326)
        if pd.notnull(row['lat']) and pd.notnull(row['lng']) else None,
        axis=1
    )
    return df

def write_df_to_postgres(df, table_name, db_url):
    """
    将 DataFrame 写入 PostgreSQL 表（支持 PostGIS geom 列）

    参数:
        df: 包含数据的 DataFrame，必须包含 'geom' 列（GeoAlchemy WKTElement 类型）
        table_name: 目标表名，例如 'acs_data'
        db_url: SQLAlchemy 格式的连接字符串，例如 'postgresql+psycopg2://user:pwd@localhost:5432/db'
    """
    engine = create_engine(db_url)

    # 字段类型映射（可以根据实际数据再细化）
    dtype = {
        'zipcode': Text(),
        'state': Text(),
        'city': Text(),
        'year': Integer(),
        'population': Integer(),
        'median_income': Float(),
        'per_capita_income': Float(),
        'geom': Geometry('POINT', srid=4326)
    }

    # 自动识别其余列为 Float 类型
    for col in df.columns:
        if col not in dtype and col not in ['geom']:
            dtype[col] = Float()

    # 写入数据库
    df.to_sql(
        name=table_name,
        con=engine,
        if_exists='append',
        index=False,
        dtype=dtype,
        method='multi'
    )

    print(f"✅ 数据已成功写入 PostgreSQL 表 `{table_name}`")

# ========== 主流程 ==========






In [76]:
# 1. 加载变量定义
df_def = load_definition_table(JSON_PATH, EXCEL_PATH)

# 2. 提取原始变量代码
var_codes = extract_variable_codes(df_def['Code'])

year = 2016
# 3. 下载原始 ACS 数据
if year >= 2020:
    df_raw = download_acs_data(API_KEY, year, STATE_FIPS, var_codes)
else:
    df_raw = download_acs_data_old(API_KEY, year, var_codes)

df_enriched = enrich_with_zip_info(df_raw, "../data/us_zip_lat_lon.csv")
# 4. 计算自定义指标
df_final = calculate_custom_variables(df_enriched, df_def)
# 5. 重命名、清洗列
df_cleaned = clean_and_rename_columns(df_final, df_def)

# 6. 附加经纬度与空间字段
df_with_geom = attach_zip_latlon_geom(df_cleaned, ZIP_LATLON_PATH)

# 7. 添加年份信息
df_with_geom['year'] = year

# 8. 保存结果
df_with_geom.head()

🔁 读取缓存 JSON 文件: ../data/acs_variable_definitions.json
🔁 提取公式或代码中的所有原始 ACS 变量名
⬇ 正在下载 Alabama (01) 的数据...
⬇ 正在下载 Alaska (02) 的数据...
⬇ 正在下载 Arizona (04) 的数据...
⬇ 正在下载 Arkansas (05) 的数据...
⬇ 正在下载 California (06) 的数据...
⬇ 正在下载 Colorado (08) 的数据...
⬇ 正在下载 Connecticut (09) 的数据...
⬇ 正在下载 Delaware (10) 的数据...
⬇ 正在下载 Florida (12) 的数据...
⬇ 正在下载 Georgia (13) 的数据...
⬇ 正在下载 Hawaii (15) 的数据...
⬇ 正在下载 Idaho (16) 的数据...
⬇ 正在下载 Illinois (17) 的数据...
⬇ 正在下载 Indiana (18) 的数据...
⬇ 正在下载 Iowa (19) 的数据...
⬇ 正在下载 Kansas (20) 的数据...
⬇ 正在下载 Kentucky (21) 的数据...
⬇ 正在下载 Louisiana (22) 的数据...
⬇ 正在下载 Maine (23) 的数据...
⬇ 正在下载 Maryland (24) 的数据...
⬇ 正在下载 Massachusetts (25) 的数据...
⬇ 正在下载 Michigan (26) 的数据...
⬇ 正在下载 Minnesota (27) 的数据...
⬇ 正在下载 Mississippi (28) 的数据...
⬇ 正在下载 Missouri (29) 的数据...
⬇ 正在下载 Montana (30) 的数据...
⬇ 正在下载 Nebraska (31) 的数据...
⬇ 正在下载 Nevada (32) 的数据...
⬇ 正在下载 New Hampshire (33) 的数据...
⬇ 正在下载 New Jersey (34) 的数据...
⬇ 正在下载 New Mexico (35) 的数据...
⬇ 正在下载 New York (36) 的数据...
⬇ 正在下载 North Carolina (37)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['zipcode'] = df['zipcode'].astype(str)


Unnamed: 0,state,city,zipcode,pct_less_than_9th_grade,pct_high_school,pct_bachelor,pct_master,pct_high_school_or_higher,pct_english_speaker,pct_spanish_speaker,...,pct_households_without_a_vehicle,pct_with_access_to_a_vehicle,commute_time,pct_work_at_home,pct_professinal,pct_service,lat,lng,geom,year
0,AL,Moody,35004,2.034276,24.104779,14.323534,5.225024,93.242302,99.348733,0.132289,...,2.769231,97.230769,5297.0,3.162706,18.773267,5.76139,33.603543,-86.466833,POINT(-86.466833 33.603543),2016
1,AL,Adamsville,35005,4.424447,30.708661,9.336333,4.16198,83.314586,99.24357,2.255536,...,3.963102,96.036898,2976.0,0.766922,9.166667,6.634615,33.578097,-86.987228,POINT(-86.987228 33.578097),2016
2,AL,Adger,35006,3.788969,35.059952,7.098321,2.35012,85.659472,99.081007,1.225323,...,4.037006,95.962994,1012.0,1.268293,11.058151,6.577693,33.437653,-87.207592,POINT(-87.207592 33.437653),2016
3,AL,Alabaster,35007,5.241912,23.448195,19.358802,8.260692,88.492365,93.595704,10.716576,...,1.671437,98.328563,11898.0,4.110251,21.00653,7.811753,33.193415,-86.794377,POINT(-86.794377 33.193415),2016
4,AL,Alexander City,35010,8.692047,26.380156,10.184481,2.998687,74.26242,98.3198,3.202084,...,8.220564,91.779436,7226.0,3.279347,11.477363,7.08234,32.930079,-85.805026,POINT(-85.805026 32.930079),2016


In [77]:
df_with_geom.shape[0]


32936

In [78]:
db_url = "postgresql+psycopg2://postgres:wym45123@localhost:5432/dashboard"
write_df_to_postgres(df_with_geom, table_name="acs_data_all", db_url=db_url)

✅ 数据已成功写入 PostgreSQL 表 `acs_data_all`


In [35]:
df_enriched = enrich_with_zip_info(df_raw, "../data/us_zip_lat_lon.csv")
df_enriched

Unnamed: 0,B01001_001E,B01001_002E,B01001_003E,B01001_004E,B01001_005E,B01001_006E,B01001_007E,B01001_008E,B01001_009E,B01001_010E,...,C16001_035E,C16001_037E,C16001_038E,C24010_001E,C24010_003E,C24010_019E,C24010_042E,C24010_058E,city,state
0,16773.0,8318.0,500.0,431.0,587.0,316.0,236.0,108.0,29.0,425.0,...,0.0,0.0,0.0,3630.0,426.0,660.0,58.0,40.0,,
1,37083.0,18149.0,704.0,1013.0,907.0,674.0,487.0,313.0,294.0,633.0,...,0.0,0.0,0.0,10851.0,1341.0,1591.0,156.0,77.0,,
2,45652.0,22182.0,957.0,1112.0,1436.0,950.0,658.0,376.0,252.0,916.0,...,0.0,0.0,0.0,11965.0,1776.0,1647.0,228.0,101.0,,
3,6231.0,3075.0,126.0,127.0,156.0,123.0,70.0,66.0,34.0,139.0,...,0.0,0.0,0.0,1455.0,91.0,143.0,0.0,0.0,,
4,26502.0,12784.0,465.0,666.0,786.0,509.0,342.0,157.0,133.0,651.0,...,0.0,0.0,0.0,8273.0,873.0,1219.0,0.0,75.0,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
33115,12.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,12.0,0.0,0.0,0.0,0.0,Hyder,AK
33116,990.0,552.0,30.0,32.0,50.0,19.0,11.0,7.0,9.0,12.0,...,0.0,21.0,0.0,451.0,53.0,29.0,0.0,0.0,Klawock,AK
33117,1582.0,866.0,31.0,72.0,79.0,48.0,15.0,6.0,14.0,33.0,...,0.0,34.0,0.0,549.0,98.0,28.0,11.0,0.0,Metlakatla,AK
33118,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Point Baker,AK


In [37]:
if 'state' in df_enriched.columns:
    print("✅ 包含 state 字段")
else:
    print("❌ 不包含 state 字段")

✅ 包含 state 字段


In [38]:
# 4. 计算自定义指标
df_final = calculate_custom_variables(df_enriched, df_def)
df_final.head()

🔁 根据自定义公式计算衍生变量


Unnamed: 0,B01001_001E,B01001_002E,B01001_003E,B01001_004E,B01001_005E,B01001_006E,B01001_007E,B01001_008E,B01001_009E,B01001_010E,...,% multi-unit structures,% without plumbing,% overcrowded housing,% renter-occupied,% households without a vehicle,% with access to a vehicle,commute time,% work at home,% professinal,% service
0,16773.0,8318.0,500.0,431.0,587.0,316.0,236.0,108.0,29.0,425.0,...,7.731392,1.008101,0.90009,35.877588,15.859586,84.140414,3436.0,4.236343,13.333333,19.283747
1,37083.0,18149.0,704.0,1013.0,907.0,674.0,487.0,313.0,294.0,633.0,...,25.385494,0.798388,1.178203,23.517557,13.185024,86.814976,10156.0,6.058644,13.795964,15.371855
2,45652.0,22182.0,957.0,1112.0,1436.0,950.0,658.0,376.0,252.0,916.0,...,16.971333,1.152797,1.631414,42.885081,18.943956,81.056044,10941.0,2.321221,16.748851,14.609277
3,6231.0,3075.0,126.0,127.0,156.0,123.0,70.0,66.0,34.0,139.0,...,2.689136,0.453858,6.858296,24.861321,15.179022,84.820978,1428.0,1.85567,6.254296,9.828179
4,26502.0,12784.0,465.0,666.0,786.0,509.0,342.0,157.0,133.0,651.0,...,21.719929,0.722022,1.083032,25.812274,11.902076,88.097924,7964.0,2.97271,10.552399,15.641243


In [41]:
# 5. 重命名、清洗列
df_cleaned = clean_and_rename_columns(df_final, df_def)
df_cleaned.head()

🔁 重命名列并标准化字段格式


Unnamed: 0,state,city,zipcode,pct_less_than_9th_grade,pct_high_school,pct_bachelor,pct_master,pct_high_school_or_higher,pct_english_speaker,pct_spanish_speaker,...,pct_multi_unit_structures,pct_without_plumbing,pct_overcrowded_housing,pct_renter_occupied,pct_households_without_a_vehicle,pct_with_access_to_a_vehicle,commute_time,pct_work_at_home,pct_professinal,pct_service
0,,,601,23.498233,26.527007,13.831398,1.304055,64.941949,13.123131,95.881107,...,7.731392,1.008101,0.90009,35.877588,15.859586,84.140414,3436.0,4.236343,13.333333,19.283747
1,,,602,24.110032,23.341424,15.482495,3.916593,65.839217,24.298961,96.030479,...,25.385494,0.798388,1.178203,23.517557,13.185024,86.814976,10156.0,6.058644,13.795964,15.371855
2,,,603,17.130178,26.62837,16.312574,5.124464,73.009939,27.886526,95.536877,...,16.971333,1.152797,1.631414,42.885081,18.943956,81.056044,10941.0,2.321221,16.748851,14.609277
3,,,606,31.40731,27.70847,6.981834,3.56752,54.891661,5.996325,96.826457,...,2.689136,0.453858,6.858296,24.861321,15.179022,84.820978,1428.0,1.85567,6.254296,9.828179
4,,,610,22.797847,27.900838,13.507918,3.027637,68.258979,21.024438,96.308895,...,21.719929,0.722022,1.083032,25.812274,11.902076,88.097924,7964.0,2.97271,10.552399,15.641243


In [42]:
# 6. 附加经纬度与空间字段
df_with_geom = attach_zip_latlon_geom(df_cleaned, ZIP_LATLON_PATH)

# 7. 添加年份信息
df_with_geom['year'] = 2020
df_with_geom.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['zipcode'] = df['zipcode'].astype(str)


Unnamed: 0,state,city,zipcode,pct_less_than_9th_grade,pct_high_school,pct_bachelor,pct_master,pct_high_school_or_higher,pct_english_speaker,pct_spanish_speaker,...,pct_households_without_a_vehicle,pct_with_access_to_a_vehicle,commute_time,pct_work_at_home,pct_professinal,pct_service,lat,lng,geom,year
0,,,601,23.498233,26.527007,13.831398,1.304055,64.941949,13.123131,95.881107,...,15.859586,84.140414,3436.0,4.236343,13.333333,19.283747,,,,2020
1,,,602,24.110032,23.341424,15.482495,3.916593,65.839217,24.298961,96.030479,...,13.185024,86.814976,10156.0,6.058644,13.795964,15.371855,,,,2020
2,,,603,17.130178,26.62837,16.312574,5.124464,73.009939,27.886526,95.536877,...,18.943956,81.056044,10941.0,2.321221,16.748851,14.609277,,,,2020
3,,,606,31.40731,27.70847,6.981834,3.56752,54.891661,5.996325,96.826457,...,15.179022,84.820978,1428.0,1.85567,6.254296,9.828179,,,,2020
4,,,610,22.797847,27.900838,13.507918,3.027637,68.258979,21.024438,96.308895,...,11.902076,88.097924,7964.0,2.97271,10.552399,15.641243,,,,2020


In [44]:
db_url = "postgresql+psycopg2://postgres:wym45123@localhost:5432/dashboard"
write_df_to_postgres(df_with_geom, table_name="acs_data_all", db_url=db_url)

✅ 数据已成功写入 PostgreSQL 表 `acs_data_all`
