# Normalize

## Dependencies

In [None]:
import time

import pandas as pd
import numpy as np

from datetime import datetime, timedelta

import sys
from pathlib import Path

# Automatically detect the repo root (parent of notebook folder)
repo_root = Path().resolve().parent  # if notebook is in 'notebooks/' folder
sys.path.append(str(repo_root))

from config.config import get_environment

from config.config import data_import_json, data_export_json, data_import_pandas, data_export_pandas

## ENV

In [None]:
ENV = get_environment(
    env_path="../environments",
    env_name="env.json"
)

# content_date = datetime.now().date() + timedelta(days=0)
content_date = ENV['CONTENT_DATE']
version = ENV['VERSION']

website = 'all'
# website = ENV['SOURCE']['NAME']
# website = ENV['TARGET']["1"]['NAME']
# website = ENV['TARGET']["2"]['NAME']

MODEL = ENV['LLM']['GEMINI']['MODEL']
API_KEY = ENV['LLM']['GEMINI']['API_KEY']
range_input = ENV['LLM']['GEMINI']['RANGE_INPUT']

EMB_MODEL = ENV['EMBEDDING']['OPENAI']['MODEL']
EMB_API_KEY = ENV['EMBEDDING']['OPENAI']['API_KEY']

## Functions

### LLM

In [None]:
# LLM Gemini Dependencies
from typing import List, Tuple, Optional, Any
from google.genai import types, Client

def _build_schema_from_defs(defs: Any) -> types.Schema:
    if isinstance(defs, str):
        # Primitive type
        if defs == "array_of_strings" or defs == "array":
            # Array of simple strings
            return types.Schema(type="array", items=types.Schema(type="string"))
        return types.Schema(type=defs)

    if isinstance(defs, tuple):
        if len(defs) == 2:
            name, content = defs
            # "v" tuple just unwraps
            if name == "v":
                return _build_schema_from_defs(content)
            return types.Schema(type="object", properties=_build_schema_from_defs(content))
        elif len(defs) == 3:
            # (name, children, "array") -> array of objects
            name, children, marker = defs
            if marker == "array":
                return types.Schema(type="array", items=_build_schema_from_defs(children))

    if isinstance(defs, list):
        props = {}
        required = []
        for part in defs:
            if len(part) == 3 and part[2] == "array":
                # Array of objects
                name, sub, _ = part
                props[name] = types.Schema(type="array", items=_build_schema_from_defs(sub))
            else:
                name, sub = part
                if sub == "array_of_strings" or sub == "array":
                    props[name] = types.Schema(type="array", items=types.Schema(type="string"))
                else:
                    props[name] = _build_schema_from_defs(sub)
            required.append(name)
        return types.Schema(type="object", properties=props, required=required)

    raise ValueError(f"Invalid schema definition: {defs}")


def _build_types_schema(field_defs: List[Tuple[str, Any]]) -> types.Schema:
    """
    Build the top-level schema as an ARRAY of OBJECTS.

    Each top-level tuple (name, sub) becomes a property on the item object.
      - If sub is a list -> property = array(items = object defined by sub)
      - If sub is a string/tuple -> property = schema returned by _build_schema_from_defs
    """
    item_props = {}
    item_required = []

    for name, sub in field_defs:
        if isinstance(sub, list):
            item_schema = _build_schema_from_defs(sub)
            item_props[name] = types.Schema(type="array", items=item_schema)
        else:
            item_props[name] = _build_schema_from_defs(sub)

        item_required.append(name)

    return types.Schema(
        type="array",
        items=types.Schema(
            type="object",
            properties=item_props,
            required=item_required
        )
    )

