In [1]:
import numpy as np
import pandas as pd
import re

In [None]:
def map_img_src_to_medal(img_src:pd.Series) -> pd.Series:

    medal = {
        'https://cdn.homebrewersassociation.org/wp-content/uploads/2021/02/04134634/medal-1-1.svg' : 'NHC GOLD',
        'https://cdn.homebrewersassociation.org/wp-content/uploads/2021/02/04134237/medal-0-1.svg' : 'NORMAL MEDAL',
        'https://cdn.homebrewersassociation.org/wp-content/uploads/2021/02/04134238/clone-flag.svg' : 'CLONE',
        'https://cdn.homebrewersassociation.org/wp-content/uploads/2021/02/04134239/pro-am-flag.svg' : 'PRO AM',
        'https://cdn.homebrewersassociation.org/wp-content/uploads/2021/02/04134635/medal-3-1.svg' : 'NHC COPPER',
        'https://cdn.homebrewersassociation.org/wp-content/uploads/2021/02/04134633/medal-2-1.svg' : 'NHC SILVER',
        'Not Found' : 'No Medal'
    }
    medal = img_src.map(medal)
    return medal

def find_index_topics(ingredient:list) -> dict:
    idx_topics = {
        'MALTS': np.nan,
        'Fermentable': np.nan,
        'EXTRACT': np.nan,
        'HOPS': np.nan,
        'WATER': np.nan,
        'YEAST': np.nan,
        'ADDITIONAL': np.nan,
        'Specifications': np.nan,
        'Yield': np.nan,
        'Original Gravity': np.nan,
        'Final Gravity': np.nan,
        'ABV': np.nan,
        'IBU': np.nan,
        'SRM': np.nan,
        'Efficiency': np.nan,
    }

    for i, text in enumerate(ingredient):
        
        for topics in idx_topics.keys():
            
            # Get index of topic words for futher cleaning
            
            if text.lower().find(topics.lower()) == 0: # if find topics as first word in each ingredient list
                idx_topics[topics] = i
    
    return idx_topics

def find_index_not_nan(keys:list, idx_topics):
    for key in keys:
        value = idx_topics[key]
        if not np.isnan(value):
            return value
    return np.nan

def extract_ingredients(start_keys, end_keys, idx_topics, ingredient):
    i_start = find_index_not_nan(keys=start_keys, idx_topics=idx_topics)

    # Early return if start index not found
    if np.isnan(i_start):
        return ''

    # Filter values greater than i_start
    filtered_values = [v for v in idx_topics.values() if not np.isnan(v) and v > i_start]

    # If no values are greater than i_start, return rest of ingredient list
    if not filtered_values:
        return '\n'.join(ingredient[int(i_start)+1:])

    # Find the smallest value that is greater than i_start
    i_end = min(filtered_values, key=lambda v: v - i_start)

    if not np.isnan(i_end):
        return '\n'.join(ingredient[int(i_start)+1:int(i_end)])
    return ''

In [None]:
# Load all recipes data
df1 = pd.read_csv('../dataset/all_recipes_new.csv')
df2 = pd.read_csv('../dataset/all_recipes_new1.csv')
df = pd.concat([df1, df2])

# Load all url data which contain => style, recipe name, category (beer, cider, mead)
df_url = pd.read_csv('../dataset/all_urls.csv')
df_url = df_url.drop(['Unnamed: 0'], axis=1)
df_url = df_url.groupby('url').agg({
    'style': 'first',          
    'beer_name': 'first',      
    'final_results': 'first',  
    'category': list          
}).reset_index()

df = pd.merge(df, df_url[['beer_name', 'style', 'url', 'category']], how='left', on='url')

# Recheck category (beer, cider, mead) => Due to source categorize not correctly
type_ = []
for desc in df['ingredients']:
    types = [t for t in ['hops', 'malt', 'honey', 'cider', 'mead', 'apple'] if t in desc.lower()]
    type_.append(types)
df['recheck_category'] = type_

category_final = []
for cat, recheck in df[['category', 'recheck_category']].values:
    if 'Mead' in cat:
        if ('malt' in recheck) or ('hops' in recheck):
            cat_final = 'Beer'
        cat_final = 'Mead'
        
    elif 'Cider' in cat:
        if ('malt' in recheck) or ('hops' in recheck):
            cat_final = 'Beer'
        cat_final = 'Cider'
    
    else:
        cat_final = 'Beer'
        
    category_final.append(cat_final)

# Get final category
df['final category'] = category_final


# Extract winning medal
df['medal'] = map_img_src_to_medal(df['src'])
df = df.drop(['url', 'src'], axis=1)
df = df.fillna('N/A')

# Extract year
year_in_text = []
for descrip in df['description'].values:
    year = [str(i) for i in np.arange(1900, 2040) if str(i) in descrip]
    
    year_in_text.append(year)
    
df['year'] = year_in_text
df['year'] = df['year'].apply(lambda x: max(x) if len(x) > 0 else np.nan)

