# Structuring COICOP 2018 information for classification

The COICOP 2018 classification contains unstructured information for its most detailed level of classification. This notebook leverages LLMs in order to structure this information in a format which can be used for similarity search.

In [35]:
import os
import time # Add delays because of free API rate limits
from datetime import datetime

import pandas as pd
from ftfy import fix_text
from unidecode import unidecode

from typing import List
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_ai.models.mistral import MistralModel
from pydantic_ai.models.groq import GroqModel
from pydantic_ai.models.gemini import GeminiModel

from dotenv import dotenv_values

In [None]:
import nest_asyncio # Fix issues with Jupyter notebook event loop
nest_asyncio.apply()

Import environment variables with API keys

In [None]:
config = dotenv_values(".env")

Define the Pydantic model for parsing the additional information in each COICOP 2018 level 4

In [None]:
class CoicopDetails(BaseModel):
    examples: List[str]

Read COICOP 2018 definitions

In [None]:
usecols = ["code","title","intro", "includes", "alsoIncludes", "excludes"]

data_df = pd.read_excel(
    "coicop_2018/COICOP_2018_English_structure_edited.xlsx", 
    usecols=usecols,)


Remove exclusion note from the English and French files, rename Spanish columns.

Remove (ND), (SD), (D), (S) markings from the class names

Filter level 4 classes only for LLMs queries

In [None]:
def process_subclasses(df):
    """
    Process DataFrame with the following operations:
    1. Filter rows where Code contains exactly 3 dots
    2. Combine description columns
    3. Remove classification markers from Description
    
    Args:
        df (pd.DataFrame): Input DataFrame
    
    Returns:
        pd.DataFrame: Processed DataFrame
    """
    # Create a copy to avoid modifying the original
    result_df = df.copy()
    
    # 1. Select rows where Code has exactly 2 dots
    result_df = result_df[result_df['code'].str.count(r'\.') == 3]
    
    # 2. Combine description columns
    columns_to_concat = ["intro", "includes", "alsoIncludes"]
    
    def concatenate_non_nan_columns(row):
        # Filter out NaN values and convert to string
        non_nan_values = [str(row[col]) for col in columns_to_concat if pd.notna(row[col])]
        return ' \n '.join(non_nan_values) if non_nan_values else ''
    
    result_df['description'] = result_df.apply(concatenate_non_nan_columns, axis=1)
    # Fix encoding issues
    result_df['description'] = result_df['description'].apply(fix_text)
    result_df['description'] = result_df['description'].apply(unidecode)
    result_df['description'] = result_df['description'].str.replace("_x000D_", " ")

    # Fix the exclusion column
    result_df['excludes'] = result_df['excludes'].fillna('')
    result_df['excludes'] = result_df['excludes'].apply(fix_text)
    result_df['excludes'] = result_df['excludes'].apply(unidecode)
    result_df['excludes'] = result_df['excludes'].str.replace("_x000D_", " ")
        
    # 3. Remove classification markers from Description
    markers_pattern = r'\s*\((ND|SD|S|D)\)'
    result_df['title'] = result_df['title'].str.replace(markers_pattern, '', regex=True)
    
    return result_df[["code", "title", "description", "excludes"]]

In [None]:
def process_subsubclasses(df):
    """
    Process DataFrame with the following operations:
    1. Filter rows where Code contains exactly 4 dots
    2. Combine description columns
    3. Remove classification markers from Description
    4. censor the code to subclass level
    
    Args:
        df (pd.DataFrame): Input DataFrame
    
    Returns:
        pd.DataFrame: Processed DataFrame
    """
    # Create a copy to avoid modifying the original
    result_df = df.copy()
    
    # 1. Select rows where Code has exactly 3 dots
    result_df = result_df[result_df['code'].str.count(r'\.') == 4]
    
    # 2. Combine description columns
    columns_to_concat = ["intro", "includes", "alsoIncludes"]
    
    def concatenate_non_nan_columns(row):
        # Filter out NaN values and convert to string
        non_nan_values = [str(row[col]) for col in columns_to_concat if pd.notna(row[col])]
        return ' \n '.join(non_nan_values) if non_nan_values else ''
    
    result_df['description'] = result_df.apply(concatenate_non_nan_columns, axis=1)
    # Fix encoding issues
    result_df['description'] = result_df['description'].apply(fix_text)
    result_df['description'] = result_df['description'].apply(unidecode)
    result_df['description'] = result_df['description'].str.replace("_x000D_", " ")

    # Fix the exclusion column
    result_df['excludes'] = result_df['excludes'].fillna('')
    result_df['excludes'] = result_df['excludes'].apply(fix_text)
    result_df['excludes'] = result_df['excludes'].apply(unidecode)
    result_df['excludes'] = result_df['excludes'].str.replace("_x000D_", " ")
        
    # 3. Remove classification markers from Description
    markers_pattern = r'\s*\((ND|SD|S|D)\)'
    result_df['title'] = result_df['title'].str.replace(markers_pattern, '', regex=True)

    # 4. censor the code to subclass level
    result_df['code'] = result_df['code'].str[:8]
    
    return result_df[["code", "title", "description", "excludes"]]

