In [0]:
%pip install -qU databricks-langchain

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from databricks_langchain import ChatDatabricks
from pyspark.sql.functions import current_timestamp, year, month, add_months
import pandas as pd
import json
import re
import os
import sys
sys.path.append("/Workspace/Users/az.admz.yhting@hutchisonports.onmicrosoft.com")
from src.log import configure_logging
from nltk.stem import LancasterStemmer
from nltk.tokenize import word_tokenize
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [0]:
def get_previous_month():
    df = spark.sql("SELECT add_months(current_timestamp(), -1) AS prev_month_time")

    result = df.select(
        year("prev_month_time").alias("year"),
        month("prev_month_time").alias("month")
    ).collect()[0]
    
    return result["year"], result["month"]

In [0]:
class RawDataExtraction:
    def __init__(self, year: int, month: int) -> None:
        """
        Initialize the RawDataExtraction class with year and month.
        
        Args:
            year (int): The year for which data is to be extracted.
            month (int): The month for which data is to be extracted.
        """
        self.year = year
        self.month = month

    def form_sql(self, table_name: str) -> str:
        """
        Generate a SQL query string based on the provided table name and class attributes.
        """
        sql = f"""
        SELECT * FROM default.{table_name} 
        where YEAR = {self.year}  
        AND MONTH = {self.month}
        AND REMARKS IS NOT NULL
        ORDER BY TML
        """
        return sql
    def get_cost_consumption(self) -> 'pd.DataFrame':
        """
        Retrieve cost consumption data from the COS_RMPF_CC table using Spark SQL.
        """
        df = spark.sql(self.form_sql("COS_RMPF_CC"))
        df = df.toPandas()
        df_cost_consumption = df.drop(columns=["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"], errors="ignore")
        return df_cost_consumption

    def get_fleet_size(self) -> 'pd.DataFrame':
        """
        Retrieve fleet size data from the COS_RMPF_CC table using Spark SQL.
        """
        df = spark.sql(self.form_sql("COS_RMPF_FLEET_SIZE"))
        df = df.toPandas()
        df_fleet_size = df.drop(columns=["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"], errors="ignore")
        return df_fleet_size

    def get_equipment_performance(self) -> 'pd.DataFrame':
        """
        Retrieve equipment performance data from the COS_RMPF_CC table using Spark SQL.
        """
        df = spark.sql(self.form_sql("COS_RMPF_EQUIPMENT_PERF"))
        df = df.toPandas()
        df_equipment_performance = df.drop(columns=["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"], errors="ignore")
        return df_equipment_performance


In [0]:
def calculate_others_cost_ratio(df: 'pd.DataFrame') -> 'pd.DataFrame':
    """
    Calculate the ratio of 'Others' equipment type cost/consumption and update remarks columns.
    
    Parameters:
    - df (pd.DataFrame): Input DataFrame containing cost and consumption data.
    
    Returns:
    - df (pd.DataFrame): Processed DataFrame with updated remarks for 'Others' exceeding 10% ratio.
    """
    # Strip column names
    df.columns = df.columns.str.strip()
    
    # Group by TML and CATEGORY
    grouped = df.groupby(["TML", "CATEGORY"])
    final = pd.DataFrame()
    
    # Calculate total and 'Others' ratio for COST and CONSUMPTION
    for i in ["COST", "CONSUMPTION"]:
        total_cost = grouped[i].sum().rename(f"total_{i}")
        others_cost = df[df["EQUIPMENT_TYPE"] == "Others"].groupby(["TML", "CATEGORY"])[i].sum().rename(f"others_{i}")
        result = pd.concat([total_cost, others_cost], axis=1).fillna(0)
        # Avoid division by zero by setting ratio to 0 when total is 0
        result["others_ratio"] = result.apply(
            lambda row: row[f"others_{i}"] / row[f"total_{i}"] if row[f"total_{i}"] != 0 else 0, axis=1
        )
        result = result[(result["others_ratio"] > 0.1) & (result["others_ratio"] <= 1)][["others_ratio"]]
        final = pd.concat([final, result], axis=0)
    
    # Reset index and add EQUIPMENT_TYPE
    final.reset_index(inplace=True)
    final.drop_duplicates(["TML", "CATEGORY"], keep="first", inplace=True)
    final["EQUIPMENT_TYPE"] = "Others"
    
    # Merge with original DataFrame
    df = df.merge(final, how="left", on=["TML", "CATEGORY", "EQUIPMENT_TYPE"])
    
    # Update REMARKS columns where others_ratio is not null
    df.loc[pd.notnull(df["others_ratio"]), "REMARKS_CATEGORY"] = "additional_info"
    df.loc[pd.notnull(df["others_ratio"]), "REMARKS_SUBCATEGORY"] = "more_than_10_percent_cost"
    
    # Drop temporary column
    df = df.drop(columns=["others_ratio"])
    
    return df

