In [None]:
import pandas as pd
from pandas import DataFrame
from typing import Dict
import numpy as np
from IPython.display import display
import re # 正则表达式模块
from IPython.display import HTML
from haversine import haversine, Unit 
import matplotlib.pyplot as plt
import matplotlib
import altair as alt
import warnings
from hanlp_restful import HanLPClient
import time
from sqlalchemy import create_engine
import psycopg2 # 导入psycopg2库，用于连接PostgreSQL数据库
import jieba
from wordcloud import WordCloud
from collections import Counter
from sqlalchemy.sql import text  # 添加此行导入 text
import json
from scipy.stats import norm


# 忽略 FutureWarning
warnings.simplefilter(action='ignore', category=FutureWarning)

## 导入csv数据

In [None]:
# 指定列名和对应的数据类型
dtype_mapping = {
    '2.小红书号id': str,
    '3.B站昵称': str,
    '4.抖音号': str,
    '5.邮箱': str,
    '6.公示用昵称': str,
    '7.小黑盒id': str,
    '11.身高（单位cm）': str,  # 暂时改为字符串，后续检查异常值
    '42.TA的月均收入_填空1': str,  # 暂时改为字符串
    '42.TA的月均收入_填空2': str,   # 暂时改为字符串
    '43.TA的个人总资产_填空1': str,  # 暂时改为字符串
    '43.TA的个人总资产_填空2': str,  # 暂时改为字符串
}
# 读取 CSV 文件
file_path = r"D:\code\相亲问卷数据存储\最后一场.csv"
df = pd.read_csv(file_path, dtype=dtype_mapping)
print(df.columns)
print(df.dtypes)


In [None]:
# 基础清理：去除所有字符串数据的前后空格
df1 = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)

# 删除指定列：UA、Referrer、智能清洗数据无效概率、清洗数据结果
columns_to_drop = ["UA", "Referrer", "智能清洗数据无效概率", "清洗数据结果", "语言", "自定义字段", '地理位置国家和地区', '地理位置省']
df1 = df1.drop(columns=columns_to_drop, errors="ignore")  # 使用 errors="ignore" 避免列不存在时报错

# 删除列名开头为 "Unnamed：" 的列
df1 = df1.loc[:, ~df1.columns.str.contains(r'^Unnamed')]

# 去除列名前面的数字和点（如 "1.列名" -> "列名"）
df1.columns = df1.columns.str.replace(r'^\d+\.', '', regex=True)

# 清理地址数据：去掉常驻地址字段中的 `/` 符号
df1['常驻地址(经度，纬度)'] = df1['常驻地址(经度，纬度)'].str.replace('/', '', regex=False)
df1['家乡地址(经度，纬度)'] = df1['家乡地址(经度，纬度)'].str.replace('/', '', regex=False)

# 将类型转换为数值，遇到错误设置为 NaN
df1['身高（单位cm）'] = pd.to_numeric(df1['身高（单位cm）'], errors='coerce')
df1['TA的月均收入（单位：元）_填空1'] = pd.to_numeric(df1['TA的月均收入（单位：元）_填空1'], errors='coerce')
df1['TA的月均收入（单位：元）_填空2'] = pd.to_numeric(df1['TA的月均收入（单位：元）_填空2'], errors='coerce')
df1['TA的个人总资产（单位：万元）_填空1'] = pd.to_numeric(df1['TA的个人总资产（单位：万元）_填空1'], errors='coerce')
df1['TA的个人总资产（单位：万元）_填空2'] = pd.to_numeric(df1['TA的个人总资产（单位：万元）_填空2'], errors='coerce')


In [None]:
print(df1.info())
print(df1.head(1))

### 创建更多信息

In [None]:
df_more = df1.copy()
print(df_more.info())

#### 基础信息处理

In [None]:
# 出生年份是出生日期的最后四个字符
df_more['出生年份'] = df_more['出生日期'].str[:4].astype(int)
print(df_more['出生年份'].head(2))

# BMI 按照公式计算
df_more['BMI'] = round(df_more['体重（单位kg）'] / (df_more['身高（单位cm）'] / 100) ** 2,2)
print(df_more['BMI'].head(2))

# 确保地址列数据格式正确，分离为经度和纬度
df_more[['常驻经度', '常驻纬度']] = df_more['常驻地址(经度，纬度)'].str.split('，', expand=True).astype(float)
df_more[['家乡经度', '家乡纬度']] = df_more['家乡地址(经度，纬度)'].str.split('，', expand=True).astype(float)
# 生产字段(纬度,经度)，用于计算距离
df_more['常驻坐标'] = list(zip(df_more['常驻纬度'], df_more['常驻经度']))
df_more['家乡坐标'] = list(zip(df_more['家乡纬度'], df_more['家乡经度']))
# 身高中的异常值处理，将"-"替换为"0"，并将条件整列的数据类型转换为int
df_more['身高（单位cm）'] = df_more['身高（单位cm）'].replace('-', 0).astype(float)
df_more['TA的身高（单位cm）:底线-最低'] = df_more['TA的身高（单位cm）:底线-最低'].replace('-', 0).astype(int)
df_more['TA的身高（单位cm）:底线-最高'] = df_more['TA的身高（单位cm）:底线-最高'].replace('-', 0).astype(int)
df_more['TA的身高（单位cm）:加分项-最低'] = df_more['TA的身高（单位cm）:加分项-最低'].replace('-', 0).astype(int)
df_more['TA的身高（单位cm）:加分项-最高'] = df_more['TA的身高（单位cm）:加分项-最高'].replace('-', 0).astype(int)

In [None]:
# 删除常驻地址(经度，纬度)
df_more1 = df_more.drop(columns=[ '常驻经度', '常驻纬度', '家乡经度', '家乡纬度', '常驻地址', '家乡地址'])
# 删除体重
df_more1 = df_more1.drop(columns=['体重（单位kg）', '出生日期'])

#### 生产MBTI字段

In [None]:
# 通用函数，用于根据输入字段解析MBTI类型
def 提取MBTI字段(字段: str, 映射: dict) -> str:
    匹配结果 = re.search(r'【(\d)】', 字段)
    if 匹配结果:
        数字 = 匹配结果.group(1)
        return 映射.get(数字, 'O')  # 如果匹配到数字但是不在映射中，输出报错“MBTI生成阶段错误：未知的数字”
    return 'O'  # 如果没有匹配到数字，返回 'O'

# 主函数，整合MBTI字段
def 生产MBTI字段(生活态度: str, 感知方式: str, 判断方式: str, 生活方式: str) -> str:
    映射表 = {
        "生活态度": {"1": "I", "2": "E"},
        "感知方式": {"1": "S", "2": "N"},
        "判断方式": {"1": "T", "2": "F"},
        "生活方式": {"1": "J", "2": "P"},
    }
    # 按照映射表生成MBTI字段
    MBTI = (
        提取MBTI字段(生活态度, 映射表["生活态度"])
        + 提取MBTI字段(感知方式, 映射表["感知方式"])
        + 提取MBTI字段(判断方式, 映射表["判断方式"])
        + 提取MBTI字段(生活方式, 映射表["生活方式"])
    )
    return MBTI

# DataFrame 中应用
df_more1['MBTI'] = df_more1.apply(
    lambda x: 生产MBTI字段(x['MBTI:生活态度【1：I】or【2：E】'], 
                          x['MBTI:感知方式【1：S】or【2：N】'],
                          x['MBTI:判断方式【1：T】or【2：F】'],
                          x['MBTI:生活方式【1：J】or【2：P】']
    ),axis=1
)
df_more1['TA的MBTI'] = df_more1.apply(
    lambda x: 生产MBTI字段(x['TA的MBTI（仅作为加分项）:生活态度【1：I】or【2：E】'],
                          x['TA的MBTI（仅作为加分项）:感知方式【1：S】or【2：N】'],
                          x['TA的MBTI（仅作为加分项）:判断方式【1：T】or【2：F】'],
                          x['TA的MBTI（仅作为加分项）:生活方式【1：J】or【2：P】']
    ),axis=1
)


# 查看前5行的MBTI和生活态度、感知方式、判断方式、生活方式字段
display(df_more1[['MBTI', 'MBTI:生活态度【1：I】or【2：E】', 'MBTI:感知方式【1：S】or【2：N】', 'MBTI:判断方式【1：T】or【2：F】','MBTI:生活方式【1：J】or【2：P】']].head(5))
display(df_more1[['TA的MBTI', 'TA的MBTI（仅作为加分项）:生活态度【1：I】or【2：E】', 'TA的MBTI（仅作为加分项）:感知方式【1：S】or【2：N】', 'TA的MBTI（仅作为加分项）:判断方式【1：T】or【2：F】','TA的MBTI（仅作为加分项）:生活方式【1：J】or【2：P】']].head(5))


In [None]:
# 删除MBTI:生活态度【1：I】or【2：E】、MBTI:感知方式【1：S】or【2：N】、MBTI:判断方式【1：T】or【2：F】、MBTI:生活方式【1：J】or【2：P】
df_more2 =df_more1.drop(columns=['MBTI:生活态度【1：I】or【2：E】', 'MBTI:感知方式【1：S】or【2：N】', 'MBTI:判断方式【1：T】or【2：F】', 'MBTI:生活方式【1：J】or【2：P】'])
df_more2 =df_more2.drop(columns=['TA的MBTI（仅作为加分项）:生活态度【1：I】or【2：E】', 'TA的MBTI（仅作为加分项）:感知方式【1：S】or【2：N】', 'TA的MBTI（仅作为加分项）:判断方式【1：T】or【2：F】', 'TA的MBTI（仅作为加分项）:生活方式【1：J】or【2：P】'])

In [None]:
print(df_more2.info())
print(df_more2.head(1))

#### 生成识别用id

In [None]:
# 输入一个 DataFrame，返回一个 DataFrame
def 生成识别用id字段(df: pd.DataFrame) -> pd.DataFrame:
    # 将字段重命名
    df.rename(columns={'你是在哪个平台收到这份问卷的？': '填写平台'}, inplace=True)
    
    # 创建空的 '识别用id' 列
    df['识别用id'] = None

    # 确保涉及拼接的字段为字符串类型，并处理 NaN 值
    df['公示用昵称'] = df['公示用昵称'].fillna('').astype(str)
    df['邮箱'] = df['邮箱'].fillna('').astype(str)
    df['小红书号id'] = df['小红书号id'].fillna('').astype(str)
    df['B站昵称'] = df['B站昵称'].fillna('').astype(str)
    df['抖音号'] = df['抖音号'].fillna('').astype(str)
    df['小黑盒id'] = df['小黑盒id'].fillna('').astype(str)

    # 使用矢量化逻辑处理每个平台的情况
    df.loc[df['填写平台'] == 'A.小红书', '识别用id'] = df['小红书号id']
    df.loc[df['填写平台'] == 'B.B站', '识别用id'] = df['B站昵称']
    df.loc[df['填写平台'] == 'C.抖音', '识别用id'] = df['抖音号']
    df.loc[df['填写平台'] == 'D.西瓜视频', '识别用id'] = df['抖音号']
    df.loc[df['填写平台'] == 'E.博客（需填写邮箱）', '识别用id'] = df['公示用昵称'] + '-邮箱前四位为' + df['邮箱'].str[:4]
    df.loc[df['填写平台'] == 'F.小黑盒', '识别用id'] = df['小黑盒id']
    
    return df

# 调用函数生成识别用 id 字段
df_more3 = 生成识别用id字段(df_more2)
# 别在这删除平台字段，不然后面筛除id会很麻烦


In [None]:
print(df_more3.info())

#### 生成地级市

In [None]:
# 数据库连接信息
user = 'postgres'
password = 'root2'  # 替换为你的数据库密码
host = 'localhost'
port = 5655 # 替换为你的数据库端口号
database = 'postgres'  # 替换为你的数据库名称
schema = 'public'  # 替换为你的架构名称
table_name = 'city_20250105'  # 替换为你的表名称

# 创建 PostgreSQL 数据库连接
try:
    # 直接创建连接
    conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
    conn.autocommit = True  # 自动提交模式

    # 设置搜索路径
    with conn.cursor() as cursor:
        cursor.execute(f'SET search_path TO {schema};')
    print("连接成功，并已设置搜索路径")

except psycopg2.Error as e:
    print(f"数据库连接失败: {e}")
finally:
    if conn:
        conn.close()
        
# 使用 SQLAlchemy 创建引擎
try:
    engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}?options=-csearch_path={schema}')
    print("SQLAlchemy 引擎创建成功")
except Exception as e:
    print(f"引擎创建失败: {e}")

In [None]:
# 定义查询函数
def 根据经纬度取城市(row,经纬度字段:str = '活动地址(经度，纬度)') -> str:
    # 预处理：将中文逗号“，”替换为英文逗号“,”
    coords = row[经纬度字段].replace('，', ',').strip()
    
    # 判断是否为空或只有逗号
    if not coords or coords == ',':
        return None  # 无效数据直接返回 None

    # 构建 SQL 查询语句
    query = f"""
    SELECT
        name
    FROM
        {table_name}
    WHERE
        deep = 1
        AND ST_Intersects(
            polygon,
            ST_SetSRID(ST_Point({coords}), 0) -- 保持SRID一致
        )
    ORDER BY id
    LIMIT 1;
    """

    # 执行查询
    try:
        result = pd.read_sql(query, engine)  # 查询数据库
        if not result.empty:
            return result.iloc[0]['name']  # 返回第一行的 name 字段
        return None
    except Exception as e:
        print(f"查询失败2: {e}")
        return None

In [None]:
df_more4 = df_more3.copy()

In [None]:
# 使用 apply 方法调用函数时无需显式传递 row 参数
df_more4['常驻市'] = df_more4.apply(lambda row: 根据经纬度取城市(row, "常驻地址(经度，纬度)"), axis=1)
df_more4['家乡市'] = df_more4.apply(lambda row: 根据经纬度取城市(row, "家乡地址(经度，纬度)"), axis=1)

# 显示结果
display(df_more4[['识别用id', '常驻市', '家乡市', '地理位置市', '常驻地址(经度，纬度)']].head(10))

In [None]:
# 删除常驻地址(经度，纬度)
df_more4 = df_more4.drop(columns=[ '地理位置市', '常驻地址(经度，纬度)', '家乡地址(经度，纬度)'])

#### 简化多选题匹配

In [None]:
df_more5 = df_more4.copy()

##### 提取英文字母函数

In [None]:
# 提取符合规则的字母组合并拼接成字符串
def extract_and_concat_letters(row):
    # 提取每列中的有效标签
    valid_matches = []
    pattern = r'([A-Z]{1,2}\.)'  # 匹配规则

    for col in row.dropna():  # 遍历非空列
        col_text = str(col)  # 转为字符串
        matches = re.findall(pattern, col_text)  # 提取匹配项
        valid_matches.extend(matches)  # 累积匹配结果

    # 拼接所有提取的标签
    return ''.join(valid_matches).strip()  # 移除首尾空格


In [None]:
# 筛选出列名包含 "兴趣爱好:" 的列
包含兴趣爱好的列 = df_more5.columns[df_more5.columns.str.contains("兴趣爱好:")]
df_more5['兴趣爱好'] = df_more5[包含兴趣爱好的列].apply(extract_and_concat_letters, axis=1)

# 筛选出列名包含 "勾选与你相符的描述:" 的列
包含相符描述的列 = df_more5.columns[df_more5.columns.str.contains("勾选与你相符的描述:")]
df_more5['相符描述'] = df_more5[包含相符描述的列].apply(extract_and_concat_letters, axis=1)

# 筛选出列名包含 "勾选TA需要符合的特点（仅作为加分项）:" 的列
包含TA相符描述的列 = df_more5.columns[df_more5.columns.str.contains("勾选与TA相符的描述（仅作为加分项）")]
df_more5['TA的相符描述'] = df_more5[包含TA相符描述的列].apply(extract_and_concat_letters, axis=1)

# 筛选出列名包含 "TA的学历（底线）:" 的列
包含学历底线 = df_more5.columns[df_more5.columns.str.contains("TA的学历（底线）:")]
df_more5['TA的学历（底线）'] = df_more5[包含学历底线].apply(extract_and_concat_letters, axis=1)

# 筛选出列名包含 "TA的学历（加分项）:" 的列
包含学历加分项 = df_more5.columns[df_more5.columns.str.contains("TA的学历（加分项）:")]
df_more5['TA的学历（加分项）'] = df_more5[包含学历加分项].apply(extract_and_concat_letters, axis=1)

# 筛选出列名包含 "TA的工作性质（仅作为加分项）:" 的列
包含工作性质 = df_more5.columns[df_more5.columns.str.contains("TA的工作性质（仅作为加分项）:")]
df_more5['TA的工作性质（仅作为加分项）'] = df_more5[包含工作性质].apply(extract_and_concat_letters, axis=1)


###### 提取字母对应的含义

In [None]:
'''提取兴趣爱好
1. 查看所有包含兴趣爱好的列的列名
    由于列名按照顺序的，所以可以根据列的数量创造一个字典，并给设置字母
2. 提取每个列名中冒号后面的文本，将其和字母按顺序映射起来。这里可能需要定义一个函数，用于提取冒号后面的文本。
'''
# 查看所有包含兴趣爱好的列的列名
print(包含兴趣爱好的列)

def 数字转字母序列(n: int) -> str:
    """
    将数字转换为字母序列（A, B, ..., Z, AA, AB, ..., AZ, BA, BB, ...）
    """
    结果 = ""
    while n > 0:
        n -= 1  # 因为 A 对应 0，B 对应 1，...
        结果 = chr(65 + n % 26) + 结果  # 65 是 'A' 的 ASCII 码
        n = n // 26
    return 结果