In [None]:
subsub_df = process_subsubclasses(data_df)
sub_df = process_subclasses(data_df)
data_df = pd.concat([sub_df, subsub_df], axis=0)

Remove divisions 14 and 15, as they do not add further information

In [None]:
data_df = data_df[~data_df["code"].str.startswith(("14", "15"))]

In [None]:
data_dict = data_df.to_dict(orient="records")

# Syntetic data generation

In [None]:
# Alternative versions
# system_prompt = """You are an expert data curator specializing in semantic distinctiveness. When provided with a subclass name and its inclusion/exclusion description:

# 1. Parse the inclusion description to identify all distinct items that belong in the subclass, at the most granular level possible.
#    - For example, if the text says “Chocolate bars,” specify types (milk chocolate, dark chocolate, etc.)
#    - Use the exclusion description only to clarify boundaries.

# 2. For each included item, generate a distinctive description that:
#    - Incorporates the subclass context while focusing on item-specific attributes
#    - Emphasizes unique characteristics that differentiate it from other items
#    - Uses precise terminology relevant to the domain of the subclass
#    - Maximizes semantic distance between item descriptions

# 3. Ensure each item description:
#    - Avoids generic placeholder terms (like "other," "miscellaneous," "various," "assorted," "etc.")
#    - Eliminates redundant phrasing and filler words
#    - Uses synonyms and alternative phrasing to minimize lexical overlap between descriptions
#    - Maintains specificity without resorting to catch-all language

# 4. Produce output in the same language as the input text, preserving technical terminology while maximizing semantic uniqueness across all descriptions.

# 5. Format each output entry to include the semantically distinctive item description for included items only
# """

system_prompt = """ou are an expert data curator. When provided with a subclass name and its inclusion/exclusion description:

1. Identify specific products/services that belong in this subclass based on the inclusion description. Use exclusions only to understand boundaries.

2. Generate a list of specific product/service names that:
   - Belong within the defined subclass
   - Represent specific items, not categories
   - Cover the full range of inclusions

3. Make each name semantically unique by:
   - Avoiding generic terms ("other," "miscellaneous")
   - Minimizing word overlap between entries
   - Using varied terminology

4. Output in the same language as input, preserving industry terminology.

5. Format as a simple list of product/service names.
"""

In [None]:
subclass_prompt = """Subclass title: {title}
Inclusion note: {description}
Exclusion note: {excludes}"""

## Mistral

Initialize PydanticAI agent to structure information

In [None]:
llm_model = "mistral-large-latest"

In [None]:
model = MistralModel(model_name=llm_model, api_key=config.get("MISTRAL_API_KEY"))
agent = Agent(
    model=model,
    retries=3,
    result_type=CoicopDetails,
    system_prompt=system_prompt
)

Run calls to the Agent to extract and format information

