In [None]:
import os
import time
import shutil
import tempfile
from pathlib import Path

import polars as pl
import streamlit as st

In [None]:
# --- Config ---

HUB_OVERRIDES = {
    # don_vi_khai_thac: chi_nhanh_HUB
    "HUBTAN": "BDG",
    "HUBBHD": "BDH",
}

RULE_SCHEMA_OVERRIDES = {
    "thoigian_nhapdau": pl.Time(),
    "thoigian_nhapcuoi": pl.Time(),
    "thoigian_xuat": pl.Time(),
    "ngay_xuat": pl.Int8(),
}

In [None]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

MAX_WORKERS = 8

def load_excel(path: str, **read_options) -> pl.DataFrame:
    return pl.read_excel(path, **read_options)

def load_with_threads(excel_files: list, **read_options) -> pl.DataFrame:
    dfs = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = [executor.submit(load_excel, f, **read_options) for f in excel_files]
        for future in as_completed(futures):
            dfs.append(future.result())

    return pl.concat(dfs)

In [None]:
# --- Helper functions ---

def import_rule(file, rule_type: str) -> pl.DataFrame:
    try:
        df = pl.read_excel(file, schema_overrides=RULE_SCHEMA_OVERRIDES)
        # Pending validation logic
        return df
    except Exception as e:
        # logger.error(f"Lỗi đọc rule: {e}")
        raise

def import_lookup(file) -> pl.DataFrame:
    try:
        df = pl.read_excel(file)
        # Pending validation logic
        return df
    except Exception as e:
        # logger.error(f"Lỗi đọc lookup: {e}")
        raise


def apply_rule(lf: pl.LazyFrame, rule: pl.LazyFrame, type: str) -> pl.LazyFrame:
    try:
        # Join keys
        join_keys_mapping = {
            "RD": {
                "left_on": ["don_vi_khaithac", "ma_buucuc_phat"],
                "right_on": ["don_vi_khai_thac", "buu_cuc_phat"],
            },
            "KN": {
                "left_on": ["don_vi_khaithac", "chi_nhanh_phat"],
                "right_on": ["don_vi_khai_thac", "chi_nhanh_phat"],
            },
        }

        keys = join_keys_mapping[type]

        # Join
        lf = lf.join(
            rule, how="left", left_on=keys["left_on"], right_on=keys["right_on"]
        )

        # Flag khớp key
        lf = lf.with_columns(
            pl.col("thoigian_nhapdau").is_not_null().alias("_key_matched")
        )

        # Lấy thời gian nhập để so sánh
        lf = lf.with_columns(pl.col("tg_nhap_buucuc").dt.time().alias("_enter_time"))

        # Flag khớp khoảng giờ nhập
        lf = lf.with_columns(
            pl.when(pl.col("_key_matched"))
            .then(
                (pl.col("_enter_time") >= pl.col("thoigian_nhapdau"))
                & (pl.col("_enter_time") <= pl.col("thoigian_nhapcuoi"))
            )
            .otherwise(False)
            .alias("_time_matched")
        )

        # Tính deadline (ngày nhập + giờ deadline + ngày offset)
        lf = lf.with_columns(
            (pl.col("thoigian_xuat") - pl.time(0, 0, 0)).alias("_deadline_period")
        )

        lf = lf.with_columns(
            pl.when(pl.col("_time_matched"))
            .then(
                pl.col("tg_nhap_buucuc").dt.truncate("1d")  # Ngày nhập tại 00h00
                + pl.col("_deadline_period")  # Giờ deadline
                + pl.duration(days=pl.col("ngay_xuat"))  # Ngày offset
            )
            .otherwise(None)
            .alias("Deadline")
        )

        # Gán nhãn:
        # - Nếu không tìm thấy cặp key thì rule đang thiếu → Check lại
        # - Nếu thấy cặp key, nhưng không có khung thời gian nào hợp lệ → Thiếu config
        # - Nếu thời gian lái xe nhận (thời gian xuất kho) <= deadline → Đúng
        # - Còn lại là sai hẹn
        lf = lf.filter(
            (~pl.col("_key_matched"))
            | (pl.col("_key_matched") & pl.col("_time_matched"))
        ).with_columns(
            pl.when(pl.col("_key_matched").not_())
            .then(pl.lit("Check lại"))
            .when(pl.col("_time_matched").not_())
            .then(pl.lit("Thiếu config"))
            .when(pl.col("tg_laixe_nhan") <= pl.col("Deadline"))
            .then(pl.lit("Đúng"))
            .otherwise(pl.lit("Sai hẹn"))
            .alias("Kết quả")
        )

        # Bỏ các dòng không cần thiết
        lf = lf.drop(
            ["_key_matched", "_time_matched", "_deadline_period", "_enter_time"]
        )

        return lf
    except Exception as e:
        # logger.error(f"Lỗi không áp được rule: {e}")
        raise

