In [14]:
# Predefine Funcion
from typing import List
import re
def clean_column(name):
    name = name.lower()
    name = re.sub(r'\W+', ' ', name)
    name = name.strip()
    name = name.replace(' ', '_')
    return name

In [None]:
import fastexcel as fex
import polars as pl
import xlsxwriter

file = 'data/excel data/singthai ffb pur mar2025.xlsx'
with open(file, 'rb') as f:
    bfile = f.read()
    excel = fex.read_excel(bfile)

# ? print out sheet names for further selection
# excel.sheet_names
df = pl.read_excel(bfile, sheet_name='data')
df = df.rename({col: clean_column(col) for col in df.columns})

df.head()

date_out,serial_no,supplier,net_wt_ton,price_ton,gross_amt,tpt_chrg,tpt_amt,worker_chrg,worker_amt
date,str,str,f64,i64,f64,i64,f64,i64,f64
2025-03-15,"""P0029341""","""1004""",0.2,915,183.0,,0.0,,0.0
2025-03-05,"""P0029241""","""1007""",0.2,915,183.0,,0.0,,0.0
2025-03-06,"""P0029250""","""1007""",0.2,915,183.0,,0.0,,0.0
2025-03-07,"""P0029258""","""1007""",0.2,915,183.0,,0.0,,0.0
2025-03-08,"""P0029266""","""1007""",0.2,915,183.0,,0.0,,0.0


In [None]:
# ! Process Purchase Data
def pur_process(data: pl.DataFrame, supplier_data: pl.DataFrame = None, start_index: int = 1) -> pl.DataFrame:
    pur = data.filter(
        (data["date_out"].is_not_null()) & (data["supplier"].is_not_null())
    ).with_columns(
        supplier = pl.col("supplier").map_elements(lambda x: f"{x.replace(' ', '')}", return_dtype=pl.String),
    )
    
    if supplier_data is not None and not supplier_data.is_empty():
        pur = pur.join(supplier_data, left_on="supplier", right_on="code", how="inner")

    pur = pur.select([
        "date_out",
        "supplier",
        "net_wt_ton",
        "price_ton",
        "gross_amt",
        "serial_no",
    ]).rename({
        "date_out": "DocDate",
        "supplier": "Code",
        "net_wt_ton": "Qty",
        "price_ton": "UnitPrice",
        "gross_amt": "Amount",
        "serial_no": "Remark1",
    })

    pur_grouped = (
        pur
        .group_by(
            ["Code"],
            maintain_order=True
        )
        .all()
        .with_row_index(offset=start_index)
        .with_columns(
            Seq = pl.col("DocDate").list.len().map_elements(lambda x: list(range(1, x + 1)), return_dtype=pl.List(pl.Int32)),
            DocNo = pl.col("index").map_elements(lambda x: f"PI-{x:0>5}", return_dtype=pl.String),
            ItemCode = pl.lit("610-001"),
            Account = pl.lit("610-000"),
        )
        .drop(["index"])
        .explode("DocDate", "Seq", "Qty", "UnitPrice", "Remark1", "Amount")
    )
    return pur_grouped

pur_process(df, start_index=1).head(10)

In [None]:
# ! Process Sales Data
def sal_process(data: pl.DataFrame, customer_data: pl.DataFrame = None, start_index: int = 1) -> pl.DataFrame:
    sal = data.filter(
        (data["date_out"].is_not_null()) & (data["supplier"].is_not_null())
    ).with_columns(
        supplier = pl.col("supplier").map_elements(lambda x: f"300-{x.replace(' ', '')}", return_dtype=pl.String),
    )
    
    if customer_data is not None and not customer_data.is_empty():
        sal = sal.join(customer_data, left_on="supplier", right_on="code", how="inner")
        
    tpt = sal.select([
        "date_out",
        "supplier",
        "net_wt_ton",
        "tpt_chrg",
        "tpt_amt",
    ]).rename({

        "date_out": "DocDate",
        "supplier": "Code",
        "net_wt_ton": "Qty",
        "tpt_chrg": "UnitPrice",
        "tpt_amt": "Amount",
    }).drop_nulls(["UnitPrice", "Amount"]).with_columns(
        ItemCode = pl.lit("500-002"),
        Account = pl.lit("500-000"),
    )

    worker = sal.select([
        "date_out",
        "supplier",
        "net_wt_ton",
        "worker_chrg",
        "worker_amt",
    ]).rename({
        "date_out": "DocDate",
        "supplier": "Code",
        "net_wt_ton": "Qty",
        "worker_chrg": "UnitPrice",
        "worker_amt": "Amount",
    }).drop_nulls(["UnitPrice", "Amount"]).with_columns(
        ItemCode = pl.lit("500-003"),
        Account = pl.lit("500-000"),
    )

    combined =(tpt.vstack(worker)
        .group_by(
            ["Code"],
            maintain_order=True
        )
        .all()
        .with_row_index(offset=start_index)
        .with_columns(
            Seq = pl.col("DocDate").list.len().map_elements(lambda x: list(range(1, x + 1)), return_dtype=pl.List(pl.Int32)),
            # DocNo = f"PI-{pl.col('index'):0>5}"
            DocNo = pl.col("index").map_elements(lambda x: f"IV-{x:0>5}", return_dtype=pl.String),
            # ItemCode = "610-001",
            # Account = "610-000"
        )
        .drop(["index"])
        .explode("DocDate", "Seq", "Qty", "UnitPrice", "Amount", "ItemCode", "Account")
    )

    return combined