In [0]:
def process_data(df: 'pd.DataFrame', columns_to_keep: list, category_df: 'pd.DataFrame') -> tuple['pd.DataFrame', 'pd.DataFrame']:
    """
    Process the given DataFrame to clean and classify remarks.
    
    Parameters:
    - df (pd.DataFrame): The input DataFrame to process.
    - columns_to_keep (list): The list of columns to retain.
    - category_df (pd.DataFrame): DataFrame containing remark categories.
    
    Returns:
    - classified_df (pd.DataFrame): DataFrame with already classified remarks.
    - unclassified_df (pd.DataFrame): DataFrame with unclassified remarks.
    """
    # Preprocess category_df
    filtered_clean_remarks_cats_df = category_df[['CLEAN_REMARKS', 'REMARKS_CATEGORY', 'REMARKS_SUBCATEGORY']].drop_duplicates(subset='CLEAN_REMARKS')
    filtered_stemming_remarks_cats_df = category_df[['REMARK_STEMMING', 'REMARKS_CATEGORY', 'REMARKS_SUBCATEGORY']].dropna(subset=['REMARK_STEMMING'])
    filtered_stemming_remarks_cats_df = filtered_stemming_remarks_cats_df.drop_duplicates(subset='REMARK_STEMMING')

    # Clean REMARKS column: Remove numbers, spaces, commas, and dots, then standardize to lowercase
    df["CLEAN_REMARKS"] = df["REMARKS"].astype(str).str.replace(r"[^a-zA-Z]", "", regex=True).str.strip().str.lower()
    
    # REMARKS stemming column
    def stem_and_clean(text):
        words = word_tokenize(str(text))
        stemmed_words = [stemmer.stem(word) for word in words if word.isalpha()]
        return ''.join(stemmed_words).lower()

    df["REMARK_STEMMING"] = df["REMARKS"].apply(stem_and_clean)

    # Remove duplicate CLEAN_REMARKS, keeping only the first occurrence
    df_clean = df.loc[df['CLEAN_REMARKS'].drop_duplicates(keep='first').index]

    # Retain specified columns plus CLEAN_REMARKS
    df_clean = df_clean[columns_to_keep + ["CLEAN_REMARKS", "REMARK_STEMMING"]]
    
    # Stage 1: Merge with category_df to classify remarks based on CLEAN_REMARKS
    merged_df = df_clean.merge(filtered_clean_remarks_cats_df, on="CLEAN_REMARKS", how="left")
    
    # Extract matched and unmatched
    classified_df = merged_df.dropna(subset=["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"])
    unmatched_df = merged_df[merged_df["REMARKS_CATEGORY"].isna()].drop(columns=["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"])

    # Stage 2: Match unmatched using REMARK_STEMMING
    stemming_matched = unmatched_df.merge(filtered_stemming_remarks_cats_df, on="REMARK_STEMMING", how="left")

    # Combine new matches with previously matched
    newly_classified = stemming_matched.dropna(subset=["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"])
    still_unclassified = stemming_matched[stemming_matched["REMARKS_CATEGORY"].isna()].drop(columns=["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"])

    # Final combined classified and unclassified
    final_classified = pd.concat([classified_df, newly_classified], ignore_index=True)
    final_unclassified = still_unclassified
    
    return df, final_classified, final_unclassified