In [None]:
results = []
failed_calls = []
for i, item in enumerate(data_dict):
    # Print every 20 items to show progress
    if i % 20 == 0:
        print(f"Processing item {i+1} out of {len(data_dict)}")
    # Add switch to skip none items
    if item.get("description") is None:
        # No information to parse, just append existing item
        results.append(item)
        continue
    # Time delay to respect API rate limits
    time.sleep(3)
    try:
        prompt = subclass_prompt.format(
            title=item.get("title"),
            description=item.get("description"), 
            excludes=item.get("excludes"))
        agent_result = agent.run_sync(prompt, model_settings={'temperature': 0.0})
        # Add to results all examples, including the original class name...list concatenation
        for ex in [item.get("title")] + agent_result.data.model_dump().get("examples"):
            # the new "Description" is inserted at the end, so it overwrites the original one
            results.append({**item, "title": ex})
    except Exception as e:
        failed_calls.append(item)
        print(f"Error processing item {i+1} out of {len(data_dict)}")
        print(item)
        print(e)
        continue

Save results and failed calls

In [None]:
results_df = pd.DataFrame(results)
results_df.drop(columns=["description", "excludes"]).to_csv(
    "results/coicop2018_{}_{}.csv".format(
        llm_model,
        datetime.now().strftime("%Y-%m-%d_%H%M%S")),
    index=False)

In [None]:
failed_df = pd.DataFrame(failed_calls)
failed_df.to_csv(
    "results/failed_coicop2018_{}_{}.csv".format(
        llm_model,
        datetime.now().strftime("%Y-%m-%d_%H%M%S")),
    index=False)

## Llama 3 on Groq

In [None]:
llm_model = "llama3-70b-8192"

In [None]:
model = GroqModel(
    model_name=llm_model, 
    api_key=config.get("GROQ_API_KEY"))
agent = Agent(
    model=model,
    retries=3,
    result_type=CoicopDetails,
    system_prompt=system_prompt,
)

In [None]:
results = []
failed_calls = []
for i, item in enumerate(data_dict):
    # Print every 20 items to show progress
    if i % 20 == 0:
        print(f"Processing item {i+1} out of {len(data_dict)}")
    # Add switch to skip none items
    if item.get("description") is None:
        # No information to parse, just append existing item
        results.append(item)
        continue
    # Time delay to respect API rate limits
    time.sleep(3)
    try:
        prompt = subclass_prompt.format(
            title=item.get("title"),
            description=item.get("description"), 
            excludes=item.get("excludes"))
        agent_result = agent.run_sync(prompt, model_settings={'temperature': 0.0})
        # Add to results all examples, including the original class name...list concatenation
        for ex in [item.get("title")] + agent_result.data.model_dump().get("examples"):
            # the new "Description" is inserted at the end, so it overwrites the original one
            results.append({**item, "title": ex})
    except Exception as e:
        failed_calls.append(item)
        print(f"Error processing item {i+1} out of {len(data_dict)}")
        print(item)
        print(e)
        continue

In [None]:
results_df = pd.DataFrame(results)
results_df.drop(columns=["description", "excludes"]).to_csv(
    "results/coicop2018_{}_{}.csv".format(
        llm_model,
        datetime.now().strftime("%Y-%m-%d_%H%M%S")),
    index=False)

In [None]:
failed_df = pd.DataFrame(failed_calls)
failed_df.to_csv(
    "results/failed_coicop2018_{}_{}.csv".format(
        llm_model,
        datetime.now().strftime("%Y-%m-%d_%H%M%S")),
    index=False)

## Mixtral on Groq

In [None]:
llm_model = "mixtral-8x7b-32768"

In [None]:
model = GroqModel(
    model_name=llm_model, 
    api_key=config.get("GROQ_API_KEY"))
agent = Agent(
    model=model,
    retries=3,
    result_type=CoicopDetails,
    system_prompt=system_prompt,
)

In [None]:
results = []
failed_calls = []
for i, item in enumerate(data_dict):
    # Print every 20 items to show progress
    if i % 20 == 0:
        print(f"Processing item {i+1} out of {len(data_dict)}")
    # Add switch to skip none items
    if item.get("description") is None:
        # No information to parse, just append existing item
        results.append(item)
        continue
    # Time delay to respect API rate limits
    time.sleep(3)
    try:
        prompt = subclass_prompt.format(
            title=item.get("title"),
            description=item.get("description"), 
            excludes=item.get("excludes"))
        agent_result = agent.run_sync(prompt, model_settings={'temperature': 0.0})
        # Add to results all examples, including the original class name...list concatenation
        for ex in [item.get("title")] + agent_result.data.model_dump().get("examples"):
            # the new "Description" is inserted at the end, so it overwrites the original one
            results.append({**item, "title": ex})
    except Exception as e:
        failed_calls.append(item)
        print(f"Error processing item {i+1} out of {len(data_dict)}")
        print(item)
        print(e)
        continue

