<a href="https://colab.research.google.com/github/thalessac/rateio/blob/main/vr_va_mensal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import files
import io
import os
import pandas as pd
import time
import warnings

try:
    import xlsxwriter
except:
    print("Installing dependencies...")
    !pip install XlsxWriter==3.1.2
warnings.filterwarnings("ignore")
pd.set_option("display.max_rows", None)


###### Utils functions ----------------------------------------


def lower_case_file_names():
    for file in os.listdir():
        os.rename(file, file.lower())


def check_required_columns(df: pd.DataFrame, file: dict):
    file_name, required_columns, file_description = file.values()
    df.columns = df.columns.str.strip().str.lower()
    missing_columns = set(required_columns) - set(df.columns)
    if len(missing_columns):
        !rm {file_name} #Remove file with errors
        raise ValueError(
            f"Required columns missing in {file_name}: {missing_columns}. Check your input data and try again"
        )


def extract_sheets_data(file: dict):
    file_name, required_columns, file_description = file.values()
    xl = pd.ExcelFile(file_name)
    df_tae = []
    df_tre = []
    for sheet_name in xl.sheet_names:  # see all sheet names
        if "tae" in sheet_name.lower() and "resumo" not in sheet_name.lower():
            print(f"\nReading '{file_name}'/'{sheet_name}: {file_description}")
            df_tae = pd.read_excel(file_name, sheet_name=sheet_name, header=1)
            print(f"- Check a preview of {file_name} below:")
            display(df_tae.head(5))
        elif "tre" in sheet_name.lower() and "resumo" not in sheet_name.lower():
            print(f"\nReading '{file_name}'/'{sheet_name}: {file_description}")
            df_tre = pd.read_excel(file_name, sheet_name=sheet_name, header=1)
            print(f"- Check a preview of {file_name} below:")
            display(df_tre.head(5))
    if len(df_tae):
      check_required_columns(df_tae, file)
      df_tae = df_tae[required_columns]
    if len(df_tre):
      check_required_columns(df_tre, file)
      df_tre = df_tre[required_columns]

    return df_tae, df_tre


def read_ticket_data(file: dict):
    lower_case_file_names()
    file_name, required_columns, file_description = file.values()
    if file_name in os.listdir():
        df_tae, df_tre = extract_sheets_data(file)
        return df_tae, df_tre
    else:
        while True:
            try:
                print(f"- Upload '{file_name}'")
                uploaded = files.upload()
                uploaded = {k.lower(): v for k, v in uploaded.items()}
                lower_case_file_names()
                df_tae, df_tre = extract_sheets_data(file)
                return df_tae, df_tre
            except FileNotFoundError as e:
                !rm {list(uploaded.keys())[0]} #Remove file with errors
                print(f"ERROR: File name should be '{file_name}'. Try again")
            except KeyError as e:
                !rm {list(uploaded.keys())[0]} #Remove file with errors
                print(f"ERROR: File name should be '{file_name}'. Try again")


def upload_excel_files(file: dict):
    lower_case_file_names()
    file_name, required_columns, file_description = file.values()
    print(f"\nReading '{file_name}': {file_description}")
    if file_name in os.listdir():
        data = pd.read_excel(file_name)
    else:
      while True:
          try:
            print(f"- Upload '{file_name}'")
            uploaded = files.upload()
            uploaded = {k.lower(): v for k, v in uploaded.items()}
            data = pd.read_excel(io.BytesIO(uploaded[file_name]))
            lower_case_file_names()
            break
          except FileNotFoundError as e:
            !rm {list(uploaded.keys())[0]} #Remove file with errors
            print(f"ERROR: File name should be '{file_name}'. Try again")
          except KeyError as e:
            !rm {list(uploaded.keys())[0]} #Remove file with errors
            print(f"ERROR: File name should be '{file_name}'. Try again")
          except Exception:
            print("Unexpected error")
            !rm {list(uploaded.keys())[0]} #Remove file with errors
            raise
    print(f"- Check a preview of {file_name} below:")
    display(data.head(5))

    check_required_columns(data, file)

    return data[required_columns]


def validate_output(calculated_total_sum: float, invoice_total_amount: float):
    print("\n------------------------------------------")
    print("Output validation:\n")
    print(f"- Calculated sum of 'total bilhete' column: R${calculated_total_sum:.2f}")
    print(f"- Invoice total amount: R${invoice_total_amount:.2f}")
    if round(calculated_total_sum, 2) == round(invoice_total_amount, 2):
        print("- Calculated total sum is equals to Invoice total amount. Good job!")
    else:
        diff = abs(calculated_total_sum - invoice_total_amount)
        diff_percent = diff / invoice_total_amount
        print(
            f"- Calculated total sum differs in R${diff:.2f} ({100*diff_percent:.3f}%) from Invoice total amount"
        )
        if diff_percent > 0.01:
            print(
                (
                    "WARNING: Calculated total value differs in more than 1% from Invoice total amount value. You'd better check your inputs and try again."
                )
            )
            print("Resuming in 10s...")
            time.sleep(10)


###### Routines ---------------------------------------------------