In [0]:
def generate_prompt(data) -> str:
    prompt = f"""
    we are a port container resources management company.
    we have remarks columns and we want to classify the free text remarks into a category and subcategory.
    If not certain, put under others. Each remark should have only one unique category and one unique subcategory. Do not assign more than one.

    remark category and subcategory can only be one of the listed
    | Category        | Subcategory                             | Description                                                                               |
    |-----------------|--------------------------------------   |-------------------------------------------------------------------------------------------|
    | additional_info | break_down_info                         | If there are mentions of failures or damages, they should be categorized under this type. |
    |                 | cost_breakdown                          | If there are only mentions of monetary allocations or distributions of costs, they should be categorized under this type. |
    |                 | potential_change_of_number_of_equipment | Indicates that the number of equipment may increase or decrease.                          |
    |                 | usage                                   | Regarding the usage amount of the equipment, which may be measured in kilometers (km) or running hours (hr).    |
    |                 | other                                   | Other relevant information.                                                               |
    | justification   | financial_treatment                     | If there are mentions of how financial matters are handled, such as accruals, invoice reversals, release provisions, or accounting adjustments, they should be categorized under this type.|                                                 |
    |                 | negative_figures                        | Explanation of negative values.                                                           |
    |                 | high_low_cost_reason                    | Explanation for high or low costs.                                                        |
    |                 | policy_56                               | If the remarks mention anything about Policy 56 to explain zero consumption or cost, it is because the amount was already charged at the time of purchase.|
    |                 | special_cases                           | Unique or exceptional situations. Cases where consumption is zero.                        |
    |                 | no_breakdown_or_activity_or_cost        | No equipment activity, no breakdown occurred, or no cost.                                 |
    |                 | other                                   | Other relevant justifications.                                                            |
    | other           | other                                   | Any other or uncertain how to calssified remarks.                                         |

    ### Output format:
    {{
        "category": "additional_info / justification / other",
        "subcategory": "break_down_info / cost_breakdown / potential_change_of_number_of_equipment / usage / financial_treatment / negative_figures / high_low_cost_reason / policy_56 / special_cases / no_breakdown_or_activity / other"
    }}

        
    ### Here are some examples. Please follow the output format.

    input1: 
    {{'CATEGORY': 'Cost_RMTotalCostByEquipmentType_ConsumablePurchase', 'EQUIPMENT_TYPE': 'AGV', 'COST': -107332.81, 'CONSUMPTION': nan, 'REMARKS': 'Equipment damage', 'CLEAN_REMARKS': 'equipmentdamage', 'REMARK_STEMMING': 'equipdam'}}
    output1:
    {{
        "category": "additional_info",
        "subcategory": "break_down_info"
    }}

    input2:
    {{'CATEGORY': 'Consumption_WireRope', 'EQUIPMENT_TYPE': 'ASC', 'COST': 0.0, 'CONSUMPTION': 979.2, 'REMARKS': 'Policy 56/2021 - consumables items', 'CLEAN_REMARKS': 'policy/-consumablesitems', 'REMARK_STEMMING': 'policyconsumitem'}}
    output2:
    {{
        "category": "justification",
        "subcategory": "policy_56"
    }}

    input3:
    {{'CATEGORY': 'Cost_RMTotalNonRecurrentCostByEquipmentType', 'EQUIPMENT_TYPE': 'QC', 'COST': -159166.3391, 'CONSUMPTION': nan, 'REMARKS': 'negative because of charching ECT for NRMC work during the year for the the cranes leased', 'CLEAN_REMARKS': 'negativebecauseofcharchingectfornrmcworkduringtheyearforthethecranesleased', 'REMARK_STEMMING': 'negbecausofcharchectfornrmcworkdurtheyearforthethecranleas'}}
    output3:
        {{
        "category": "justification",
        "subcategory": "negative_figures"
    }}
    
    input4:
    {{'CATEGORY': 'Cost_RMTotalNonRecurrentCostByEquipmentType', 'EQUIPMENT_TYPE': 'AGV', 'COST': 262303.0394, 'CONSUMPTION': nan, 'REMARKS': 'higher than R&M Total because of Capitalisation Third Party Services NRMC', 'CLEAN_REMARKS': 'higherthanr&mtotalbecauseofcapitalisationthirdpartyservicesnrmc', 'REMARK_STEMMING': 'highthanrmtotbecausofcapitthirdpartyservnrmc'}}
    output4:
        {{
        "category": "justification",
        "subcategory": "high_low_cost_reason"
    }}

    input5:
    {{'CATEGORY': 'Cost_RMTotalCostByEquipmentType', 'EQUIPMENT_TYPE': 'Others', 'COST': 226426.06, 'CONSUMPTION': nan, 'REMARKS': '105.8K allocated to ENG general account | 120.6K allocated to site services general account', 'CLEAN_REMARKS': 'kallocatedtoenggeneralaccount|kallocatedtositeservicesgeneralaccount', 'REMARK_STEMMING': 'alloctoenggenaccountalloctositservgenaccount'}}
    output5:
        {{
        "category": "additional_info",
        "subcategory": "cost_breakdown"
    }}

    input6:
    {{'CATEGORY': 'Cost_RMTotalCostByEquipmentType_ConsumablePurchase', 'EQUIPMENT_TYPE': 'Quay Deck & Yard', 'COST': 10, 'CONSUMPTION': nan, 'REMARKS': 'release of unrealised provisions for PO previous month', 'CLEAN_REMARKS': 'releaseofunrealisedprovisionsforpopreviousmonth', 'REMARK_STEMMING': 'releasofunrprovidforpoprevymon'}}
    output6:
        {{
        "category": "justification",
        "subcategory": "financial_treatment"
    }}

    input7:
    {{'EQUIPMENT_TYPE': 'ASC', 'NUM_OF_EQUIPMENT': 142.0, 'MEASURING_UNIT': 'Equipment Moves', 'REMARKS': '10 ASCs out of ops for cost savings', 'CLEAN_REMARKS': 'ASCsoutofopsforcostsavings', 'REMARK_STEMMING': 'ascoutofopforcostsav'}}
    output7:
    {{
        "category": "additional_info",
        "subcategory": "potential_change_of_number_of_equipment"
    }}
    
    input8:
    {{'EQUIPMENT_TYPE': 'Empty Container Handler', 'NUM_OF_EQUIPMENT': 1.0, 'MEASURING_UNIT': 'Equipment Moves', 'REMARKS': 'Not utilized', 'CLEAN_REMARKS': 'Notutilized', 'REMARK_STEMMING': 'notutil'}}
    output8:
        {{
        "category": "justification",
        "subcategory": "no_breakdown_or_activity_or_cost"
    }}

    input9:
    {{'EQUIPMENT_TYPE': 'AGV', 'NUM_OF_EQUIPMENT': 181.0, 'MEASURING_UNIT': 'Engine Running Hours', 'REMARKS': '96.022km in January', 'CLEAN_REMARKS': 'kminjanuary', 'REMARK_STEMMING': 'injanu'}}
    output9:
        {{
        "category": "additional_info",
        "subcategory": "usage"
    }}

    input10:
    {{'EQUIPMENT_TYPE': 'Conventional RTGC', 'OPERATIONS_AVAILABILITY': 0.78, 'UTILIZATION_PCT': 0.57, 'MMBF': 545.3, 'MTTR': 36.6, 'REMARKS': '57.06%\t\tCH=\t50.213%\t   OS=\t6.847%', 'CLEAN_REMARKS': 'chos', 'REMARK_STEMMING': ''}}
    output10:
        {{
        "category": "other",
        "subcategory": "other"
    }}

    input11:
    {{'CATEGORY': 'Cost_RMTotalCostByEquipmentType', 'EQUIPMENT_TYPE': 'Others', 'COST': 752.4135, 'CONSUMPTION': nan, 'REMARKS': 'AP invoices & clearance cost', 'CLEAN_REMARKS': 'apinvoicesclearancecost', 'REMARK_STEMMING': 'apinvoclearcost'}}
    output11:
        {{
        "category": "other",
        "subcategory": "other"
    }}

    input12:
    {{'CATEGORY': 'Consumption_Electricity_SelfGenRenewable', 'EQUIPMENT_TYPE': 'Others', 'COST': 0, 'CONSUMPTION': nan, 'REMARKS': 'No Partial meters available', 'CLEAN_REMARKS': 'nopartialmetersavailable', 'REMARK_STEMMING': 'nopartmetavail'}}
    output12:
        {{
        "category": "justification",
        "subcategory": "no_breakdown_or_activity_or_cost"
    }}

    input13:
    {{'CATEGORY': 'Cost_RMTotalCostByEquipmentType', 'EQUIPMENT_TYPE': 'QC', 'COST': 40191.3075, 'CONSUMPTION': nan, 'REMARKS': 'Freight charges', 'CLEAN_REMARKS': 'freightcharges', 'REMARK_STEMMING': 'freightcharg'}}
    output13:
        {{
        "category": "other",
        "subcategory": "other"
    }}

    --- Additional reference rules ---
    You may also use the following common keywords as a guide to help classification, but always consider the full semantic meaning of the remark:

    - If remark includes:
        - "None fitted in period"
        - "Equipment was not in operation"
        ➝ category: justification, subcategory: no_breakdown_or_activity_or_cost

    - If remark includes:
        - "Claim refund"
        ➝ category: justification, subcategory: financial_treatment

    - If remark includes:
        - "new in service"
        ➝ category: additional_info, subcategory: potential_change_of_number_of_equipment

    - If remark includes:
        - "Difference due to"
        ➝ category: justification, subcategory: high_low_cost_reason

    - If remark includes:
        - "TLC counter", "Hour counter"
        ➝ category: additional_info, subcategory: usage

    Please remember, these are **reference only** and final classification must be based on meaning.


    new input: 
    {data.to_dict()}  

    your output:


    """
    return prompt