In [None]:
results_df = pd.DataFrame(results)
results_df.drop(columns=["description", "excludes"]).to_csv(
    "results/coicop2018_{}_{}.csv".format(
        llm_model,
        datetime.now().strftime("%Y-%m-%d_%H%M%S")),
    index=False)

In [None]:
failed_df = pd.DataFrame(failed_calls)
failed_df.to_csv(
    "results/failed_coicop2018_{}_{}.csv".format(
        llm_model,
        datetime.now().strftime("%Y-%m-%d_%H%M%S")),
    index=False)

## Gemini 2.0 Flash Lite Preview

In [None]:
llm_model = "gemini-2.0-flash-lite-preview-02-05"

In [None]:
model = GeminiModel(
    model_name=llm_model, 
    api_key=config.get("GEMINI_API_KEY"))
agent = Agent(
    model=model,
    retries=3,
    result_type=CoicopDetails,
    system_prompt=system_prompt,
)

In [None]:
results = []
failed_calls = []
for i, item in enumerate(data_dict):
    # Print every 20 items to show progress
    if i % 20 == 0:
        print(f"Processing item {i+1} out of {len(data_dict)}")
    # Add switch to skip none items
    if item.get("description") is None:
        # No information to parse, just append existing item
        results.append(item)
        continue
    # Time delay to respect API rate limits
    time.sleep(4)
    try:
        prompt = subclass_prompt.format(
            title=item.get("title"),
            description=item.get("description"), 
            excludes=item.get("excludes"))
        agent_result = agent.run_sync(prompt, model_settings={'temperature': 0.0})
        # Add to results all examples, including the original class name...list concatenation
        for ex in [item.get("title")] + agent_result.data.model_dump().get("examples"):
            # the new "Description" is inserted at the end, so it overwrites the original one
            results.append({**item, "title": ex})
    except Exception as e:
        failed_calls.append(item)
        print(f"Error processing item {i+1} out of {len(data_dict)}")
        print(item)
        print(e)
        continue

In [None]:
results_df = pd.DataFrame(results)
results_df.drop(columns=["description", "excludes"]).to_csv(
    "results/coicop2018_{}_{}.csv".format(
        llm_model,
        datetime.now().strftime("%Y-%m-%d_%H%M%S")),
    index=False)

In [None]:
failed_df = pd.DataFrame(failed_calls)
failed_df.to_csv(
    "results/failed_coicop2018_{}_{}.csv".format(
        llm_model,
        datetime.now().strftime("%Y-%m-%d_%H%M%S")),
    index=False)

# Combine results

In [36]:
result_files = [f for f in os.listdir("results/") if f.startswith("coicop2018") and f.endswith(".csv")]

In [37]:
results_list = []
for f in result_files:
    temp_df = pd.read_csv(os.path.join("results", f))
    results_list.append(temp_df)

results_df = pd.concat(results_list)

In [38]:
results_df["code"].nunique()

338

Normalize to lowercase and remove all "other" labels

In [39]:
# Lowercase
results_df["title"] = results_df["title"].str.lower()
# Remove duplicates
results_df = results_df.drop_duplicates(ignore_index=True)
# Remove NAs
results_df = results_df.dropna(subset=["title"])

In [40]:
# Remove items with "other" or "miscellaneous"
results_df = results_df[~(results_df["title"].str.contains(pat="other|miscellaneous"))]

Models may have generated the same label with reference to different COICOP subclasses. Temporarily, we will simply drops the duplicates keeping the first occurrence. Possibly, we could process those occurrences with a LLM in order to keep the most suitable one according to the COICOP definitions of each subclass.

In [43]:
results_df = results_df.drop_duplicates(subset=["title"], keep="first", ignore_index=True)

In [41]:
results_df["code"].nunique()

338

In [45]:
results_df.to_csv(
    "results/consolidated_coicop2018_{}.csv".format(
        datetime.now().strftime("%Y-%m-%d")
    ), index=False)