In [0]:
import json
import glob
import pandas as pd
import re
from collections import defaultdict
# from unidecode import unidecode  # 可选，如果不想转拼音可以不使用

# Define parameters for the notebook
dbutils.widgets.text('environment',"")

var_environment = dbutils.widgets.get('environment')

OUTPUT_PARSED = f"/Volumes/land_market_{var_environment}/10_rawdata/transaction_info_parsed/"

dfs = []

for file in glob.glob(OUTPUT_PARSED + "*.json"):
    data = json.load(open(file, encoding="utf-8"))

    rows = data["rows"]
    header = rows[0]          # 第一行是列名
    body = rows[1:]           # 其他是数据
    
    df = pd.DataFrame(body, columns=header)
    # 将 metadata 插入到最前面
    df.insert(0, "url", data["url"])
    df.insert(0, "publish_time", data["publish_time"])
    df.insert(0, "title", data["title"])
    df.insert(0, "transaction_id", data["transaction_id"])
    dfs.append(df)

final_df = pd.concat(dfs, ignore_index=True)
spark_df = spark.createDataFrame(final_df)
# spark_df.createOrReplaceTempView("v_transaction_raw")
# display(spark_df)

In [0]:
def normalize_column(col: str, use_unidecode: bool = False) -> str:
    # 1. 去两端空白，替换全角空格
    col = col.strip().replace('\u3000', ' ')
    # 2. 全角转半角
    col = ''.join(chr(ord(c) - 65248) if 65281 <= ord(c) <= 65374 else c for c in col)
    # 3. 可选：中文转拼音/拉丁化（unidecode 会把中文变成近似拼音或字母）
    if use_unidecode:
        col = unidecode(col)

    # 2. 单位统一替换（㎡ → sqm）
    col = col.replace("㎡", "sqm").replace("m²", "sqm").replace("m2", "sqm")

    # 4. 删除括号、单位符号等常见特殊字符
    col = re.sub(r"[()（）\[\]{}<>%¥￥㎡㎡²³°′″·–—·/\\,:;\"'。、••·…\-]", " ", col)
    # 5. 删除其它非字母数字下划线空白的字符
    col = re.sub(r"[^\w\s]", " ", col)
    # 6. 空白转下划线，合并多下划线
    col = re.sub(r"\s+", "_", col)
    col = re.sub(r"_+", "_", col)
    # 7. 去掉首尾下划线
    col = col.strip("_")
    # 8. 小写
    col = col.lower()
    # 9. 如果为空则填一个占位
    if not col:
        col = "col"
    return col

In [0]:
def make_unique_column_names(cols, use_unidecode=False):
    """
    输入：原始列名列表
    输出：unique_names 列表（与 cols 一一对应），以及 rename_map 原->新
    去重策略：第一次出现保持原规范名，后续冲突追加 _1,_2...
    也会处理以数字开头的列名（加 col_ 前缀）。
    """
    normalized = [normalize_column(c, use_unidecode) for c in cols]

    counts = defaultdict(int)
    unique_names = []
    rename_map = {}

    for idx, (orig, norm) in enumerate(zip(cols, normalized)):
        base = norm
        # 如果以数字开头，添加前缀避免 Spark/SQL 不兼容
        if re.match(r"^[0-9]", base):
            base = "col_" + base

        if counts[base] == 0:
            final = base
        else:
            # append suffix _1, _2 ...
            final = f"{base}_{counts[base]}"
        counts[base] += 1

        # 再检查 final 是否与已有任意 final 冲突（极少数情况）
        # 如果冲突，使用索引保证唯一
        if final in unique_names:
            final = f"{base}_{idx}"
        unique_names.append(final)
        rename_map[orig] = final

    return unique_names, rename_map

In [0]:
# 示例：在 Spark DataFrame 上重命名
# spark_df: 你的 DataFrame
orig_cols = spark_df.columns
unique_names, rename_map = make_unique_column_names(orig_cols, use_unidecode=False)  # or True

# 批量重命名（顺序一一对应）
for old, new in zip(orig_cols, unique_names):
    if old != new:
        spark_df = spark_df.withColumnRenamed(old, new)

# 打印映射以备审计
print("列名映射（原 -> 新）:")
for k, v in rename_map.items():
    print(f"{k!r} -> {v!r}")


spark_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("land_market_dev.20_datastore.transaction_detail")