In [0]:
def classify_remark(model: 'Any', row: 'pd.Series', existing_categories: list[str]) -> tuple[str, str]:
    """
    Classify a remark using a provided model by generating a prompt and parsing the response.
    
    Parameters:
        model (Any): The model used to classify the remark (e.g., an AI model with an invoke method).
        row (pd.Series): A row from a DataFrame containing the remark to classify.
        existing_categories (list[str]): A list of valid categories for validation.
    
    Returns:
        tuple[str, str]: A tuple containing:
            - category (str): The classified category of the remark.
            - subcategory (str): The classified subcategory of the remark.
    """
    prompt =  generate_prompt(row)

    # Initialize default values for category and subcategory
    category, subcategory = "other", "other"

    # Attempt classification up to 3 times to handle potential errors
    for i in range(3): 
        try:
            json_string = model.invoke(prompt).content

            # Clean the JSON string
            cleaned_string = json_string.replace('```json', '').replace('```', '').replace('\\n', '\\\\n').strip()
            cleaned_string = cleaned_string.rstrip(',')
            match = re.search(r'\{.*\}', cleaned_string, re.DOTALL)
            if match:
                cleaned_string = match.group(0)

            # Parse the cleaned JSON string into a dictionary
            result = json.loads(cleaned_string)
            category = result.get("category", "other")
            subcategory = result.get("subcategory", "other")

            # Validate if the category and subcategory are in the existing categories
            if category in existing_categories and subcategory in existing_categories[category]:
                return category, subcategory  

            # If invalid, log a message and retry
            logger.info(f"Invalid category/subcategory: {category}/{subcategory}, retrying...")
            print(f"Invalid category/subcategory: {category}/{subcategory}, retrying...")

        except Exception as e:
            # Log any errors encountered during processing
            logger.error(f"Error encountered: {str(e)}")
            print(f"Error encountered: {str(e)}")

            # If the error is related to safety filters, return defaults immediately
            if "safety filters" in str(e).lower():
                return "other", "other"
            
    # After 3 failed attempts, return the default values for category and subcategory
    return category, subcategory

