In [4]:
import os
import re
import time
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ===================== CONFIG =====================
TEMPLATE_FILE = os.path.abspath("workday_data.xlsx")
OUTPUT_MASTER = "master_data.xlsx"
HR_DIR = "hr_files"

SHEET_MASTER = "Masterdata_PSteam"
SHEET_HIRING = "Hiring_data"
SHEET_EMP = "Employee Master"
SHEET_LINEUP = "LINEUP_PASS_IMPORT"
SHEET_PERMISSION = "Permission"

SKIP_MASTER = 11
SKIP_HIRING = 1
SKIP_EMP = 11
SKIP_LINEUP = 0
SKIP_PERMISSION = 0

# mapping cột nguồn -> cột đích (chuẩn theo template)
column_mapping = {
    'Emp id': 'EID',
    'Hire Date (DD-MMM-YYYY)': 'DOJ',
    'FULL_NAME': 'EMPLOYEE FULL NAME_EN',
    'Họ tên': 'EMPLOYEE FULL NAME_VN',
    'Gender': 'GENDER',
    'Giới tính': 'GENDER',
    'DOB (DD-MMM-YYYY)': 'DOB',
    'Dân tộc': 'ETHNIC',
    'Nơi sinh': 'BIRTH PLACE',
    'Số CMND/CCCD:': 'ID CARD',
    'Ngày cấp CMND/CCCD': 'ISSUED DATE',
    'Nơi cấp CCCD/CMND': 'ISSUED PLACE',
    'Số CMND Cũ (Nếu Có)': 'OLD ID CARD NO.',
    'Địa chỉ thường trú:': 'PERMANENT ADDRESS',
    'Địa chỉ tạm trú': 'TEMPORARY ADDRESS',
    'Highest Education': 'EDUCATION',
    'Marital Status': 'MARITAL STATUS',
    'Phone Number': 'MOBILE PHONE NO',
    'Email - Work': 'CNX EMAIL',
    'Home E-mail address': 'PERSONAL EMAIL',
    'Số điện thoại người liên hệ trong trường hợp khẩn cấp': 'EMERGENCY CONTACT',
    'Số Bảo Hiểm Xã Hội': 'SOCIAL INSURANCE NUMBER',
    'Mã số thuế': 'PERSONAL TAX CODE',
    'Số tài khoản ngân hàng nhận lương': 'BANK ACCOUNT',
    'Tên ngân hàng': 'BANK NAME',
    'Base Pay': 'BASE SALARY',
    'Complexity Allowance': 'COMPLEXCITY ALLOWANCE',
    'Position Allowance': 'POSITION ALLOWANCE',
    'Meal Allowance - Monthly': 'MEAL ALLOWANCE',
    'Business Title': 'BUSINESS_TITLE_EN',
    'Vị trí': 'BUSINESS_TITLE_VN',
    'SUPERVISOR_FULL_NAME': 'SUPERVISOR',
    'Career Level': 'CAREER LEVEL',
    'Họ và tên chủ hộ': 'Họ tên chủ hộ',
    'Giới tính chủ hộ': 'Giới tính chủ hộ',
    'Ngày sinh của chủ hộ': 'DOB chủ hộ',
    'Mối quan hệ giữa chủ hộ và bạn': 'Mối quan hệ giữa NLĐ với chủ hộ',
    "MSA Client": "MSA Client",
    "Contract ID": "PROBATION CONTRACT NO",
    "Process_x": "PROJECT",
    'ORIGINAL_DATE_OF_HIRE': 'DOJ',
    "Start Date (DD-MMM-YYYY)": "FROM",
    "End Date (DD-MMM-YYYY)": "TO",
    "Bệnh viện muốn đăng ký BHYT": "HOSPITAL NAME ",
    "Legislation Code / Country": "NATIONALITY"
}

# ===================== HELPERS =====================

def sanitize_filename(name: str) -> str:
    return re.sub(r'[<>:"/\\|?*]', '_', str(name))

