In [1]:
from pandas import read_excel, Timestamp, lreshape
from typing import Optional

from tools import (
    trimmed_str,
    location_mapper,
    relative_mapper,
    pattern_name,
    isIDcard,
    isPhone,
    add_colon,
    form,
)
from model.models import (
    db,
    init_db,
    Person,
    Permission,
    Vaccination,
    House,
    Family,
    Location,
)

# import numpy as np
# from datetime import datetime
####################################################初始化修正数据格式###################################################################
xlsx = 'H:/xfjy/tomorrow/data/sample.xlsx'
df = read_excel(
    xlsx,
    converters={
        '区划代码': trimmed_str,
        '小区': trimmed_str,
        '门牌号': trimmed_str,
        '住房性质': trimmed_str,
        '姓名': trimmed_str,
        '与户主关系': relative_mapper,
        '身份证': trimmed_str,
        '单位': trimmed_str,
        '第一针时间': Timestamp,
        '第一针地点': location_mapper,
        '第二针时间': Timestamp,
        '第二针地点': location_mapper,
        '禁忌症': trimmed_str,
        '未接种原因': trimmed_str,
        '电话': trimmed_str,
        '备注': trimmed_str,
    },
)[
    [
        '区划代码',
        '小区',
        '门牌号',
        '住房性质',
        '姓名',
        '与户主关系',
        '身份证',
        '单位',
        '第一针时间',
        '第一针地点',
        '第二针时间',
        '第二针地点',
        '禁忌症',
        '未接种原因',
        '电话',
        '备注',
    ]
]
# 把所有备注信息都放在备注中，add_colon把np.NaN转换为''，而不是nan
df['备注'] = (
    df['备注'].map(add_colon) + df['禁忌症'].map(add_colon) + df['未接种原因'].map(add_colon)
)
del df['禁忌症']
del df['未接种原因']
df['剂型'] = df.apply(form, axis=1)
# 保留所有非空值，把空值转换为None而不是NaN
df = df.where(df.notnull(), None)

#######################################################################################################################################
# 根据规则验证3列
is_name = df['姓名'].str.match(pattern_name)
is_id = df['身份证'].map(isIDcard)
is_phone = df['电话'].map(isPhone)
# 姓名一定要为真、身份证一定要为真、电话可以空但不能假
valid_data = df[(is_name == True) & (is_id == True) & (is_phone != False)]
# 反馈给用户错误的
feedback = df[(is_name != True) | (is_id != True) | (is_phone == False)]
# 删除身份证重复项
no_repeat = valid_data.drop_duplicates(subset=['身份证'], keep=False)
# 准备导出身份证重复项
duplicated = valid_data.loc[set(valid_data.index) - set(no_repeat.index)]
# 把重复项导出为excel
# dt=datetime.now().strftime('%y%m%d-%H%M%S')
# duplicated.to_excel(f'h:/dupicated_ids{dt}.xlsx')
db.connect(reuse_if_open=True)
# 删除数据库中所有表、设置类型、重新建表
init_db()

######################################################################################################
xlsx = 'H:/xfjy/tomorrow/data/region_code_list.xlsx'
df_location = read_excel(xlsx)
df_location = df_location.where(df_location.notnull(), None)
sql_1 = Location.insert_many(
    df_location.values,
    [
        Location.id,
        Location.province,
        Location.city,
        Location.county,
        Location.country,
        Location.village,
        Location.neighborhood,
    ],
)
with db.transaction():
    sql_1.execute()