In [0]:
def process_table(table: 'pd.DataFrame') -> 'pd.DataFrame':
    """
    Process a DataFrame table by classifying remarks and adding category columns.
    """
    # Check if the table is empty; if so, add empty category columns and return
    if table.empty:
        table["REMARKS_CATEGORY"] = pd.Series(dtype="object")
        table["REMARKS_SUBCATEGORY"] = pd.Series(dtype="object")
        return table
    
    # Apply the classify_remark function to each row and assign results to new columns
    table[["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"]] = table.apply(
        lambda row: pd.Series(classify_remark(LLM_MODEL, row, existing_categories)), axis=1
    )
    return table

In [0]:
def process_and_save_file(file_type: str, df_original: 'pd.DataFrame', classified_remarks: 'pd.DataFrame', processed_unclassified_remarks: 'pd.DataFrame', year: int, month: int, bu: str, classified_by_ratio: 'pd.DataFrame' = None) -> None:
    """
    Processes and saves categorized remark data for a specific file type.

    Parameters:
    - file_type (str): The type of data being processed (e.g., 'cost_consumption', 'fleet_size').
    - df_original (pd.DataFrame): The original DataFrame before classification.
    - classified_remarks (pd.DataFrame): DataFrame containing already classified remarks.
    - processed_unclassified_remarks (pd.DataFrame): DataFrame containing processed unclassified remarks.
    - year (int): Year of the dataset.
    - month (int): Month of the dataset.
    - bu (str): Business unit identifier.
    - classified_by_ratio (pd.DataFrame, optional): DataFrame containing classified remarks by ratio (only for cost_consumption).
    """    
    # Merge classified_remarks and processed_unclassified_remarks
    merged_df = pd.concat([classified_remarks, processed_unclassified_remarks], ignore_index=True)

    # Perform a left join with the original DataFrame to include category columns
    categorized_df = df_original.merge(
        merged_df[['CLEAN_REMARKS', 'REMARKS_CATEGORY', 'REMARKS_SUBCATEGORY']], 
        on='CLEAN_REMARKS', 
        how='left'
    )

    # Drop "CLEAN_REMARKS" and "REMARKS_STEMMING" column
    categorized_df = categorized_df.drop(columns=['CLEAN_REMARKS', 'REMARK_STEMMING'])

    # Fill NaN with "other"
    categorized_df[['REMARKS_CATEGORY', 'REMARKS_SUBCATEGORY']] = categorized_df[['REMARKS_CATEGORY', 'REMARKS_SUBCATEGORY']].fillna("other")

    # Add ratio columns to categorized_df (only for cost_consumption)
    if file_type == "cost_consumption" and classified_by_ratio is not None:
        categorized_df = pd.concat([categorized_df, classified_by_ratio], ignore_index=True)

    # Convert Pandas DataFrame to Spark DataFrame
    spark_df = spark.createDataFrame(categorized_df)

    # Register the Spark DataFrame as a temporary view
    spark_df.createOrReplaceTempView("temp_updates")

    # Define the target table based on file_type
    table_mapping = {
        "cost_consumption": {
            "table": "default.COS_RMPF_CC",
            "merge_condition": """
                target.TML = source.TML AND
                target.YEAR = source.YEAR AND
                target.MONTH = source.MONTH AND
                target.CATEGORY = source.CATEGORY AND
                target.EQUIPMENT_TYPE = source.EQUIPMENT_TYPE AND
                target.REMARKS = source.REMARKS AND
                target.CREATE_DATE = source.CREATE_DATE
            """
        },
        "fleet_size": {
            "table": "default.COS_RMPF_FLEET_SIZE",
            "merge_condition": """
                target.TML = source.TML AND
                target.YEAR = source.YEAR AND
                target.MONTH = source.MONTH AND
                target.EQUIPMENT_TYPE = source.EQUIPMENT_TYPE AND
                target.REMARKS = source.REMARKS AND
                target.CREATE_DATE = source.CREATE_DATE
            """
        },
        "equipment_performance": {
            "table": "default.COS_RMPF_EQUIPMENT_PERF",
            "merge_condition": """
                target.TML = source.TML AND
                target.YEAR = source.YEAR AND
                target.MONTH = source.MONTH AND
                target.EQUIPMENT_TYPE = source.EQUIPMENT_TYPE AND
                target.REMARKS = source.REMARKS AND
                target.CREATE_DATE = source.CREATE_DATE
            """
        }
    }

    table_info = table_mapping.get(file_type)
    target_table = table_info["table"]
    merge_condition = table_info["merge_condition"]

    # Use MERGE INTO to update the original table
    spark.sql(f"""
        MERGE INTO {target_table} AS target
        USING temp_updates AS source
        ON {merge_condition}
        WHEN MATCHED THEN
            UPDATE SET 
                target.REMARKS_CATEGORY = source.REMARKS_CATEGORY,
                target.REMARKS_SUBCATEGORY = source.REMARKS_SUBCATEGORY
    """)

    # Print confirmation message upon successful update
    logger.info(f"{year}_{month}_{bu}_Categorized_{file_type}_data updated to {target_table}!")

    # Save to CSV
    notebook_path = f"file:/Workspace/Users/az.admz.yhting@hutchisonports.onmicrosoft.com/data/output/{year}_{month}_{bu}_Categorized_{file_type}_data.csv"
    dbutils.fs.put(notebook_path, spark_df.toPandas().to_csv(index=False, header=True), True)

    # Print confirmation message upon successful save
    logger.info(f"{year}_{month}_{bu}_Categorized_{file_type}_data.csv done!")
    print(f"✅ {year}_{month}_{bu}_Categorized_{file_type}_data.csv done!")