def coalesce_duplicate_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Khi rename theo column_mapping có thể xuất hiện các cột trùng tên (VD: DOJ đến từ 2 nguồn).
    Hàm này giữ lại 1 cột cho mỗi tên, ưu tiên giá trị non-null đầu tiên theo thứ tự cột hiện tại.
    """
    # group theo tên cột, lấy first non-null theo hàng
    if df.columns.has_duplicates:
        # tốc độ tốt và không copy nhiều
        df = df.T.groupby(level=0, sort=False).first().T
    return df

def fast_to_excel(path: str, df: pd.DataFrame, sheet_name: str = "master_data") -> None:
    # xlsxwriter nhanh hơn openpyxl với write-only
    with pd.ExcelWriter(path, engine="xlsxwriter", engine_kwargs={'options': {"strings_to_formulas": False}}) as writer:
        df.to_excel(writer, index=False, sheet_name=sheet_name)

def write_hr_file(sso, hr_df: pd.DataFrame, base_dir: str) -> str:
    safe = sanitize_filename(sso)
    out = os.path.join(base_dir, f"HR_{safe}.xlsx")
    fast_to_excel(out, hr_df)
    return out

def _clean_id_value(x):
    """
    Chuyển 1 ô (có thể int/float/str) về chuỗi chỉ chứa chữ số, xử lý .0 và scientific.
    Trả về pd.NA nếu rỗng.
    """
    if pd.isna(x):
        return pd.NA
    s = str(x).strip()

    # float hiển thị như '75203213222.0' hoặc scientific '7.52032e+11'
    # nếu có 'e' hoặc 'E', format lại bằng float -> không mất precision cho 22 chữ số
    if 'e' in s or 'E' in s:
        try:
            s = '{:.0f}'.format(float(x))
        except Exception:
            pass

    # loại .0 cuối cùng (ví dụ '75203213222.0' -> '75203213222')
    if s.endswith('.0'):
        s = s[:-2]

    # giữ lại chữ số
    s = re.sub(r'\D+', '', s)

    return s if s != '' else pd.NA

def normalize_id_series(ser: pd.Series, expected_len: int | None = None) -> pd.Series:
    """
    Chuyển series về chuỗi chữ số (string), remove non-digits, optional zfill tới expected_len.
    Nếu expected_len là None -> sẽ không zfill.
    """
    cleaned = ser.map(_clean_id_value).astype("string")
    if expected_len is not None:
        cleaned = cleaned.map(lambda v: v.zfill(expected_len) if pd.notna(v) else v)
    return cleaned

def normalize_id_series(ser: pd.Series, expected_len: int | None = None) -> pd.Series:
    """
    Chuẩn hóa series ID thành chuỗi, bảo toàn leading zero, xử lý NA và định dạng không hợp lệ.
    """
    def clean_id(x):
        if pd.isna(x):
            return pd.NA
        s = str(x).strip()
        s = re.sub(r'[^\d]', '', s)
        return s if s else pd.NA

    cleaned = ser.map(clean_id).astype("string")
    if expected_len is not None:
        cleaned = cleaned.map(lambda v: v.zfill(expected_len) if pd.notna(v) else v)
    return cleaned

def validate_and_rename(df: pd.DataFrame, src_col: str, dst_col: str, logger) -> pd.DataFrame:
    if src_col in df.columns and dst_col not in df.columns:
        df = df.rename(columns={src_col: dst_col})
        logger.info(f"Renamed column {src_col} to {dst_col}")
    elif dst_col not in df.columns:
        logger.warning(f"Missing column {dst_col} in dataframe")
    return df

def check_merge_quality(df: pd.DataFrame, key_col: str, logger):
    total_rows = len(df)
    matched_rows = df[key_col].notna().sum()
    logger.info(f"Merge quality: {matched_rows}/{total_rows} rows matched ({matched_rows/total_rows*100:.2f}%)")
    if matched_rows / total_rows < 0.9:
        logger.warning(f"Low merge match rate for {key_col}. Possible data format issue.")

def clean_lineup(lineup, nat_col: str = "National ID (SSN/SIN) (National Identifiers)"):
    # Lọc các giá trị chỉ chứa số
    def is_valid_id(x):
        if pd.isna(x):
            return False
        return bool(re.match(r'^\d+$', str(x)))

    # Đếm số bản ghi lỗi trước khi làm sạch
    invalid_count = lineup[~lineup[nat_col].apply(is_valid_id)].shape[0]
    if invalid_count > 0:
        logger.warning(f"Found {invalid_count} invalid National IDs (non-numeric) in lineup. Removing them.")

    # Giữ lại chỉ các bản ghi có National ID hợp lệ
    lineup = lineup[lineup[nat_col].apply(is_valid_id)]

    # Loại bỏ trùng lặp
    if lineup[nat_col].duplicated().any():
        logger.warning(f"Found duplicates in {nat_col} in lineup. Keeping first occurrence.")
        lineup = lineup.drop_duplicates(subset=[nat_col], keep='first')

    return lineup

def combine_full_name(df, name_cols=None, target_col="EMPLOYEE FULL NAME_EN"):
    """
    Gộp các cột tên (Last, Middle, First) thành EMPLOYEE FULL NAME_EN một cách linh hoạt.
    Giữ nguyên dòng nếu thiếu dữ liệu, chỉ gộp khi có ít nhất một cột hợp lệ.
    """
    if name_cols is None:
        name_cols = ["Last Name (Family Name)", "Middle Name", "First Name"]

    available_cols = [col for col in name_cols if col in df.columns]
    if not available_cols:
        logger.warning("No name columns found in dataframe. Skipping name combination.")
        return df

    def combine_names(row):
        parts = [str(row[col]).strip() if pd.notna(row[col]) else "" for col in available_cols]
        full_name = " ".join(filter(None, parts))
        return full_name if full_name else pd.NA

    df[target_col] = df.apply(combine_names, axis=1)
    logger.info(f"Combined {target_col} from columns: {available_cols}")
    return df

# Cải tiến đoạn code merge
def merge_data(hiring, emp, lineup):
    # Chỉ định dtype cho ID
    id_columns = ['Emp id', 'EMPLOYEE_NUMBER', 'National ID (SSN/SIN) (National Identifiers)', 'Số CMND/CCCD:']
    dtypes = {col: 'string' for col in id_columns}

    # Đọc lại dữ liệu nếu cần (giả sử xls đã được khởi tạo)
    hiring = hiring.astype({k: v for k, v in dtypes.items() if k in hiring.columns})
    emp = emp.astype({k: v for k, v in dtypes.items() if k in emp.columns})
    lineup = lineup.astype({k: v for k, v in dtypes.items() if k in lineup.columns})

    # Rename cột
    emp = validate_and_rename(emp, "EMPLOYEE_NUMBER", "Emp id", logger)
    lineup = validate_and_rename(lineup, "Số CMND/CCCD:", "National ID (SSN/SIN) (National Identifiers)", logger)

    hiring = combine_full_name(hiring)

    # Chuẩn hóa ID
    nat_col = "National ID (SSN/SIN) (National Identifiers)"
    logger.info(f"Before normalize: hiring[{nat_col}] sample: {hiring[nat_col].head(5).tolist()}")
    hiring[nat_col] = normalize_id_series(hiring.get(nat_col, pd.Series(dtype="string")), expected_len=12)
    lineup[nat_col] = normalize_id_series(lineup.get(nat_col, pd.Series(dtype="string")), expected_len=12)
    logger.info(f"After normalize: hiring[{nat_col}] sample: {hiring[nat_col].head(5).tolist()}")


    lineup = clean_lineup(lineup)
    # Merge
    merged = hiring.merge(emp, on="Emp id", how="left")
    merged = merged.merge(lineup, on=nat_col, how="left")

    return merged

# ===================== MAIN WORKFLOW =====================

def main():
    # t0 = time.perf_counter()

    # Đọc workbook 1 lần
    xls = pd.ExcelFile(TEMPLATE_FILE, engine="openpyxl")
    t_read0 = time.perf_counter()

    # Đọc các sheet (dtype=object là chậm và dễ làm rối type; bỏ để pandas tự suy)
    master_data = xls.parse(SHEET_MASTER, skiprows=SKIP_MASTER)
    hiring = xls.parse(SHEET_HIRING, skiprows=SKIP_HIRING, dtype={'National ID (SSN/SIN) (National Identifiers)': 'string'})
    emp = xls.parse(SHEET_EMP, skiprows=SKIP_EMP, dtype={'EMPLOYEE_NUMBER': 'string'})
    lineup = xls.parse(SHEET_LINEUP, skiprows=SKIP_LINEUP, dtype={'Số CMND/CCCD:': 'string'})
    permission = xls.parse(SHEET_PERMISSION, skiprows=SKIP_PERMISSION)

    # Lấy danh sách cột chuẩn từ template, bỏ cột Unnamed (header layout)
    master_cols = [c for c in master_data.columns if not str(c).startswith("Unnamed")]
    # Giữ lại chỉ các cột dữ liệu hợp lệ của master_data
    master_data = master_data.loc[:, master_cols]

    merged = merge_data(hiring, emp, lineup)

    # t_merge = time.perf_counter()

    # Đổi tên theo mapping 1 lần
    merged = merged.rename(columns=column_mapping)

    # Coalesce cột trùng (ví dụ DOJ từ 2 nguồn)
    merged = coalesce_duplicate_columns(merged)

    # Chỉ giữ các cột trong template, thiếu thì điền NA
    mapped_df = merged.reindex(columns=master_cols)

    # Gộp với master_data (nếu master_data là phần dữ liệu lịch sử cần giữ)
    # Nếu master_data chỉ là template rỗng, có thể bỏ concat để tiết kiệm ~vài ms
    master_df = pd.concat([master_data, mapped_df], ignore_index=True)

    # Merge với Permission theo "MSA Client"
    if "MSA Client" in master_df.columns and "MSA Client" in permission.columns:
        final_df = master_df.merge(permission, on="MSA Client", how="left", copy=False)
    else:
        # Không có MSA Client để merge – vẫn tiếp tục
        final_df = master_df

    # t_map = time.perf_counter()

    # Ghi file master 1 lần
    fast_to_excel(OUTPUT_MASTER, final_df)

    # Tách theo SSO (nếu không có SSO thì báo sớm)
    if "SSO" not in final_df.columns:
        raise ValueError("Thiếu cột 'SSO' sau khi xử lý. Kiểm tra nguồn Permission/mapping.")

    os.makedirs(HR_DIR, exist_ok=True)

    # Ghi theo nhóm song song – I/O bound nên ThreadPool cho hiệu quả tốt
    futures = []
    with ThreadPoolExecutor(max_workers=min(8, os.cpu_count() or 4)) as ex:
        for sso, grp in final_df.groupby("SSO", sort=False, dropna=False):
            futures.append(ex.submit(write_hr_file, sso if pd.notna(sso) else "NA", grp, HR_DIR))
        _ = [f.result() for f in as_completed(futures)]

    # t_write = time.perf_counter()

    # print(f"Map+reindex  : {(t_map - t_merge):.3f}s")
    # print(f"Write output : {(t_write - t_map):.3f}s")
    # print(f"TOTAL        : {(t_write - t0):.3f}s")

if __name__ == "__main__":
    main()

2025-08-25 15:12:34,351 - INFO - Renamed column EMPLOYEE_NUMBER to Emp id
2025-08-25 15:12:34,361 - INFO - Renamed column Số CMND/CCCD: to National ID (SSN/SIN) (National Identifiers)
2025-08-25 15:12:34,368 - INFO - Combined EMPLOYEE FULL NAME_EN from columns: ['Last Name (Family Name)', 'First Name']
2025-08-25 15:12:34,370 - INFO - Before normalize: hiring[National ID (SSN/SIN) (National Identifiers)] sample: ['080091001168', '079095021069', '079095041013', <NA>, '049196016754']
2025-08-25 15:12:34,376 - INFO - After normalize: hiring[National ID (SSN/SIN) (National Identifiers)] sample: ['080091001168', '079095021069', '079095041013', <NA>, '049196016754']