def read_user_inputs():
    # 1. Values from Invoice
    # @markdown ### Insert data from the invoice in the fields below:
    invoice_total_amount_tae = 0  # @param {type:"number"}
    desconto_tae = 0  # @param {type:"number"}
    invoice_total_amount_tre = 0  # @param {type:"number"}
    desconto_tre = 0  # @param {type:"number"}

    desconto_tae = abs(desconto_tae)
    desconto_tre = abs(desconto_tre)

    tables = []
    # 2. Upload Excel files
    files = [
        {
            "file_name": "tae-tre.xlsx",
            "required_columns": [
                "beneficiário",
                "matrícula",
                "cpf",
                "valor do benefício",
            ],
            "description": "Data from Ticket website",
        },
        {
            "file_name": "cadastrão.xlsx",
            "required_columns": ["cpf", "centro de resultado - código"],
            "description": "File with cost center of each employer",
        },
    ]

    for file in files:
        if file.get("file_name") == "tae-tre.xlsx":
            df_tae, df_tre = read_ticket_data(file)
        else:
            data = upload_excel_files(file)
            tables.append(data)
    return (
        desconto_tae,
        desconto_tre,
        invoice_total_amount_tae,
        invoice_total_amount_tre,
        df_tae,
        df_tre,
        *tables,
    )


def create_rateio_df(df: pd.DataFrame, cadastrao: pd.DataFrame, df_type: str):
    cadastrao = cadastrao.rename(columns={"centro de resultado - código": "cc"})

    # Make sure 'matrícula' and 'cpf' have only numeric characters
    df["matrícula"] = (
        df["matrícula"].astype(str).replace(r"\D+", "", regex=True).astype(int)
    )
    cadastrao["cpf"] = (
        cadastrao["cpf"].astype(str).replace(r"\D+", "", regex=True).astype(int)
    )
    df = df.merge(cadastrao, how="left", left_on="matrícula", right_on="cpf")

    df["file"] = df_type

    nulls = (
        df.loc[pd.isnull(df["cc"]), ["beneficiário", "matrícula", "cc", "file"]]
        .drop_duplicates()
        .reset_index(drop=True)
    )
    if len(nulls):
        print("\nWARNING: CCs not found in 'Cadastrão'.")
        print("\nCheck missing CCs below:")
        display(nulls)
        print()
        for index, row in nulls.iterrows():
            cc_input = str(
                input(
                    f"Insert 'CC' value for {row['beneficiário']} ({index+1}/{len(nulls)}):\n"
                )
            )
            df.loc[
                (df["beneficiário"] == row["beneficiário"])
                & (df["matrícula"] == row["matrícula"]),
                "cc",
            ] = cc_input
    return df


def calculate_rateio(df: pd.DataFrame, desconto: float, invoice_total_amount):
    n_ccs = df["cc"].count()
    ccs_list = list(df["cc"])

    df_aux = []
    for cc in ccs_list:
        calc_dict = {
            "beneficiário": "Desconto",
            "valor do benefício": -abs(desconto / n_ccs),
            "cc": cc,
        }
        df_aux.append(calc_dict)
    df_aux = pd.DataFrame.from_records(df_aux)
    df = pd.concat([df, df_aux])

    df_final = df[["beneficiário", "matrícula", "valor do benefício", "cc"]]

    df_final = df_final.rename(
        columns={
            "beneficiário": "Beneficiário",
            "matrícula": "CPF",
            "valor do benefício": "Valor do benefício",
            "cc": "CC",
        }
    )
    pivot_table_final = pd.pivot_table(
        df_final,
        values="Valor do benefício",
        index="CC",
        aggfunc="sum",
        margins=True,
        margins_name="Grand Total",
        sort=True,
    ).reset_index()

    validate_output(
        pivot_table_final.loc[
            pivot_table_final["CC"] == "Grand Total", "Valor do benefício"
        ].values[0],
        invoice_total_amount,
    )
    return df_final, pivot_table_final


def save_to_excel(
    file_name: str, df_final: pd.DataFrame, pivot_table_final: pd.DataFrame
):
    print("\n------------------------------------------")
    if not file_name.endswith(".xlsx"):
        file_name += ".xlsx"
    writer = pd.ExcelWriter(file_name, engine="xlsxwriter")
    workbook = writer.book
    df_final.to_excel(
        writer, sheet_name="Sheet1", startrow=0, startcol=0, header=True, index=False
    )
    pivot_table_final.to_excel(
        writer, sheet_name="Sheet1", startrow=0, startcol=6, header=True, index=False
    )

    # Currency formatting
    worksheet = writer.sheets["Sheet1"]
    currency_format = workbook.add_format()
    currency_format.set_num_format("R$#,##0.00")
    worksheet.set_column(2, 2, None, currency_format)
    worksheet.set_column(7, 7, None, currency_format)
    worksheet.autofit()
    writer.save()
    print(f"Excel file was successfully saved as {file_name}. You can download it now.")


###### Main ---------------------------------------------------


def main():
    (
        desconto_tae,
        desconto_tre,
        invoice_total_amount_tae,
        invoice_total_amount_tre,
        df_tae,
        df_tre,
        cadastrao,
    ) = read_user_inputs()
    file_types = ["tae", "tre"]
    for ft in file_types:
        if ft == "tae":
            print("\n====================================================")
            print("Processing TAE data")
            df = df_tae
            desconto = desconto_tae
            invoice_total_amount = invoice_total_amount_tae
        elif ft == "tre":
            print("\n====================================================")
            print("Processing TRE data")
            df = df_tre
            desconto = desconto_tre
            invoice_total_amount = invoice_total_amount_tre
        if len(df):
          df = create_rateio_df(df, cadastrao, df_type=ft)
          df, pivot_table = calculate_rateio(df, desconto, invoice_total_amount)
          save_to_excel(f"output_{ft}.xlsx", df, pivot_table)
        else:
          print(f"No data found for {ft}")

main()