In [0]:
BU = "0502"
#YEAR, MONTH = get_previous_month()
YEAR = 2025
MONTH = 1

# log
logger = configure_logging("text_analysis")

# Lancaster Stemmer
stemmer = LancasterStemmer()

# LLM model
LLM_MODEL = ChatDatabricks(
    endpoint="databricks-meta-llama-3-3-70b-instruct"
)
# Load remark categories
all_clean_remarks_categories_df = pd.read_csv("file:/Workspace/Users/az.admz.yhting@hutchisonports.onmicrosoft.com/data/raw/all_clean_remarks_categories_new.csv")

# columns to keep
cost_consumption_keep = ["CATEGORY", "EQUIPMENT_TYPE", "COST", "CONSUMPTION", "REMARKS"]
fleet_size_keep = ["EQUIPMENT_TYPE", "NUM_OF_EQUIPMENT", "MEASURING_UNIT", "REMARKS"]
equipment_performance_keep = ["EQUIPMENT_TYPE", "OPERATIONS_AVAILABILITY", "UTILIZATION_PCT", "MMBF", "MTTR", "REMARKS"]

# Pre-define remark categories
existing_categories = {
    "additional_info": ["break_down_info", "cost_breakdown", "potential_change_of_number_of_equipment", "usage", "other"],
    "justification": ["financial_treatment", "negative_figures", "high_low_cost_reason", "policy_56", "special_cases", "no_breakdown_or_activity_or_cost", "other"],
    "other": ["other"]
}


