# Convert Kyoto Hospital Medical Data to NutriBench Format w/ Translations

convert xls(x) to csv for easier viewing in a text editor (e.g., when validating the script's output)

In [ ]:
# xls2csv.py
import pandas as pd
import os

# List all Excel files in the current directory
excel_files = [f for f in os.listdir('.') if f.endswith('.xlsx') or f.endswith('.xls')]

for file in excel_files:
    try:
        xl = pd.ExcelFile(file)
        for sheet in xl.sheet_names:
            df = xl.parse(sheet, header=None)
            outname = f"{os.path.splitext(file)[0]}_{sheet}.csv"
            df.to_csv(outname, index=False, header=False)
            print(f"Saved: {outname}")
    except Exception as e:
        print(f"Error processing {file}: {e}")

convert the hospital data to nutribench format (without EN translations for now)

In [ ]:
# nhk2nutribench.py
import json
import re
import unicodedata
from pathlib import Path
from typing import Any, List, Dict, Tuple

import pandas as pd

# Constants
INPUT_CSV = Path("NHO-Kyoto.csv")
OUTPUT_CSV = Path("NHO-Kyoto-NutriBench.csv")
JSONL_OUTPUT = False

# ---------------------------------------------------------------------------
# Mapping and column indices
# ---------------------------------------------------------------------------
MEAL_MAP = {"朝": "breakfast", "昼": "lunch", "夕": "dinner"}

COL_TIME = 1       # "朝" / "昼" / "夕" or blank
COL_DISH = 3       # Dish name (料理名)
COL_SUBTOTAL = 7   # Label like "朝小計"
COL_ENERGY = 10    # kcal
COL_PROTEIN = 11   # g
COL_FAT = 14       # g
COL_CARB = 15      # g

# ---------------------------------------------------------------------------
# Text normalisation helpers
# ---------------------------------------------------------------------------

def normalize_text(text: str) -> str:
    """Canonicalise *text* for matching & tidy output.

    • half‑width kana → full‑width
    • full‑width roman/ASCII variants → ASCII
    • any "space separator" char → ASCII space
    • collapse multiple whitespace → one space
    """
    if not isinstance(text, str):
        text = str(text or "")

    s = unicodedata.normalize("NFKC", text)  # width & punctuation normalisation
    s = "".join(" " if unicodedata.category(c) == "Zs" else c for c in s)
    s = re.sub(r"\s+", "", s)
    return s.strip()


# ---------------------------------------------------------------------------
# Numeric conversion helper
# ---------------------------------------------------------------------------

def _to_float(cell: str | Any):
    try:
        return float(str(cell).replace(",", "")) if str(cell).strip() else None
    except ValueError:
        return None


# ---------------------------------------------------------------------------
# Core parsing routine
# ---------------------------------------------------------------------------

def parse_kyoto(csv_path: Path) -> List[Dict[str, Any]]:
    df = pd.read_csv(csv_path, header=None, dtype=str).fillna("")

    records: List[Dict[str, Any]] = []
    current_meal_en: str | None = None
    current_meal_ja: str | None = None
    items: List[str] = []
    energy_values: List[float | None] = []
    protein_values: List[float | None] = []
    fat_values: List[float | None] = []
    carb_values: List[float | None] = []

    for _, row in df.iterrows():
        time_marker = normalize_text(row[COL_TIME])
        dish_name = normalize_text(row[COL_DISH])
        subtotal_label = normalize_text(row[COL_SUBTOTAL])

        # --- start of a new meal ------------------------------------------
        if current_meal_en is None:
            if time_marker in MEAL_MAP and dish_name:
                current_meal_en = MEAL_MAP[time_marker]
                current_meal_ja = time_marker
                items = [dish_name]
                # Capture nutritional values for the first item
                energy_values = [_to_float(row[COL_ENERGY])]
                protein_values = [_to_float(row[COL_PROTEIN])]
                fat_values = [_to_float(row[COL_FAT])]
                carb_values = [_to_float(row[COL_CARB])]
            continue

        # --- inside a meal -------------------------------------------------
        if (subtotal_label.endswith("小計") and subtotal_label.startswith(current_meal_ja)):
            # Check if we have any valid nutritional values
            has_values = any(
                v is not None for values in [energy_values, protein_values, fat_values, carb_values] 
                for v in values
            )
            
            if has_values and items:  # Only add if we have both items and values
                records.append({
                    "meal_time": current_meal_en,
                    "meal_time_local": current_meal_ja,
                    "description_local": items.copy(),
                    "energy": energy_values.copy(),
                    "protein": protein_values.copy(),
                    "fat": fat_values.copy(),
                    "carb": carb_values.copy(),
                })
            
            # Reset for next meal
            current_meal_en = current_meal_ja = None
            items = []
            energy_values = []
            protein_values = []
            fat_values = []
            carb_values = []
            continue

        # --- add food item and its nutritional values ----------------------
        if dish_name and current_meal_en is not None:
            # Get nutritional values for this item
            energy = _to_float(row[COL_ENERGY])
            protein = _to_float(row[COL_PROTEIN])
            fat = _to_float(row[COL_FAT])
            carb = _to_float(row[COL_CARB])
            
            # Only add if at least one nutritional value is present
            if any(v is not None for v in [energy, protein, fat, carb]):
                items.append(dish_name)
                energy_values.append(energy)
                protein_values.append(protein)
                fat_values.append(fat)
                carb_values.append(carb)

    return records


# ---------------------------------------------------------------------------
# NutriBench DataFrame builder
# ---------------------------------------------------------------------------

def build_nutribench_df(records: List[Dict[str, Any]]) -> pd.DataFrame:
    # Convert arrays to JSON strings
    for record in records:
        record["description_local"] = json.dumps(record["description_local"], ensure_ascii=False)
        record["carb"] = json.dumps(record["carb"], ensure_ascii=False)
        record["energy"] = json.dumps(record["energy"], ensure_ascii=False)
        record["protein"] = json.dumps(record["protein"], ensure_ascii=False)
        record["fat"] = json.dumps(record["fat"], ensure_ascii=False)
    
    df = pd.DataFrame(records)

    # Add empty arrays for fields not in the original data
    df["description"] = "[]"
    df["unit"] = "[]"
    df["unit_local"] = "[]"
    df["local_language"] = "Japanese"

    order = [
        "description", "description_local", "unit", "unit_local",
        "local_language", "carb", "energy", "protein", "fat", "meal_time", "meal_time_local"
    ]
    return df[order]


# ---------------------------------------------------------------------------
# Main execution
# ---------------------------------------------------------------------------

records = parse_kyoto(INPUT_CSV)
if not records:
    raise SystemExit("No meals detected.")

df = build_nutribench_df(records)

if JSONL_OUTPUT:
    df.to_json(OUTPUT_CSV.with_suffix('.jsonl'), orient="records", lines=True, force_ascii=False)
else:
    df.to_csv(OUTPUT_CSV, index=False)

fmt = "JSONL" if JSONL_OUTPUT else "CSV"
print(f"✅ Converted {len(df)} meals → {OUTPUT_CSV} ({fmt})")

from the nutribench csv, extract a deduplicated list of foods to send to an LLM for translation

In [ ]:
# ls_foods.py
import csv
import io
import re
import ast

# Constants
INPUT_FILENAME = "NHO-Kyoto-NutriBench.csv"
OUTPUT_FILENAME = "kyoto-foods.txt"

def extract_unique_foods(csv_data):
    """
    Parses CSV data to extract, clean, and deduplicate food items from
    the 'description_local' column.

    Args:
        csv_data (str): A string containing the CSV data.

    Returns:
        list: A sorted list of unique food item names.
    """
    # Use a set to store unique food items to automatically handle duplicates.
    unique_foods = set()

    # Use io.StringIO to treat the string data as a file for the csv reader
    csv_file = io.StringIO(csv_data)
    
    # Use DictReader to easily access columns by name
    reader = csv.DictReader(csv_file)

    for row in reader:
        # The 'description_local' column contains a string representation of a list.
        # We use ast.literal_eval() to safely parse this string into a Python list.
        try:
            food_list_str = row['description_local']
            # Safely evaluate the string to a list
            food_items = ast.literal_eval(food_list_str)

            if not isinstance(food_items, list):
                continue
                
        except (ValueError, SyntaxError, KeyError):
            # Skip rows where the column is missing or the format is incorrect
            print(f"Warning: Could not parse row: {row}")
            continue

        for item in food_items:
            if item:
                unique_foods.add(item)

    # Convert the set to a sorted list for a consistent output order.
    return sorted(list(unique_foods))

# --- Main execution block ---

try:
    # Read the content from the specified CSV file.
    with open(INPUT_FILENAME, 'r', encoding='utf-8') as f:
        csv_content = f.read()
    
    # Process the file content to get the deduplicated list.
    deduplicated_foods = extract_unique_foods(csv_content)

    # Write the deduplicated list to the output file (one item per line)
    with open(OUTPUT_FILENAME, 'w', encoding='utf-8') as out_f:
        for food in deduplicated_foods:
            out_f.write(f"{food}\n")

    # Print a summary to the console
    print(f"Deduplicated list of food items from '{INPUT_FILENAME}' written to '{OUTPUT_FILENAME}'.")
    print(f"Total unique food items: {len(deduplicated_foods)}")
    if not deduplicated_foods:
        print("No food items were found or extracted.")

except FileNotFoundError:
    print(f"Error: The file '{INPUT_FILENAME}' was not found. Please make sure it's in the same directory as the script.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

translate the list of food items from source language to english. instead of translating in a single shot, we chunk the file and ask the LLM to translate it 5 lines at a time.

we do it this way because LLMs degrade as input context increases (https://fiction.live/stories/Fiction-liveBench-Mar-25-2025/oQdzQvKHw8JyXbN87 and other benchmarks), and as completion length increases (https://eqbench.com/creative_writing_longform.html, the "degradation" column). While these benchmarks focus on creative writing, I find it reasonable to extrapolate to tasks like translation as well

we translate the entire deduplicated list instead of just iterating through each row of the nutribench csv because the nutribench csv might contain duplicate food items across rows - translating each food item only once saves on API costs.

In [ ]:
# translate_ls.py
import os
from openai import OpenAI

# Constants
INPUT_FILE = "kyoto-foods.txt"
OUTPUT_FILE = "kyoto-foods-en.txt"
SRC_LANG = "Japanese"
API_BASE_URL = "https://openrouter.ai/api/v1"
API_KEY = os.environ.get("API_KEY", os.environ.get("OPENROUTER_API_KEY"))
MODEL = "google/gemini-2.5-pro"

def process_chunk(client, model, chunk_lines):
    """Process a chunk of lines and return the translation."""
    # Join the lines into a single string
    foods = "\n".join(chunk_lines)
    
    # Create the prompt with the configured language
    prompt = (
        f"You are an expert translator who specializes in nutrition and food items. "
        f"Please translate the following list of foods from {SRC_LANG} to English. "
        f"Only provide direct romanizations/transliterations when an average English speaker "
        f"would be familiar with the romanization; otherwise, provide an English localization. "
        f"Provide only the translated list.\n\n"
        f"{foods}"
    )
    
    # Generate the response
    completion = client.chat.completions.create(
        model=model,
        messages=[
            {
                "role": "user",
                "content": prompt
            }
        ],
        temperature=0.7, # slightly lower temp for (hopefully) more sane translations
    )
    
    return completion.choices[0].message.content.strip()

# Check if API key is available
if not API_KEY:
    print("Error: API_KEY or OPENROUTER_API_KEY environment variable is required")
    exit(1)

client = OpenAI(
    base_url=API_BASE_URL,
    api_key=API_KEY,
)

print(f"Using API: {API_BASE_URL}")
print(f"Using model: {MODEL}")

try:
    with open(INPUT_FILE, 'r', encoding='utf-8') as f_in, \
         open(OUTPUT_FILE, 'w', encoding='utf-8') as f_out:
        
        chunk = []
        line_count = 0
        total_chunks = 0
        
        for line in f_in:
            line = line.strip()
            if line:  # Skip empty lines
                chunk.append(line)
                line_count += 1
            
            # Process when we have 5 lines
            if len(chunk) == 5:
                total_chunks += 1
                print(f"Processing chunk {total_chunks} (lines {line_count-4} to {line_count})...")
                try:
                    translation = process_chunk(client, MODEL, chunk)
                    f_out.write(translation)
                    if not translation.endswith('\n'):
                        f_out.write("\n")
                    f_out.flush()  # Ensure output is written immediately
                except Exception as e:
                    print(f"Error processing chunk {total_chunks}: {e}")
                    # Write original lines as fallback
                    f_out.write("# Translation failed:\n")
                    f_out.write("\n".join(chunk))
                    f_out.write("\n")
                chunk = []
        
        # Process any remaining lines
        if chunk:
            total_chunks += 1
            print(f"Processing final chunk {total_chunks} ({len(chunk)} lines)...")
            try:
                translation = process_chunk(client, MODEL, chunk)
                f_out.write(translation)
                if not translation.endswith('\n'):
                    f_out.write("\n")
            except Exception as e:
                print(f"Error processing final chunk: {e}")
                # Write original lines as fallback
                f_out.write("# Translation failed:\n")
                f_out.write("\n".join(chunk))
    
    print(f"Translation complete! Processed {total_chunks} chunks.")
    print(f"Output saved to {OUTPUT_FILE}")
    
except FileNotFoundError:
    print(f"Error: Could not find input file '{INPUT_FILE}'")
except Exception as e:
    print(f"Error: {e}")

zip the source language and target language lists into a single csv translation dictionary

In [ ]:
# mk_dict.py
import csv

# Constants
JP_FILE = "kyoto-foods.txt"
EN_FILE = "kyoto-foods-en.txt"
OUTPUT_FILE = "kyoto-foods-dict.csv"

def create_food_csv(jp_file, en_file, output_file):
    """
    Reads Japanese and English food names from separate files and combines them into a CSV.
    
    Args:
        jp_file: Path to the Japanese food names file (required)
        en_file: Path to the English food names file (required)
        output_file: Path to the output CSV file (required)
    """
    try:
        # Read Japanese food names
        with open(jp_file, 'r', encoding='utf-8') as f:
            jp_foods = [line.strip() for line in f.readlines() if line.strip()]
        
        # Read English food names
        with open(en_file, 'r', encoding='utf-8') as f:
            en_foods = [line.strip() for line in f.readlines() if line.strip()]
        
        # Check if both files have the same number of items
        if len(jp_foods) != len(en_foods):
            print(f"Warning: Number of items don't match! Japanese: {len(jp_foods)}, English: {len(en_foods)}")
            print("The CSV will contain as many rows as the shorter list.")
        
        # Create CSV file
        with open(output_file, 'w', encoding='utf-8', newline='') as csvfile:
            writer = csv.writer(csvfile)
            
            # Write header
            writer.writerow(['jp_food', 'en_food'])
            
            # Write food pairs
            for jp, en in zip(jp_foods, en_foods):
                writer.writerow([jp, en])
        
        print(f"Successfully created {output_file}")
        print(f"Total rows written: {min(len(jp_foods), len(en_foods))}")
        
        # Show first few rows as preview
        print("\nFirst 5 rows of the CSV:")
        with open(output_file, 'r', encoding='utf-8') as f:
            reader = csv.reader(f)
            for i, row in enumerate(reader):
                if i < 6:  # Header + 5 data rows
                    print(f"{row[0]:<30} | {row[1]}")
                else:
                    break
                    
    except FileNotFoundError as e:
        print(f"Error: Could not find file - {e}")
    except Exception as e:
        print(f"An error occurred: {e}")

# Run the function with constants
create_food_csv(JP_FILE, EN_FILE, OUTPUT_FILE)

finally, use the dictionary to translate meals to english

In [ ]:
# translate_foods.py
import pandas as pd
import json

# Constants
INPUT_FILE = "NHO-Kyoto-NutriBench.csv"
OUTPUT_FILE = "NutriBench Multilingual Data Format.csv"
DICT_FILE = "kyoto-foods-dict.csv"

def load_translation_dictionary(dict_file):
    """Load the Japanese-English food dictionary into a dict"""
    df = pd.read_csv(dict_file)
    # Create a dictionary for easy lookup
    translation_dict = dict(zip(df['jp_food'], df['en_food']))
    return translation_dict

def translate_meal(meal_list, translation_dict):
    """Translate a list of Japanese food items to English"""
    translated_meal = []
    untranslated_items = []
    
    for item in meal_list:
        if item in translation_dict:
            translated_meal.append(translation_dict[item])
        else:
            # Item not found in dictionary
            translated_meal.append(f"[{item}]")  # Mark untranslated items
            untranslated_items.append(item)
    
    return translated_meal, untranslated_items

# Load translation dictionary
print(f"Loading translation dictionary from {DICT_FILE}...")
translation_dict = load_translation_dictionary(DICT_FILE)
print(f"Loaded {len(translation_dict)} translations")

# Load meals data
print(f"\nLoading meals data from {INPUT_FILE}...")
meals_df = pd.read_csv(INPUT_FILE)
print(f"Loaded {len(meals_df)} meals")

# Translate meals
print("\nTranslating meals...")
translated_meals = []
all_untranslated = set()

for idx, row in meals_df.iterrows():
    # Parse the JSON array from description_local
    try:
        meal_items = json.loads(row['description_local'])
    except:
        print(f"Error parsing meal at row {idx}")
        translated_meals.append([])
        continue
    
    # Translate the meal
    translated_meal, untranslated = translate_meal(meal_items, translation_dict)
    translated_meals.append(translated_meal)
    all_untranslated.update(untranslated)

# Add translated meals to dataframe
meals_df['description'] = [json.dumps(meal, ensure_ascii=False) for meal in translated_meals]

# Save results
print(f"\nSaving translated meals to {OUTPUT_FILE}...")
meals_df.to_csv(OUTPUT_FILE, index=False)

# Print summary
print(f"\nTranslation complete!")
print(f"Total meals translated: {len(translated_meals)}")

if all_untranslated:
    print(f"\nItems not found in dictionary ({len(all_untranslated)}):")
    for item in sorted(all_untranslated):
        print(f"  - {item}")

# Show a few examples
print("\n--- Example Translations ---")
for i in range(min(3, len(meals_df))):
    print(f"\nMeal {i+1} ({meals_df.iloc[i]['meal_time']}):")
    original = json.loads(meals_df.iloc[i]['description_local'])
    translated = translated_meals[i]
    print("  Original:", original)
    print("  Translated:", translated)