def 提取标签字典(list: list[str]) -> Dict[str, str]:
    pattern = r':(.*)'  # 匹配规则
    结果字典 = {}  # 创建一个空字典
    
    # 遍历列表中每一个元素，生成字母序列作为键，并提取出冒号后面的文本加入字典
    for i, col in enumerate(list):
        match = re.search(pattern, col)
        if match:  # 如果匹配成功
            键 = 数字转字母序列(i + 1)  # 生成字母序列（从 A 开始）
            结果字典[键] = match.group(1).strip()  # 提取内容并去除前后空格
    
    return 结果字典

兴趣爱好标签 = 提取标签字典(包含兴趣爱好的列)
display(兴趣爱好标签)

In [None]:
# 删除包含兴趣爱好的列、包含相符描述的列、包含TA相符描述的列、包含学历底线、包含学历加分项、包含工作性质
df_more6 = df_more5.drop(columns=包含兴趣爱好的列)
df_more6 = df_more6.drop(columns=包含相符描述的列)
df_more6 = df_more6.drop(columns=包含TA相符描述的列)
df_more6 = df_more6.drop(columns=包含学历底线)
df_more6 = df_more6.drop(columns=包含学历加分项)
df_more7 = df_more6.drop(columns=包含工作性质)

#### 提取赋权排序

In [None]:
def 提取排序字典(row):
    # 从列名中提取冒号后的字段名，并将这些字段与对应的行值映射成字典。
    pattern = r":(.*)"
    result = {}

    for col in row.index:  # 遍历列名
        match = re.search(pattern, col)
        if match:
            key = match.group(1).strip()
            value = row[col]
            result[key] = value
    # 将字典按值升序排序
    sorted_result = dict(sorted(result.items(), key=lambda item: item[1]))
    
    return sorted_result  # 返回排序后的字典

In [None]:
df_more8 = df_more7.copy()
# 提取列名
包含看重的点进行排序的列 = df_more7.columns[df_more7.columns.str.contains("给你看重的点进行排序:")]
df_more8['赋权排序'] = df_more7[包含看重的点进行排序的列].apply(提取排序字典, axis=1)
# 把所有 numpy 类型值转换为 Python 原生类型
def safe_dict_to_json(d):
    clean_dict = {k: int(v) if isinstance(v, (np.integer, np.int64)) else v for k, v in d.items()}
    return json.dumps(clean_dict, ensure_ascii=False)

df_more8['赋权排序_json'] = df_more8['赋权排序'].apply(safe_dict_to_json)

In [None]:
df_more9 = df_more8.drop(columns=包含看重的点进行排序的列)
df_more9 = df_more9.drop(columns=['赋权排序'])

In [None]:
df_more_end = df_more9.copy()
df_more_end.info()

### 筛除无效问卷

In [None]:
df_clean = df_more_end.copy()
print(df_clean.info())

基础原则：不应当规定底线和加分项两个值之间哪个更大。

In [None]:
'''年龄大于等于 18 岁 
提取当前的年份，
与出生年份进行计算，得到年龄，
筛选出大于等于 18 岁的数据
'''
# 提取当前年份
current_year = pd.Timestamp.now().year
# 计算年龄
df_clean0 = df_clean.copy()
df_clean0['年龄'] = current_year - df_clean0['出生年份']
# 筛选年龄大于等于 18 岁的数据
df_clean0 = df_clean0[df_clean0['年龄'] >= 18]
print(f'0：{df_clean0.shape[0]}')
# 删除年龄字段
df_clean0 = df_clean0.drop(columns=['年龄'])

# 答题时长大于 2 分钟的数据
df_clean1 = df_clean0[df_clean0["答题时长"] > 120]
print(f'1：{df_clean1.shape[0]}')

# IP 重复的数据，仅保留结束答题时间最新的一条
df_clean2 = df_clean1.sort_values(by="结束答题时间")
df_clean2 = df_clean2.drop_duplicates(subset=["IP"], keep="last")
# 平台&识别用id 同时重复的数据，仅保留结束答题时间最新的一条
df_clean2 = df_clean2.sort_values(by="结束答题时间")
df_clean2 = df_clean2.drop_duplicates(subset=["填写平台", "识别用id"], keep="last")
print(f'2：{df_clean2.shape[0]}')

# BMI最低底线数字应小于等于30，最高底线数字应大于等于10
df_clean3 = df_clean2[df_clean2["TA的身体质量指数BMI:底线-最低"] <= 30]
df_clean3 = df_clean2[df_clean2["TA的身体质量指数BMI:底线-最高"] >= 10]
print(f'3：{df_clean3.shape[0]}')

""" 
个人总资产应当小于等于家庭总资产
TA的个人总资产和家庭总资产之间不需要筛选，有很多人不在意对方的家庭总资产，在家庭总资产方面要求奇低
"""
df_clean4 = df_clean3[df_clean3["个人总资产（单位：万元）"] <= df_clean3["家庭总资产（单位：万元）"]]
print(f'4：{df_clean4.shape[0]}')

# 距离的底线应当大于0km
df_clean5 = df_clean4[df_clean4["TA的地址与你的距离（单位km）:常驻地址相距-底线"] > 0]
df_clean5 = df_clean5[df_clean5["TA的地址与你的距离（单位km）:家乡地址相距-底线"] > 0]
print(f'5：{df_clean5.shape[0]}')

# 收入和资产值不为空值
df_clean6 = df_clean5.copy()
df_clean6 = df_clean5[df_clean5["TA的月均收入（单位：元）_填空1"].notnull()]
df_clean6 = df_clean6[df_clean6["TA的月均收入（单位：元）_填空2"].notnull()]
df_clean6 = df_clean6[df_clean6["TA的个人总资产（单位：万元）_填空1"].notnull()]
df_clean6 = df_clean6[df_clean6["TA的个人总资产（单位：万元）_填空2"].notnull()]
print(f'6：{df_clean6.shape[0]}')

#### 根据平台规则过滤id

In [None]:
# 示例校验规则
field_rules = {
    "小红书号id": r'^[A-Za-z0-9_]+$',  # 只允许字母、数字和下划线
    "B站昵称": r'.*',                 # 无限制
    "抖音号": r'^\d+$',               # 只允许数字
    "邮箱": r'^[^@]+@[^@]+\.[^@]+$',  # 邮箱格式校验
    "小黑盒id": r'^\d+$'              # 只允许数字
}

# 复制数据
df_clean7 = df_clean6.copy()
print(f'7: {df_clean7.shape[0]}')
df_clean8 = df_clean7.copy()

# 根据字段名和正则表达式进行校验
for field, regex in field_rules.items():
    if field in df_clean8.columns:  # 确保字段存在
        # 重新将空字符串转换为 NaN
        df_clean8[field] = df_clean8[field].replace("", pd.NA)
        # 校验字段，非空时必须匹配正则表达式
        df_clean8 = df_clean8[df_clean8[field].str.fullmatch(regex, na=True)] # na=True 表示 NaN 值会被视为匹配

print(f'校验后的数据行数: {df_clean8.shape[0]}')


In [None]:
df_clean9 = df_clean8.drop(columns=['小红书号id', 'B站昵称', '抖音号', '公示用昵称', '小黑盒id'])
df_clean9 = df_clean8.drop(columns=['开始答题时间'])
df_clean9.info()
print(df_clean7.shape[0])
print(df_clean9.shape[0])

#### 调整范围大小项顺序

In [None]:
df_clean10 = df_clean9.copy()
# 查看第一行的'TA的年龄_填空1', 'TA的年龄_填空2'
print(df_clean10[['TA的年龄_填空1', 'TA的年龄_填空2','TA的身高（单位cm）:底线-最低', 'TA的身高（单位cm）:底线-最高']].head(5))
columns_to_swap = [
        ('TA的年龄_填空1', 'TA的年龄_填空2'),  # 年龄底线
        ('TA的年龄_填空3', 'TA的年龄_填空4'),  # 年龄加分项
        ('TA的身高（单位cm）:底线-最低', 'TA的身高（单位cm）:底线-最高'),  # 身高底线
        ('TA的身高（单位cm）:加分项-最低', 'TA的身高（单位cm）:加分项-最高'),  # 身高加分项
        ('TA的身体质量指数BMI:底线-最低', 'TA的身体质量指数BMI:底线-最高'),  # BMI底线
        ('TA的身体质量指数BMI:加分项-最低', 'TA的身体质量指数BMI:加分项-最高'),  # BMI加分项
        ('TA在现实生活中成熟的恋爱次数（仅作为加分项）_填空1', 'TA在现实生活中成熟的恋爱次数（仅作为加分项）_填空2'),  # 恋爱次数
    ]
def 假设我们角色互换 (df:DataFrame, columns_to_swap: list[tuple[str, str]]) ->DataFrame:
    # 对一对列进行交换处理
    for col1, col2 in columns_to_swap:
        if col1 in df.columns and col2 in df.columns:
            # 保证 col1 低于 col2
            df[[col1, col2]] = df[[col1, col2]].apply(lambda x: pd.Series(sorted(x)), axis=1)

    return df

df_clean10 = 假设我们角色互换(df_clean10, columns_to_swap)

print(df_clean10[['TA的年龄_填空1', 'TA的年龄_填空2','TA的身高（单位cm）:底线-最低', 'TA的身高（单位cm）:底线-最高']].head(5))

In [None]:
df_clean11 = df_clean10.copy()
# 获取当前列的顺序
当前列顺序 = df_clean11.columns.tolist()

# 将'id'列移动到第三位置（索引2）
当前列顺序.insert(2, 当前列顺序.pop(当前列顺序.index('识别用id')))

# 重新排序数据框的列
df_clean11 = df_clean11[当前列顺序]

#### 缩短列名

In [None]:
def 输出列名和字节长度(df: pd.DataFrame) -> None:
    # 创建列名和字节长度的字典
    列名和字节长度 = {'列名': [], '字节长度': []}
    # 遍历列名
    for col in df.columns:
        if len(col.encode('utf-8')) >= 50:
            # 获取列名的字节长度（UTF-8 编码）
            列名和字节长度['列名'].append(col)
            列名和字节长度['字节长度'].append(len(col.encode('utf-8')))  # 计算 UTF-8 字节长度
            
    # 如果字典不是空的，展示并报错
    if 列名和字节长度 != {'列名': [], '字节长度': []}:
        with pd.option_context('display.max_rows', None, 'display.max_colwidth', None):
            display(列名和字节长度)
            raise ValueError('列名长度超过 50 字节，请修改列名！')
    else:
        print('所有列名长度均符合要求！')

In [None]:
'''
{'列名': ['底线类生活习惯（给自己打分）:不抽烟就会死',
  '底线类生活习惯（给自己打分）:一周不喝酒就会死',
  '底线类生活习惯（给自己打分）:一周不吃槟榔就会死',
  '底线类生活习惯（给自己打分）:12点前睡觉就会死',
  '底线类生活习惯（给自己打分）:10点前不回家会很疲惫',
  '底线类生活习惯（给自己打分）:喜欢养宠物',
  '底线类生活习惯（设置自己的可接受范围）:不抽烟就会死',
  '底线类生活习惯（设置自己的可接受范围）:一周不喝酒就会死',
  '底线类生活习惯（设置自己的可接受范围）:一周不吃槟榔就会死',
  '底线类生活习惯（设置自己的可接受范围）:12点前睡觉就会死',
  '底线类生活习惯（设置自己的可接受范围）:10点前不回家会很疲惫',
  '底线类生活习惯（设置自己的可接受范围）:喜欢养宠物']
'''

# 缩短列名，建立映射关系
缩短_列名映射 = {
    '经济情况 - 其他信息:是否是独生子女': '经济情况_其他信息:是否是独生子女',
    '经济情况 - 其他信息:父母是否都有养老保险': '经济情况_其他信息:父母养老保险',
    '经济情况 - 其他信息:是否是单亲家庭': '经济情况_其他信息:是否是单亲家庭',
    '生活习惯:掏出去每一分钱都是不得不花的': '生活习惯:掏的钱都是不得不花的',
    'TA的地址与你的距离（单位km）:常驻地址相距-底线': 'TA地址与你相距:常驻地址-底线',
    'TA的地址与你的距离（单位km）:常驻地址相距-加分项': 'TA地址与你相距:常驻地址-加分项',
    'TA的地址与你的距离（单位km）:家乡地址相距-底线': 'TA地址与你相距:家乡地址-底线',
    'TA的地址与你的距离（单位km）:家乡地址相距-加分项': 'TA地址与你相距:家乡地址-加分项',
    'TA的经济情况 - 其他信息:TA是独生子女': 'TA经济情况_其他信息:TA是独生子女',
    'TA的经济情况 - 其他信息:TA的父母有养老保险': 'TA经济情况_其他信息:TA父母养老保险',
    'TA的经济情况 - 其他信息:TA的情况是单亲家庭': 'TA经济情况_其他信息:TA是单亲家庭',
    'TA在现实生活中成熟的恋爱次数（仅作为加分项）_填空1': 'TA成熟的恋爱次数_填空1',
    'TA在现实生活中成熟的恋爱次数（仅作为加分项）_填空2': 'TA成熟的恋爱次数_填空2',
    '勾选与TA相符的描述（仅作为加分项）:会做家常菜': '勾选与TA相符的描述:会做家常菜',
    '勾选与TA相符的描述（仅作为加分项）:会一种乐器': '勾选与TA相符的描述:会一种乐器',
    '勾选与TA相符的描述（仅作为加分项）:唱歌在调上': '勾选与TA相符的描述:唱歌在调上',
    '勾选与TA相符的描述（仅作为加分项）:每月会有8h以上时间用于运动': '勾选与TA相符的描述:每月8h以上运动',
    '勾选与TA相符的描述（仅作为加分项）:每月会有4h以上时间用于阅读': '勾选与TA相符的描述:每月4h以上阅读',
    '勾选与TA相符的描述（仅作为加分项）:大眼睛': '勾选与TA相符的描述:大眼睛',
    '勾选与TA相符的描述（仅作为加分项）:爱笑': '勾选与TA相符的描述:爱笑',
    '勾选与TA相符的描述（仅作为加分项）:皮肤白': '勾选与TA相符的描述:皮肤白',
    '勾选与TA相符的描述（仅作为加分项）:花时间打扮自己': '勾选与TA相符的描述:花时间打扮自己',
    '勾选与TA相符的描述（仅作为加分项）:沉默寡言': '勾选与TA相符的描述:沉默寡言',
    '勾选与TA相符的描述（仅作为加分项）:乐于分享': '勾选与TA相符的描述:乐于分享',
    '勾选与TA相符的描述（仅作为加分项）:抽象乐子人': '勾选与TA相符的描述:抽象乐子人',
    '勾选与TA相符的描述（仅作为加分项）:性格内向': '勾选与TA相符的描述:性格内向',
    '底线类生活习惯（给自己打分）:不抽烟就会死': '底线类生活习惯:抽烟',
    '底线类生活习惯（给自己打分）:一周不喝酒就会死': '底线类生活习惯:喝酒',
    '底线类生活习惯（给自己打分）:一周不吃槟榔就会死': '底线类生活习惯:槟榔',
    '底线类生活习惯（给自己打分）:12点前睡觉就会死': '底线类生活习惯:熬夜',
    '底线类生活习惯（给自己打分）:10点前不回家会很疲惫': '底线类生活习惯:早回家',
    '底线类生活习惯（给自己打分）:喜欢养宠物': '底线类生活习惯:养宠物',
    '底线类生活习惯（设置自己的可接受范围）:不抽烟就会死': '底线类生活习惯范围:抽烟',
    '底线类生活习惯（设置自己的可接受范围）:一周不喝酒就会死': '底线类生活习惯范围:喝酒',
    '底线类生活习惯（设置自己的可接受范围）:一周不吃槟榔就会死': '底线类生活习惯范围:槟榔',
    '底线类生活习惯（设置自己的可接受范围）:12点前睡觉就会死': '底线类生活习惯范围:熬夜',
    '底线类生活习惯（设置自己的可接受范围）:10点前不回家会很疲惫': '底线类生活习惯范围:早回家',
    '底线类生活习惯（设置自己的可接受范围）:喜欢养宠物': '底线类生活习惯范围:养宠物'
}

# 使用 pandas 重命名列名
df_clean12 = df_clean11.rename(columns=缩短_列名映射).copy()

# 示例使用
输出列名和字节长度(df_clean12)


In [None]:
df_cleaned = df_clean12.copy()

In [None]:
print(df_cleaned.info())
display(df_cleaned.head(5))

#### 问卷无效原因

