In [None]:
!pip install openai flask line-bot-sdk fuzzywuzzy
!pip install python-Levenshtein
!pip install openai==0.28
!pip install jellyfish

In [1]:
# 啟動SERVER前，先RUN這段，營養紀錄(GPT)
## 打包[記錄飲食]def
import openai
from fuzzywuzzy import fuzz
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import json
from datetime import datetime

def parse_user_input_with_gpt(input_text):
    print("GPT execute str to JSON")
    print("check input_text",input_text)
    # 設定 OpenAI API 金鑰
    openai_api_key = "XXXX"
    openai.api_key = openai_api_key
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "system",
                
                "content": (
                    "你是我的飲食記錄小助手，請將使用者輸入的飲食紀錄拆解成 JSON 格式，"
                    "包含以下欄位：項目, 數詞, 量詞, 食物來源, 食物名稱,飲料(0=否,1=是)。"
                    "例如： '一份三商巧福的清燉牛肉麵, 一杯紅茶'"
                    "應該輸出為: [{\"項目\": \"1\", \"數詞\": \"一\", \"量詞\": \"份\", \"食物來源\": \"三商巧福\", "
                    "\"食物名稱\": \"清燉牛肉麵\", \"飲料\": \"0\"}, {\"項目\": \"2\", \"數詞\": \"一\", \"量詞\": \"杯\", "
                    "\"食物來源\": \"\", \"食物名稱\": \"紅茶\", \"飲料\": \"1\"}]"
                    "如果使用者輸入的文字已經是JSON格式也沒關係，請按照以上格式輸出內容"
                )
            },
            {
                "role": "user",
                "content": f"請拆解並分析以下飲食紀錄:\n{input_text}"
            }
        ],
        response_format={"type": "json_object"}  # 正確設定 response_format
    )
    #response_content = response['choices'][0]['message']['content'].strip()
    # 將結果轉換為 Python 字典
    response_content = response['choices'][0]['message']['content'].strip()
    
    try:
        # 提取 JSON 資料
        response_json = response_content.split("```")[1] if "```" in response_content else response_content
        parsed_items = json.loads(response_json)
        # 打印分析結果
        print("分析結果 (JSON 格式):")
        print(json.dumps(parsed_items, indent=4, ensure_ascii=False))
        
        # 判斷是否存在第二層資料，若存在則取第二層，否則直接使用第一層資料
        if isinstance(next(iter(parsed_items.values())), list):
            parsed_items = next(iter(parsed_items.values()), [])
        else:
            parsed_items = [parsed_items]
    
    except Exception as e:
        print("處理過程中發生錯誤:", str(e))
        parsed_items = None
    
    return parsed_items

def insert_into_database(user_id, input_text, meal_type, nutritional_summary, engine):
    # 建立 meal_type 對應字典
    meal_type_map = {
        "早餐": 0,
        "午餐": 1,
        "晚餐": 2,
        "點心": 3
    }

    # 檢查 meal_type 是否在字典中，若不存在則拋出錯誤
    if meal_type not in meal_type_map:
        raise ValueError(f"無效的 meal_type：{meal_type}. 必須為 '早餐'、'午餐'、'晚餐' 或 '點心'.")

    # 轉換 meal_type 為對應的數值
    meal_type_value = meal_type_map[meal_type]

    
    query = text("""
            INSERT INTO record (
                user_id, input_text, meal_type, energy, protein, carbohydrate, lipid_fat,
                grains, meat_beans, milk, vegetables, fruits, oil_nuts, create_time
            ) VALUES (
                :user_id, :input_text, :meal_type, :energy, :protein, :carbohydrate, :lipid_fat,
                :grains, :meat_beans, :milk, :vegetables, :fruits, :oil_nuts, NOW()
            )
        """)
    
    data = {
        "user_id": user_id,
        "input_text": input_text,
        "meal_type": meal_type_value,
        "energy": nutritional_summary["熱量"],
        "protein": nutritional_summary["蛋白質"],
        "carbohydrate": nutritional_summary["醣類"],
        "lipid_fat": nutritional_summary["脂質"],
        "grains": nutritional_summary["全穀雜糧類"],
        "meat_beans": nutritional_summary["豆魚蛋肉類"],
        "milk": nutritional_summary["乳品類"],
        "vegetables": nutritional_summary["蔬菜類"],
        "fruits": nutritional_summary["水果類"],
        "oil_nuts": nutritional_summary["油脂與堅果種子類"],
    }

    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        session.execute(query, data)
        session.commit()
        print("資料成功插入資料表")
    except Exception as e:
        session.rollback()
        print("插入資料失敗:", str(e))
    finally:
        session.close()
    
