How to Clean Your Text Data & Classify Them with LLMs (Together AI Edition)
This notebook demonstrates how to:
• Standardize food descriptions (removing abbreviations, normalizing text)
• Extract structured entities (Brand, Category, Ingredients, etc.)
• Perform advanced classification with Together AI’s LLM chat completions
• Handle missing data, deduplicate information, and validate results


1. Setup & Imports

In [None]:
# (1) Setup & Imports
import os
import re
import json
import time
import pandas as pd
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from together import Together
from ratelimit import limits, sleep_and_retry

# Constants
BATCH_SIZE = 20
MAX_TOKENS = 8024

# Initialize Together AI client
try:
    together_client = Together(api_key="YOUR_TOGETHER_API_KEY")
    print("Together client initialized successfully.")
except Exception as e:
    raise RuntimeError("Failed to initialize Together client: ", e)

2. Text Preprocessing

In [None]:
# (2) Text Preprocessing

def standardize_abbreviations(text):
    """Standardize abbreviations in food descriptions."""
    abbreviation_map = {
        # Product Types
        'BEVE': 'BEVERAGE',
        'VEG': 'VEGETABLE',
        'FR': 'FRESH',
        'PREP': 'PREPARED',
        'BRST': 'BREAST',
        'WHL': 'WHOLE',
        'W/': 'WITH',
        'W/O': 'WITHOUT',
        
        # States & Forms
        'FRZ': 'FROZEN',
        'CKD': 'COOKED',
        'UCKD': 'UNCOOKED',
        'BKD': 'BAKED',
        # ...rest of the abbreviation map...
    }
    text_up = text.upper()
    for abbr, full in abbreviation_map.items():
        text_up = re.sub(fr'\b{abbr}\b', full, text_up)
    return text_up.strip()

def preprocess_descriptions(input_file, output_file):
    """Apply text standardization to each row and save output."""
    df = pd.read_csv(input_file)
    df['Shrt_Desc'] = df['Shrt_Desc'].apply(standardize_abbreviations)
    df.to_csv(output_file, index=False)
    print(f"Processed data saved to {output_file}")

3. Prompt Definition & Rate-Limited Processing

In [None]:
# (3) Prompt Definition & Rate-Limited Processing

system_prompt = """
You are a specialized food information extraction system. Follow these classification rules:

1. STATES vs PREPARATION:
- States: RAW, FROZEN, DRIED, POWDER, LIQUID, SOLID
- Preparation: BAKED, FRIED, COOKED, GRILLED, BROILED

2. CATEGORY:
- Use base categories: MEAT, DAIRY, BEVERAGES, DESSERTS, etc.

3. SUB-CATEGORY:
- Use simple product type (e.g., "Steak" not "Wagyu Beef Steak").

4. INGREDIENTS:
- Only list base ingredients explicitly mentioned.

5. OUTPUT FORMAT:
{
    "Brand": "...",
    "Category": "...",
    "Sub-Category": "...",
    "Ingredients": [...],
    "Preparation Method": [...],
    "Cultural Origin": "...",
    "State": "...",
    "Additional Features": [...]
}
"""

CALLS_PER_MINUTE = 6000

@sleep_and_retry
@limits(calls=CALLS_PER_MINUTE, period=60)
def process_with_rate_limit(batch_data, client, prompt):
    """Use chat completions under rate limits."""
    messages = [
        {"role": "system", "content": prompt},
        {"role": "user", "content": f"Return a JSON array. Analyze:\n{chr(10).join(batch_data[0])}"}
    ]
    return client.chat.completions.create(
        model="meta-llama/Llama-3.3-70B-Instruct-Turbo",
        messages=messages,
        max_tokens=MAX_TOKENS,
        temperature=0.1,
        top_p=0.9
    )

4. Batch Processing & Flattening

In [None]:
# (4) Batch Processing & Flattening

def process_batch(batch_data, ndb_nos):
    """Send batch to LLM and return parsed JSON results."""
    try:
        response = process_with_rate_limit(batch_data, together_client, system_prompt)
        if response and hasattr(response, 'choices'):
            content = response.choices[0].message.content.strip()
            results = json.loads(content)
            for item, ndb in zip(results, ndb_nos):
                item['NDB_No'] = ndb
            return results
    except Exception as e:
        print(f"Batch error: {str(e)}")
    return []

def flatten_results(results):
    """Convert LLM JSON output to a row-based format."""
    flattened = []
    for r in results:
        flattened.append({
            'NDB_No': r.get('NDB_No'),
            'Brand': r.get('Brand'),
            'Category': r.get('Category'),
            'Sub_Category': r.get('Sub-Category'),
            'Ingredients': ','.join(r.get('Ingredients', [])),
            'Preparation_Method': ','.join(r.get('Preparation Method', [])),
            'Cultural_Origin': r.get('Cultural Origin'),
            'State': r.get('State'),
            'Additional_Features': ','.join(r.get('Additional Features', []))
        })
    return flattened

def create_fixed_batches(descriptions, batch_size=BATCH_SIZE):
    """Create fixed-size batches for stable LLM calls."""
    return [(descriptions[i:i + batch_size], i // batch_size)
            for i in range(0, len(descriptions), batch_size)]

5. End-to-End Chunked Processing


In [None]:
# (5) End-to-End Chunked Processing

def process_bulk_in_chunks(input_file, output_dir, rows_per_chunk=500):
    os.makedirs(output_dir, exist_ok=True)
    df = pd.read_csv(input_file)
    descriptions = df["Shrt_Desc"].tolist()
    ndb_nos = df["NDB_No"].tolist()

    total_rows = len(df)
    chunk_index = 0

    for start_idx in range(0, total_rows, rows_per_chunk):
        end_idx = start_idx + rows_per_chunk
        chunk_desc = descriptions[start_idx:end_idx]
        chunk_ndb = ndb_nos[start_idx:end_idx]

        batches = create_fixed_batches(chunk_desc, BATCH_SIZE)
        all_results = []

        for batch_data, batch_idx in tqdm(batches, desc=f"Processing chunk {chunk_index}"):
            batch_start = batch_idx * BATCH_SIZE
            batch_end = batch_start + BATCH_SIZE
            ndb_subset = chunk_ndb[batch_start:batch_end]

            batch_results = process_batch((batch_data, batch_idx), ndb_subset)
            all_results.extend(flatten_results(batch_results))

        chunk_file = os.path.join(output_dir, f"output_chunk_{chunk_index}.csv")
        if all_results:
            pd.DataFrame(all_results).to_csv(chunk_file, index=False)
            print(f"[Chunk {chunk_index}] Saved {len(all_results)} rows to {chunk_file}")
        else:
            print(f"[Chunk {chunk_index}] No results.")

        chunk_index += 1

    print("All chunks processed.")

6. Main Execution

In [None]:
# (6) Main Execution

def main():
    # 1. Preprocess descriptions (Abbreviation Expansion)
    input_raw = "mnt/data/replaced_v3.csv"
    output_preprocessed = "mnt/data/standardized_descriptions.csv"
    preprocess_descriptions(input_raw, output_preprocessed)

    # 2. Perform chunked extraction & classification
    output_dir = "mnt/data/output_chunks"
    process_bulk_in_chunks(output_preprocessed, output_dir, rows_per_chunk=500)

if __name__ == "__main__":
    main()