In [None]:
# 创建一个空字典，应有两列，第一列是原因，第二列是值，用于存储每一次筛除的结果
筛除结果 = {'原因': [], '值': [], '占比': []}
print(df.shape[0])
"""
原因0:年龄小于18岁，值=df_clean.shape[0] - df_clean0.shape[0]，
原因1：答题时长少于2分钟，值=df_clean.shape[0] - df_clean1.shape[0]，
占比=（df_clean.shape[0] - df_clean1.shape[0]）/（df.shape[0] - df_cleaned.shape[0]）
原因2：IP&ID重复，值=df_clean1.shape[0] - df_clean2.shape[0]，
占比=（df_clean1.shape[0] - df_clean2.shape[0]）/（df.shape[0] - df_cleaned.shape[0]）
原因3：BMI底线不符合，值=df_clean2.shape[0] - df_clean3.shape[0]，
占比=（df_clean2.shape[0] - df_clean3.shape[0]）/（df.shape[0] - df_cleaned.shape[0]）
原因4：个人总资产大于家庭总资产，值=df_clean3.shape[0] - df_clean4.shape[0]，
占比=（df_clean3.shape[0] - df_clean4.shape[0]）/（df.shape[0] - df_cleaned.shape[0]）
原因5：距离底线不符合，值=df_clean4.shape[0] - df_clean5.shape[0]，
占比=（df_clean4.shape[0] - df_clean5.shape[0]）/（df.shape[0] - df_cleaned.shape[0]）
原因6：ID校验不通过，值=df_clean9.shape[0] - df_clean7.shape[0]，
占比=（df_clean8.shape[0] - df_clean6.shape[0]）/（df.shape[0] - df_cleaned.shape[0]）
"""
# 输入字典
筛除结果['原因'] = ['年龄小于18岁','答题时长少于2分钟', 'IP&ID重复', 'BMI底线不符合', '个人总资产大于家庭总资产', '距离底线不符合', '值超出范围', 'ID校验不通过']
筛除结果['值'] = [df_clean.shape[0] - df_clean0.shape[0], 
             df_clean0.shape[0] - df_clean1.shape[0], 
             df_clean1.shape[0] - df_clean2.shape[0], 
             df_clean2.shape[0] - df_clean3.shape[0], 
             df_clean3.shape[0] - df_clean4.shape[0], 
             df_clean4.shape[0] - df_clean5.shape[0], 
             df_clean5.shape[0] - df_clean6.shape[0], 
             df_clean7.shape[0] - df_clean9.shape[0]]

筛除结果['占比'] = [(df_clean.shape[0] - df_clean0.shape[0])/(df.shape[0] - df_cleaned.shape[0]), 
              (df_clean0.shape[0] - df_clean1.shape[0])/(df.shape[0] - df_cleaned.shape[0]), 
              (df_clean1.shape[0] - df_clean2.shape[0])/(df.shape[0] - df_cleaned.shape[0]), 
              (df_clean2.shape[0] - df_clean3.shape[0])/(df.shape[0] - df_cleaned.shape[0]), 
              (df_clean3.shape[0] - df_clean4.shape[0])/(df.shape[0] - df_cleaned.shape[0]), 
              (df_clean4.shape[0] - df_clean5.shape[0])/(df.shape[0] - df_cleaned.shape[0]), 
              (df_clean5.shape[0] - df_clean6.shape[0])/(df.shape[0] - df_cleaned.shape[0]), 
              (df_clean7.shape[0] - df_clean9.shape[0])/(df.shape[0] - df_cleaned.shape[0])]

display(pd.DataFrame(筛除结果))

### 导入数据库

In [None]:
df_导入数据库 = df_cleaned.copy()

In [None]:
# 数据库连接信息
user = 'postgres'
password = 'root2'
host = 'localhost'
port = 5655 # 替换为你的数据库端口号
database = 'postgres'  # 替换为你的数据库名称
schema = '赛博相亲'  # 替换为你的架构名称
table_name = 'v1_2问卷填写数据存储_20250405'  # 替换为你的表名称

# 创建 PostgreSQL 数据库连接和引擎
try:
    # 使用 SQLAlchemy 创建引擎
    engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}?options=-csearch_path={schema}')
    print("SQLAlchemy 引擎创建成功")

    # 检查表是否存在
    with engine.begin() as conn:
        check_table_query = f"""
        SELECT EXISTS (
            SELECT FROM information_schema.tables 
            WHERE table_schema = '{schema}' 
            AND table_name = '{table_name}'
        );
        """
        table_exists = conn.execute(text(check_table_query)).scalar()

        # 根据表的存在性插入数据
        if not table_exists:
            # 如果表不存在，直接创建表并插入数据
            print(f"表 {schema}.{table_name} 不存在，正在创建...")
            df_导入数据库.to_sql(table_name, conn, schema=schema, if_exists='replace', index=False)
            print(f"表 {schema}.{table_name} 创建并成功插入数据")
        else:
            # 如果表存在，避免插入完全重复的数据
            print(f"表 {schema}.{table_name} 已存在，正在检查重复记录...")
            # 将目前的数据替换至数据库中的临时表
            temp_table_name = f"{table_name}_temp"
            
            # 获取数据库表的列名
            columns_query = f"""
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_schema = '{schema}' AND table_name = '{table_name}'
            ORDER BY ordinal_position;
            """
            columns = pd.read_sql(columns_query, conn)['column_name'].tolist()
            # 给所有列名加上双引号，避免 SQL 解析错误
            columns_str = ', '.join([f'"{col}"' for col in columns])
            
            
            df_导入数据库.to_sql(temp_table_name, conn, schema=schema, if_exists='replace', index=False)
            # 直接使用sql进行操作，查询临时表与数据库表的差集
            diff_query = f"""
            SELECT {columns_str} FROM {schema}.{temp_table_name}
            EXCEPT
            SELECT {columns_str} FROM {schema}.{table_name};
            """
            diff_df = pd.read_sql(diff_query, conn)
            # 如果有差集，插入差集数据
            diff_df.to_sql(table_name, conn, schema=schema, if_exists='append', index=False)
            # 打印原本有多少行，导入了多少行
            print(f"表 {schema}.{table_name} 原有 {df_导入数据库.shape[0]} 行数据，导入了 {diff_df.shape[0]} 行数据")
            # 如果临时表存在，删除临时表
            drop_temp_table_query = f"""
            DROP TABLE IF EXISTS {schema}.{temp_table_name};
            """
            # 删除临时表
            conn.execute(text(drop_temp_table_query))
            
except Exception as e:
    print(f"发生错误: {e}")

In [None]:
# 导出库中的数据
df_cleaned = pd.read_sql(f"SELECT * FROM {schema}.{table_name}", engine)
# 去掉括号并拆分为经纬度
df_cleaned['常驻坐标'] = df_cleaned['常驻坐标'].str.strip('()')
df_cleaned['家乡坐标'] = df_cleaned['家乡坐标'].str.strip('()')

# 分离为经度和纬度
df_cleaned[['常驻纬度', '常驻经度']] = df_cleaned['常驻坐标'].str.split(',', expand=True).astype(float)
df_cleaned[['家乡纬度', '家乡经度']] = df_cleaned['家乡坐标'].str.split(',', expand=True).astype(float)

# 生成字段(纬度,经度)，用于计算距离
df_cleaned['常驻坐标'] = list(zip(df_cleaned['常驻纬度'], df_cleaned['常驻经度']))
df_cleaned['家乡坐标'] = list(zip(df_cleaned['家乡纬度'], df_cleaned['家乡经度']))

# 删除经度和纬度列
df_cleaned = df_cleaned.drop(columns=['常驻经度', '常驻纬度', '家乡经度', '家乡纬度'])
print(df_cleaned.info())

#### 仅保留最新数据

In [None]:
# IP 重复的数据，仅保留结束答题时间最新的一条
df_cleaned = df_cleaned.sort_values(by="结束答题时间")
df_cleaned = df_cleaned.drop_duplicates(subset=["IP"], keep="last")
# 平台&识别用id 同时重复的数据，仅保留结束答题时间最新的一条
df_cleaned = df_cleaned.sort_values(by="结束答题时间")
df_cleaned = df_cleaned.drop_duplicates(subset=["填写平台", "识别用id"], keep="last")
print(f'2：{df_cleaned.shape[0]}')

In [None]:
# 删除结束答题时间列和ip列
df_cleaned = df_cleaned.drop(columns=['结束答题时间', 'IP'])

### 输出无效名单

In [None]:
# df_more_end 中包含的识别用id与填写平台，但不在 df_cleaned 中。
## 创建临时用的唯一识别符
df_cleaned['唯一识别符'] = df_cleaned['识别用id'] + df_cleaned['填写平台']
df_more_end['唯一识别符'] = df_more_end['识别用id'] + df_more_end['填写平台']
## 找出 df_more_end 中包含的识别用id与填写平台，但不在 df_cleaned 中。
df_无效名单 = df_more_end[~df_more_end['唯一识别符'].isin(df_cleaned['唯一识别符'])]
## 删除临时列
df_cleaned = df_cleaned.drop(columns=['唯一识别符'])
df_more_end = df_more_end.drop(columns=['唯一识别符'])
## 创建新列，列内数值为当日日期
df_无效名单['日期'] = pd.to_datetime('today').date()

In [None]:
# 使用 pandas 重命名列名
df_无效名单 = df_无效名单.rename(columns=缩短_列名映射).copy()

# 示例使用
输出列名和字节长度(df_无效名单)

无效名单在输出成功数据处输出

## 两两匹配

In [None]:
df_未两两匹配 = df_cleaned.copy()
df_未两两匹配 = df_未两两匹配.drop(columns=['答题时长'])

In [None]:
# 生成笛卡尔积（两两匹配）
df_前置底线筛选 = df_未两两匹配.merge(df_未两两匹配, how='cross')

# 保留不相等的情况，分别比较两个字段
df_前置底线筛选 = df_前置底线筛选[
    (df_前置底线筛选['填写平台_x'] != df_前置底线筛选['填写平台_y']) | 
    (df_前置底线筛选['识别用id_x'] != df_前置底线筛选['识别用id_y'])
]

# 输出结果
print(df_前置底线筛选.info())
print(df_前置底线筛选.head())


#### 去除匹配过期的情况

In [None]:
# 数据库连接信息
user = 'postgres'
password = 'root2'
host = 'localhost'
port = 5655 # 替换为你的数据库端口
database = 'postgres'  # 替换为你的数据库名称
schema = '赛博相亲'  # 替换为你的架构名称
table_name_过期 = '匹配过期记录'  # 替换为你的表名称

# 创建 PostgreSQL 数据库连接
try:
    conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
    conn.autocommit = True  # 如果需要创建数据库，保持自动提交

    # 设置搜索路径
    with conn.cursor() as cursor:
        cursor.execute(f'SET search_path TO {schema}')
    
    print("连接成功并已设置搜索路径")
except psycopg2.Error as e:
    print(f"数据库连接失败: {e}")
finally:
    if conn:
        conn.close()

# 创建 postgresql 数据库连接
engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}?options=-csearch_path={schema}')

# 检查表是否存在
with engine.begin() as conn:
    匹配过期取数 = f"""
    select * from {schema}.{table_name_过期} where 日期 != current_date
    """

    df_去除匹配过期 = pd.read_sql(匹配过期取数, conn)
    
    print(f"表 {schema}.{table_name_过期} 已存在，已取出过期记录")

In [None]:
# 在df_前置底线筛选中，删除匹配过期的数据
df_去除匹配过期 = df_前置底线筛选.merge(df_去除匹配过期, 
                               left_on=['识别用id_x', '识别用id_y'], 
                               right_on=['id_x', 'id_y'], 
                               how='inner')

# 删除匹配过期的数据
df_前置底线筛选 = df_前置底线筛选.merge(df_去除匹配过期[['识别用id_x', '识别用id_y']], 
                                   on=['识别用id_x', '识别用id_y'], 
                                   how='left', 
                                   indicator=True).query('_merge == "left_only"').drop(columns=['_merge'])

### 前置纯底线筛选

#### 性取向（纯底线）

In [None]:
# 定义性别匹配规则（将其改为集合更易于快速查找）
性别匹配 = {
    ("A.男", "A.男"),
    ("B.其他", "A.男"),
    ("B.其他", "B.其他"),
    ("B.其他", "C.女"),
    ("C.女", "C.女"),
}

# 生成性别匹配结果
df_前置底线筛选['底线：性取向'] = df_前置底线筛选.apply(
    lambda row: (
        # 检查生理性别匹配
        (row['性别&性取向:TA的生理性别_x'], row['性别&性取向:自身生理性别_y']) in 性别匹配
        # 检查心理性别匹配
        and (row['性别&性取向:TA的心理性别_x'], row['性别&性取向:自身心理性别_y']) in 性别匹配
    ),
    axis=1
)

print(df_前置底线筛选['底线：性取向'].head(2))
# 转换布尔值为0和1
df_前置底线筛选['底线：性取向'] = df_前置底线筛选['底线：性取向'].apply(lambda x: 0 if x else 1)
print(df_前置底线筛选['底线：性取向'].head(2))


In [None]:
df_前置筛选_性取向通过 = df_前置底线筛选[df_前置底线筛选['底线：性取向'] == 0].copy()
print(df_前置筛选_性取向通过.info())

#### 是否打算在两年内结婚（纯底线）

In [None]:
结婚意愿匹配 = {
    ("A.两年内结婚", "A.两年内结婚"),
    ("A.两年内结婚", "B.看感觉，有合适的人2年内结婚"),
    ("B.看感觉，有合适的人2年内结婚", "A.两年内结婚"),
    ("B.看感觉，有合适的人2年内结婚", "B.看感觉，有合适的人2年内结婚"),
    ("C.2年内不结婚", "C.2年内不结婚"),
}
# 生成匹配结果
df_前置筛选_性取向通过['底线：是否打算在两年内结婚'] = df_前置筛选_性取向通过.apply(
    lambda row: (row['是否打算在2年内结婚？_x'], row['是否打算在2年内结婚？_y']) in 结婚意愿匹配,
    axis=1
)

print(df_前置筛选_性取向通过[['底线：是否打算在两年内结婚','是否打算在2年内结婚？_x','是否打算在2年内结婚？_y']].head(2))
# 转换布尔值为0和1
df_前置筛选_性取向通过['底线：是否打算在两年内结婚'] = df_前置筛选_性取向通过['底线：是否打算在两年内结婚'].apply(lambda x: 0 if x else 1)
print(df_前置筛选_性取向通过['底线：是否打算在两年内结婚'].head(2))

In [None]:
df_前置筛选_结婚意愿通过 = df_前置筛选_性取向通过[df_前置筛选_性取向通过['底线：是否打算在两年内结婚'] == 0].copy()
print(df_前置筛选_结婚意愿通过.info())

#### 宗教信仰（纯底线）

In [None]:
"""列名
宗教信仰_填空1
宗教信仰_填空2
"""

# 生成匹配结果
df_前置筛选_结婚意愿通过['底线：宗教信仰'] = df_前置筛选_结婚意愿通过.apply(
    lambda row: True if row['宗教信仰_填空2_x'] == "是" else row['宗教信仰_填空1_x'] == row['宗教信仰_填空1_y'],
    axis=1
)

# 转换布尔值为0和1
df_前置筛选_结婚意愿通过['底线：宗教信仰'] = df_前置筛选_结婚意愿通过['底线：宗教信仰'].apply(lambda x: 0 if x else 1)

# 看一看宗教信仰_填空2_x 为否的情况
print(df_前置筛选_结婚意愿通过[df_前置筛选_结婚意愿通过['宗教信仰_填空2_x'] == "否"][['宗教信仰_填空2_x','宗教信仰_填空1_x','宗教信仰_填空1_y','底线：宗教信仰']].head())

In [None]:
df_前置筛选_宗教信仰通过 = df_前置筛选_结婚意愿通过[df_前置筛选_结婚意愿通过['底线：宗教信仰'] == 0].copy()
print(df_前置筛选_宗教信仰通过.info())

#### 婚姻情况（纯底线）

In [None]:
"""列名
婚姻情况_填空1
婚姻情况_填空2
"""

# 生成匹配结果
df_前置筛选_宗教信仰通过['底线：婚姻情况'] = df_前置筛选_宗教信仰通过.apply(
    lambda row: True if row['婚姻情况_填空2_x'] == "是" else row['婚姻情况_填空1_y'] == "未婚",
    axis=1
)

# 转换布尔值为0和1
df_前置筛选_宗教信仰通过['底线：婚姻情况'] = df_前置筛选_宗教信仰通过['底线：婚姻情况'].apply(lambda x: 0 if x else 1)

# 看一看
print(df_前置筛选_宗教信仰通过[df_前置筛选_宗教信仰通过['婚姻情况_填空2_x'] == "否"][['婚姻情况_填空2_x','婚姻情况_填空1_x','婚姻情况_填空1_y','底线：婚姻情况']].head())

In [None]:
df_前置筛选_婚姻情况通过 = df_前置筛选_宗教信仰通过[df_前置筛选_宗教信仰通过['底线：婚姻情况'] == 0].copy()
print(df_前置筛选_婚姻情况通过.info())

In [None]:
df_前置筛选_结果 = df_前置筛选_婚姻情况通过.copy()

In [None]:
print(df_前置筛选_结果.info())

### 生成交互列（额外信息）

In [None]:
df_cartesian_more = df_前置筛选_结果.copy()

#### 计算距离

In [None]:
# 根据经纬度计算距离，计算结果单位为公里，保留两位小数
df_cartesian_more['常驻距离'] = df_前置筛选_结果.apply(
    lambda row: round(
        haversine(row['常驻坐标_x'], row['常驻坐标_y'], unit=Unit.KILOMETERS), 2
    ), axis=1
)

df_cartesian_more['家乡距离'] = df_前置筛选_结果.apply(
    lambda row: round(
        haversine(row['家乡坐标_x'], row['家乡坐标_y'], unit=Unit.KILOMETERS), 2
    ), axis=1
)

In [None]:
# 显示结果
print(df_cartesian_more[['常驻坐标_x','常驻坐标_y','家乡坐标_x','家乡坐标_y','常驻距离', '家乡距离']].head())
# 已验证，结果误差小于1km

#### 查看现有列名

In [None]:
# 显示完整列名
with pd.option_context('display.max_rows', None, 'display.max_colwidth', None):
    display(df_cartesian_more.columns.tolist())

### 计算匹配结果

基本原则：只判断自己是否匹配对方，不判断对方是否匹配自己。
因为在对方的那一行，ta也会进行匹配，如果不匹配会自动筛除，没必要重复运算。

In [None]:
df_底线 = df_cartesian_more
print(df_底线.info())

#### 年龄