sal_process(df, start_index=1)

In [48]:
# ! Check Unprocessed Data

def pur_unprocess(data: pl.DataFrame, supplier_data: pl.DataFrame) -> pl.DataFrame:
    pur = data.filter(
        (data["date_out"].is_not_null()) & (data["supplier"].is_not_null())
    ).with_columns(
        supplier = pl.col("supplier").map_elements(lambda x: f"{x.replace(' ', '')}", return_dtype=pl.String),
    )
    # check by joinning with supplier data, if not found, it will be in unprocessed data
    pur = pur.join(supplier_data, left_on="supplier", right_on="code", how="anti")
    return pur

def sal_unprocess(data: pl.DataFrame, customer_data: pl.DataFrame) -> pl.DataFrame:
    sal = data.filter(
        (data["date_out"].is_not_null()) & (data["supplier"].is_not_null())
    ).with_columns(
        supplier = pl.col("supplier").map_elements(lambda x: f"300-{x.replace(' ', '')}", return_dtype=pl.String),
    )
    # check by joinning with customer data, if not found, it will be in unprocessed data
    sal = sal.join(customer_data, left_on="supplier", right_on="code", how="anti")
    return sal

In [None]:
import polars as pl
import fastexcel as fex
from typing import List
from database import get_db_session
from models import Supplier, Customer
from models.settings import Setting

session = next(get_db_session())
customers = session.query(Customer).all()
customers = [row.__dict__ for row in customers]
suppliers = session.query(Supplier).all()
suppliers = [row.__dict__ for row in suppliers]
setting: Setting = session.query(Setting).first()

df_customer = pl.DataFrame(customers).drop(['_sa_instance_state'])
df_supplier = pl.DataFrame(suppliers).drop(['_sa_instance_state'])

df_supplier.head()

In [None]:
from typing import List
import polars as pl

def fraction_df(data: pl.DataFrame, key: str = "Code") -> List[pl.DataFrame]:
    if data.is_empty():
        return []
    # Add a row index to help us track positions
    data = data.with_row_index(name="row_idx")
    # Group by 'Code' and get list of rows for each group
    groups = data.group_by(key, maintain_order=True).agg(
        pl.col("row_idx")
    )
    chunks = []
    current_chunk = []
    current_count = 0
    i = 0
    while i < len(groups):
        group_indices = groups[i, "row_idx"]
        group_size = len(group_indices)

        # If adding this group exceeds 100 rows, flush current chunk
        if current_count + group_size > 100:
            if current_chunk:
                chunk_df = data.filter(pl.col("row_idx").is_in(current_chunk)).drop("row_idx")
                chunks.append(chunk_df)
            current_chunk = []
            current_count = 0

        # Add group to current chunk
        current_chunk.extend(group_indices)
        current_count += group_size
        i += 1

    # Add last chunk if not empty
    if current_chunk:
        chunk_df = data.filter(pl.col("row_idx").is_in(current_chunk)).drop("row_idx")
        chunks.append(chunk_df)

    return chunks

pur = pur_process(df, df_supplier, start_index=1)
sal = sal_process(df, df_customer, start_index=1)
pur_frac = fraction_df(pur, "Code")
sal_frac = fraction_df(sal, "Code")
with xlsxwriter.Workbook("output3.xlsx") as workbook:
    i = 0
    for dataframes in pur_frac:
        dataframes.write_excel(workbook=workbook, worksheet=f"purchases_{i}")
        i += 1
    i = 0
    for dataframes in sal_frac:
        dataframes.write_excel(workbook=workbook, worksheet=f"sales_{i}")
        i += 1

len pur_frac 3
len sal_frac 3