def gemini_process_response(
    model_version: str,
    api_key: str,
    prompt_template: str,
    input: str,
    column_uid: str,
    response_key_list: Optional[List[Tuple[str, str]]] = None,
    timeout: int=600
):
    """
    Request Gemini model and parse structured list of dicts
    """
    # Ensure uid is always included
    if response_key_list is None:
        response_key_list = []
    response_key_list = [(column_uid, "string")] + response_key_list

    # Build schema from response_key_list
    response_schema = _build_types_schema(response_key_list)

    client = Client(api_key=api_key)
    for retry in range(3):
        try:
            response = client.models.generate_content(
                model=model_version,
                contents=(
                    prompt_template +
                    " Output must include uid exactly as provided in the input, without trimming, chopping, or normalization."
                    " list input: " + input
                ),
                config=types.GenerateContentConfig(
                    response_mime_type="application/json",
                    response_schema=response_schema,
                    thinking_config=types.ThinkingConfig(thinking_budget=0),
                    temperature=0,
                    top_p=1,
                    top_k=1,
                ),
                # timeout=timeout
            )
        except Exception as e:
            if ('ClientError' in str(e) or '429' in str(e)) and retry < 2:
                print(e, 'retry', retry+1)
                time.sleep(30)
            else:
                raise e

    if response.candidates[0].finish_reason.name != 'STOP':
        raise ValueError("response hit token output limit:", response.usage_metadata.candidates_token_count)

    return response

### Extract

In [None]:
def llm_preprocess_input(
        df_input: pd.DataFrame,
        content_date: datetime,
        version: str
    ):

    # Assign content_date
    df_input['content_date'] = str(content_date)

    # Generate UID
    df_input['uid'] = df_input.index + 1
    df_input['uid'] = df_input['uid'].astype(str)
    df_input['version'] = version

    return

def llm_generate_input(
        df_input: pd.DataFrame,
        process_column: list
    ):

    # Convert input to list
    input_data = df_input[['uid'] + process_column].apply(dict, axis=1).to_list()

    return input_data

def llm_request_gemini(
        prompt: str,
        FIELD_DEFS: list,
        input_data: list,
        MODEL: str,
        API_KEY: str,
        timeout: int=600
    ):

    response = gemini_process_response(
        model_version=MODEL,
        api_key=API_KEY,
        prompt_template=prompt,
        input=str(input_data),
        column_uid='uid',
        response_key_list=FIELD_DEFS,
        timeout=timeout
    )

    return response


def llm_dump_response(
        response: types.GenerateContentResponse,
        content_date: datetime,
        website: str,
        version: str,
        additional_info: str='response'
    ):

    # Convert to Python dict safely
    response_dict = response.model_dump()

    # Optionally save to file
    data_export_json(
        data=response_dict,
        website=website,
        folder_name=f'llm/{website}',
        version=version,
        content_date=content_date, # "0000-00-00"
        additional_info=additional_info
    )

def llm_merge_response(
        df_input: pd.DataFrame,
        response: types.GenerateContentResponse,
        website: str,
        content_date: datetime,
        version: str,
        folder_name: str,
        additional_info: str='cleaning'
    ):

    # Get token usage
    token_input = response.usage_metadata.prompt_token_count
    token_output = response.usage_metadata.candidates_token_count
    print(f"{additional_info} | Token Input: {token_input} | Token Output: {token_output}")

    # Convert parsed response to dataframe
    df_response = pd.DataFrame(response.parsed)
    df_response.insert(0, "token_input", token_input)
    df_response.insert(1, "token_output", token_output)

    # # Dump converted response to json
    # data_export_pandas(
    #     df_output=df_response,
    #     website=website,
    #     content_date=content_date,
    #     version=version,
    #     folder_name='llm',
    #     additional_info=additional_info, #'response-parsed'
    #     # incl_excel=True
    # )

    # Merge response
    df_input_merged = pd.merge(
        left=df_input,
        right=df_response,
        on='uid',
        how='left'
    )

    # Export Merged Gemini Response with Input Data
    data_export_pandas(
        df_output=df_input_merged,
        website=website,
        content_date=content_date,
        version=version,
        folder_name=folder_name,
        additional_info=additional_info
    )

    return df_input_merged