# 批量插入人员信息
# sql_1=Person.insert_many(
#     no_repeat[['姓名', '身份证', '单位','电话', '备注']].values,
#     [Person.name, Person.id, Person.workplace, Person.phone, Person.remarks],
# ).execute()
# house_people = no_repeat[['区划代码', '门牌号',  '住房性质', '与户主关系', '身份证']]
# house = house_people.drop_duplicates(subset=['区划代码','门牌号'])
# sql_5=House.insert_many(
#     house.values,
#     [House.location, House.name]
# ).execute()
# 获取house的每一行，并且apply或者迭代
# 使用update时有个issue，导致会提示类型是TEXT
# 使用了区划代码、门牌号、住房性质、身份证、与户主关系
# https://github.com/coleifer/peewee/issues/2308
def add_house_family_people():
    last_code: Optional[str] = None
    last_house_number: Optional[str] = None
    last_family: Optional[Family] = None
    last_house: Optional[House] = None
    people = []

    def gen_person_information(row):
        nonlocal last_code
        nonlocal last_house_number
        nonlocal last_family
        nonlocal last_house
        nonlocal people
        # 先生成这行人员的实例
        person = [row['身份证'], row['姓名'], row['电话'], row['单位'], row['备注'], row['与户主关系']]
        # 如果这是列表中的第一个，则新建一个home
        if last_family is None:
            family = Family.create()
            person.append(family)
            house = House.create(location=row['区划代码'], name=row['门牌号'])
            person.append(house)
            if row['住房性质'] == '自有' and row['与户主关系'] == '户主':
                person.append(house)
            last_family = family
            last_house = house
        else:
            # 如果和上个人是一个房子，则加入上个家庭
            if row['区划代码'] == last_code and row['门牌号'] == last_house_number:
                person.append(last_family)
                person.append(last_house)
                if row['住房性质'] == '自有' and row['与户主关系'] == '户主':
                    person.append(last_house)
            else:
                # 如果和上个人不是一个房子，则新建家庭
                family = Family.create()
                person.append(family)
                house = House.create(location=row['区划代码'], name=row['门牌号'])
                person.append(house)
                if row['住房性质'] == '自有' and row['与户主关系'] == '户主':
                    person.append(house)
                last_family = family
                last_house = house
        # 这里不能使用last_series = series，否则会形成可变量持续变化的BUG
        last_code = row['区划代码']
        last_house_number = row['门牌号']
        people.append(person)

    no_repeat.apply(gen_person_information, axis=1)
    sql_2 = Person.insert_many(
        people,
        [
            Person.id,
            Person.name,
            Person.phone,
            Person.workplace,
            Person.remarks,
            Person.relative,
            Person.family,
            Person.house,
            Person.house_property,
        ],
    )
    with db.transaction():
        sql_2.execute()


add_house_family_people()
del no_repeat['姓名']
del no_repeat['单位']
del no_repeat['备注']
###########################################反馈#################################################################
# # 反馈给用户
# fd = feedback[['isName','姓名',  'isID','身份证',  'isPhone', '电话', '单位','备注', ]]
# fd['isName'] = fd['isName'].map({None: '姓名不能为空', False: '姓名错误', True:None})
# fd['isID'] = fd['isID'].map({None: '身份证不能为空', False: '身份证错误', True:None})
# fd['isPhone'] = fd['isPhone'].map({None: None, False: '电话错误（只能填一个号码，多余的放到备注）', True:None})
# fd.to_excel(f'h:/wrong{dt}.xlsx')
#######################################################################################################
# lreshape作用见下面的网页
# https://www.cnblogs.com/traditional/p/11967360.html
vaccination = lreshape(
    no_repeat[['身份证', '第一针时间', '第一针地点', '第二针时间', '第二针地点', '剂型']],
    {'date': ['第一针时间', '第二针时间'], 'location': ['第一针地点', '第二针地点']},
)
sql_3 = Vaccination.insert_many(
    vaccination.values,
    [Vaccination.type_, Vaccination.person, Vaccination.date, Vaccination.location],
)
with db.transaction():
    sql_3.execute()

del no_repeat['第一针时间']
del no_repeat['第一针地点']
del no_repeat['第二针时间']
del no_repeat['第二针地点']
del no_repeat['剂型']
# TODO缺少反馈地点错误的信息feedback

#######################################################################################################
# 生成有电话的df
has_phone = no_repeat.dropna(axis=0, subset=['电话'], inplace=False)[['身份证', '电话']]
# 只需要电话后6位当作初始密码
has_phone['电话'] = has_phone['电话'].str[5:]
sql_4 = Permission.insert_many(
    has_phone.values, [Permission.person, Permission.password]
)
with db.transaction():
    sql_4.execute()
del no_repeat['电话']