# df = df[(df['final category'] == 'Beer') & (df['medal'].isin(['NHC GOLD', 'NHC SILVER', 'NHC COPPER']))].copy()
# df = df[df['category'].apply(lambda x: 'Extract' not in x)]
df.head()

In [12]:
# Extract Ingredients by Topics

# Initialize an empty list to collect rows
rows_list = []

# Iterate over each row in the original DataFrame
for index, row in df.iterrows():
    ingredient = row['ingredients']
    ingredient = ingredient.split('\n')  
    idx_topics = find_index_topics(ingredient)
    
    # Extracting each ingredient type
    all_ingredient = {}
    all_ingredient['fermentable'] = extract_ingredients(['MALTS', 'Fermentable', 'EXTRACT'], ['HOPS', 'WATER', 'YEAST'], idx_topics, ingredient)
    all_ingredient['hops'] = extract_ingredients(['HOPS'], ['WATER', 'YEAST'], idx_topics, ingredient)
    all_ingredient['water'] = extract_ingredients(['WATER'], ['YEAST'], idx_topics, ingredient)
    all_ingredient['yeast'] = extract_ingredients(['YEAST'], ['ADDITIONAL ITEMS', 'Specifications'], idx_topics, ingredient)
    all_ingredient['additional'] = extract_ingredients(['ADDITIONAL'], ['Specifications'], idx_topics, ingredient)
    
    # Extracting specifications
    specifications = {}
    topics = ['Yield', 'Original Gravity', 'Final Gravity', 'ABV', 'IBU', 'SRM', 'Efficiency']
    for topic in topics:
        idx = idx_topics.get(topic, np.nan)
        if np.isnan(idx):
            specifications[topic] = np.nan
        else:
            text = ingredient[int(idx)]
            num = [float(num) for num in re.findall(r'\d+\.\d+|\d+', text)]
            specifications[topic] = num[0] if num else np.nan  
            
        specifications[topic] = str(specifications[topic])

    combined_data = pd.Series({**row.to_dict(), **all_ingredient, **specifications})
    rows_list.append(combined_data)

df_new = pd.DataFrame(rows_list)

df_new.reset_index(drop=True, inplace=True)

In [24]:
# Iterate over each row in the original DataFrame