def executor(year: int, month: int, bu: str) -> None:
    
    # Get raw df
    row_data_loader_object = RawDataExtraction(year=year, month=month)
    df_cost_consumption = row_data_loader_object.get_cost_consumption()
    df_fleet_size = row_data_loader_object.get_fleet_size()
    df_equipment_performance = row_data_loader_object.get_equipment_performance()
    
    # Process cost consumption for 'Others' ratio > 10%
    df_cost_consumption_processed = calculate_others_cost_ratio(df_cost_consumption)
    
    # Split processed cost consumption into classified and unclassified based on REMARKS_CATEGORY and REMARKS_SUBCATEGORY
    df_cost_consumption_classified_by_ratio = df_cost_consumption_processed[
        pd.notnull(df_cost_consumption_processed["REMARKS_CATEGORY"]) & 
        pd.notnull(df_cost_consumption_processed["REMARKS_SUBCATEGORY"])
    ]
    df_cost_consumption_unclassified_by_ratio = df_cost_consumption_processed[
        pd.isnull(df_cost_consumption_processed["REMARKS_CATEGORY"]) | 
        pd.isnull(df_cost_consumption_processed["REMARKS_SUBCATEGORY"])
    ]

    # Remove category and subcategory columns from df_cost_consumption_classified_by_ratio
    df_cost_consumption_unclassified_by_ratio = df_cost_consumption_unclassified_by_ratio.drop(
    ["REMARKS_CATEGORY", "REMARKS_SUBCATEGORY"], axis=1)
    
    # Get classified df and unclassified df from the remaining data
    df_cost_consumption, df_classified_cost_consumption_clean, df_unclassified_cost_consumption_clean = process_data(
        df_cost_consumption_unclassified_by_ratio, cost_consumption_keep, all_clean_remarks_categories_df
    )
    df_fleet_size, df_classified_fleet_size_clean, df_unclassified_fleet_size_clean = process_data(
        df_fleet_size, fleet_size_keep, all_clean_remarks_categories_df
    )
    df_equipment_performance, df_classified_equipment_performance_clean, df_unclassified_equipment_performance_clean = process_data(
        df_equipment_performance, equipment_performance_keep, all_clean_remarks_categories_df
    )

    # Classify remarks with LLM
    df_processed_unclassified_cost_consumption_clean = process_table(df_unclassified_cost_consumption_clean)
    df_processed_unclassified_fleet_size_clean = process_table(df_unclassified_fleet_size_clean)
    df_processed_unclassified_equipment_performance_clean = process_table(df_unclassified_equipment_performance_clean)
   
    # Process and save data
    process_and_save_file("cost_consumption", df_cost_consumption, df_classified_cost_consumption_clean, df_processed_unclassified_cost_consumption_clean, year, month, bu, df_cost_consumption_classified_by_ratio)
    process_and_save_file("fleet_size", df_fleet_size, df_classified_fleet_size_clean, df_processed_unclassified_fleet_size_clean, year, month, bu)
    process_and_save_file("equipment_performance", df_equipment_performance, df_classified_equipment_performance_clean, df_processed_unclassified_equipment_performance_clean, year, month, bu)
    
