In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
import json
from transformers import AutoModelForCausalLM, AutoTokenizer
import pandas as pd
import ast  # Add this line
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import logging

MODEL_NAME = "microsoft/phi-2"
# MODEL_NAME = "HuggingFaceTB/SmolLM-1.7B"
MAX_LENGTH = 512
NUM_SAMPLES = 500

def load_and_preprocess_data():
    #logging.info("Loading and preprocessing dataset...")

    #try:
    #    from kaggle.api.kaggle_api_extended import KaggleApi
    #except ImportError:
    #    logging.error("Kaggle API not found. Please install it using: pip install kaggle")
    #    raise
    #os.environ['KAGGLE_USERNAME'] = 'rahulanilnair'
    #os.environ['KAGGLE_KEY'] = '51bb178e6b9c32072b20f0595db9f68a'

    #kaggle_dir = os.path.expanduser('~/.kaggle')
    #if not os.path.exists(kaggle_dir):
    #    os.makedirs(kaggle_dir)

    #kaggle_config = {
    #    "username": "rahulanilnair",
    #    "key": "51bb178e6b9c32072b20f0595db9f68a"
    #}
    #kaggle_json_path = os.path.join(kaggle_dir, 'kaggle.json')
    #with open(kaggle_json_path, 'w') as f:
    #    json.dump(kaggle_config, f)


    #os.chmod(kaggle_json_path, 0o600)

    #api = KaggleApi()
    #api.authenticate()

    #dataset_dir = 'datasets/foodcom'
    #if not os.path.exists(os.path.join(dataset_dir, 'PP_recipes.csv')):
    #    if not os.path.exists(dataset_dir):
    #        os.makedirs(dataset_dir)
    #    api.dataset_download_files(DATASET_NAME, path=dataset_dir, unzip=True)
    #    logging.info('Dataset downloaded and extracted.')
    #else:
    #    logging.info('Dataset already exists.')


    recipe_path = '/content/drive/My Drive/CSCI544 Project/Allergen Substitution/'
    recipes = pd.read_csv(recipe_path + 'RAW_recipes.csv',encoding='utf8')
    recipes=recipes[['name','ingredients','steps']]
    recipes['name'] = recipes['name'].apply(lambda x: [x] if isinstance(x, str) else x)
    recipes['steps']=recipes['steps'].apply(ast.literal_eval)
    def safe_literal_eval(val):
        if pd.isna(val):
            return []
        try:
            return ast.literal_eval(val)
        except (ValueError, SyntaxError):
            return []
    recipes['ingredients'] = recipes['ingredients'].apply(safe_literal_eval)
    subsample_size = len(recipes)
    recipes = recipes.sample(n=subsample_size, random_state=42)
    logging.info(f"Using a subsample of {len(recipes)} recipes")

    def format_recipe(row):
        try:
            name = row['name']
            if isinstance(name, list):
                name = ' '.join(map(str, name))  # Join list elements into a single string
            else:
                name = str(name)  # If it's not a list, just convert it to a string

            # Handle 'ingredients' field (ensure it's a list or convert to a string if it's not)
            ingredients = row['ingredients']
            if isinstance(ingredients, list):
                ingredients = ' '.join(map(str, ingredients))  # Join list elements into a single string
            else:
                ingredients = str(ingredients)  # If it's not a list, just convert it to a string

            # Handle 'steps' field (ensure it's a list or convert to a string if it's not)
            steps = row['steps']
            if isinstance(steps, list):
                steps = '\n'.join(map(str, steps))  # Join list elements into a single string with newlines
            else:
                steps = str(steps)  # If it's not a list, just convert it to a string

            # Format the recipe as a string
            prompt = f"Name: {name}\nIngredients: {ingredients}\nInstructions:"
            completion = steps  # Instructions are joined by newlines

            return prompt + completion
        except Exception as e:
            logging.warning(f"Error processing recipe: {str(e)}")
            return None

    formatted_data = recipes.apply(format_recipe, axis=1)
    formatted_data = formatted_data[formatted_data.notna()].tolist()
    copy_formatted_data=formatted_data.copy()
    train_val_text,test_texts=train_test_split(copy_formatted_data,test_size=0.1,random_state=42)
    train_texts,val_texts=train_test_split(train_val_text,test_size=1/9,random_state=42)


    logging.info(f"Split dataset into {len(train_texts)} training ,{len(val_texts)} validation samples and {len(test_texts)} test samples")
    return train_texts, val_texts,test_texts