ingredient_extracted = []
for index, row in df.iterrows():
    ingredient = row['ingredients']
    ingredient = ingredient.split('\n')
    
    ingre_dict = {}
    texts=['malt', 'barley', 'sugar', 'corn', 'dextrose', 'LME', 'rice', 'flake',
           'oat', 'extract', 'roast', 'torrified', 'crystal', 'powder', 'cara', '2-row', 'otter', 'wheat',
           'Dextrose', 'Sugar', 'Honey', 'Lactose', 'Maltodextrin', 'Maple Syrup', 'Molasses', 'Sucrose', 'Turbinado',
           '2-Row', '6-Row', 'Acidulated', 'Golden Promise', 'Maris Otter', 'Mild', 'Munich', 'Peat Smoked Malt', 'Pilsner',
           'Rye', 'Smoked', 'Vienna', 'Wheat', 'Caramel', 'Amber', 'Cara Ruby', 'Carafoam', 'Caramel/Crystal', 'Carapils', 
           'Honey', 'Extract', 'DME', 'LME', 'Chit Malt', 'Malted', 'Roasted', 'Flaked','Barley', 'Corn', 'Oats', 'Rice', 'Rye',
           'Spelt', 'Wheat', 'Grits', 'Rice Hulls', 'Torrified Wheat', 'Aromatic', 'Biscuit', 'Black', 'Brown', 'Chocolate', 'Special Roast']

    ferment = []
    for i in ingredient:
        for t in texts:
            if (t.lower() in i.lower()) & (i.lower().find(t.lower()) != 0): # if find topics as first word in each ingredient list
                ferment.append(i)
                break
    ingre_dict['fermentable'] = ferment
    
    texts=['hop', 'pallet', 'a.a.', 'AA', 'Astra', 'Eclipse', 'Ella', 'Enigma', 'Feux-Coeur Francais', 'Galaxy', 'Helga',
           'HPA-016', 'Melba', 'Pride of Ringwood', 'Summer', 'Super Pride', 'Sylva', 'Topaz', 
            'Vic Secret', 'Vienna Gold', 'Canadian Redvine', 'Lumberjack', 'Sasquatch', 
            'Wild Loyalist', 'Marco Polo', 'Tsingdao Flower', 'Agnus', 'Amethyst', 'Bohemie', 
            'Boomerang', 'Bor', 'Gaia', 'Harmonie', 'Kazbek', 'Mimosa', 'Premiant', 'Saaz CZ', 
            'Saaz Late', 'Sládek', 'Vital', 'Aramis', 'Barbe Rouge', 'Bouclier', 'Elixir', 
            'Mistral', 'Petit Blanc', 'Strisselspalt', 'Tardif de Bourgogne', 'Triskel', 
            'Akoya', 'Ariana', 'Aurum', 'Brewer\'s Gold GR', 'Callista', 'Diamant', 
            'Hallertau Blanc', 'Hallertau Gold', 'Hallertau Mittelfrüh', 'Hallertau Tradition', 
            'Herkules', 'Hersbrucker', 'Huell Melon', 'Hüller Bitterer', 'Magnum GR', 
            'Mandarina Bavaria', 'Merkur', 'Monroe', 'Northern Brewer GR', 'Opal', 'Orion', 
            'Perle GR', 'Polaris', 'Relax', 'Saphir', 'Smaragd', 'Solero', 'Spalt', 
            'Spalter Select', 'Tango', 'Tettnanger', 'Wurttemberg', 'Yellow Sub', 'Golden Star', 
            'Sorachi Ace', 'Toyomidori', 'Brooklyn', 'Dr. Rudi', 'Green Bullet', 'HORT9909', 
            'Kohatu', 'Motueka', 'Moutere', 'Nectaron', 'Nelson Sauvin', 'NZH-107', 
            'Pacific Gem', 'Pacific Jade', 'Pacific Sunrise', 'Pacifica', 'Rakau', 'Riwaka', 
            'Smooth Cone', 'Southern Cross', 'Sticklebract', 'Taiheke', 'Wai-iti', 'Waimea', 
            'Wakatu', 'Junga', 'Limbus', 'Lubelska', 'Magnat', 'Marynka', 'Pulawski', 
            'Sybilla', 'Zula', 'Apolon', 'Atlas', 'Aurora', 'Bobek', 'Cekin', 'Celeia', 
            'Dana', 'Styrian Cardinal', 'Styrian Dragon', 'Styrian Eagle', 'Styrian Fox', 
            'Styrian Golding', 'Styrian Kolibri', 'Styrian Wolf', 'African Queen', 
            'Southern Aroma', 'Southern Brewer', 'Southern Dawn', 'Southern Passion', 
            'Southern Promise', 'Southern Star', 'Southern Sublime', 'Southern Tropic', 
            'XJA2/436', 'Zagrava', 'Admiral', 'Archer', 'Beata', 'Boadicea', 'Bramling Cross', 
            'Challenger', 'Defender', 'East Kent Goldings', 'Endeavour', 'Epic', 'Ernest', 
            'First Gold', 'Flyer', 'Fuggle', 'Godiva', 'Hallertau Taurus', 'Harlequin', 
            'Herald', 'Jester', 'Minstrel', 'Mystic', 'Olicana', 'Omega', 'Phoenix', 'Pilgrim', 
            'Pilot', 'Pioneer', 'Progress', 'Sovereign', 'Sussex', 'Target', 
            'Whitbread Golding Variety WGV', 'Yeoman', 'Adeena', 'Ahhhroma', 'Ahtanum', 
            'Aloha Blend', 'Altus', 'Amarillo', 'Apollo', 'Aquila', 'Arcadian', 'Azacca', 
            'Banner', 'Belma', 'Bergamot', 'Bianca', 'Bitter Gold', 'Bravo', 'Brewer\'s Gold US', 
            'BRU-1', 'Bullion', 'Buzz Bullets', 'Caliente', 'Calypso', 'Cascade', 'Cashmere', 
            'Centennial', 'Chelan', 'Chinook', 'Citra', 'Cluster', 'Cluster Fugget Blend', 
            'Columbia', 'Columbus', 'Comet', 'Contessa', 'Crystal', 'CTZ', 'Delta', 
            'Diamond Springs', 'Ekuanot', 'El Dorado', 'Emerald Spire', 'Equinox', 'Eroica', 
            'Eureka', 'Evergreen Blend', 'Falconer\'s Flight', 'Falconer\'s Flight 7CS', 
            'Fantasia Blend', 'Galena', 'Gemini', 'Glacier', 'Golding', 'Grungeist', 
            'Hallertau US', 'Hartwick', 'HBC 472', 'HBC 586', 'HBC 630', 'HBC 692', 
            'Helios', 'Horizon', 'Hydra', 'Idaho 7', 'Idaho Gem', 'Independence', 'Jarrylo', 
            'Lambic', 'Lawton', 'Lemondrop', 'Liberty', 'Loral', 'Lotus', 'Mackinac', 
            'Magnum US', 'McKenzie', 'Medusa', 'Meridian', 'Michigan Copper', 'Millennium', 
            'Mosaic', 'Mount Hood', 'Mount Rainier', 'Multihead', 'Newport', 'Nobility Blend', 
            'Northdown', 'Northern Brewer US', 'Nugget', 'Old Mission', 'Olympic', 
            'Pacific Crest', 'Pahto', 'Palisade', 'Paradigm', 'Pekko', 'Perle US', 
            'Petoskey', 'Prussian', 'Saaz US', 'Sabro', 'Samba', 'Santiam', 'Satus', 
            'Saugatuck', 'Sequoia Blend', 'Shaddock', 'Simcoe', 'Sitiva Blend', 'Sonnet', 
            'Sterling', 'Sticky Fingers Blend', 'Stirling', 'Strata', 'Sultana Denali', 
            'Summit', 'Sun', 'Sunbeam', 'Super Galena', 'Tahoma', 'Talisman', 'Talus', 
            'Teamaker', 'Tettnang US', 'Tillicum', 'TNT Blend', 'Tomahawk', 'Trident', 
            'TriplePearl', 'Triumph', 'Tropica', 'Ultra', 'USDA 008', 'Vanguard', 'Vista', 
            'Walhalla', 'Warrior', 'Wild Wolf', 'Willamette', 'X13459', 'Yakima Cluster', 
            'Yakima Gold', 'YQH-1380', 'Zamba Blend', 'Zappa', 'Zenia', 'Zenith', 'Zeus', 
            'Zythos'
]
    hops = []
    for i in ingredient:
        
        for t in texts:
            if (t.lower() in i.lower()) & (t.lower().find(i.lower()) != 0): # if find topics as first word in each ingredient list
                hops.append(i)
                if i.lower() == 'hops':
                    hops.remove(i)
                break
    ingre_dict['hops'] = hops

    texts=['yeast', 'white labs', 'Wyeast', 'Omega', 'Lallemand', 'Fermentis', 'Safale', 'wy', 'wlp', 'Imperial', 'White Labs', 'WLP']
    yeast = []
    for i in ingredient:
        for t in texts:
            if (t in i): # if find topics as first word in each ingredient list
                yeast.append(i)
                if i.lower() == 'yeast':
                    hops.remove(i)
                break
    ingre_dict['yeast'] = yeast

    texts=['ppm', ]
    water = []
    for i in ingredient:
        for t in texts:
            if (t.lower() in i.lower()) & (t.lower().find(i.lower()) != 0): # if find topics as first word in each ingredient list
                water.append(i)
                break
    ingre_dict['water'] = water

    texts=['nutrient', 'moss', 'Whirfloc']
    addition = []
    for i in ingredient:
        for t in texts:
            if (t.lower() in i.lower()) & (t.lower().find(i.lower()) != 0): # if find topics as first word in each ingredient list
                addition.append(i)
                break
    ingre_dict['addition'] = addition
    
    
    ingredient_extracted.append(ingre_dict)