executor(YEAR, MONTH, BU)

def process_json_and_execute():
    json_path = "file:/Workspace/Users/az.admz.yhting@hutchisonports.onmicrosoft.com/data/output/updated_tml_record.json"
    lines = spark.read.text(json_path).collect()
    json_str = "\n".join([row.value for row in lines])
    data = json.loads(json_str)
    
    if "COS_RMPF" in data:
        executor(YEAR, MONTH, BU)

#process_json_and_execute()

  LLM_MODEL = ChatDatabricks(
2025-05-02 06:39:22,648 text_analysis INFO 2025_1_0502_Categorized_cost_consumption_data updated to default.COS_RMPF_CC!


Wrote 567733 bytes.


2025-05-02 06:39:28,506 text_analysis INFO 2025_1_0502_Categorized_cost_consumption_data.csv done!


✅ 2025_1_0502_Categorized_cost_consumption_data.csv done!


2025-05-02 06:39:36,654 text_analysis INFO 2025_1_0502_Categorized_fleet_size_data updated to default.COS_RMPF_FLEET_SIZE!


Wrote 27297 bytes.


2025-05-02 06:39:39,033 text_analysis INFO 2025_1_0502_Categorized_fleet_size_data.csv done!


✅ 2025_1_0502_Categorized_fleet_size_data.csv done!


2025-05-02 06:39:46,334 text_analysis INFO 2025_1_0502_Categorized_equipment_performance_data updated to default.COS_RMPF_EQUIPMENT_PERF!


Wrote 16466 bytes.


2025-05-02 06:39:48,702 text_analysis INFO 2025_1_0502_Categorized_equipment_performance_data.csv done!


✅ 2025_1_0502_Categorized_equipment_performance_data.csv done!