def record_diet(input_text, openai_api_key, db_settings, user_id, meal_type):
    """
    分析並記錄飲食，並加總營養數值。
    
    :param input_text: str, 使用者輸入的飲食紀錄文本
    :param openai_api_key: str, OpenAI API 金鑰
    :param db_settings: dict, 資料庫連線設定，包含 host, port, user, password, db
    :user_id: str, LineBot UserID 唯一識別值
    :meal_type: str, 餐別，用於新增至資料庫的欄位
    :return: tuple, (記錄文字, 營養數值總計)
    """
    # 設定 OpenAI API 金鑰
    openai.api_key = openai_api_key

    # 建立 MySQL 連線引擎
    engine = create_engine(
        f"mysql+pymysql://{db_settings['user']}:{db_settings['password']}@"
        f"{db_settings['host']}:{db_settings['port']}/{db_settings['db']}?charset=utf8mb4"
    )

    def find_in_database(source, name):
        Session = sessionmaker(bind=engine)
        session = Session()
        query = text("""
            SELECT 
                Source, 
                Name, 
                Energy, 
                Protein, 
                Carbohydrate, 
                Total_lipid_fat, 
                grains,  
                meat_beans_low_fat, 
                meat_beans_medium_fat, 
                meat_beans_high_fat, 
                meat_beans_super_high_fat, 
                milk_whole_fat, 
                milk_low_fat, 
                milk_skim, 
                vegetables, 
                fruits, 
                oil, 
                nuts
            FROM 
                new 
            WHERE 
                Source = :source 
                AND Name = :name
        """)

        try:
            result = session.execute(query, {"source": source, "name": name}).mappings().fetchone()
            return dict(result) if result else None
        except Exception as e:
            print("查詢資料庫失敗:", str(e))
            return None
        finally:
            session.close()

    def find_similar_food(name, source, drink):
        Session = sessionmaker(bind=engine)
        session = Session()
        # SQL 查詢語句，用於過濾飲料參數
        query_all = text("""
            SELECT 
                Source, 
                Name, 
                Energy, 
                Protein, 
                Carbohydrate, 
                Total_lipid_fat, 
                grains,  
                meat_beans_low_fat, 
                meat_beans_medium_fat, 
                meat_beans_high_fat, 
                meat_beans_super_high_fat, 
                milk_whole_fat, 
                milk_low_fat, 
                milk_skim, 
                vegetables, 
                fruits, 
                oil, 
                nuts,
                Drinks
            FROM 
                new 
            WHERE Drinks=:drink
        """)

        try:
            all_data = pd.read_sql(query_all, session.bind, params={"drink": drink})
            
            # 如果結果為空，返回 None
            if all_data.empty:
                print("No data found for the specified drink.")
                return None
                
            # 1. 優先處理來源相同的匹配
            if source:
                # 篩選來源相同的記錄
                source_filtered_data = all_data[all_data['Source'] == source].copy() # 確保是副本，避免出現錯誤
                # 計算名稱的相似度（使用 partial_ratio 更適合部分匹配）
                if not source_filtered_data.empty:
                    source_filtered_data['similarity'] = source_filtered_data['Name'].apply(lambda x: fuzz.partial_ratio(x, name))
                    closest_match = source_filtered_data.loc[source_filtered_data['similarity'].idxmax()]
                    # 返回相似度足夠高的結果
                    if closest_match['similarity'] > 10: # 設定來源相同時的相似度閾值
                        return closest_match.to_dict()
                        
            # 2. 處理來源不同的匹配
            all_data['similarity'] = all_data['Name'].apply(lambda x: fuzz.partial_ratio(x, name))
            closest_match = all_data.loc[all_data['similarity'].idxmax()]
            return closest_match.to_dict()
        except Exception as e:
            print("查詢相似食物失敗:", str(e))
            return None
        finally:
            session.close()
    
    parsed_items = parse_user_input_with_gpt(input_text)
    print("parsed_items",parsed_items)
    nutritional_summary = {"熱量": 0, "蛋白質": 0, "脂質": 0, "醣類": 0, "全穀雜糧類": 0, "豆魚蛋肉類": 0, "乳品類": 0, "蔬菜類": 0, "水果類": 0, "油脂與堅果種子類": 0}
    detailed_items = []
    used_similar_food = False
    for item in parsed_items:
        food_data = find_in_database(item.get('食物來源'), item.get('食物名稱'))
        if food_data:
            nutritional_summary["熱量"] += food_data.get("Energy", 0)
            nutritional_summary["蛋白質"] += food_data.get("Protein", 0)
            nutritional_summary["脂質"] += food_data.get("Total_lipid_fat", 0)
            nutritional_summary["醣類"] += food_data.get("Carbohydrate", 0)
            nutritional_summary["全穀雜糧類"] += round(food_data.get("grains", 0), 1)
            nutritional_summary["豆魚蛋肉類"] += round(
                food_data.get("meat_beans_low_fat", 0) +
                food_data.get("meat_beans_medium_fat", 0) +
                food_data.get("meat_beans_high_fat", 0) +
                food_data.get("meat_beans_super_high_fat", 0),
                1
            )
            nutritional_summary["乳品類"] += round(
                food_data.get("milk_whole_fat", 0) +
                food_data.get("milk_low_fat", 0) +
                food_data.get("milk_skim", 0),
                1
            )
            nutritional_summary["蔬菜類"] += round(food_data.get("vegetables", 0), 1)
            nutritional_summary["水果類"] += round(food_data.get("fruits", 0), 1)
            nutritional_summary["油脂與堅果種子類"] += round(
                food_data.get("oil", 0) +
                food_data.get("nuts", 0),
                1 
            )
            detailed_items.append(f"\n{item['食物來源']} - {item['食物名稱']}")
        else:
            similar_food = find_similar_food(item.get('食物名稱'), item.get('食物來源'), item.get('飲料'))
            if similar_food:
                used_similar_food = True
                nutritional_summary["熱量"] += similar_food.get("Energy", 0)
                nutritional_summary["蛋白質"] += similar_food.get("Protein", 0)
                nutritional_summary["脂質"] += similar_food.get("Total_lipid_fat", 0)
                nutritional_summary["醣類"] += similar_food.get("Carbohydrate", 0)
                nutritional_summary["全穀雜糧類"] += round(similar_food.get("grains", 0), 1)
                nutritional_summary["豆魚蛋肉類"] += round(
                    similar_food.get("meat_beans_low_fat", 0) +
                    similar_food.get("meat_beans_medium_fat", 0) +
                    similar_food.get("meat_beans_high_fat", 0) +
                    similar_food.get("meat_beans_super_high_fat", 0),
                    1
                )
                nutritional_summary["乳品類"] += round(
                    similar_food.get("milk_whole_fat", 0) +
                    similar_food.get("milk_low_fat", 0) +
                    similar_food.get("milk_skim", 0),
                    1
                )
                nutritional_summary["蔬菜類"] += round(similar_food.get("vegetables", 0), 1)
                nutritional_summary["水果類"] += round(similar_food.get("fruits", 0), 1)
                nutritional_summary["油脂與堅果種子類"] += round(
                    similar_food.get("oil", 0) +
                    similar_food.get("nuts", 0),
                    1
                )
                detailed_items.append(f"\n{similar_food['Source']} - {similar_food['Name']}")
            else:
                detailed_items.append(f"{item['食物來源']} - {item['食物名稱']}（未找到）")
    # 寫入資料庫
    insert_into_database(user_id, input_text, meal_type, nutritional_summary, engine)
    result_message = (
        f"""(使用相似品項加總營養數值)\n記錄項目：{'\n'.join(detailed_items)}"""
        if used_similar_food else
        f"""記錄項目：{'\n'.join(detailed_items)}"""
    )

    return result_message, nutritional_summary