In [25]:
df_new['fermenatble2'] = [i['fermentable'] for i in ingredient_extracted]
df_new['hops2'] = [i['hops'] for i in ingredient_extracted]
df_new['yeast2'] = [i['yeast'] for i in ingredient_extracted]
df_new['water2'] = [i['water'] for i in ingredient_extracted]
df_new['additional2'] = [i['addition'] for i in ingredient_extracted]

In [None]:
df_new.to_excel('../dataset/clean3.xlsx', index=False)

In [16]:
# df1 = pd.read_csv('/Users/famepatcharapol/Desktop/Learning/craft_beer/all_recipes_new.csv')
# df2 = pd.read_csv('/Users/famepatcharapol/Desktop/Learning/craft_beer/all_recipes_new1.csv')
# df = pd.concat([df1, df2])
# df.to_csv('/Users/famepatcharapol/Desktop/Learning/craft_beer/all_recipes.csv')

In [None]:
raw_json = """
{
  "title": "Lines on the Map - English Strong Ale",
  "url": "https://www.homebrewersassociation.org/homebrew-recipe/lines-on-the-map-english-strong-ale/",
  "style": "English Strong Ale",
  "brewer": "Chris Colby",
  "source": "Zymurgy Magazine (July/August 2023)",
  "competition": {
    "name": "National Homebrew Competition",
    "year": 2023,
    "award": "Gold Medal",
    "category": "Strong American Ale"
  },
  "description": "Strong dark ale with a moderately roasty character. Reasonably dry considering its high gravity. Balanced malt, hops, and roast character.",
  "ingredients": {
    "malts": [
      {"name": "English pale ale malt", "amount_lb": 9.0, "color_L": 3},
      {"name": "Crystal malt", "amount_oz": 6, "color_L": "60–80"},
      {"name": "Chocolate malt", "amount_oz": 5, "color_L": "350–400"},
      {"name": "Black malt", "amount_oz": 3, "color_L": 500},
      {"name": "Light LME", "amount_lb": 4},
      {"name": "Sucrose", "amount_oz": 8}
    ],
    "hops": [
      {"name": "Goldings", "amount_oz": 3, "alpha_acid_pct": 5.0, "time_min": 60, "usage": "boil"},
      {"name": "Goldings", "amount_oz": 1, "alpha_acid_pct": 5.0, "time_min": 0, "usage": "flameout"},
      {"name": "Goldings", "amount_oz": 1, "alpha_acid_pct": 5.0, "usage": "dry hop", "duration_days": 5}
    ],
    "yeast": {
      "name": "Attenuative English Ale Yeast",
      "brand": "Generic",
      "starter_volume_L": 3.0,
      "fermentation_temp_F": 70
    },
    "adjuncts": [
      {"item": "Irish moss", "amount_tsp": 0.5, "time_min": 20, "purpose": "clarity"},
      {"item": "Corn sugar", "amount_oz": 4.5, "purpose": "priming"}
    ]
  },
  "water_profile": {
    "source_type": "Carbon-filtered city water (Akron, OH)",
    "treatment": [
      {"additive": "Calcium chloride (CaCl₂)", "amount_tsp": 0.5, "purpose": "malt balance"},
      {"additive": "Calcium sulfate (CaSO₄ / gypsum)", "amount_tsp": 0.5, "purpose": "hop accentuation"}
    ],
    "target_minerals_ppm": {
      "Ca": 150,
      "Mg": 10,
      "Na": 25,
      "Cl": 70,
      "SO4": 150,
      "HCO3": 70
    },
    "pH_target": 5.3,
    "notes": "Adjust with distilled water if needed; designed for balanced English ale profile."
  },
  "specs": {
    "batch_size_gal": 5.0,
    "boil_time_min": 90,
    "efficiency_pct": 70,
    "og": 1.090,
    "fg": 1.022,
    "abv_pct": 8.7,
    "ibu": 56,
    "srm": 35
  },
  "directions": {
    "mash": {
      "temp_F": 150,
      "time_min": 60,
      "mash_out_temp_F": 170,
      "notes": "Stir every 10 minutes; maintain temp by adding boiling water if needed."
    },
    "boil": {
      "time_min": 90,
      "hop_schedule": ["60 min", "0 min"],
      "additions": ["Irish moss @20 min", "cane sugar @5 min"]
    },
    "fermentation": {
      "primary_temp_F": 70,
      "duration_days": 14,
      "dry_hop_days": 5
    },
    "conditioning": {
      "method": "bottle",
      "priming_sugar_g": 130,
      "target_CO2_vol": 2.3
    }
  },
  "extract_version": {
    "instructions": "Substitute pale malt with 13.75 lb Maris Otter LME. Steep remaining grains at 155°F for 30 min, dissolve extract, and proceed as above."
  }
}
"""