In [None]:
def pipeline_xuatsach_HUB(raw_files: list, config: dict) -> list[str]:
    # Create temporary data directory
    tmp_dir = Path(tempfile.mkdtemp())
    try:
        

        raw_format = config["raw_format"]
        # output_format = config["output_format"]
        output_path = config["output_path"]

        # --- Ingesting ---

        # Import lookup and rules as lazyframe
        rule_rd = import_rule(config["rule_rd"], "RD")
        lf_rule_rd = rule_rd.lazy()

        rule_kn = import_rule(config["rule_kn"], "KN")
        lf_rule_kn = rule_kn.lazy()

        lookup = import_lookup(config["lookup"])
        lf_lookup = lookup.lazy()

        # Ingest Excel file to csv
        if raw_format == "xlsx":
            ingested = ingest_excel(raw_files)
        else:
            ingested = pl.scan_csv(raw_files)

        # Scan files as lazyframe
        lf = ingested.lazy()

        # --- Transformations ---

        # Thêm cột ngày xuất (phục vụ export)
        lf = lf.with_columns(pl.col("tg_laixe_nhan").dt.date().alias("_date"))

        # Xác định chi nhánh hiện tại theo đơn vị khai thác
        lf = lf.with_columns(
            pl.col("don_vi_khaithac").str.slice(3, 3).alias("chi_nhanh_HUB")
        )

        # Thay thế theo HUB_OVERRIDES
        if HUB_OVERRIDES:
            expr = pl.col("chi_nhanh_HUB")
            for match_value, new_value in HUB_OVERRIDES.items():
                expr = (
                    pl.when(pl.col("don_vi_khaithac") == match_value)
                    .then(pl.lit(new_value))
                    .otherwise(expr)
                )
            lf = lf.with_columns(expr.alias("chi_nhanh_HUB"))

        # Xác định chi nhánh phát cũ theo lookup
        lf = (
            lf.join(
                lf_lookup, how="left", left_on="ma_buucuc_phat", right_on="ma_buucuc"
            )
            .drop("chi_nhanh_phat")
            .rename({"ma_tinh": "chi_nhanh_phat"})
        )

        # Phân loại đơn rải đích / kết nối
        lf = lf.with_columns(
            pl.when(pl.col("chi_nhanh_HUB") == pl.col("chi_nhanh_phat"))
            .then(pl.lit("RD"))
            .otherwise(pl.lit("KN"))
            .alias("phan_loai")
            .cast(pl.Categorical)
        )

        # Tìm rule và deadline phù hợp với mỗi đơn
        rules = {"RD": lf_rule_rd, "KN": lf_rule_kn}
        outputs = {"RD": pl.LazyFrame(), "KN": pl.LazyFrame()}

        for type in outputs.keys():
            filtered = lf.filter(pl.col("phan_loai") == type)
            outputs[type] = apply_rule(lf=filtered, rule=rules[type], type=type)

        # --- Export ---

        fn_map = {"RD": "RaiDich", "KN": "KetNoi"}

        if output_format == "csv":
            out = output_path
        else:
            out = tmp_dir

        for type in outputs.keys():
            outputs[type].sink_csv(
                pl.PartitionByKey(
                    base_path=out,
                    by="_date",
                    file_path=lambda ctx: f"XuatsachHUB-{fn_map[type]}-{ctx.keys[0].str_value}.csv",
                )
            )

        # Convert to Excel if required
        if output_format == "xlsx":
            files = [f for f in os.listdir(tmp_dir) if f.endswith(".csv")]
            export_excel(tmp_dir, files, output_path)

        return os.listdir(output_path)

    except Exception:
        raise
    finally:
        shutil.rmtree(tmp_dir)


def pipeline_xuatsach_TTKT(raw_files: list, config: dict) -> list[str]:  
    try:
        # --- Load configuration ---
        print("Starting TTKT pipeline...")
        raw_format = config["raw_format"]
        output_format = config["output_format"]
        output_path = config["output_path"]
    
        # --- Ingesting ---
        with tempfile.TemporaryDirectory() as tmp_dir:
            # Import rules as lazyframe
            print("Importing rules...")
            rule = import_rule(config["rule_ttkt"], "RD")
            lf_rule = rule.lazy()
    
            # Ingest Excel file to csv
            print("Importing raw files...")
            if raw_format == "xlsx":
                ingested = load_with_threads(raw_files, read_options={"skip_rows": 1})
            else:
                ingested = pl.scan_csv(raw_files)

            # Scan files as lazyframe
            lf = ingested.lazy()

            lf = lf.with_columns(
                [
                    pl.col("tg_nhap_buucuc").str.to_datetime("%Y-%m-%d %H:%M:%S"),
                    pl.col("tg_laixe_nhan").str.to_datetime("%Y-%m-%d %H:%M:%S"),
                ]
            )

            # --- Transformations ---
            print("Transforming data...")

            # Tìm rule và deadline phù hợp với mỗi đơn (Tương tự rule rải đích)
            lf = apply_rule(lf, rule=lf_rule, type="RD")

            # --- Export ---
            print("Exporting results...")

            timestamp = time.strftime("%H%M%S_%d%m%Y")
                    
            lf.sink_csv(
                os.path.join(output_path, f"XuatsachTTKT@{timestamp}.csv"),
                datetime_format="%Y-%m-%d %H:%M:%S",
                date_format= "%Y-%m-%d",
                time_format="%H:%M:%S"
            )

    except Exception:
        raise


In [None]:
config = {}

config['raw_format'] = 'xlsx'
config['output_format'] = 'csv'
config['output_path'] = './output'

# config['lookup'] = './data/lookup.xlsx'
# config['rule_rd'] = './data/RD.xlsx'
# config['rule_kn'] = './data/KN.xlsx'
config['rule_ttkt'] = './rule_xs_log_1812.xlsx'

raw_files = [os.path.join('./xslog1712', f) for f in os.listdir('./xslog1712')]
# raw_files = ['./test.xlsx']

In [None]:
pipeline_xuatsach_TTKT(
    raw_files=raw_files,
    config=config,
)

Starting TTKT pipeline...
Importing rules...
Importing raw files...


Could not determine dtype for column 18, falling back to string
Could not determine dtype for column 19, falling back to string
Could not determine dtype for column 20, falling back to string


Transforming data...
Exporting results...