In [None]:
def extract(
        df_input: pd.DataFrame,
        website: str,
        content_date: datetime,
        version: str,
        prompt: str,
        FIELD_DEFS: list,
        MODEL: str,
        API_KEY: str,
        timeout: int=600,
        process_column: list=['item_name'],
        additional_info: str='extract_dump',
    ):

    print("Preprocessing Input Data")
    try:
        llm_preprocess_input(
            df_input=df_input,
            content_date=content_date,
            version=version
        )

    except Exception as e:
        raise(e)

    print("Generating Input List")
    try:
        input_data = llm_generate_input(
            df_input=df_input,
            process_column=process_column
        )

    except Exception as e:
        raise(e)

    print("Request Gemini Response")
    try:
        response = llm_request_gemini(
            prompt=prompt,
            FIELD_DEFS=FIELD_DEFS,
            input_data=input_data,
            MODEL=MODEL,
            API_KEY=API_KEY,
            timeout=timeout
        )

    except Exception as e:
        raise(e)

    print("Dump Gemini Response to JSON")
    try:
        llm_dump_response(
            response=response,
            content_date=content_date,
            website=website,
            version=version,
            additional_info=f'response-{additional_info}'
        )

    except Exception as e:
        raise(e)

    print("Merge Gemini Response to Input Data and Export")
    try:
        df_input_merge = llm_merge_response(
            df_input=df_input,
            response=response,
            website=website,
            content_date=content_date,
            version=version,
            folder_name=f'normalize/{website}',
            additional_info=additional_info
        )

    except Exception as e:
        raise(e)

    return df_input_merge

## Normalize

### Input

In [None]:
df_input = data_import_pandas(
    website=website,
    folder_name=f'standardized/{website}',
    version=version,
    content_date=content_date, # "0000-00-00"
    additional_info="standardized",
)

In [None]:
# Generate key_id
df_input['key_id'] = df_input[['website', 'item_id']].apply(tuple, axis=1).str.join('-')

# Fill empty package with 0
df_input['is_package'] = df_input['is_package'].fillna(0)

### Generate Gemini Cache

In [None]:
# cache_input = df_input[
#     ['item_id', 'item_name', 'item_variant']
# ].rename(columns={
#     'item_id': 'uid',
#     'item_name': 'i',
#     'item_variant': 'o',
# }).apply(dict, axis=1).to_list()

# cache_input = [str(r) for r in cache_input]

# Generate Gemini Cache
# def gemini_create_cache(
#         model_version: str,
#         api_key: str,
#         cache_name: str,
#         system_instruction: str,
#         contents: list[str],
#         expiry: str="3600s"
#     ):

#     """
#     Generate Gemini Cache
#     """

#     client = Client(api_key=api_key)

#     cached = client.caches.create(
#         model = model_version,  # or another versioned model supporting caching
#         config = types.CreateCachedContentConfig(
#             display_name = cache_name, # a name for your cache
#             system_instruction = system_instruction, # optional
#             contents = contents,
#             # optional: specify ttl (time-to-live), e.g. "3600s" or a datetime-based expire_time
#             ttl = expiry
#         )
#     )

#     print("Created cache:", cached.name)
#     return client, cached

# client, cache_variant = gemini_create_cache(
#     model_version=MODEL,
#     api_key=API_KEY,
#     cache_name='Variant Name',
#     system_instruction='You are a data cleaning and normalization assistant. Use the context below to answer.',
#     contents=cache_input
# )

# # 2. When you make requests, reference this cache
# response = client.models.generate_content(
#     model = "gemini-2.5-flash",
#     contents = "Now fill the empty item_variant for this item_id and return as uid, i, o",
#     config = types.GenerateContentConfig(
#         cached_content = cache_variant.name
#     )
# )

# print("GPT:", response.text)
# print("Usage metadata:", response.usage_metadata)

### Prompt

In [None]:
# VARIANT
# Generate context_input and subset df to be processed through llm
context_input_variant = df_input[df_input['item_variant'].notna()][
    ['item_id', 'item_name', 'brand', 'item_variant']
].rename(columns={
    'item_id': 'uid'
}).apply(dict, axis=1).to_list()[:100]

context_input_variant = str(context_input_variant)

prompt_variant = f"""
You are a data cleaning and normalization assistant.

Your task is to take a messy text containing multiple information and convert it into a clean, structured JSON object. 
Make sure to:
- Extract the `item_variant` from the `item_name` of each product.
- Return empty result with empty string if you couldn't figure out the answer.
- Do not return irrelevant text or optional output.

Return output **strictly** in JSON with these fields:
`uid`, `v` as `item_variant`.

Example:

{context_input_variant}

Now extract variant of the following input:

"""