recipe = json.loads(raw_json)

# If you have many recipes, put them in a list:
data = [recipe]   # later this becomes a list of many recipe dicts


In [None]:
# Install required packages (run once)
# !pip install openai pydantic python-dotenv tqdm

In [None]:
import os
import json
from typing import Optional
from pydantic import BaseModel, Field
from openai import OpenAI
from dotenv import load_dotenv
from tqdm import tqdm
import time

# Load environment variables from .env file
load_dotenv('../.env')

# Initialize OpenAI client
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

In [None]:
# Define Pydantic models for structured output

class Malt(BaseModel):
    name: str = Field(description="Name of the malt/grain")
    amount: float = Field(description="Amount of malt")
    unit: str = Field(description="Unit: lb, oz, kg, or g")
    color_L: Optional[float] = Field(default=None, description="Color in Lovibond")

class Hop(BaseModel):
    name: str = Field(description="Name of the hop variety")
    amount: float = Field(description="Amount of hops")
    unit: str = Field(description="Unit: oz or g")
    alpha_acid_pct: Optional[float] = Field(default=None, description="Alpha acid percentage")
    time_min: Optional[int] = Field(default=None, description="Boil time in minutes, 0 for flameout")
    usage: str = Field(description="Usage: boil, flameout, dry hop, FWH (first wort hop), whirlpool")

class Yeast(BaseModel):
    name: str = Field(description="Name of the yeast strain")
    brand: Optional[str] = Field(default=None, description="Brand: Wyeast, White Labs, Fermentis, Lallemand, etc.")
    product_code: Optional[str] = Field(default=None, description="Product code: 1056, WLP001, US-05, etc.")
    starter_volume_L: Optional[float] = Field(default=None, description="Yeast starter volume in liters")

class MineralProfile(BaseModel):
    """Mineral profile with specific fields for common brewing minerals."""
    model_config = {"extra": "forbid"}
    
    Ca: Optional[float] = Field(default=None, description="Calcium in ppm")
    Mg: Optional[float] = Field(default=None, description="Magnesium in ppm")
    Na: Optional[float] = Field(default=None, description="Sodium in ppm")
    Cl: Optional[float] = Field(default=None, description="Chloride in ppm")
    SO4: Optional[float] = Field(default=None, description="Sulfate in ppm")
    HCO3: Optional[float] = Field(default=None, description="Bicarbonate in ppm")

class Water(BaseModel):
    description: Optional[str] = Field(default=None, description="Water source and treatment description")
    minerals_ppm: Optional[MineralProfile] = Field(default=None, description="Target mineral levels in ppm")

class Adjunct(BaseModel):
    item: str = Field(description="Name of the adjunct/addition")
    amount: Optional[float] = Field(default=None, description="Amount")
    unit: Optional[str] = Field(default=None, description="Unit: oz, g, tsp, etc.")
    purpose: Optional[str] = Field(default=None, description="Purpose: clarity, priming, flavor, etc.")