In [None]:
# 測試，紀錄飲食，即上面區塊

# 設定 OpenAI API 金鑰
openai_api_key = "XXXX"

# 測試輸入文本
input_text = "一份小吃店的黑胡椒烏龍鐵板麵，一顆小吃店的滷蛋"
input_text2 = "一個麥當勞的滿福堡，一塊麥當勞的薯餅"
input_text3 = [{"項目": "1", "數詞": "一", "量詞": "碗", "食物來源": "小吃店", "食物名稱": "餛飩湯", "飲料": "0"}, {"項目": "2", "數詞": "一", "量詞": "顆", "食物來源": "小吃店", "食物名稱": "滷蛋", "飲料": "0"}]
input_text4 = "一顆豆漿店的饅頭"
input_text5 = "一碗餛飩湯，一顆滷蛋"

# 檢查是否為列表並進行組合
if isinstance(input_text3, list):
    input_text3 = "，".join(
        f"{item['數詞']}{item['量詞']}{item['食物來源']}的{item['食物名稱']}"
        for item in input_text3
    )
    print("input_text3",input_text3)

# 資料庫連線設定

db_settings = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'OUTS',
    'password': 'test12345',
    'db': 'balance_diet'
}
user_id = '23##%23SWrf%%%'
meal_type = '午餐' # 1=午餐

# 執行函式並取得結果
result_message, nutritional_summary = record_diet(input_text5, openai_api_key, db_settings, user_id, meal_type)

# 打印結果
print("記錄訊息:")
print(result_message)

print("\n營養總計:")
for key, value in nutritional_summary.items():
    print(f"{key}: {round(value, 1):.1f}")