FIELD_DEFS_variant = [
    ("r", [
        ("v", "string")
    ]),
]

process_column_variant = ['item_name']
result_column_variant = ['item_variant']


# NAME
# Generate context_input and subset df to be processed through llm
context_input_name = df_input[df_input['website'] == 'sociolla'][
    ['item_id', 'item_name', 'brand', 'item_variant']
].rename(columns={
    'item_id': 'uid'
}).apply(dict, axis=1).to_list()[:100]

context_input_name = str(context_input_name)

prompt_name = f"""
You are a data cleaning and normalization assistant.

Your task is to take a messy text containing multiple information and convert it into a clean, structured JSON object. 
Make sure to:
- Extract end of `brand` index string from `item_name` properly even though the brand is in variation, lowercase uppercase version, or abbreviation.
- For example {{'brand': 'L'OREAL', 'item_name': 'L'oreal Ultra Care Body Lotion 621ml + Pouch'}} `brand` index string '7' since it is the end of brand index in item_name including symbols and whitespaces between the brand of item_name.
- Remember to retrieve index from the item_name, not from the separated brand.
- Return `0` if the `brand` does not exist in `item_name`.
- Do not return irrelevant text or optional output.

Return output **strictly** in JSON with these fields:
`uid`, `i` as `brand index string`.

Example context:

{context_input_name}

Now extract index of the following input:

"""

FIELD_DEFS_name = [
    ("r", [
        ("i", "string")
    ]),
]

process_column_name = ['item_name']
result_column_name = ['index_brand']

### Execute LLM Request

In [None]:
def response_llm_variant(
        df_process: pd.DataFrame
    ):

    df_process['item_variant'] = df_process['r'].str[0].str['v']

    return

def response_llm_name(
        df_process: pd.DataFrame
    ):

    df_process['brand_index'] = df_process['r'].str[0].str['i']
    df_process['brand_index'] = df_process['brand_index'].astype(int)
    df_process['item_name'] = df_process[['item_name', 'brand_index']].apply(dict, axis=1).apply(lambda v: v['item_name'][v['brand_index']:])

    return

In [None]:
def revert_value(
        df_input: pd.DataFrame,
        df_process: pd.DataFrame,
        revert_cols: list=['item_name', 'item_variant']
    ):

    for col in revert_cols:
        df_input[col] = np.where(
            df_input['key_id'].isin(
                df_process[
                    (df_process[col] != '')
                    &
                    df_process[col].notna()
                ]['key_id'].unique()
            ),
            df_input['key_id'].map(
                df_process[['key_id'] + [col]].set_index('key_id')[col].to_dict()
            ),
            df_input[col]
        )

    return

In [None]:
def normalize_value(
        df_input: pd.DataFrame
    ):

    df_input['clean_brand'] = df_input['brand'].str.lower()
    df_input['clean_name'] = df_input['item_name'].str.lower().str.replace(r'(\d+)', r' \1 ', regex=True).str.split().str.join(' ')
    df_input['clean_variant'] = np.where(
        (df_input['item_variant'] != '') & df_input['item_variant'].notna(),
        df_input['item_variant'].str.lower(),
        'no info'
    )
    df_input['clean_price'] = df_input['price']

    return

In [None]:
# if website in ['sociolla']:
#     prompt = prompt_sociolla
#     FIELD_DEFS = FIELD_DEFS_sociolla
#     process_column = process_column_sociolla
#     result_column = result_column_sociolla
# elif website in ['guardian']:
#     prompt = prompt_guardian
#     FIELD_DEFS = FIELD_DEFS_guardian
#     process_column = process_column_guardian
#     result_column = result_column_guardian
# elif website in ['watsons']:
#     prompt = prompt_watsons
#     FIELD_DEFS = FIELD_DEFS_watsons
#     process_column = process_column_watsons
#     result_column = result_column_watsons

In [None]:
from config.config import date_basic
# import sys
# sys.stdout = open(f"../data/logging/{date_basic(content_date)}/{date_basic(content_date)}-cleaning.log", "w", encoding="utf-8")