In [None]:
# 生成年龄匹配结果
df_底线['底线：年龄'] = df_底线.apply(
    lambda row: row['TA的年龄_填空1_x'] <= row['出生年份_y'] <= row['TA的年龄_填空2_x'] ,
    axis=1
) # 如果ta的年龄在我的要求内，返回True，否则返回False
# 加分项
df_底线['加分项：年龄'] = df_底线.apply(
    lambda row: row['TA的年龄_填空3_x'] <= row['出生年份_y'] <= row['TA的年龄_填空4_x'],
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：年龄'] = df_底线['底线：年龄'].apply(lambda x: 0 if x else 1)
df_底线['加分项：年龄'] = df_底线['加分项：年龄'].apply(lambda x: 1 if x else 0)

#### 身高

In [None]:
# 生成身高匹配结果
df_底线['底线：身高'] = df_底线.apply(
    lambda row: row['TA的身高（单位cm）:底线-最低_x'] <= row['身高（单位cm）_y'] <= row['TA的身高（单位cm）:底线-最高_x'] ,
    axis=1
)
# 加分项
df_底线['加分项：身高'] = df_底线.apply(
    lambda row: row['TA的身高（单位cm）:加分项-最低_x'] <= row['身高（单位cm）_y'] <= row['TA的身高（单位cm）:加分项-最高_x'] ,
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：身高'] = df_底线['底线：身高'].apply(lambda x: 0 if x else 1)
df_底线['加分项：身高'] = df_底线['加分项：身高'].apply(lambda x: 1 if x else 0)

#### BMI

In [None]:
# 生成BMI匹配结果
df_底线['底线：BMI'] = df_底线.apply(
    lambda row: row['TA的身体质量指数BMI:底线-最低_x'] <= row['BMI_y'] <= row['TA的身体质量指数BMI:底线-最高_x'],
    axis=1
)
# 加分项
df_底线['加分项：BMI'] = df_底线.apply(
    lambda row: row['TA的身体质量指数BMI:加分项-最低_x'] <= row['BMI_y'] <= row['TA的身体质量指数BMI:加分项-最高_x'],
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：BMI'] = df_底线['底线：BMI'].apply(lambda x: 0 if x else 1)
df_底线['加分项：BMI'] = df_底线['加分项：BMI'].apply(lambda x: 1 if x else 0)

#### 学历

In [None]:
# 提取“学历”中的前缀（例如 'A'）
df_底线['学历前缀_y'] = df_底线['学历_y'].apply(lambda x: x.split('.')[0])

# 对每一行进行判断，学历是否在TA的学历底线要求之内
df_底线['底线：学历'] = df_底线.apply(
    lambda row: row['学历前缀_y'] in row['TA的学历（底线）_x'].split('.'),
    axis=1
)
# 加分项
df_底线['加分项：学历'] = df_底线.apply(
    lambda row: row['学历前缀_y'] in row['TA的学历（加分项）_x'].split('.'),
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：学历'] = df_底线['底线：学历'].apply(lambda x: 0 if x else 1)
df_底线['加分项：学历'] = df_底线['加分项：学历'].apply(lambda x: 1 if x else 0)

# 删除辅助列
df_底线.drop(columns=['学历前缀_y'], inplace=True)

#### 工作性质（纯加分项）

In [None]:
# 提取“工作性质”中的前缀（例如 'A'）
df_底线['工作性质前缀_y'] = df_底线['工作性质_y'].apply(lambda x: x.split('.')[0])
print(df_底线[['工作性质前缀_y','工作性质_y']].head(2))

# 对每一行进行判断，工作性质是否在TA的工作性质加分项要求之内
df_底线['加分项：工作性质'] = df_底线.apply(
    lambda row: row['工作性质前缀_y'] in row['TA的工作性质（仅作为加分项）_x'].split('.'),
    axis=1
)

# 转换布尔值为0和1
df_底线['加分项：工作性质'] = df_底线['加分项：工作性质'].apply(lambda x: 1 if x else 0)

# 删除辅助列
df_底线.drop(columns=['工作性质前缀_y'], inplace=True)
print(df_底线[['加分项：工作性质','工作性质_y','TA的工作性质（仅作为加分项）_x']].head(5))

#### 距离

In [None]:
df_底线['底线：常驻距离'] = df_底线.apply(
    lambda row: (
        (row['常驻距离'] <= row['TA地址与你相距:常驻地址-底线_x'] and 
        row['常驻市_x'] == row['常驻市_y'])
        if row['TA地址与你相距:常驻地址-底线_x'] <= 60 
        else row['常驻距离'] <= row['TA地址与你相距:常驻地址-底线_x']
    ),
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：常驻距离'] = df_底线['底线：常驻距离'].apply(lambda x: 0 if x else 1)

In [None]:
df_底线['底线：家乡距离'] = df_底线.apply(
    lambda row: (
        (row['家乡距离'] <= row['TA地址与你相距:家乡地址-底线_x'] and 
        row['家乡市_x'] == row['家乡市_y'])
        if row['TA地址与你相距:家乡地址-底线_x'] <= 60 
        else row['家乡距离'] <= row['TA地址与你相距:家乡地址-底线_x']
    ),
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：家乡距离'] = df_底线['底线：家乡距离'].apply(lambda x: 0 if x else 1)

In [None]:

df_底线['加分项：常驻距离'] = df_底线.apply(
    lambda row: row['常驻距离'] <= row['TA地址与你相距:常驻地址-加分项_x'],
    axis=1
)
df_底线['加分项：家乡距离'] = df_底线.apply(
    lambda row: row['家乡距离'] <= row['TA地址与你相距:家乡地址-加分项_x'],
    axis=1
)

# 转换布尔值为0和1

df_底线['加分项：常驻距离'] = df_底线['加分项：常驻距离'].apply(lambda x: 1 if x else 0)
df_底线['加分项：家乡距离'] = df_底线['加分项：家乡距离'].apply(lambda x: 1 if x else 0)


#### 月均收入

In [None]:
# 生成收入匹配结果
df_底线['底线：月均收入'] = df_底线.apply(
    lambda row: row['月均收入（单位：元）_y'] >= row['TA的月均收入（单位：元）_填空1_x'] ,
    axis=1
)
# 加分项
df_底线['加分项：月均收入'] = df_底线.apply(
    lambda row: row['月均收入（单位：元）_y'] >= row['TA的月均收入（单位：元）_填空1_x'] ,
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：月均收入'] = df_底线['底线：月均收入'].apply(lambda x: 0 if x else 1)
df_底线['加分项：月均收入'] = df_底线['加分项：月均收入'].apply(lambda x: 1 if x else 0)

#### 个人&家庭总资产

In [None]:
df_底线['底线：个人总资产'] = df_底线.apply(
    lambda row: row['个人总资产（单位：万元）_y'] >= row['TA的个人总资产（单位：万元）_填空1_x'],
    axis=1
)
df_底线['加分项：个人总资产'] = df_底线.apply(
    lambda row: row['个人总资产（单位：万元）_y'] >= row['TA的个人总资产（单位：万元）_填空2_x'],
    axis=1
)

# 家庭总资产
df_底线['底线：家庭总资产'] = df_底线.apply(
    lambda row: row['家庭总资产（单位：万元）_y'] >= row['TA的家庭总资产（单位：万元）_填空1_x'],
    axis=1
)
# 加分项
df_底线['加分项：家庭总资产'] = df_底线.apply(
    lambda row: row['家庭总资产（单位：万元）_y'] >= row['TA的家庭总资产（单位：万元）_填空2_x'],
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：个人总资产'] = df_底线['底线：个人总资产'].apply(lambda x: 0 if x else 1)
df_底线['加分项：个人总资产'] = df_底线['加分项：个人总资产'].apply(lambda x: 1 if x else 0)
df_底线['底线：家庭总资产'] = df_底线['底线：家庭总资产'].apply(lambda x: 0 if x else 1)
df_底线['加分项：家庭总资产'] = df_底线['加分项：家庭总资产'].apply(lambda x: 1 if x else 0)

#### 经济情况 - 其他信息（纯底线）

In [None]:
"""列名
经济情况 - 其他信息:是否是独生子女
经济情况 - 其他信息:父母是否有养老保险
经济情况_其他信息:是否是单亲家庭
TA的经济情况 - 其他信息:TA是独生子女
TA的经济情况 - 其他信息:TA的父母有养老保险
TA经济情况_其他信息:TA是单亲家庭
"""
# 定义匹配规则（将其改为集合更易于快速查找）
其他经济情况匹配 = {
    ("A.是", "A.是"),
    ("B.两者皆可", "A.是"),
    ("B.两者皆可", "B.否"),
    ("C.否", "B.否"),
}
# 生成匹配结果
df_底线['底线：是否是独生子女'] = df_底线.apply(
    lambda row: (row['TA经济情况_其他信息:TA是独生子女_x'], row['经济情况_其他信息:是否是独生子女_y']) in 其他经济情况匹配,
    axis=1
)
df_底线['底线：父母是否有养老保险'] = df_底线.apply(
    lambda row: (row['TA经济情况_其他信息:TA父母养老保险_x'], row['经济情况_其他信息:父母养老保险_y']) in 其他经济情况匹配,
    axis=1
)
df_底线['底线：是否是单亲家庭'] = df_底线.apply(
    lambda row: (row['TA经济情况_其他信息:TA是单亲家庭_x'], row['经济情况_其他信息:是否是单亲家庭_y']) in 其他经济情况匹配,
    axis=1
)

print(df_底线['底线：是否是独生子女'].head(2))
print(df_底线['底线：父母是否有养老保险'].head(2))
print(df_底线['底线：是否是单亲家庭'].head(2))
# 转换布尔值为0和1
df_底线['底线：是否是独生子女'] = df_底线['底线：是否是独生子女'].apply(lambda x: 0 if x else 1)
df_底线['底线：父母是否有养老保险'] = df_底线['底线：父母是否有养老保险'].apply(lambda x: 0 if x else 1)
df_底线['底线：是否是单亲家庭'] = df_底线['底线：是否是单亲家庭'].apply(lambda x: 0 if x else 1)

#### 工作强度

In [None]:
# 生成工作强度匹配结果
df_底线['底线：工作强度'] = df_底线.apply(
    lambda row: (row['工作强度_填空1_y'] <= row['TA的工作强度_填空1_x']) and
                (row['工作强度_填空2_y'] <= row['TA的工作强度_填空2_x']) ,
    axis=1
)
# 加分项
df_底线['加分项：工作强度'] = df_底线.apply(
    lambda row: (row['工作强度_填空1_y'] <= row['TA的工作强度_填空3_x']) and
                (row['工作强度_填空2_y'] <= row['TA的工作强度_填空4_x']) ,
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：工作强度'] = df_底线['底线：工作强度'].apply(lambda x: 0 if x else 1)
df_底线['加分项：工作强度'] = df_底线['加分项：工作强度'].apply(lambda x: 1 if x else 0)

#### 婚姻观&生育观（纯底线）

In [None]:
"""列名
婚姻观&生育观:丁克
婚姻观&生育观:支持婚前性生活
婚姻观&生育观:婚后和父母一起住
"""
# 定义匹配规则（将其改为集合更易于快速查找）
# 此处是神奇的双方同时进行筛选，而非单方面筛选，神奇吧！
婚姻观生育观不匹配 = {
    ("A.我希望双方都", "C.我自己不想"),
    ("C.我自己不想", "A.我希望双方都")
}
# 生成匹配结果
df_底线['底线：婚姻观&生育观'] = df_底线.apply(
    lambda row: ((row['婚姻观&生育观:丁克_x'], row['婚姻观&生育观:丁克_y']) in 婚姻观生育观不匹配) or
                ((row['婚姻观&生育观:支持婚前性生活_x'], row['婚姻观&生育观:支持婚前性生活_y']) in 婚姻观生育观不匹配) or
                ((row['婚姻观&生育观:婚后和父母一起住_x'], row['婚姻观&生育观:婚后和父母一起住_y']) in 婚姻观生育观不匹配),
    axis=1
)

# 转换布尔值为0和1，因为这一次是in不匹配，所以取反
df_底线['底线：婚姻观&生育观'] = df_底线['底线：婚姻观&生育观'].apply(lambda x: 1 if x else 0)

#### 家务参与意愿（纯加分项）

In [None]:
"""列名
家务参与意愿:参与日常烧饭
家务参与意愿:参与日常清洁
家务参与意愿:参与洗衣洗鞋
家务参与意愿:参与家具维修
"""

# 生成匹配结果
df_底线['加分项：家务参与意愿'] = df_底线.apply(
lambda row: (
                int((row['家务参与意愿:参与日常烧饭_x'] + row['家务参与意愿:参与日常烧饭_y']) >= 5) +
                int((row['家务参与意愿:参与日常清洁_x'] + row['家务参与意愿:参与日常清洁_y']) >= 5) +
                int((row['家务参与意愿:参与洗衣洗鞋_x'] + row['家务参与意愿:参与洗衣洗鞋_y']) >= 5) +
                int((row['家务参与意愿:参与家具维修_x'] + row['家务参与意愿:参与家具维修_y']) >= 5)
            ) / 4, # 标准化，使得全都符合的情况下数值为 1
    axis=1
)

display(df_底线[['家务参与意愿:参与日常烧饭_x','家务参与意愿:参与日常烧饭_y','家务参与意愿:参与日常清洁_x','家务参与意愿:参与日常清洁_y','家务参与意愿:参与洗衣洗鞋_x','家务参与意愿:参与洗衣洗鞋_y','家务参与意愿:参与家具维修_x','家务参与意愿:参与家具维修_y','加分项：家务参与意愿']].head(2))

# 展示加分项的分布（不添加列，全在display中处理
display(df_底线['加分项：家务参与意愿'].value_counts())

#### 恋爱次数（纯加分项）

In [None]:
df_底线['加分项：恋爱次数'] = df_底线.apply(
    lambda row:
        row['TA成熟的恋爱次数_填空1_x'] <= row['在现实生活中成熟的恋爱次数_y'] <= row['TA成熟的恋爱次数_填空2_x'],
    axis=1
)

# 转换布尔值为0和1
df_底线['加分项：恋爱次数'] = df_底线['加分项：恋爱次数'].apply(lambda x: 1 if x else 0)

#### 金钱观（纯底线）

In [None]:
"""列名
 '金钱观:为买房贷款',
 '金钱观:为买车贷款',
 '金钱观:为非必要消费贷款',
 '金钱观:为金融投资贷款',
 '金钱观:为实体投资贷款',
 '金钱观:为还贷款贷款',
 '金钱观:投资基金',
 '金钱观:投资股票',
 '金钱观:创业',
"""
# 定义匹配规则（将其改为集合更易于快速查找）
# 此处是神奇的双方同时进行筛选，而非单方面筛选，神奇吧！
金钱观不匹配 = {
    ("A.我接受", "C.我很讨厌这样的人"),
    ("C.我很讨厌这样的人", "A.我接受")
}
# 生成匹配结果
df_底线['底线：金钱观'] = df_底线.apply(
    lambda row: ((row['金钱观:为买房贷款_x'], row['金钱观:为买房贷款_y']) in 金钱观不匹配) or
                ((row['金钱观:为买车贷款_x'], row['金钱观:为买车贷款_y']) in 金钱观不匹配) or
                ((row['金钱观:为非必要消费贷款_x'], row['金钱观:为非必要消费贷款_y']) in 金钱观不匹配) or
                ((row['金钱观:为金融投资贷款_x'], row['金钱观:为金融投资贷款_y']) in 金钱观不匹配) or
                ((row['金钱观:为实体投资贷款_x'], row['金钱观:为实体投资贷款_y']) in 金钱观不匹配) or
                ((row['金钱观:为还贷款贷款_x'], row['金钱观:为还贷款贷款_y']) in 金钱观不匹配) or
                ((row['金钱观:投资基金_x'], row['金钱观:投资基金_y']) in 金钱观不匹配) or
                ((row['金钱观:投资股票_x'], row['金钱观:投资股票_y']) in 金钱观不匹配) or
                ((row['金钱观:创业_x'], row['金钱观:创业_y']) in 金钱观不匹配)
    ,axis=1
)

# 转换布尔值为0和1，因为这一次是in不匹配，所以取反
df_底线['底线：金钱观'] = df_底线['底线：金钱观'].apply(lambda x: 1 if x else 0)


#### 底线：生活习惯

In [None]:
"""打印列名的数据类型
'底线类生活习惯:抽烟',
'底线类生活习惯:喝酒',
'底线类生活习惯:槟榔',
'底线类生活习惯:熬夜',
'底线类生活习惯:早回家',
'底线类生活习惯:养宠物',
'底线类生活习惯范围:抽烟',
'底线类生活习惯范围:喝酒',
'底线类生活习惯范围:槟榔',
'底线类生活习惯范围:养宠物',
"""
print(df_底线[['底线类生活习惯:抽烟_x','底线类生活习惯:喝酒_x','底线类生活习惯:槟榔_x','底线类生活习惯:养宠物_x', '底线类生活习惯范围:抽烟_x','底线类生活习惯范围:喝酒_x','底线类生活习惯范围:槟榔_x', '底线类生活习惯范围:养宠物_x']].dtypes)

In [None]:
"""列名
'底线类生活习惯:抽烟',
'底线类生活习惯:喝酒',
'底线类生活习惯:槟榔',
'底线类生活习惯:养宠物',
'底线类生活习惯范围:抽烟',
'底线类生活习惯范围:喝酒',
'底线类生活习惯范围:槟榔',
'底线类生活习惯范围:养宠物',
"""

# 生成匹配结果，匹配规则为两者差值小于范围
df_底线['底线：生活习惯'] = df_底线.apply(
    lambda row: abs(row['底线类生活习惯:抽烟_x'] - row['底线类生活习惯:抽烟_y']) < row['底线类生活习惯范围:抽烟_x'] and
                abs(row['底线类生活习惯:喝酒_x'] - row['底线类生活习惯:喝酒_y']) < row['底线类生活习惯范围:喝酒_x'] and
                abs(row['底线类生活习惯:槟榔_x'] - row['底线类生活习惯:槟榔_y']) < row['底线类生活习惯范围:槟榔_x'] and
                abs(row['底线类生活习惯:养宠物_x'] - row['底线类生活习惯:养宠物_y']) < row['底线类生活习惯范围:养宠物_x'],
    axis=1
)

# 转换布尔值为0和1
df_底线['底线：生活习惯'] = df_底线['底线：生活习惯'].apply(lambda x: 0 if x else 1)

# 打印数据框的前两行，检查结果
with pd.option_context('display.max_colwidth', None):
    display(df_底线[[
        '底线：生活习惯',
        '底线类生活习惯:抽烟_x','底线类生活习惯:抽烟_y','底线类生活习惯范围:抽烟_x',
        '底线类生活习惯:喝酒_x','底线类生活习惯:喝酒_y','底线类生活习惯范围:喝酒_x',
        '底线类生活习惯:槟榔_x','底线类生活习惯:槟榔_y','底线类生活习惯范围:槟榔_x',
        '底线类生活习惯:养宠物_x','底线类生活习惯:养宠物_y','底线类生活习惯范围:养宠物_x'
    ]].head(2))


#### 加减分项目

In [None]:
# 计算分数函数
def 加减分项目算分(row: pd.Series, columns: list, 完全相反的判定规则: set) -> int:
    score = 0
    for col in columns:
        if (row[f'{col}_x'], row[f'{col}_y']) in 完全相反的判定规则:  # 完全相反，扣1分
            score -= 1
        elif row[f'{col}_x'] == row[f'{col}_y']:  # 完全相同，加1分
            score += 1
    return score

In [None]:
# 没看懂，让ai写的，爱咋咋吧。应该是对的吧？？？
def 标准化映射_负值为0(series):
    if series.std() == 0:
        return series.apply(lambda x: 0.5 if x >= 0 else 0)

    mean = series.mean()
    std = series.std()
    z_scores = (series - mean) / std
    cdf_values = norm.cdf(z_scores)

    # 把原始为负的项标记为 0，其余做归一化
    final_scores = np.array([cdf if orig >= 0 else 0 for orig, cdf in zip(series, cdf_values)])

    # 除掉原始负值后剩下的最大最小
    valid_mask = np.array(series) >= 0
    valid_scores = final_scores[valid_mask]

    if valid_scores.max() - valid_scores.min() == 0:
        return pd.Series([0.5 if orig >= 0 else 0 for orig in series], index=series.index)

    # 对非负项做归一化处理，使最大为1，最小为0
    final_scores[valid_mask] = (valid_scores - valid_scores.min()) / (valid_scores.max() - valid_scores.min())
    
    # 保留两位小数
    final_scores = np.round(final_scores, 1)

    return pd.Series(final_scores, index=series.index)

##### 饮食习惯

In [None]:
# 饮食习惯匹配规则
饮食习惯完全相反 = {
    ("A.三天不吃上房揭瓦", "C.沾上一点就想漱口"),
    ("C.沾上一点就想漱口", "A.三天不吃上房揭瓦")
}
# 定义列名
饮食习惯列名 = [
    '饮食习惯:辣',
    '饮食习惯:香菜',
    '饮食习惯:葱',
    '饮食习惯:蒜',
    '饮食习惯:姜',
    '饮食习惯:榴莲',
    '饮食习惯:螺狮粉',
    '饮食习惯:海鲜/生鲜',
]
# 计算饮食习惯分数
df_底线['加分项：饮食习惯'] = df_底线.apply(
    lambda row: 加减分项目算分(row, 饮食习惯列名, 饮食习惯完全相反),
    axis=1
)

df_底线['加分项：饮食习惯'] = 标准化映射_负值为0(df_底线['加分项：饮食习惯'])

# 打印数据框的前两行，检查结果
with pd.option_context('display.max_colwidth', None):
    display(df_底线[[
        '加分项：饮食习惯',
        *[f'{col}_x' for col in 饮食习惯列名],
        *[f'{col}_y' for col in 饮食习惯列名]
    ]].head(2))
    
# 计算饮食习惯分数的分布
display(df_底线['加分项：饮食习惯'].value_counts())

##### 生活习惯

In [None]:
# 饮食习惯匹配规则
生活习惯完全相反 = {
    ("A.这是我本人", "C.我很讨厌这样的人"),
    ("C.我很讨厌这样的人", "A.这是我本人")
}
# 定义列名
生活习惯列名 = [
    '生活习惯:非常在乎时间观念',
    '生活习惯:掏的钱都是不得不花的',
    '生活习惯:24小时离不开对方',
    '生活习惯:公交能到的地方绝对不打车',
    '生活习惯:足不出户',
    '生活习惯:二次元',
    '生活习惯:一周扔垃圾袋5次及以上',
    '生活习惯:重视所有特殊的日子',
]
# 计算生活习惯分数
df_底线['加分项：生活习惯'] = df_底线.apply(
    lambda row: 加减分项目算分(row, 生活习惯列名, 生活习惯完全相反),
    axis=1
)

df_底线['加分项：生活习惯'] = 标准化映射_负值为0(df_底线['加分项：生活习惯'])



# 打印数据框的前两行，检查结果
with pd.option_context('display.max_colwidth', None):
    display(df_底线[[
        '加分项：生活习惯',
        *[f'{col}_x' for col in 生活习惯列名],
        *[f'{col}_y' for col in 生活习惯列名]
    ]].head(3))
    
display(df_底线['加分项：生活习惯'].value_counts())

#### MBTI（纯加分项）

In [None]:
def 计算MBTI分数(mbti: str, ta_mbti: str) -> int:
    return all(mbti_char == ta_mbti_char or ta_mbti_char == 'O' 
               for mbti_char, ta_mbti_char in zip(mbti, ta_mbti))

In [None]:
df_底线['加分项：MBTI'] = df_底线.apply(
    lambda row: 计算MBTI分数(row['MBTI_y'], row['TA的MBTI_x']),
    axis=1
)

# 转换布尔值为0和1
df_底线['加分项：MBTI'] = df_底线['加分项：MBTI'].apply(lambda x: 1 if x else 0)

print(df_底线[['加分项：MBTI','MBTI_y','TA的MBTI_x']].head(2))

#### 多选匹配

##### 多选匹配函数

In [None]:
def 多选匹配(匹配条件1: str, 匹配条件2: str) -> int:
    """
    多选匹配函数，用于匹配多选题的匹配结果。
    
    参数:
    - 匹配条件1 (str): 第一个匹配条件字符串，形如 "A.B.C."。
    - 匹配条件2 (str): 第二个匹配条件字符串，形如 "B.C.D."。
    
    返回值:
    - int: 匹配的项目数。
    """
    # 提取完整的 [A-Z]. 格式的选项
    set1 = set(re.findall(r'[A-Z]\.', 匹配条件1))
    set2 = set(re.findall(r'[A-Z]\.', 匹配条件2))
    
    # 计算交集的长度
    多选匹配结果 = len(set1 & set2)
    
    return 多选匹配结果


##### 相符描述（纯加分项）

In [None]:
# 相符描述中若有相同字母，加分项
df_底线['加分项：相符描述'] = df_底线.apply(
    lambda row: 多选匹配(row['相符描述_y'], row['TA的相符描述_x']),
    axis=1
)

df_底线['加分项：相符描述'] = 标准化映射_负值为0(df_底线['加分项：相符描述'])

display(df_底线[['加分项：相符描述']].value_counts())

##### 兴趣爱好（纯加分项）

In [None]:
# 相符描述中若有相同字母，加分项
df_底线['加分项：兴趣爱好'] = df_底线.apply(
    lambda row: 多选匹配(row['兴趣爱好_y'], row['兴趣爱好_x']),
    axis=1
)

df_底线['加分项：兴趣爱好'] = 标准化映射_负值为0(df_底线['加分项：兴趣爱好'])

display(df_底线[['加分项：兴趣爱好']].value_counts())

## 打分

In [None]:
df_打分 = df_底线.copy()
df_打分.info()

### 根据底线筛除

In [None]:
放弃一个底线映射 = {
    '年龄': '底线：年龄',
    '身高': '底线：身高',
    'BMI': '底线：BMI',
    '学历': '底线：学历',
    '月均收入': '底线：月均收入',
    '个人资产': '底线：个人总资产',
    '家庭资产': '底线：家庭总资产',
    '经济情况-其他信息': ['底线：是否是独生子女', '底线：父母是否有养老保险', '底线：是否是单亲家庭'],
    '工作强度': '底线：工作强度',
    '婚姻观&生育观': '底线：婚姻观&生育观',
    '金钱观': '底线：金钱观',
    '底线类生活习惯': '底线：生活习惯',
}

In [None]:
def 提取底线字段(放弃项):
    if pd.isna(放弃项):
        return []
    # 匹配格式如 'I.工作强度' 或 'L.底线类生活习惯'
    match = re.search(r'[A-Z]\.(.+)', 放弃项)
    if match:
        项名 = match.group(1).strip()
        映射字段 = 放弃一个底线映射.get(项名, None)
        if isinstance(映射字段, list):
            return 映射字段
        elif 映射字段:
            return [映射字段]
    return []

# 匹配所有开头为"底线："的列
底线列 = df_打分.columns[df_打分.columns.str.startswith("底线：")]

def 获取重要底线列(row, 底线列):
    放弃字段列表 = 提取底线字段(row['放弃一个底线项_x'])
    return [col for col in 底线列 if col not in 放弃字段列表]

def 计算匹配系数(row):
    重要列 = 获取重要底线列(row, 底线列)
    底线乘积 = row[重要列].sum()
    return 1 if 底线乘积 == 0 else 0

df_打分['底线匹配系数'] = df_打分.apply(计算匹配系数, axis=1)

### 计算分数

In [None]:
# 设置基本分数
基本分数 = 35

In [None]:
# 展示所有加分项列
加分项列 = df_打分.columns[df_打分.columns.str.startswith("加分项：")]
display(df_打分[加分项列].head(2))

In [None]:
# 家务参与意愿是新进入加分项的，它忘了写进赋权题目里了。就默认算进生活习惯里吧
计分映射 = {
    '年龄': '加分项：年龄',
    '身材': ['加分项：身高', '加分项：BMI'] ,
    '距离': ['加分项：常驻距离', '加分项：家乡距离'] ,
    '个人能力': ['加分项：学历', '加分项：月均收入', '加分项：工作性质'],
    '资产情况': ['加分项：个人总资产', '加分项：家庭总资产'],
    '工作强度': '加分项：工作强度',
    '生活习惯': ['加分项：饮食习惯', '加分项：生活习惯', '加分项：家务参与意愿'],
    '性格爱好': ['加分项：MBTI', '加分项：相符描述', '加分项：兴趣爱好'],
    '恋爱次数': '加分项：恋爱次数',
}

'''
赋权排序_json 字段中有关于加分项的排序，1 最高，2 次之，9 最低。
为方便计算，应将其倒过来，把 1 变成 9，2 变成 8，依此类推，然后再除以整个列别内值的数量 以生成权重
再将权重乘以分数，把加分项的分数加起来。
将其分布在 0-65 之间，最后加上 底线匹配系数 * 基本分数。
'''

赋权项总数 = len(计分映射)  # 9
加分上限 = 100 - 基本分数  # 加分段总分上限

In [None]:
def 计算加分原始得分(row):
    # 1. 解析排序字典
    try:
        排序字典 = json.loads(row['赋权排序_json_x'])
    except:
        display(row['赋权排序_json_x'])
        raise ValueError(f"json 解析错误")

    分数 = 0

    for 项, 排序值 in 排序字典.items():
        # 2. 倒序权重：1 → 9，9 → 1
        倒序权重 = 赋权项总数 - 排序值 + 1
        权重 = 倒序权重 / 赋权项总数  # 归一化到 0~1

        对应列 = 计分映射.get(项)
        if 对应列 is None:
            raise ValueError(f"未知的加分项: {项}")

        # 3. 取出加分项分数
            # 如果是列表，则取平均值
        if isinstance(对应列, list):
            值列表 = [row.get(col, 0) for col in 对应列]
            分项平均 = sum(值列表) / len(值列表) if 值列表 else 0
        else:
            分项平均 = row.get(对应列, 0)

        分数 += 权重 * 分项平均

    return 分数

df_打分['加分原始得分'] = df_打分.apply(计算加分原始得分, axis=1)
min_score = df_打分['加分原始得分'].min()
max_score = df_打分['加分原始得分'].max()

def 映射到065(x):
    if pd.isna(x): return 0
    if max_score == min_score:
        return 65
    return int(round(((x - min_score) / (max_score - min_score) * 65), 0 ))

df_打分['加分得分'] = df_打分['加分原始得分'].apply(映射到065)

In [None]:
df_打分['总分'] = df_打分['加分得分'] + df_打分['底线匹配系数'] * 基本分数

In [None]:
# 设置支持中文字体（防止中文乱码）
matplotlib.rcParams['font.family'] = 'SimHei'  # 或 'Microsoft YaHei', 视你系统而定
matplotlib.rcParams['axes.unicode_minus'] = False  # 负号也能显示

# 画图区域设置
plt.figure(figsize=(12, 5))

# 子图1：加分得分
plt.subplot(1, 2, 1)
plt.hist(df_打分['加分得分'].dropna(), bins=10, edgecolor='black', color='skyblue')
plt.title('加分得分分布')
plt.xlabel('加分得分')
plt.ylabel('对数')

# 子图2：综合总分
plt.subplot(1, 2, 2)
plt.hist(df_打分['总分'].dropna(), bins=10, edgecolor='black', color='salmon')
plt.title('总分分布')
plt.xlabel('总分')
plt.ylabel('对数')

plt.tight_layout()
plt.show()

## 统计

In [None]:
df_统计 = df_打分.copy()
print(df_统计.info())

#### 匹配成功

In [None]:
# 有效问卷中，根据总分排序
df_统计 = df_统计.sort_values(by='总分', ascending=False)

##### 限制匹配成功行数函数

In [None]:
def 限制匹配成功行数(
    匹配成功数据: pd.DataFrame, 
    limit: int,
    col_id_x: str = 'id_x',
    col_id_y: str = 'id_y',
    col_score: str = 'score'
) -> pd.DataFrame:

    # 按组大小和分数同时排序
    匹配成功组大小排序 = 匹配成功数据[col_id_x].value_counts()
    # 添加辅助列：组大小
    匹配成功数据['组大小'] = 匹配成功数据[col_id_x].map(匹配成功组大小排序)

    排序后处理前数据 = 匹配成功数据.sort_values(
        by=['组大小', col_id_x, col_score],  
        ascending=[False, True, False]  
    )
    排序后处理前数据.drop(columns=['组大小'], inplace=True)
    限行后数据 = 排序后处理前数据.copy()
    
    # 初步筛选出超限的组
    超限组动态id池 = [
        group_id for group_id, group_df in 限行后数据.groupby(col_id_x, observed=True) 
        if len(group_df) > limit
    ]
    超限组原始id池 = 超限组动态id池.copy()
    # 创建一个空的数据表，用于存储删除数据
    删除数据 = pd.DataFrame(columns=限行后数据.columns)

    while 超限组动态id池:
        # 取出第一个超限组
        group_id = 超限组动态id池[0]
        group = 限行后数据[限行后数据[col_id_x] == group_id]

        # 如果长度超限，删除多余的行和对应匹配的行
        if len(group) > limit:
            # 找到当前需要删除的行和对应匹配行（排序上在limit之后的行）
            drop_rows = group.iloc[limit:]
            # 验证是否存在空的 DataFrame
            if drop_rows.empty:
                raise ValueError("drop_rows 或 match_rows 为空 DataFrame，无法继续执行删除操作")
            
            drop_set = set([tuple(x) for x in drop_rows[[col_id_y, col_id_x]].to_numpy()])
            match_rows = 限行后数据[
                限行后数据.apply(
                    lambda row: (row[col_id_x], row[col_id_y]) in drop_set 
                                or (row[col_id_y], row[col_id_x]) in drop_set,
                    axis=1
                )
            ]
            # 删除当前行和匹配行
            to_drop = set(drop_rows.index).union(set(match_rows.index))  
            限行后数据.drop(to_drop, inplace=True)
            # 将删除的数据存储到删除数据表中
            删除数据 = pd.concat(
                [删除数据, drop_rows, match_rows],
                ignore_index=True
            ).astype(删除数据.dtypes)
        else:
            raise ValueError('超限组没有正确动态更新')

        # 更新超限组列表，防止死循环
        超限组动态id池 = [
            group_id for group_id, group_df in 限行后数据.groupby(col_id_x, observed=True) 
            if len(group_df) > limit
        ]
    
    # 初步筛选出低于限制的原超限组id
    低限的原超限id_原始 = [
        group_id for group_id in 超限组原始id池  
        if len(限行后数据[限行后数据[col_id_x] == group_id]) < limit
    ]
    动态低限的原超限组id = 低限的原超限id_原始.copy()
    
    # 对数据进行排序
    限行后数据组大小排序 = 限行后数据[col_id_x].value_counts()
    限行后数据['组大小'] = 限行后数据[col_id_x].map(限行后数据组大小排序)
    排序后限行后数据 = 限行后数据.sort_values(
        by=['组大小', col_id_x, col_score],  
        ascending=[False, True, False]  
    )
    排序后限行后数据.drop(columns=['组大小'], inplace=True)
    限行后数据 = 排序后限行后数据.copy()

    删除数据 = 删除数据.sort_values(by=[col_id_x, col_score], ascending=False)
    
    完成补充id = []
    
    # 补充低于限制的组
    while 动态低限的原超限组id:
        group_id = 动态低限的原超限组id[0]
        
        group_主表 = 限行后数据[限行后数据[col_id_x] == group_id]
        待补充行数 = limit - len(group_主表)
        
        group_被删除 = 删除数据[删除数据[col_id_x] == group_id]

        # 如果待补充行数大于0，且删除数据中存在对应的行
        if 待补充行数 > 0 and len(group_被删除) > 0:
            # 对 group_被删除 中的行依次尝试补充
            for index, row in group_被删除.iterrows():
                # 若此 row 对应的 col_id_y 在限行后数据里的同 col_id_y 行数也 < limit
                if len(限行后数据[限行后数据[col_id_y] == row[col_id_y]]) < limit and 待补充行数 > 0:
                    # 验证是否存在空的 DataFrame
                    if row.empty:
                        raise ValueError("drop_rows 或 match_rows 为空 DataFrame，无法继续执行删除操作")
                    # 将删除数据中的行和对应的匹配行添加到限行后数据中
                    append_set = set([tuple(x) for x in row[[col_id_y, col_id_x]].to_numpy()])
                    match_rows = 删除数据[
                        删除数据.apply(
                            lambda row: (row[col_id_x], row[col_id_y]) in append_set 
                                        or (row[col_id_y], row[col_id_x]) in append_set,
                            axis=1
                        )
                    ]
                    限行后数据 = pd.concat([限行后数据, match_rows], ignore_index=True)
                    # 删除删除数据中的行
                    删除数据.drop(index, inplace=True)
                    待补充行数 -= 1
                elif len(限行后数据[限行后数据[col_id_y] == row[col_id_y]]) >= limit and 待补充行数 > 0:
                    # 如果同 col_id_y 已经达到了 limit，则不能补回主表，直接在删除表中移除
                    删除数据.drop(index, inplace=True)
                else:
                    # 待补充行数耗尽则跳出
                    break
        
        # 更新低限组列表
        动态低限的原超限组id = [
            gid for gid in 超限组原始id池
            if len(限行后数据[限行后数据[col_id_x] == gid]) < limit
        ]
        完成补充id.append(group_id)
        动态低限的原超限组id = [gid for gid in 动态低限的原超限组id if gid not in 完成补充id]
        
    # 最终对输出数据进行排序
    限行后数据 = 限行后数据.sort_values(
        by=[col_id_x, col_score], 
        ascending=[True, False]
    )

    return 限行后数据

##### 是否匹配成功函数

In [None]:
df_统计.columns

In [None]:
def 是否匹配成功(df: pd.DataFrame) -> pd.DataFrame:
    # 筛选总分 >= 60 的组合
    匹配成功组合 = df[df['总分'] >= 60]
    display(匹配成功组合.head())

    # 创建反向 DataFrame，仅交换键列
    df_反向 = 匹配成功组合.copy()
    df_反向 = df_反向.rename(columns={
        '识别用id_x': 'temp_id',
        '填写平台_x': 'temp_platform',
        '识别用id_y': '识别用id_x',
        '填写平台_y': '填写平台_x'
    })
    df_反向['识别用id_y'] = df_反向['temp_id']
    df_反向['填写平台_y'] = df_反向['temp_platform']
    df_反向.drop(['temp_id', 'temp_platform'], axis=1, inplace=True)

    # 设置索引进行匹配
    匹配成功组合 = 匹配成功组合.set_index(['识别用id_x', '填写平台_x', '识别用id_y', '填写平台_y'])
    df_反向 = df_反向.set_index(['识别用id_x', '填写平台_x', '识别用id_y', '填写平台_y'])

    # 内连接：取交集的索引
    共同索引 = 匹配成功组合.index.intersection(df_反向.index)
    
    display(df_反向.head())

    # 从原始 DataFrame 中提取匹配行
    df_匹配成功 = 匹配成功组合.loc[共同索引].reset_index()
    
    df_匹配成功 = 限制匹配成功行数(df_匹配成功, 5, '识别用id_x', '识别用id_y', '总分')
    
    return df_匹配成功

df_匹配成功 = 是否匹配成功(df_统计)
print(df_匹配成功.info())
df_匹配成功 = df_匹配成功.copy()
df_匹配成功['填写平台&识别用id'] = df_匹配成功['填写平台_x'] + df_匹配成功['识别用id_x']
print(df_匹配成功.info())
display(df_匹配成功.head(2))

### 统计性输出

In [None]:
# 收集到的问卷总数：df的行数
问卷总数 = df.shape[0]
# 收集到的有效问卷数：df_导入数据库的行数
有效问卷数 = df_导入数据库.shape[0]
# 无效问卷数 = 问卷总数 - 有效问卷数
无效问卷数 = 问卷总数 - 有效问卷数
# 有效问卷率 = 有效问卷数 / 问卷总数
无效问卷率 = 1 - 有效问卷数 / 问卷总数
# 匹配成功对数 = 总分大于60的行数 / 2
匹配成功行数 = int(df_匹配成功.shape[0] / 2)
# 填写平台&识别用id拼接后的字段不同的行数
    # 给df_导入数据库添加填写平台&识别用id字段
df_cleaned['填写平台&识别用id'] = df_cleaned['填写平台'] + df_cleaned['识别用id']
匹配参加人数 = df_cleaned['填写平台&识别用id'].nunique()
匹配成功人数 = df_匹配成功['填写平台&识别用id'].nunique()
# 平均答题时长
平均答题时长 = df_cleaned['答题时长'].mean()
# 本周新增问卷数
本周新增问卷数 = diff_df.shape[0]

display(f'本轮无效问卷率为{无效问卷率:.2%}')
display(f'匹配成功对数为{匹配成功行数}对，匹配参加人数为{匹配参加人数}人，匹配成功人数为{匹配成功人数}人，成功率为{匹配成功人数 / 匹配参加人数:.2%}')
# 平均答题时长为几分几秒
display(f'平均答题时长为{int(平均答题时长 // 60)}分{int(平均答题时长 % 60)}秒')
# 删除填写平台&识别用id字段
df_cleaned.drop(columns='填写平台&识别用id', inplace=True)

display(pd.DataFrame(筛除结果))

#### 设置底线项和加分项

In [None]:
def 统计影响因素(df: pd.DataFrame, 前缀: str) -> Dict[str, int]:
    影响因素: Dict[str, int] = {}
    for column in df.columns:
        if column.startswith(前缀):
            if 前缀 == '底线：':
                影响因素[column] = df[column].sum()
            elif 前缀 == '加分项：':
                影响因素[column] = (df[column] == 0).sum()
    return 影响因素

In [None]:
# 显示底线项
print(f"总共行数：{df_统计.shape[0]}（有效问卷数*有效问卷数-有效问卷数）；不匹配的行数：")

# 调用函数统计底线项
底线项 = 统计影响因素(df_统计, '底线：')
display(底线项)

# 调用函数统计加分项
加分项 = 统计影响因素(df_统计, '加分项：')
display(加分项)

##### 影响因素

In [None]:
# 统计用列，专门用于评判除性取向和距离外哪些因素造成影响最大
距离用列 = ['底线：家乡距离', '底线：常驻距离']
# 筛选出统计用列数据都等于0的行
df_影响因素 = df_打分[df_打分[距离用列].sum(axis=1) == 0]

# 显示底线项
print(f"总共行数：{df_影响因素.shape[0]}（有效问卷数*有效问卷数-有效问卷数）；不匹配的行数：")

# 调用函数统计底线项
影响因素_底线 = 统计影响因素(df_影响因素, '底线：')
display(影响因素_底线)

# 调用函数统计加分项
影响因素_加分项 = 统计影响因素(df_影响因素, '加分项：')
display(影响因素_加分项)

### 查询

#### 查询指定id的问卷是否有效

In [None]:
# 输入id，打印出该id是否存在于df_cleaned中和df1中
def check_id(id: str) -> None:
    # 判断id是否在df_cleaned中
    if id in df_cleaned['识别用id'].values:
        display(f'id为{id}的用户问卷有效')
        display(df_匹配成功[df_匹配成功['识别用id_x'] == id])
    else:
        # 判断id是否在df1中
        if id in df_cleaned['识别用id'].values:
            display(f'收集到了id为{id}的问卷，但是无效')
        else:
            display(f'压根没收集到，或者问卷无效')
            
check_id('kiFte')

#### 查询嵌套占比

In [None]:
# 数据库连接信息
user = 'postgres'
password = 'root2'
host = 'localhost'
port = 5655 # 替换为你的数据库端口号
database = 'postgres'  # 替换为你的数据库名称
schema = '赛博相亲'  # 替换为你的架构名称
table_name = '一次性相亲查询'  # 替换为你的表名称

# 创建 PostgreSQL 数据库连接
try:
    conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
    conn.autocommit = True  # 如果需要创建数据库，保持自动提交

    # 设置搜索路径
    with conn.cursor() as cursor:
        cursor.execute(f'SET search_path TO {schema}')
    
    print("连接成功并已设置搜索路径")
except psycopg2.Error as e:
    print(f"数据库连接失败: {e}")
finally:
    if conn:
        conn.close()

# 创建 postgresql 数据库连接
engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}?options=-csearch_path={schema}')

# 将数据存入数据库
try:
    df_cleaned.to_sql(table_name, engine, if_exists='replace', index=False, schema=schema)
    print("数据成功插入数据库")
except Exception as e:
    print(f"数据插入失败: {e}")

In [None]:
# 查询语句（可以考虑补个关于 订单期限的设置，排除npc订单，也就是大于100天的单子）
query = """
select
    "工作性质",
    COUNT(*) AS 数量统计,  -- 统计数量
    COUNT(*) * 1.0 / SUM(COUNT(*)) OVER () AS 占比  -- 计算占比
from
    一次性相亲查询  -- 数据来源表
group by
    "工作性质"  -- 按此字段分组
order by
    "工作性质"

"""

# 将查询结果加载到 pandas 数据框中
查询结果 = pd.read_sql(query, engine)

# 打印数据框
查询结果

#### 指定id，指定项数据

In [None]:
# 输入id，输入列名，输出该id的该列名的数据
def 输出指定id的指定数据(df: DataFrame, 填写平台: str, id: str, column: str) -> None:
    # 判断id、填写平台、column是否在df_cleaned中，如果不在则直接退出循环并输出
    if 填写平台 not in df['填写平台'].values:
        display(f"没有{填写平台}这个填写平台")
        return
    if id not in df['识别用id'].values:
        display(f"没有id为{id}的用户")
        return
    if column not in df.columns:
        display(f"没有{column}这一列")
        return
    
    display(f"{填写平台}中{id}的{column}填写为{df[df['识别用id'] == id][column].values[0]}")
    
# 填平台的适合需要加序号
输出指定id的指定数据(df_cleaned,'A.小红书','502411749', 'TA的地址与你的距离（单位km）:常驻地址相距-底线')

#### 输入id，输出底线项和筛除人数

In [None]:
# 输入id，输出底线项和筛除人数
def 输出底线项和筛除人数(df: DataFrame, 填写平台: str, id: str, 底线项: dict, 加分项: dict) -> None:
    # 筛选出所有id为指定id，填写平台为指定填写平台的数据
    df_id = df[(df['识别用id_x'] == id) & (df['填写平台_x'] == 填写平台)]
    
    # 计算总人选数
    print(f"总人选数：{df_id.shape[0]}")
    
    # 统计底线项，底线项不为0的即为筛除人数
    底线项_id = {key: (df_id[key] != 0).sum() for key in 底线项.keys()}
    
    # 统计加分项，加分项为0的即为筛除人数
    加分项_id = {key: (df_id[key] <= 0).sum() for key in 加分项.keys()}
    
    # 输出id
    print(f"填写平台：{填写平台}，识别用id: {id}")
    # 输出底线项和加分项
    print("底线项筛除人数统计：")
    for key, value in 底线项_id.items():
        print(f"{key}: {value}")
    
    print("\n加分项筛除人数统计：")
    for key, value in 加分项_id.items():
        print(f"{key}: {value}")
    

# 调用示例
"""
'A.小红书', '识别用id'] = df['小红书号id']
'B.B站', '识别用id'] = df['B站昵称']
'C.抖音', '识别用id'] = df['抖音号']
'D.西瓜视频', '识别用id'] = df['抖音号']
'E.博客（需填写邮箱）', '识别用id'] = df['公示用昵称'] + '-邮箱前四位为' + df['邮箱'].str[:4]
'F.小黑盒', '识别用id'] = df['小黑盒id']
"""

def 看看为什么没匹配上 (填写平台: str, id: str, 底线项: dict, 加分项: dict) -> None:
    """key&value
    底线：性取向: df_前置底线筛选[df_前置底线筛选['识别用id_x'] == id] - df_前置筛选_性取向通过[df_前置筛选_性取向通过['识别用id_x'] == id]
    底线：是否打算在两年内结婚：df_前置筛选_性取向通过[df_前置筛选_性取向通过['识别用id_x'] == id] - df_前置筛选_结婚意愿通过[df_前置筛选_结婚意愿通过['识别用id_x'] == id]
    底线：宗教信仰: df_前置筛选_结婚意愿通过[df_前置筛选_结婚意愿通过['识别用id_x'] == id] - df_前置筛选_宗教信仰通过[df_前置筛选_宗教信仰通过['识别用id_x'] == id]
    """
    # 依序输出前置筛选中筛除的人数
    print("（仅在前置筛选中是依序进行，第二项筛选的基数会比第一项小）\n前置筛选情况：")
    print(f"底线：性取向：{df_前置底线筛选[df_前置底线筛选['识别用id_x'] == id].shape[0] - df_前置筛选_性取向通过[df_前置筛选_性取向通过['识别用id_x'] == id].shape[0]}")
    print(f"底线：是否打算在两年内结婚：{df_前置筛选_性取向通过[df_前置筛选_性取向通过['识别用id_x'] == id].shape[0] - df_前置筛选_结婚意愿通过[df_前置筛选_结婚意愿通过['识别用id_x'] == id].shape[0]}")
    print(f"底线：宗教信仰：{df_前置筛选_结婚意愿通过[df_前置筛选_结婚意愿通过['识别用id_x'] == id].shape[0] - df_前置筛选_宗教信仰通过[df_前置筛选_宗教信仰通过['识别用id_x'] == id].shape[0]}")
    print("（后置筛选中，总人选的基数相同）\n后置筛选情况：")
    输出底线项和筛除人数(df_统计, 填写平台, id, 底线项, 加分项)
    print("\n距离也匹配的人中：")
    输出底线项和筛除人数(df_影响因素, 填写平台,  id, 底线项, 加分项)

# A.小红书
# F.小黑盒
看看为什么没匹配上('A.小红书', '1145171554', 底线项, 加分项)

#### 输入id_x和id_y，输出TA对id_x的优缺点

In [None]:
# 输入id_x和id_y，输出TA对id_x的优缺点（明文输出）
def 输出TA对id_x的优缺点 (df: DataFrame, 填写平台_x: str, id_x: str, 填写平台_y: str, id_y: str) -> None:
    # 筛选出所有id为指定id的行
    df_id = df[(df['识别用id_x'] == id_x) & (df['识别用id_y'] == id_y)]
    
    # 检查是否找到符合条件的行
    if df_id.empty:
        print(f"没有找到符合条件的行：id_x={id_x}, id_y={id_y}")
        return
    
    # 输出TA对id_x的优点，即底线项为0的数据的列名和加分项大于0的数据的列名
    print(f"TA对{id_x}的优点：")
    for column in df_id.columns:
        if column.startswith('底线：') and df_id[column].values[0] == 0:
            print(column)
        
    for column in df_id.columns:
        if column.startswith('加分项：') and df_id[column].values[0] > 0:
            print(column)
    
    # 输出TA对id_x的缺点，即底线项不为0的数据的列名和加分项等于0的数据的列名
    print(f"\nTA对{id_x}的缺点：")
    for column in df_id.columns:
        if column.startswith('底线：') and df_id[column].values[0] != 0:
            print(column)
    
    for column in df_id.columns:
        if column.startswith('加分项：') and df_id[column].values[0] == 0:
            print(column)
            
输出TA对id_x的优缺点(df_统计,'B.B站','kiFte','A.小红书','9477396476')

#### 指定id，符合特定条件的匹配数据

In [None]:
# 指定id，符合特定条件的匹配数据
df_特定条件 = df_统计[
    (df_统计['填写平台_x'] == 'B.B站') &
    (df_统计['识别用id_x'] == 'kiFte') &
    (df_统计['底线：家庭总资产'].fillna(0) != 0)
]

# 定义匹配函数
def 提取匹配数据(row):
    底线成功 = [column.replace('底线：', '') for column in row.index if column.startswith('底线：') and row[column] == 0]
    底线失败 = [column.replace('底线：', '') for column in row.index if column.startswith('底线：') and row[column] != 0]
    优点 = [column.replace('加分项：', '') for column in row.index if column.startswith('加分项：') and row[column] > 0]
    缺点 = [column.replace('加分项：', '') for column in row.index if column.startswith('加分项：') and row[column] == 0]
    
    return pd.Series({
        'id_x': row['识别用id_x'],
        'id_y': row['识别用id_y'],
        '底线成功': ', '.join(底线成功),
        '底线失败': ', '.join(底线失败),
        '优点': ', '.join(优点),
        '缺点': ', '.join(缺点)
    })

# 提取匹配数据
df_特定条件 = df_特定条件.apply(提取匹配数据, axis=1)

# 显示结果
with pd.option_context('display.max_colwidth', None):
    display(df_特定条件.head(2))


### 导出数据

#### 输出匹配成功数据

In [None]:
def 输出匹配成功数据(df: pd.DataFrame) -> pd.DataFrame:
    """
    根据输入的 DataFrame 提取匹配成功数据，返回一个新 DataFrame
    """
    # 创建一个列表用于收集行数据
    rows = []
    
    for index, row in df.iterrows():
        # 创建一个字段记录他们的总分
        总分 = row['总分']
        # 提取优点和缺点，并移除列名中的 "加分项："
        优点 = [column.replace('加分项：', '') for column in df.columns if column.startswith('加分项：') and row[column] > 0]
        缺点 = [column.replace('加分项：', '') for column in df.columns if column.startswith('加分项：') and row[column] <= 0]
        
        # 收集数据为字典
        rows.append({
            '填写平台_x': re.sub(r'^[A-Z]+\.', '', row['填写平台_x']),  # 去掉任意字母加点的前缀
            'id_x': row['识别用id_x'],
            '填写平台_y': re.sub(r'^[A-Z]+\.', '', row['填写平台_y']),  # 去掉任意字母加点的前缀
            'id_y': row['识别用id_y'],
            '总分': 总分,
            '优点': ', '.join(优点),  # 将列表转换为字符串
            '缺点': ', '.join(缺点)   # 将列表转换为字符串
        })
        
        # 字典内部，根据填写平台_x、id_x、总分降序排序
        rows = sorted(rows, key=lambda x: (x['填写平台_x'], x['id_x'], x['总分']), reverse=True)
    
    # 使用 pd.DataFrame 构造最终 DataFrame
    df = pd.DataFrame(rows)
    return df

df_输出内容 = 输出匹配成功数据(df_匹配成功)
# 使用 option_context 临时设置显示选项
with pd.option_context(
    'display.max_colwidth', None,  # 单元格内字符显示完整
    'display.max_rows', None,      # 显示所有行
    'display.max_columns', None,   # 显示所有列
    'display.width', 0             # 自动适应宽度
):
    display(df_输出内容.head(4))  # 显示前4行

#### 匹配过期

In [None]:
# 数据库连接信息
user = 'postgres'
password = 'root2'
host = 'localhost'
port = 5655
database = 'postgres'  # 替换为你的数据库名称
schema = '赛博相亲'  # 替换为你的架构名称
table_name_过期 = '匹配过期记录'  # 替换为你的表名称

# 创建 PostgreSQL 数据库连接
try:
    conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
    conn.autocommit = True  # 如果需要创建数据库，保持自动提交

    # 设置搜索路径
    with conn.cursor() as cursor:
        cursor.execute(f'SET search_path TO {schema}')
    
    print("连接成功并已设置搜索路径")
except psycopg2.Error as e:
    print(f"数据库连接失败: {e}")
finally:
    if conn:
        conn.close()

# 创建 postgresql 数据库连接
engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}?options=-csearch_path={schema}')

# 检查表是否存在
with engine.begin() as conn:
    check_table_query = f"""
    SELECT EXISTS (
        SELECT FROM information_schema.tables 
        WHERE table_schema = '{schema}' 
        AND table_name = '{table_name_过期}'
    );
    """
    table_exists = conn.execute(text(check_table_query)).scalar()
    
    extract_query = f"""
    --- 处理两周过期的情况
    with 上周计数 as (select id_x,
                        count(*) as 计数
                    from 相亲成功结果记录
                    where EXTRACT(ISODOW FROM 日期) = 6
                    and NOW()::date - 日期::date <= 7
                    and NOW()::date - 日期::date > 0
                    group by id_x
                    having count(*) >= 5)

    select
        GREATEST(id_x,id_y) as id_a, --- 限制(x,y)的顺序，方便后续处理
        LEAST(id_x,id_y) as id_b,
        count(*) 计数
    from 相亲成功结果记录
    where EXTRACT(ISODOW FROM 日期) = 6
    and NOW()::date - 日期::date <= 28
    and id_x in (select id_x from 上周计数)
    --- 确认数据并非已存在于匹配过期中
    and not exists(
            SELECT 1
            FROM 匹配过期记录
            where
                相亲成功结果记录.id_x = 匹配过期记录.id_x
                and 相亲成功结果记录.id_y = 匹配过期记录.id_y
            )
    group by id_a, id_b
    having
        count(*) >= 2

    union

    --- 处理四周过期的情况
    select
        GREATEST(id_x,id_y) as id_a,
        LEAST(id_x,id_y) as id_b,
        count(*) 计数
    from 相亲成功结果记录
    where EXTRACT(ISODOW FROM 日期) = 6
    and NOW()::date - 日期::date <= 28
    and not exists(
        SELECT 1
        FROM 匹配过期记录
        where
            相亲成功结果记录.id_x = 匹配过期记录.id_x
            and 相亲成功结果记录.id_y = 匹配过期记录.id_y
        )
    group by id_a, id_b
    having
        count(*) >= 4;
    """

    # 从数据库中提取数据
    df_匹配过期 = pd.read_sql(extract_query, conn)
    # 创建新列日期，插入当前日期
    df_匹配过期['日期'] = pd.Timestamp.now().date()

    # 根据表的存在性插入数据
    if not table_exists:
        # 如果表不存在，直接创建表并插入数据
        print(f"表 {schema}.{table_name_过期} 不存在，正在创建...")
        df_匹配过期.to_sql(table_name_过期, conn, schema=schema, if_exists='replace', index=False)
        print(f"表 {schema}.{table_name_过期} 创建并成功插入数据")
    else:
        # 将df_匹配过期的 id_a,id_b 列重置为 id_x,id_y
        df_匹配过期.rename(columns={'id_a': 'id_x', 'id_b': 'id_y'}, inplace=True)
        df_匹配过期.to_sql(table_name_过期, conn, schema=schema, if_exists='append', index=False)
        # 打印要导入的表原本有多少行，导入了多少行
        print(f"表 {schema}.{table_name_过期} 已存在，插入了 {df_匹配过期.shape[0]} 行数据")
        
    # 取出匹配过期的全部数据
    df_全匹配过期 = pd.read_sql(f"SELECT * FROM {schema}.{table_name_过期}", conn)

In [None]:
# 添加无序配对的辅助列
df_输出内容['id_a'] = df_输出内容[['id_x', 'id_y']].min(axis=1)
df_输出内容['id_b'] = df_输出内容[['id_x', 'id_y']].max(axis=1)

df_全匹配过期['id_a'] = df_全匹配过期[['id_x', 'id_y']].min(axis=1)
df_全匹配过期['id_b'] = df_全匹配过期[['id_x', 'id_y']].max(axis=1)

# 构建 set，提高效率
过期组合集合 = set(zip(df_全匹配过期['id_a'], df_全匹配过期['id_b']))

# 过滤未过期
df_未过期 = df_输出内容[
    ~df_输出内容.apply(lambda row: (row['id_a'], row['id_b']) in 过期组合集合, axis=1)
].copy()

# 删除辅助列
df_未过期.drop(columns=['id_a', 'id_b'], inplace=True)


#### 小红书截图页

In [None]:
# 输出列名
df_未过期.columns

In [None]:
print("优点代表id_y相对于id_x在这一加分项获取了分数，缺点代表在这一加分项没有获取分数")
print("图片中仅输出了'填写平台_x'为小红书的数据")
# 明确排除日期列，并筛选小红书数据
df_小红书输出 = df_未过期.loc[
    df_未过期['填写平台_x'] == '小红书',
    df_未过期.columns.difference(['日期'])  # 移除日期列
]
with pd.option_context('display.max_rows', None, 'display.max_colwidth', None):
#with pd.option_context('display.max_colwidth', None):
    # 生成HTML并隐藏索引
    html_table = df_小红书输出.to_html(index=False)
    display(HTML(html_table))


#### 检查：单向匹配&重复匹配

In [None]:
def 单向匹配(df: DataFrame, id_x: str = '识别用id_x', id_y: str = '识别用id_y') -> None:
    # 1. 创建一个反向 DataFrame，交换 id_x 和 id_y
    df_反向 = df.rename(columns={id_x: id_y, id_y: id_x})

    # 2. 使用 merge 查找哪些行在原表中没有对应的反向匹配
    df_单向匹配 = df.merge(df_反向, on=[id_x, id_y], how='outer', indicator=True)

    # 3. 筛选出仅在原表中存在，但不在反向表中的行
    df_单向匹配 = df_单向匹配[df_单向匹配['_merge'] != 'both']
    
    # 同个id_x和id_y的行，出现多行，说明有重复匹配。仅保留重复次数大于1的部分
    df_重复匹配 = df.groupby([id_x, id_y]).size().reset_index(name='重复次数')
    df_重复匹配 = df_重复匹配[df_重复匹配['重复次数'] > 1]
    
    if df_单向匹配.empty and df_重复匹配.empty:
        print("没有单向匹配和重复匹配的数据")
    else:
        display(f"存在单向匹配或重复匹配的数据，其中单向匹配的数据有 {df_单向匹配.shape[0]} 行，重复匹配的数据有 {df_重复匹配.shape[0]} 行")
        # 结果即为不成对的行
        with pd.option_context('display.max_rows', None, 'display.max_colwidth', None):
            print("单向匹配：")
            display(df_单向匹配.head())
            print("重复匹配：")
            display(df_重复匹配.head())
        # 报错退出
        raise ValueError("存在单向匹配或重复匹配的数据")

单向匹配(df_匹配成功)
单向匹配(df_未过期, 'id_x', 'id_y')

#### 输出导入数据库

In [None]:
# 给df_输出内容添加日期列
df_未过期['日期'] = pd.to_datetime('today').date()

In [None]:
# 数据库连接信息
user = 'postgres'
password = 'root2'
host = 'localhost'
port = 5655
database = 'postgres'  # 替换为你的数据库名称
schema = '赛博相亲'  # 替换为你的架构名称
table_name_1 = '相亲成功结果记录'  # 替换为你的表名称
table_name_2 = '无效问卷记录v1_2'  # 替换为你的表名称

# 创建 PostgreSQL 数据库连接
try:
    conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
    conn.autocommit = True  # 如果需要创建数据库，保持自动提交

    # 设置搜索路径
    with conn.cursor() as cursor:
        cursor.execute(f'SET search_path TO {schema}')
    
    print("连接成功并已设置搜索路径")
except psycopg2.Error as e:
    print(f"数据库连接失败: {e}")
finally:
    if conn:
        conn.close()

# 创建 postgresql 数据库连接
engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}?options=-csearch_path={schema}')

# 检查表是否存在
with engine.begin() as conn:
    check_table_1_query = f"""
    SELECT EXISTS (
        SELECT FROM information_schema.tables 
        WHERE table_schema = '{schema}' 
        AND table_name = '{table_name_1}'
    );
    """
    check_table_2_query = f"""
    SELECT EXISTS (
        SELECT FROM information_schema.tables 
        WHERE table_schema = '{schema}' 
        AND table_name = '{table_name_2}'
    );
    """   
    table_1_exists = conn.execute(text(check_table_1_query)).scalar()
    table_2_exists = conn.execute(text(check_table_2_query)).scalar()

    # 根据表的存在性插入数据
    if not table_1_exists:
        # 如果表不存在，直接创建表并插入数据
        print(f"表 {schema}.{table_name_1} 不存在，正在创建...")
        df_未过期.to_sql(table_name_1, conn, schema=schema, if_exists='replace', index=False)
        print(f"表 {schema}.{table_name_1} 创建并成功插入数据")
    if not table_2_exists:
        # 如果表不存在，直接创建表并插入数据
        print(f"表 {schema}.{table_name_2} 不存在，正在创建...")
        df_无效名单.to_sql(table_name_2, conn, schema=schema, if_exists='replace', index=False)
        print(f"表 {schema}.{table_name_2} 创建并成功插入数据")

    # 因为懒得改代码，故无论有没有插入过都跑一遍，避免插入完全重复的数据
    print(f"正在检查重复记录并尝试插入...")
    
    # 获取数据库表的列名
    columns_query_1 = f"""
    SELECT column_name 
    FROM information_schema.columns 
    WHERE table_schema = '{schema}' AND table_name = '{table_name_1}'
    ORDER BY ordinal_position;
    """
    columns_query_2 = f"""
    SELECT column_name
    FROM information_schema.columns
    WHERE table_schema = '{schema}' AND table_name = '{table_name_2}'
    ORDER BY ordinal_position;
    """
    # 读取列名
    columns_1 = pd.read_sql(columns_query_1, conn)['column_name'].tolist()
    columns_2 = pd.read_sql(columns_query_2, conn)['column_name'].tolist()
    # 给所有列名加上双引号，避免 SQL 解析错误
    columns_str_1 = ', '.join([f'"{col}"' for col in columns_1])
    columns_str_2 = ', '.join([f'"{col}"' for col in columns_2])
    # 将目前的数据替换至数据库中的临时表
    temp_table_name_1 = f"{table_name_1}_temp"
    temp_table_name_2 = f"{table_name_2}_temp"
    df_未过期.to_sql(temp_table_name_1, conn, schema=schema, if_exists='replace', index=False)
    df_无效名单.to_sql(temp_table_name_2, conn, schema=schema, if_exists='replace', index=False)
    # 直接使用sql进行操作，查询临时表与数据库表的差集
    diff_query_1 = f"""
    SELECT {columns_str_1} FROM {schema}.{temp_table_name_1}
    EXCEPT
    SELECT {columns_str_1} FROM {schema}.{table_name_1};
    """
    diff_query_2 = f"""
    SELECT {columns_str_2} FROM {schema}.{temp_table_name_2}
    EXCEPT
    SELECT {columns_str_2} FROM {schema}.{table_name_2};
    """
    diff_df_1 = pd.read_sql(diff_query_1, conn)
    diff_df_2 = pd.read_sql(diff_query_2, conn)
    # 如果有差集，插入差集数据
    diff_df_1.to_sql(table_name_1, conn, schema=schema, if_exists='append', index=False)
    diff_df_2.to_sql(table_name_2, conn, schema=schema, if_exists='append', index=False)
    # 打印要导入的表原本有多少行，导入了多少行
    print(f"表 {schema}.{table_name_1} 原有 {df_未过期.shape[0]} 行数据，导入了 {diff_df_1.shape[0]} 行数据")
    print(f"表 {schema}.{table_name_2} 原有 {df_无效名单.shape[0]} 行数据，导入了 {diff_df_2.shape[0]} 行数据")
    # 如果临时表存在，删除临时表
    drop_temp_table_query = f"""
    DROP TABLE IF EXISTS {schema}.{temp_table_name_1};
    DROP TABLE IF EXISTS {schema}.{temp_table_name_2};
    """
    # 删除临时表
    conn.execute(text(drop_temp_table_query))
    # 取出输出用无效名单
    输出无效名单query = f"""
    select
        填写平台,
        识别用id
    from
        {table_name_2}
    where
        日期 = current_date
    """
    输出无效名单 = pd.read_sql(输出无效名单query, conn)

#### 导出

In [None]:
# 导出路径
导出路径1 = r"D:\code\相亲问卷数据存储\当前成功数据.xlsx"
导出路径2 = r"D:\code\相亲问卷数据存储\当前无效名单.xlsx"
# 导出匹配成功的数据，文件名重复会自动覆盖
df_未过期.to_excel(导出路径1, index=False)
输出无效名单.to_excel(导出路径2, index=False)


In [None]:
print("无效原因主要以距离底线填为0和个人资产<家庭资产为主")
with pd.option_context('display.max_rows', None, 'display.max_colwidth', None):
#with pd.option_context('display.max_colwidth', None):
    # 生成HTML并隐藏索引
    html_table = 输出无效名单.to_html(index=False)
    display(HTML(html_table))

## 反馈

In [None]:
df_反馈 = df_cleaned.copy()
df_反馈.info()

#### 列名

In [None]:
# 显示完整列名
with pd.option_context('display.max_rows', None, 'display.max_colwidth', None):
    display(df_反馈.columns.tolist())

#### 个人总资产查看

In [None]:
# 定义分级范围和标签
bins = [-float('inf'), 0, 10, 50, 100, 500, 1000, 3000, 8000, 15000, float('inf')]
labels = ['0', '1-10', '10-50', '50-100', '100-500', '500-1000', '1000-3000', '3000-8000', '8000-15000', '15000+']

# 定义需要分级的列及目标列名
个人资产分级列 = {
    '个人总资产（单位：万元）': '个人总资产分级',
    'TA的个人总资产（单位：万元）_填空1': 'TA的个人总资产_底线分级',
    'TA的个人总资产（单位：万元）_填空2': 'TA的个人总资产_加分项分级'
}

# 批量分级
for source_col, target_col in 个人资产分级列.items():
    df_反馈[target_col] = pd.cut(df_反馈[source_col], bins=bins, labels=labels, right=False)
    # 手动处理 0 的情况
    df_反馈.loc[df_反馈[source_col] == 0, target_col] = '0'

# 构建包含所有列统计结果的表
个人资产分级结果 = []
for col_name in 个人资产分级列.values():
    counts = df_反馈[col_name].value_counts()
    percentages = df_反馈[col_name].value_counts(normalize=True) * 100
    temp_df = pd.DataFrame({'分级列': col_name, '等级': counts.index, '行数': counts.values, '占比 (%)': percentages.values})
    个人资产分级结果.append(temp_df)

# 合并所有结果
个人资产分级表 = pd.concat(个人资产分级结果, ignore_index=True)

In [None]:
# 绘制折线图
个人资产分级图 = alt.Chart(个人资产分级表).mark_line(point=True).encode(
    x=alt.X('等级', sort=labels, title='资产分级'),
    y=alt.Y('行数', title='人数'),
    color=alt.Color('分级列', title='资产类别'),
    tooltip=['分级列', '等级', '行数', '占比 (%)']
).properties(
    title='个人资产分级趋势图',
    width=800,
    height=400
)

# 展示图表
个人资产分级图.display()

#### 家庭总资产查看

In [None]:
# 定义分级范围和标签
bins = [-float('inf'), 0, 10, 50, 100, 500, 1000, 3000, 8000, 15000, float('inf')]
labels = ['0', '1-10', '10-50', '50-100', '100-500', '500-1000', '1000-3000', '3000-8000', '8000-15000', '15000+']

# 定义需要分级的列及目标列名
家庭资产分级列 = {
    '家庭总资产（单位：万元）': '家庭总资产分级',
    'TA的家庭总资产（单位：万元）_填空1': 'TA的家庭总资产_底线分级',
    'TA的家庭总资产（单位：万元）_填空2': 'TA的家庭总资产_加分项分级'
}

# 批量分级
for source_col, target_col in 家庭资产分级列.items():
    df_反馈[target_col] = pd.cut(df_反馈[source_col], bins=bins, labels=labels, right=False)
    # 手动处理 0 的情况
    df_反馈.loc[df_反馈[source_col] == 0, target_col] = '0'

# 构建包含所有列统计结果的表
家庭资产分级结果 = []
for col_name in 家庭资产分级列.values():
    counts = df_反馈[col_name].value_counts()
    percentages = df_反馈[col_name].value_counts(normalize=True) * 100
    temp_df = pd.DataFrame({'分级列': col_name, '等级': counts.index, '行数': counts.values, '占比 (%)': percentages.values})
    家庭资产分级结果.append(temp_df)

# 合并所有结果
家庭资产分级表 = pd.concat(家庭资产分级结果, ignore_index=True)

In [None]:
# 绘制折线图
家庭资产分级图 = alt.Chart(家庭资产分级表).mark_line(point=True).encode(
    x=alt.X('等级', sort=labels, title='资产分级'),
    y=alt.Y('行数', title='人数'),
    color=alt.Color('分级列', title='资产类别'),
    tooltip=['分级列', '等级', '行数', '占比 (%)']
).properties(
    title='家庭资产分级图',
    width=800,
    height=400
)

# 展示图表
家庭资产分级图.display()

#### 兴趣爱好匹配情况

In [None]:
# **1. 计算匹配成功的比例**

# 筛选出性取向等于 0 的行
df_性取向匹配 = df_打分[df_打分['底线：性取向'] == 0]

# **2. 提取交集中的兴趣爱好**
交集兴趣爱好 = []
for _, row in df_性取向匹配.iterrows():
    x_set = set(filter(None, map(str.strip, row['兴趣爱好_x'].split('.'))))  # 清理空格和空值
    y_set = set(filter(None, map(str.strip, row['兴趣爱好_y'].split('.'))))  # 清理空格和空值
    intersection = x_set & y_set  # 取交集
    交集兴趣爱好.extend(intersection)  # 添加到交集列表

# **3. 统计每个兴趣爱好的行数和匹配成功比例**
交集兴趣爱好统计 = pd.Series(交集兴趣爱好).value_counts()
交集兴趣爱好统计比例 = (交集兴趣爱好统计 / len(df_性取向匹配)) * 100

# **4. 构建结果 DataFrame**
兴趣爱好匹配情况 = pd.DataFrame({
    '兴趣爱好':交集兴趣爱好统计.index,
    '匹配成功行数（参考）':交集兴趣爱好统计.values,
    '匹配成功比例 (%)':交集兴趣爱好统计比例.values
}).reset_index(drop=True)

# **2. 计算总体出现的比例**

# **1. 按行拆分每种兴趣爱好并清理空格**
兴趣爱好_x列 = df_打分['兴趣爱好_x'].str.split('.', expand=True).applymap(lambda x: x.strip() if x else None)
兴趣爱好_y列 = df_打分['兴趣爱好_y'].str.split('.', expand=True).applymap(lambda x: x.strip() if x else None)

# **2. 合并并获取独立兴趣爱好**
兴趣爱好_展开 = pd.concat([兴趣爱好_x列, 兴趣爱好_y列], axis=1).stack().reset_index(drop=True)

# **3. 过滤无效数据（空字符串或 None）**
兴趣爱好_展开 = 兴趣爱好_展开[兴趣爱好_展开.notnull() & (兴趣爱好_展开.str.strip() != '')]

# **4. 统计兴趣爱好行数（基于人数）**
兴趣爱好总统计 = 兴趣爱好_展开.value_counts()

# **5. 计算每种兴趣爱好占总人数的比例**
兴趣爱好总统计比例 = (兴趣爱好总统计 / len(df_打分)) * 100

# **6. 构建结果 DataFrame**
总计_兴趣爱好统计 = pd.DataFrame({
    '兴趣爱好': 兴趣爱好总统计.index,
    '总出现行数（参考）': 兴趣爱好总统计.values / 2,
    '出现比例 (%)': 兴趣爱好总统计比例.values / 2  # 除以 2 是因为每行都有两个人
}).reset_index(drop=True)

# **3. 合并两种比例到一个表中**

兴趣爱好合并 = pd.merge(总计_兴趣爱好统计, 兴趣爱好匹配情况, on='兴趣爱好', how='left').fillna(0)
print(兴趣爱好合并)


In [None]:
# **3. 生成依序自增的英文字母**
def 生成自增英文字母(n):
    letters = []
    for i in range(1, n + 1):
        s = ""
        while i > 0:
            i -= 1
            s = chr(65 + (i % 26)) + s  # A=65
            i //= 26
        letters.append(s)
    return letters

In [None]:
# 提取出"兴趣爱好："后的文本，去除空格，去除空值，去除重复值。
# 创建一个表，列1为兴趣爱好，列2为依序自增的英文字母（从A开始，超过24个就变成AA，AB...）
# 直接对包含兴趣爱好的列进行操作即可

# **1. 提取兴趣爱好并清理数据**
兴趣爱好列表 = [col.split(':')[1].strip() for col in 包含兴趣爱好的列]  # 提取兴趣爱好并去除空格

# **2. 去除重复值（如果有）**
兴趣爱好列表确保唯一 = pd.Series(兴趣爱好列表).drop_duplicates().reset_index(drop=True)

兴趣爱好字母 = 生成自增英文字母(len(兴趣爱好列表确保唯一))

# **3. 创建最终结果表**
兴趣爱好说明表 = pd.DataFrame({
    '说明': 兴趣爱好列表确保唯一,
    '兴趣爱好': 兴趣爱好字母
})

兴趣爱好说明表['兴趣爱好'] = 兴趣爱好说明表['兴趣爱好'].str.strip()

##### 兴趣爱好情况输出

In [None]:
兴趣爱好比例统计 = pd.merge(兴趣爱好合并, 兴趣爱好说明表, on='兴趣爱好', how='left').fillna(0)

# 筛选匹配成功比例小于 2 的行，并显式创建副本
无必要兴趣爱好 = 兴趣爱好比例统计[兴趣爱好比例统计['匹配成功比例 (%)'] < 2].copy()

# 调整总出现行数为 int，成功比例和出现比例保留 2 位小数
无必要兴趣爱好['总出现行数（参考）'] = 无必要兴趣爱好['总出现行数（参考）'].astype(int)
无必要兴趣爱好['匹配成功比例 (%)'] = 无必要兴趣爱好['匹配成功比例 (%)'].round(2)
无必要兴趣爱好['出现比例 (%)'] = 无必要兴趣爱好['出现比例 (%)'].round(2)

# 调整列顺序
无必要兴趣爱好 = 无必要兴趣爱好[['说明', '匹配成功比例 (%)', '匹配成功行数（参考）', '出现比例 (%)', '总出现行数（参考）', '兴趣爱好']]

# 排序
无必要兴趣爱好 = 无必要兴趣爱好.sort_values(by='匹配成功比例 (%)', ascending=True)
# 打印结果
display(无必要兴趣爱好)

In [None]:
# 确保说明列为字符串类型
兴趣爱好比例统计['说明'] = 兴趣爱好比例统计['说明'].astype(str)
# 根据匹配成功比例 (%) 对数据排序
兴趣爱好比例统计 = 兴趣爱好比例统计.sort_values(by='匹配成功比例 (%)', ascending=True)

# 提取排序后的说明顺序
兴趣爱好展示顺序 = list(兴趣爱好比例统计['说明'])

# 转换数据为长格式以便 Altair 处理
兴趣爱好比例统计_long = 兴趣爱好比例统计.melt(
    id_vars=['兴趣爱好', '说明'], 
    value_vars=['出现比例 (%)', '匹配成功比例 (%)'], 
    var_name='类型', 
    value_name='比例'
)

# 绘制折线图
兴趣爱好比例图 = alt.Chart(兴趣爱好比例统计_long).mark_line(point=True).encode(
    x=alt.X('说明:N', sort=兴趣爱好展示顺序, title='兴趣爱好'),
    y=alt.Y('比例:Q', title='比例 (%)'),
    color=alt.Color('类型:N', title='比例类型'),
    tooltip=['说明', '类型', '比例']
).properties(
    title='兴趣爱好比例图',
    width=800,
    height=400
)

# 展示图表
兴趣爱好比例图.display()


### NLP

In [None]:
# auth不填则匿名，zh中文，mul多语种
HanLP = HanLPClient('https://www.hanlp.com/api', auth='NzMxMkBiYnMuaGFubHAuY29tOldLSTl4SW5lQmUyS2NMaGs=', language='zh')

#### 加载停用词典

In [None]:
# 加载停用词表
def load_stopwords(filepath):
    """加载停用词表"""
    with open(filepath, 'r', encoding='utf-8') as f:
        stopwords = set([line.strip() for line in f.readlines()])
    return stopwords

#### 去除空格和符号

In [None]:
def 去除空格和符号(text):
    # 去除所有标点符号和空白空格
    text = re.sub( r'[^\w\s]' , '' , text) # 保留字母、数字和中文字符
    text = re.sub( r'\s+' , ' ' , text) # 多个空格替换为单个空格
    return text.strip() # 去掉首尾空格

#### 分词并过滤停用词

In [None]:
# 分词并过滤停用词
def 分词并过滤停用词(text, stopwords):
    """分词并过滤停用词"""
    words = jieba.lcut(text)  # 精确模式分词
    filtered_words = [word for word in words if word not in stopwords and len(word) > 1]  # 过滤停用词和长度为1的词
    return filtered_words

#### 生成词云

In [None]:
# 生成词云
def generate_wordcloud(word_list):
    """根据词频生成词云"""
    # 将词汇列表转换为字符串
    text = ' '.join(word_list)

    # 设置词云参数
    wordcloud = WordCloud(
        font_path='simhei.ttf',  # 中文字体路径 (Windows 默认 simhei.ttf 或 macOS 可用 STHeiti.ttf)
        width=800,
        height=600,
        background_color='white',
        max_words=200,
        max_font_size=100
    ).generate(text)

    # 显示词云
    plt.figure(figsize=(10, 8))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.show()

#### 提取关键短语

In [None]:
def 使用Hanlp提取关键词(text_list: list) -> dict:
    # 初始化变量
    total_length = sum(len(item) for item in text_list)
    result = {}  # 存储关键词结果
    api调用次数 = 0  # 统计API调用次数
    暂存文本长度 = 0  # 暂存文本长度
    批次文本上限 = 1000  # 每次调用API的文本上限
    暂存文本最小项 = 0  # 暂存文本最小项
    拼接符号占用长度 = 3  # 每个文本项之间的拼接符号占用长度
    api每分钟可调用次数 = 4  # 每分钟API调用次数

    if total_length <= 批次文本上限:
        # 如果总长度小于批次文本上限，直接合并字符串，调用api即可
        text = ' '.join([f"'{item}'" for item in text_list])
        result.update(HanLP.keyphrase_extraction(text, topk=100))
    else:
        for i in range(0,len(text_list)):
            if 暂存文本长度 + len(text_list[i]) + 拼接符号占用长度 <= 批次文本上限 and i != len(text_list) - 1:
                暂存文本长度 = len(text_list[i]) +拼接符号占用长度 + 暂存文本长度
            else:
                if i == len(text_list) - 1 and 暂存文本长度 + len(text_list[i]) + 拼接符号占用长度 <= 批次文本上限:
                    暂存文本最大项 = i + 1 # 如果是最后一项，且总长度<=批次文本上限，直接全部处理完毕
                else:
                    暂存文本最大项 = i
                    暂存文本长度 = len(text_list[i]) +拼接符号占用长度 # 重置暂存文本长度
                    
                text = ' '.join([f"'{item}'" for item in text_list[暂存文本最小项:暂存文本最大项]])
                失败次数 = 0
                while 失败次数 < 3:  # 允许每段数据最多重试3次
                    try:
                        result.update(HanLP.keyphrase_extraction(text, topk=100))
                        print(f"第{暂存文本最小项}-{暂存文本最大项 - 1}条数据提取成功")
                        暂存文本最小项 = i
                        break
                    except Exception as e:
                        失败次数 += 1
                        print(f"第{暂存文本最小项}-{暂存文本最大项 - 1}条数据提取失败，重试次数 {失败次数}/3，错误信息：{str(e)}")
                        # 如果失败次数达到3次，则抛出异常退出
                        if 失败次数 == 3:
                            raise RuntimeError(f"第{暂存文本最小项}-{暂存文本最大项 - 1}条数据连续3次提取失败，程序终止。")
                        # 等待60秒后重试
                        time.sleep(60)
                
                # 更新API调用次数
                api调用次数 += 1
                # 每调用2次API后等待60秒
                if api调用次数 % api每分钟可调用次数 == 0:
                    print("API调用次数已达{api每分钟可调用次数}次，等待60秒...")
                    time.sleep(60)
                
                if i == len(text_list) - 1 and 暂存文本最大项 == i + 1:
                    break # 已经处理完毕，退出循环
                elif i == len(text_list) - 1 and 暂存文本最大项 == i:
                    # i是最后一项，且暂存文本最大项等于i，说明最后一项需要单独处理
                    失败次数 = 0
                    text = f"'{text_list[i]}'"
                    while 失败次数 < 3:  # 允许每段数据最多重试3次
                        try:
                            result.update(HanLP.keyphrase_extraction(text, topk=100))
                            print(f"第{i}条数据提取成功")
                            break
                        except Exception as e:
                            失败次数 += 1
                            print(f"第{i}条数据提取失败，重试次数 {失败次数}/3，错误信息：{str(e)}")
                            # 如果失败次数达到3次，则抛出异常退出
                            if 失败次数 == 3:
                                raise RuntimeError(f"第{i}条数据连续3次提取失败，程序终止。")
                            # 等待60秒后重试
                            time.sleep(60)
                else:
                    continue
                        
    return result
        
    

#### 前置处理

In [None]:
df_newhobiess = df_cleaned[['兴趣爱好（补充说明）', '再描述一些自己的特征']]
print(df_newhobiess.shape[0])
# 去除重复行
df_newhobiess = df_cleaned[['兴趣爱好（补充说明）', '再描述一些自己的特征']].drop_duplicates()

# 去除空白项或缺失值
df_newhobiess = df_newhobiess.replace(r'^\s*$', float('NaN'), regex=True)
df_newhobiess = df_newhobiess.dropna(how='all')
print(df_newhobiess.shape[0])

###### 特征

In [None]:
# 替换空白字符串为空值，然后用空字符串替代 NaN
df_newhobiess['再描述一些自己的特征'] = df_newhobiess['再描述一些自己的特征'].fillna('').astype(str)
df_newhobiess['兴趣爱好（补充说明）'] = df_newhobiess['兴趣爱好（补充说明）'].fillna('').astype(str)

## 我希望对这些数据进行分词，然后统计词频
补充特征统计_0 = df_newhobiess['再描述一些自己的特征'].values.tolist()
兴趣爱好分词前准备_0 = df_newhobiess['兴趣爱好（补充说明）'].values.tolist()
# 去重
补充特征统计_2 = list(set(补充特征统计_0))
兴趣爱好分词前准备_2 = list(set(兴趣爱好分词前准备_0))


In [None]:
display(df_newhobiess.head(2))
display(补充特征统计_0[:5])
display(兴趣爱好分词前准备_0[:5])

#### 分词

In [None]:
# 把列表中文本合并成一个字符串
补充特征统计_2_str = ' '.join(补充特征统计_2)
兴趣爱好分词前准备_2_str = ' '.join(兴趣爱好分词前准备_2)

In [None]:
# 加载挺有词典
stopwords = load_stopwords("停用词典.txt")
# 加载自定义词典
jieba.load_userdict("行为特征词典.txt")

In [None]:
# 文本预处理
特征_关键短语文本 = 去除空格和符号(补充特征统计_2_str)
兴趣爱好_关键短语文本 = 去除空格和符号(兴趣爱好分词前准备_2_str)
display(特征_关键短语文本)
display(兴趣爱好_关键短语文本)

In [None]:
# 动态调整词频
jieba.suggest_freq(('想找'), True)  # 强制识别"想找"
jieba.suggest_freq(('北方人'), True)  # 确保"北方人"不被拆分
jieba.suggest_freq(('一个人'), True)  # 确保"北方人"不被拆分

In [None]:
特征分词结果 = 分词并过滤停用词(特征_关键短语文本, stopwords)
兴趣爱好分词结果 = 分词并过滤停用词(兴趣爱好_关键短语文本, stopwords)
# 统计词频
特征词频 = Counter(特征分词结果)
兴趣词频 = Counter(兴趣爱好分词结果)
# 生成词云
display(特征分词结果[:5])
print("特征词频前10：", 特征词频.most_common(10))
generate_wordcloud(特征分词结果)
display(兴趣爱好分词结果[:5])
print("兴趣词频前10：", 兴趣词频.most_common(10))
generate_wordcloud(兴趣爱好分词结果)