def generate_recipe(model, tokenizer, ingredients, name):
    prompt = f"""You are an expert chef and recipe writer. Given a recipe name and a list of ingredients, create a high-quality, detailed recipe.

Create a detailed recipe for: {name}
Using these ingredients: {ingredients}

Recipe:"""

    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_LENGTH)
    inputs = {k: v.to(device=model.device) for k, v in inputs.items()}

    outputs = model.generate(
        **inputs,
        max_new_tokens=256,
        temperature=0.75,
        top_p=0.95,
        do_sample=True,
        no_repeat_ngram_size=4,
        repetition_penalty=1.3,
        pad_token_id=tokenizer.pad_token_id
    )

    generated_recipe = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return generated_recipe.replace(prompt, "").strip()

def generate_recipe_with_allergen_substitution(model, tokenizer, ingredients, allergens, name):
    prompt = f"""You are an expert chef and recipe writer with a deep understanding of culinary techniques and food allergies. Your goal is to create a detailed, high-quality recipe that uses the provided list of ingredients, while making substitutions for any allergens to ensure the recipe is safe for individuals with those allergies.

Please follow these instructions:

1. **Create a Recipe**: Write a full, detailed recipe based on the name and ingredients provided.
2. **Substitute Allergens**: Some people are allergic to certain ingredients. You must avoid these allergens in the recipe and suggest substitutions from the list of safe ingredients. If the allergen is an essential part of the recipe, ensure the substitute maintains the flavor and texture as much as possible.
3. **Ensure Clarity and Detail**: Provide precise instructions, including cooking methods, preparation steps, and any necessary tips. The recipe should be easy to follow for someone with basic cooking knowledge.

Create a recipe for: {name}
Using these ingredients: {ingredients}
Substitute these allergens for other ingredients: {allergens}

Recipe:"""

    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_LENGTH)
    inputs = {k: v.to(device=model.device) for k, v in inputs.items()}

    outputs = model.generate(
        **inputs,
        max_new_tokens=256,
        temperature=0.75,
        top_p=0.95,
        do_sample=True,
        no_repeat_ngram_size=4,
        repetition_penalty=1.3,
        pad_token_id=tokenizer.pad_token_id
    )

    generated_recipe = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return generated_recipe.replace(prompt, "").strip()

def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")


    print("Loading model and tokenizer...")
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    model = AutoModelForCausalLM.from_pretrained(MODEL_NAME)
    model.to(device)

    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
        model.config.pad_token_id = tokenizer.pad_token_id

    train_texts, val_texts,test_texts = load_and_preprocess_data()
    recipe_data=test_texts[:500]
    print(f"Generating {NUM_SAMPLES} recipes...")
    recipes = []
    allergens = ["milk", "eggs", "fish"]
    for recipe in tqdm(recipe_data):
        parts = recipe.split("\nInstructions:")
        header = parts[0].split("\nIngredients: ")
        name = header[0].replace("Name: ", "")
        ingredients = header[1]
        reference_instructions = parts[1] if len(parts) > 1 else ""
        generated_recipe = generate_recipe_with_allergen_substitution(model, tokenizer, ingredients, allergens, name)
        recipe_dict = {
            "name": name,
            "ingredients": ingredients,
            "allergens": allergens,
            "generation": generated_recipe
        }
        recipes.append(recipe_dict)

        # Save progress every 50 recipes
        if len(recipes) % 50 == 0:
            temp_file = f"generated_recipes_temp_{len(recipes)}.json"
            with open(temp_file, "w") as f:
                json.dump(recipes, f, indent=4)
            print(f"\nProgress saved to {temp_file}")

    google_path = '/content/drive/My Drive/CSCI544 Project/Allergen Substitution/Baseline Generations/Phi-2/'
    output_file = google_path + "phi2_generated_recipes_5_fixed_v1.json"
    with open(output_file, "w") as f:
        json.dump(recipes, f, indent=4)

    print(f"\nAll recipes generated and saved to {output_file}")

if __name__ == "__main__":
    main()

Using device: cuda
Loading model and tokenizer...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

Generating 500 recipes...


100%|██████████| 5/5 [01:10<00:00, 14.11s/it]



All recipes generated and saved to /content/drive/My Drive/CSCI544 Project/Allergen Substitution/Baseline Generations/Phi-2/phi2_generated_recipes_5_fixed_v1.json