from math import ceil
for info in ['variant']:

    if info in ['variant']:
        prompt = prompt_variant
        FIELD_DEFS = FIELD_DEFS_variant
        process_column = process_column_variant
        result_column = result_column_variant

        # Create temp df for llm process
        df_process = df_input[df_input['item_variant'].isna()].copy(deep=True).reset_index(drop=True)

    elif info in ['name']:
        prompt = prompt_name
        FIELD_DEFS = FIELD_DEFS_name
        process_column = process_column_name
        result_column = result_column_name

        # Create temp df for llm process
        df_process = df_input[df_input['website'] != 'sociolla'].copy(deep=True).reset_index(drop=True)

    len_input = len(df_process)
    loop_count = ceil(len_input/range_input)

    borders = [(loop*range_input, (loop+1)*range_input) for loop in range(loop_count)][:-1]
    borders = borders + [((loop_count-1)*range_input, len_input)]

    # df_process_result = pd.DataFrame()

    for lower_border, upper_border in borders:

        if lower_border < 1750:
            continue

        print(f"Processing: {lower_border} - {upper_border}")
        df_process_temp = df_process.iloc[lower_border:upper_border].copy(deep=True)

        df_process_temp = extract(
            df_input=df_process_temp,
            process_column=process_column,
            website=website,
            content_date=content_date,
            version=version,
            prompt=prompt,
            FIELD_DEFS=FIELD_DEFS,
            MODEL=MODEL,
            API_KEY=API_KEY,
            timeout=600, # in s
            additional_info=f'extract_dump_{info}_{lower_border}_{upper_border}'
        )
        print(f"Completed: {lower_border} - {upper_border}")

        df_process_result = pd.concat([
            df_process_result,
            df_process_temp
        ])

    df_process_result.reset_index(drop=True, inplace=True)

    # Fill column value with llm response value
    if info in ['variant']:
        response_llm_variant(
            df_process=df_process_result
        )

    elif info in ['name']:
        response_llm_name(
            df_process=df_process_result
        )

    # Revert llm response to the df_input
    revert_value(
        df_input=df_input,
        df_process=df_process_result,
        revert_cols=result_column
    )

# Lowercase and fill empty value with no info and split numeric name
normalize_value(
    df_input=df_input
)

data_export_pandas(
    df_output=df_input,
    website=website,
    content_date=content_date,
    version=version,
    folder_name=f'normalize/{website}',
    additional_info='normalize',
    incl_excel=True
)

# sys.stdout.close()

## Embeddings Name

In [None]:
import time

from tqdm import tqdm
from openai import OpenAI

# 3b. Embeddings for qualification (batched)

def get_embeddings_batch(
        input_list: list,
        api_key: str,
        model: str="text-embedding-3-small",
        range_input: int=50,
        sleep_sec: float=0.5
    ):

    client = OpenAI(api_key=api_key)

    embeddings = []
    for i in tqdm(range(0, len(input_list), range_input), desc="Embedding"):
        batch_texts = input_list[i:i+range_input]

        retry = 1
        while True:
            try:            
                response = client.embeddings.create(
                    model=model,
                    input=batch_texts
                )

                # response.usage.prompt_tokens
                # response.usage.total_tokens

                batch_embeddings = [item.embedding for item in response.data]
                embeddings.extend(batch_embeddings)
                break

            except Exception as e:
                if retry > 3:
                    raise e
                else:
                    retry += 1
                    print(e)

            time.sleep(sleep_sec)
    return embeddings

X_embed = get_embeddings_batch(
    input_list=df_input['clean_name'].tolist(),
    api_key=EMB_API_KEY,
    model=EMB_MODEL
)

df_input['embeddings_name'] = [list(vec) for vec in X_embed]

data_export_json(
    data=df_input[['key_id', 'embeddings_name']].apply(dict, axis=1).to_list(),
    website=website,
    content_date=content_date,
    version=version,
    folder_name=f'normalize/{website}',
    additional_info='embeddings_name',
    metadata={
        'source_embeddings': 'clean_name'
    }
)