class Ingredients(BaseModel):
    malts: list[Malt] = Field(default_factory=list, description="List of malts and fermentables")
    hops: list[Hop] = Field(default_factory=list, description="List of hops")
    yeast: Optional[Yeast] = Field(default=None, description="Yeast information")
    water: Optional[Water] = Field(default=None, description="Water profile")
    adjuncts: list[Adjunct] = Field(default_factory=list, description="Additional ingredients")

class Specs(BaseModel):
    batch_size_gal: Optional[float] = Field(default=None, description="Batch size in gallons")
    boil_time_min: Optional[int] = Field(default=None, description="Boil time in minutes")
    og: Optional[float] = Field(default=None, description="Original gravity (e.g., 1.050)")
    fg: Optional[float] = Field(default=None, description="Final gravity (e.g., 1.010)")
    abv_pct: Optional[float] = Field(default=None, description="Alcohol by volume percentage")
    ibu: Optional[float] = Field(default=None, description="International Bitterness Units")
    srm: Optional[float] = Field(default=None, description="Standard Reference Method (color)")
    efficiency_pct: Optional[float] = Field(default=None, description="Mash efficiency percentage")

class MashDirections(BaseModel):
    temp_F: Optional[float] = Field(default=None, description="Mash temperature in Fahrenheit")
    time_min: Optional[int] = Field(default=None, description="Mash time in minutes")
    notes: Optional[str] = Field(default=None, description="Additional mash notes")

class BoilDirections(BaseModel):
    time_min: Optional[int] = Field(default=None, description="Boil time in minutes")
    notes: Optional[str] = Field(default=None, description="Additional boil notes")

class FermentationDirections(BaseModel):
    temp_F: Optional[float] = Field(default=None, description="Fermentation temperature in Fahrenheit")
    duration_days: Optional[int] = Field(default=None, description="Fermentation duration in days")
    notes: Optional[str] = Field(default=None, description="Additional fermentation notes")

class Directions(BaseModel):
    mash: Optional[MashDirections] = Field(default=None, description="Mash directions")
    boil: Optional[BoilDirections] = Field(default=None, description="Boil directions")
    fermentation: Optional[FermentationDirections] = Field(default=None, description="Fermentation directions")

class Competition(BaseModel):
    name: Optional[str] = Field(default=None, description="Competition name")
    year: Optional[int] = Field(default=None, description="Competition year")
    award: Optional[str] = Field(default=None, description="Award: Gold Medal, Silver Medal, Bronze Medal")
    category: Optional[str] = Field(default=None, description="Competition category")

class Recipe(BaseModel):
    title: str = Field(description="Recipe title/name")
    style: Optional[str] = Field(default=None, description="Beer style")
    brewer: Optional[str] = Field(default=None, description="Brewer name")
    source: Optional[str] = Field(default=None, description="Source publication and date")
    year: Optional[int] = Field(default=None, description="Recipe year")
    competition: Optional[Competition] = Field(default=None, description="Competition information if any")
    description: Optional[str] = Field(default=None, description="Short description of the beer")
    ingredients: Ingredients = Field(description="Recipe ingredients")
    specs: Specs = Field(description="Beer specifications")
    directions: Directions = Field(description="Brewing directions")
    extract_version: Optional[str] = Field(default=None, description="Extract brewing version instructions if available")

print("Pydantic models defined successfully!")
print(f"Recipe schema has {len(Recipe.model_fields)} top-level fields")

In [None]:
# OpenAI extraction function using structured outputs

EXTRACTION_PROMPT = """You are an expert at extracting structured data from craft beer recipes.

Extract all information from the provided beer recipe text into the specified JSON schema.

Guidelines:
- Extract amounts with their units (lb, oz, kg, g for malts; oz, g for hops)
- For hops, identify usage: "boil" (with time), "flameout" (0 min), "dry hop", "FWH" (first wort hop), "whirlpool"
- Extract alpha acid percentages from hops (look for "a.a." or "AA" or "%")
- Identify yeast brand (Wyeast, White Labs, Fermentis, Lallemand, etc.) and product codes
- Extract specifications: OG, FG, ABV, IBU, SRM, efficiency, batch size
- Look for competition/award information in the description
- Extract brewer name if mentioned
- If a field is not present in the text, leave it as null
- For gravity values, use decimal format (e.g., 1.050 not 50)
"""

def extract_recipe(
    description: str,
    ingredients: str,
    directions: str,
    addition: str = "",
    title: str = "",
    style: str = "",
    max_retries: int = 3
) -> dict:
    """Extract structured recipe data using OpenAI GPT-4o."""
    
    # Combine all text for extraction
    full_text = f"""
TITLE: {title}
STYLE: {style}

DESCRIPTION:
{description}

INGREDIENTS:
{ingredients}

DIRECTIONS:
{directions}

EXTRACT VERSION:
{addition if addition and addition != 'N/A' else 'Not provided'}
"""
    
    for attempt in range(max_retries):
        try:
            response = client.beta.chat.completions.parse(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": EXTRACTION_PROMPT},
                    {"role": "user", "content": full_text}
                ],
                response_format=Recipe,
            )
            
            # Get the parsed recipe object
            recipe = response.choices[0].message.parsed
            return recipe.model_dump()
            
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = (attempt + 1) * 2  # Exponential backoff
                print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                print(f"Failed after {max_retries} attempts: {e}")
                return {"error": str(e), "title": title}
    
    return {"error": "Unknown error", "title": title}

print("Extraction function defined successfully!")

In [None]:
# Load data for extraction
df1 = pd.read_csv('../dataset/all_recipes_new.csv')
df2 = pd.read_csv('../dataset/all_recipes_new1.csv')
df_recipes = pd.concat([df1, df2]).reset_index(drop=True)

# Load URL data for style and beer_name
df_url = pd.read_csv('../dataset/all_urls.csv')
df_url = df_url.drop(['Unnamed: 0'], axis=1)
df_url = df_url.groupby('url').agg({
    'style': 'first',          
    'beer_name': 'first',      
    'final_results': 'first',  
    'category': list          
}).reset_index()

# Merge to get style and beer_name
df_recipes = pd.merge(df_recipes, df_url[['beer_name', 'style', 'url', 'category']], how='left', on='url')
df_recipes = df_recipes.fillna('')

print(f"Total recipes to process: {len(df_recipes)}")
df_recipes.head()

In [None]:
# Test extraction on a sample of 5 recipes
sample_size = 5
sample_df = df_recipes.head(sample_size)

print(f"Testing extraction on {sample_size} recipes...\n")
sample_results = []

for idx, row in sample_df.iterrows():
    print(f"Processing: {row['beer_name'][:50]}...")
    
    result = extract_recipe(
        description=row['description'],
        ingredients=row['ingredients'],
        directions=row['directions'],
        addition=row.get('addition', ''),
        title=row['beer_name'],
        style=row['style']
    )
    
    # Add URL and original category for reference
    result['url'] = row['url']
    result['original_category'] = row.get('category', [])
    
    sample_results.append(result)
    print(f"  -> Extracted {len(result.get('ingredients', {}).get('malts', []))} malts, "
          f"{len(result.get('ingredients', {}).get('hops', []))} hops")

print(f"\nCompleted {len(sample_results)} extractions")

In [None]:
# View sample extraction result
print(json.dumps(sample_results[0], indent=2, default=str))

In [None]:
# Process all recipes with progress bar and incremental saving
OUTPUT_FILE = '../dataset/recipes_extracted.json'
CHECKPOINT_FILE = '../dataset/recipes_checkpoint.json'

def process_all_recipes(df, start_idx=0, save_every=50):
    """Process all recipes with checkpointing."""
    
    # Load existing results if resuming
    if start_idx > 0 and os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            results = json.load(f)
        print(f"Resuming from index {start_idx}, loaded {len(results)} existing results")
    else:
        results = []
    
    # Process remaining recipes
    for idx in tqdm(range(start_idx, len(df)), desc="Extracting recipes"):
        row = df.iloc[idx]
        
        try:
            result = extract_recipe(
                description=row['description'],
                ingredients=row['ingredients'],
                directions=row['directions'],
                addition=row.get('addition', ''),
                title=row['beer_name'],
                style=row['style']
            )
            
            # Add metadata
            result['url'] = row['url']
            result['original_category'] = row.get('category', [])
            result['index'] = idx
            
            results.append(result)
            
            # Save checkpoint periodically
            if (idx + 1) % save_every == 0:
                with open(CHECKPOINT_FILE, 'w') as f:
                    json.dump(results, f, indent=2, default=str)
                print(f"\nCheckpoint saved at index {idx + 1}")
                
        except Exception as e:
            print(f"\nError at index {idx}: {e}")
            results.append({
                "error": str(e),
                "title": row['beer_name'],
                "url": row['url'],
                "index": idx
            })
        
        # Rate limiting - avoid hitting API limits
        time.sleep(0.5)
    
    # Save final results
    with open(OUTPUT_FILE, 'w') as f:
        json.dump(results, f, indent=2, default=str)
    
    print(f"\nCompleted! Saved {len(results)} recipes to {OUTPUT_FILE}")
    return results

# Uncomment the line below to run full extraction (will take time and cost money)
# all_results = process_all_recipes(df_recipes, start_idx=0)

In [None]:
# Convert extracted JSON to flattened DataFrame for analysis
def flatten_recipe(recipe: dict) -> dict:
    """Flatten nested recipe structure for DataFrame."""
    flat = {
        'title': recipe.get('title'),
        'style': recipe.get('style'),
        'brewer': recipe.get('brewer'),
        'source': recipe.get('source'),
        'year': recipe.get('year'),
        'description': recipe.get('description'),
        'url': recipe.get('url'),
        
        # Competition
        'competition_name': recipe.get('competition', {}).get('name') if recipe.get('competition') else None,
        'competition_year': recipe.get('competition', {}).get('year') if recipe.get('competition') else None,
        'competition_award': recipe.get('competition', {}).get('award') if recipe.get('competition') else None,
        
        # Specs
        'batch_size_gal': recipe.get('specs', {}).get('batch_size_gal'),
        'og': recipe.get('specs', {}).get('og'),
        'fg': recipe.get('specs', {}).get('fg'),
        'abv_pct': recipe.get('specs', {}).get('abv_pct'),
        'ibu': recipe.get('specs', {}).get('ibu'),
        'srm': recipe.get('specs', {}).get('srm'),
        'efficiency_pct': recipe.get('specs', {}).get('efficiency_pct'),
        
        # Ingredients as JSON strings for storage
        'malts_json': json.dumps(recipe.get('ingredients', {}).get('malts', [])),
        'hops_json': json.dumps(recipe.get('ingredients', {}).get('hops', [])),
        'yeast_json': json.dumps(recipe.get('ingredients', {}).get('yeast')),
        'adjuncts_json': json.dumps(recipe.get('ingredients', {}).get('adjuncts', [])),
        
        # Counts for quick analysis
        'num_malts': len(recipe.get('ingredients', {}).get('malts', [])),
        'num_hops': len(recipe.get('ingredients', {}).get('hops', [])),
        'num_adjuncts': len(recipe.get('ingredients', {}).get('adjuncts', [])),
        
        # Directions
        'mash_temp_F': recipe.get('directions', {}).get('mash', {}).get('temp_F') if recipe.get('directions', {}).get('mash') else None,
        'mash_time_min': recipe.get('directions', {}).get('mash', {}).get('time_min') if recipe.get('directions', {}).get('mash') else None,
        'boil_time_min': recipe.get('directions', {}).get('boil', {}).get('time_min') if recipe.get('directions', {}).get('boil') else None,
        'fermentation_temp_F': recipe.get('directions', {}).get('fermentation', {}).get('temp_F') if recipe.get('directions', {}).get('fermentation') else None,
        
        # Extract version
        'extract_version': recipe.get('extract_version'),
    }
    return flat

# Example: Convert sample results to DataFrame
if 'sample_results' in dir() and sample_results:
    df_extracted = pd.DataFrame([flatten_recipe(r) for r in sample_results if 'error' not in r])
    print(f"Flattened DataFrame shape: {df_extracted.shape}")
    display(df_extracted[['title', 'style', 'abv_pct', 'ibu', 'num_malts', 'num_hops']].head())

In [None]:
# Save to various formats
def save_extracted_data(results: list, base_path: str = '../dataset/recipes_extracted'):
    """Save extracted recipes to multiple formats."""
    
    # Filter out errors
    valid_results = [r for r in results if 'error' not in r]
    error_results = [r for r in results if 'error' in r]
    
    print(f"Valid recipes: {len(valid_results)}")
    print(f"Errors: {len(error_results)}")
    
    # 1. Save full JSON (nested structure)
    with open(f'{base_path}.json', 'w') as f:
        json.dump(valid_results, f, indent=2, default=str)
    print(f"Saved: {base_path}.json")
    
    # 2. Save flattened CSV/Excel
    df_flat = pd.DataFrame([flatten_recipe(r) for r in valid_results])
    df_flat.to_csv(f'{base_path}_flat.csv', index=False)
    df_flat.to_excel(f'{base_path}_flat.xlsx', index=False)
    print(f"Saved: {base_path}_flat.csv and .xlsx")
    
    # 3. Save normalized tables for database
    # Malts table
    malts_rows = []
    for i, r in enumerate(valid_results):
        for malt in r.get('ingredients', {}).get('malts', []):
            malts_rows.append({
                'recipe_id': i,
                'recipe_title': r.get('title'),
                **malt
            })
    df_malts = pd.DataFrame(malts_rows)
    df_malts.to_csv(f'{base_path}_malts.csv', index=False)
    print(f"Saved: {base_path}_malts.csv ({len(df_malts)} rows)")
    
    # Hops table
    hops_rows = []
    for i, r in enumerate(valid_results):
        for hop in r.get('ingredients', {}).get('hops', []):
            hops_rows.append({
                'recipe_id': i,
                'recipe_title': r.get('title'),
                **hop
            })
    df_hops = pd.DataFrame(hops_rows)
    df_hops.to_csv(f'{base_path}_hops.csv', index=False)
    print(f"Saved: {base_path}_hops.csv ({len(df_hops)} rows)")
    
    # Save errors for review
    if error_results:
        with open(f'{base_path}_errors.json', 'w') as f:
            json.dump(error_results, f, indent=2)
        print(f"Saved: {base_path}_errors.json")
    
    return df_flat

# Example usage (uncomment after running full extraction):
# df_final = save_extracted_data(all_results)