## 데이터 Load (user,item, rule)

In [1]:
# -*- coding: utf-8 -*-
from __future__ import annotations
import pandas as pd
import numpy as np
import math, re, ast, operator
from typing import Any, Dict, List, Optional, Tuple
import ast
from pathlib import Path

In [2]:
RULE_CSV = Path("rulebased_new.csv")
USER_CSV = Path("df_user_mapped.csv")
ITEM_CSV = Path("제품 메타데이터 최종 - 제품 메타데이터 최종.csv")

rules = pd.read_csv(RULE_CSV)
items = pd.read_csv(ITEM_CSV)
users = pd.read_csv(USER_CSV)

#### 설문조사 데이터 -> recsys 모델 input & rulebase input 형식으로 맞춤 (합집합)

**주의**: 아래 셀들을 순서대로 실행해야 합니다:
1. 셀 4: 파싱 유틸 함수들 정의
2. 셀 5: 매핑 함수들 정의
3. 셀 6: convert_user_to_input_format 함수 정의
4. 셀 7: users_converted 생성 및 확인

In [3]:
# ================================================================
# rulebased_new.csv 룰 실행을 위한 추가 필드 매핑 함수
# ================================================================


def map_diet_type(value: Any) -> str:
    """식단 유형을 표준 형식으로 매핑 (채식 위주 여부 확인)"""
    if pd.isna(value):
        return "일반식"
    val = str(value).strip().lower()

    if "채식" in val or "비건" in val or "vegetarian" in val:
        return "채식 위주"
    return "일반식"


def calculate_lean_mass(
    weight: Optional[float], gender: Optional[str], age: Optional[float]
) -> Optional[float]:
    """제지방량(LBM) 추정 (없는 경우 간단한 공식 사용)"""
    if weight is None:
        return None

    # 간단한 추정 공식 (Deurenberg 공식 기반)
    if gender == "여성":
        # 여성: LBM = weight * 0.78 (대략적)
        return weight * 0.78
    elif gender == "남성":
        # 남성: LBM = weight * 0.85 (대략적)
        return weight * 0.85
    else:
        # 성별 불명시 평균값
        return weight * 0.815


def workout_duration_to_minutes(training_duration: str) -> Optional[float]:
    """training_duration 문자열을 분(숫자)으로 변환 (룰 비교용)
    - '60-90' -> 75
    - '<60' -> 59
    - '90+' -> 90
    """
    if pd.isna(training_duration):
        return None

    s = str(training_duration).strip()

    # 범위: '60-90' -> 평균값(75)
    m = re.match(r"^(\d+)\s*-\s*(\d+)$", s)
    if m:
        a, b = float(m.group(1)), float(m.group(2))
        return (a + b) / 2.0

    # 미만: '<60' -> 59
    m = re.match(r"^<\s*(\d+)$", s)
    if m:
        return float(m.group(1)) - 1

    # 이상: '>=60' or '60+' -> 60
    m = re.match(r"^(>=\s*)?(\d+)\+?$", s)
    if m:
        return float(m.group(2))

    # 숫자면 숫자
    try:
        return float(s)
    except ValueError:
        return None


print("✅ 룰 실행을 위한 추가 매핑 함수 정의 완료!")

✅ 룰 실행을 위한 추가 매핑 함수 정의 완료!


In [4]:
# ================================================================
# convert_user_to_input_format 함수 업데이트 (rulebased_new.csv 룰 지원)
# ================================================================

# 기존 함수를 덮어쓰기 (룰 실행을 위한 추가 필드 포함)


def convert_user_to_input_format(row: pd.Series) -> Dict[str, Any]:
    """사용자 데이터 한 행을 INPUT_FORMAT 형식으로 변환 (rulebased_new.csv 룰 지원)"""
    try:
        # 기본 정보 추출
        age = parse_float_field(row.get("2) 나이(만 나이)", row.get("나이")))
        gender = parse_string_field(row.get("3) 성별", row.get("성별")))
        weight = parse_float_field(row.get("6) 몸무게", row.get("몸무게")))

        # 제지방량 계산 (없는 경우 추정)
        lean_mass = calculate_lean_mass(weight, gender, age)

        # 운동 관련
        training_experience = map_training_experience(
            row.get("8) 운동 활동 기간", row.get("운동 활동 기간"))
        )
        training_duration_str = map_training_duration(
            row.get(
                "9) 주에 몇 회 정도 운동을 진행하시나요?(택1)", row.get("운동 빈도")
            )
        )
        training_time = map_training_time(
            row.get(
                "12-3) 시간 기준으로 운동 시작 시간이 언제인가요?(택 1)",
                row.get("운동 시간"),
            )
        )

        # workout_duration: training_duration을 분(숫자)으로 변환 (룰 비교용)
        workout_duration = workout_duration_to_minutes(training_duration_str)

        # 다이어트/목표
        diet_phase = map_diet_phase(
            row.get(
                "13-13) 해당 프로틴 이용 당시 운동 목적은 무엇이었나요? (체중)",
                row.get("운동 목적"),
            )
        )
        user_goal = map_user_goals(
            row.get(
                "13-4) 프로틴의 효과에서 고려한 세부 항목을 선택해주세요 (복수선택 가능)",
                row.get("효과"),
            )
        )  # list[str]

        # diet_type: 채식 위주 여부 확인
        diet_type = map_diet_type(
            row.get("11) 평소 챙기는 끼니는 어떻게 되나요?(복수선택 가능)", "")
        )

        # 건강/제약
        health_conditions = map_health_conditions(
            row.get("10) 알러지 또는 민감성분(복수선택 가능)", row.get("알러지"))
        )  # list[str]
        allergy = map_allergy(
            row.get("10) 알러지 또는 민감성분(복수선택 가능)", row.get("알러지"))
        )  # list[str]

        user: Dict[str, Any] = {
            # 기본 정보
            "age": age,
            "gender": gender,
            "weight": weight,
            "weight_kg": weight,  # 룰에서 weight_kg 사용
            "lean_mass": lean_mass,
            "lbm_kg": lean_mass,  # 룰에서 lbm_kg 사용
            # 운동 관련
            "training_experience": training_experience,
            "training_duration": training_duration_str,  # 문자열 형식 ("60-90", "<60", "90+")
            "workout_duration": workout_duration,  # 숫자 형식 (분) - 룰 비교용
            "training_time": training_time,
            "workout_time": training_time,  # 룰에서 workout_time 사용
            "user_gender": gender,  # 룰에서 user_gender 사용
            # 다이어트/목표
            "diet_phase": diet_phase,
            "diet_type": diet_type,  # "일반식" 또는 "채식 위주"
            "user_goal": user_goal,  # list[str]
            # 건강/제약
            "health_conditions": health_conditions,  # list[str]
            "allergy": allergy,  # list[str]
            "current_stack": [],  # CSV에 없음 (빈 리스트)
            "is_dehydrated": False,  # CSV에 없음
            # 룰에서 필요한 추가 필드들 (CSV에 없어서 기본값)
            "sweat_amount": None,  # "많이 흘린다", "적당히 흘린다" 등 (데이터 없음)
            "caffeine_habbit": None,  # "거의 마시지 않음" 등 (데이터 없음)
            "workout_type": None,  # "유산소", "무산소" 등 (데이터 없음)
            "training_intensity": None,  # "고", "중", "저" (데이터 없음)
            "oral_hygiene": "해당 없음",  # "구강청결제 사용" 등
            "environment_heat_humid": None,  # 환경 조건
            "intake_period": None,
            "cvd_risk": None,  # 심혈관 위험
        }

        return user

    except Exception as e:
        print(f"사용자 변환 중 오류: {e}")
        import traceback

        traceback.print_exc()
        return None


print("✅ convert_user_to_input_format 함수 업데이트 완료 (룰 지원)!")

✅ convert_user_to_input_format 함수 업데이트 완료 (룰 지원)!


In [5]:
# ================================================================
# users_converted 생성: 전체 사용자 데이터를 INPUT_FORMAT 형식으로 변환
# ================================================================


# convert_users_df_to_input_format 함수 정의
def convert_users_df_to_input_format(df_users: pd.DataFrame) -> pd.DataFrame:
    """전체 사용자 데이터프레임을 INPUT_FORMAT 형식으로 변환"""
    converted_users = []

    for idx, row in df_users.iterrows():
        user = convert_user_to_input_format(row)
        if user is not None:
            converted_users.append(user)

    return pd.DataFrame(converted_users)


# users_converted 생성 (셀 4, 5에서 필요한 함수들이 이미 정의되어 있어야 함)
try:
    print("🔄 사용자 데이터를 INPUT_FORMAT 형식으로 변환 중...")
    users_converted = convert_users_df_to_input_format(users)

    print(f"✅ 변환 완료: {len(users_converted)}명의 사용자 데이터")
    print(f"📋 변환된 데이터 필드: {len(users_converted.columns)}개")
    print(
        f"   필드 목록: {', '.join(users_converted.columns.tolist()[:10])}..."
    )  # 처음 10개만 표시

except NameError as e:
    print(f"❌ 오류: 필요한 함수가 정의되지 않았습니다.")
    print(f"   오류 내용: {e}")
    print(f"   해결 방법: 셀 4와 셀 5를 먼저 실행하세요.")
except Exception as e:
    print(f"❌ 변환 중 오류 발생: {e}")
    import traceback

    traceback.print_exc()

# 필요한 파싱 함수들이 이미 정의되어 있는지 확인하고, 없으면 정의
if "parse_list_field" not in globals():

    def _dedup_list(xs: List[str]) -> List[str]:
        """공백 제거 후 중복 제거(순서 보존)"""
        seen = set()
        out = []
        for x in xs:
            if x is None:
                continue
            s = str(x).strip()
            if not s or s == "없음" or s.lower() == "nan":
                continue
            if s not in seen:
                seen.add(s)
                out.append(s)
        return out

    def parse_list_field(value: Any) -> List[str]:
        """CSV 값을 list[str]로 변환"""
        if pd.isna(value) or value == "" or str(value).strip() == "없음":
            return []
        if isinstance(value, str):
            items = re.split(r"[,/|]+", value)
            return _dedup_list(items)
        if isinstance(value, (list, tuple, set)):
            return _dedup_list([str(v) for v in value])
        return _dedup_list([str(value)])

    def parse_boolean_field(value: Any) -> bool:
        """CSV 값을 boolean으로 변환"""
        if pd.isna(value):
            return False
        str_val = str(value).strip().lower()
        return str_val in ["네", "예", "true", "yes", "1", "y"]

    def parse_float_field(value: Any) -> Optional[float]:
        """CSV 값을 float으로 변환"""
        if pd.isna(value) or value == "":
            return None
        try:
            return float(value)
        except (ValueError, TypeError):
            return None

    def parse_string_field(value: Any) -> Optional[str]:
        """CSV 값을 문자열로 변환"""
        if pd.isna(value) or value == "":
            return None
        return str(value).strip()

    def map_training_experience(value: Any) -> str:
        """운동 경험을 표준 형식으로 매핑"""
        if pd.isna(value):
            return "초보"
        val = str(value).strip().lower()

        # 년수 기반 매핑
        if "년" in val:
            years = re.search(r"(\d+(?:\.\d+)?)", val)
            if years:
                year_num = float(years.group(1))
                if year_num < 1:
                    return "초보"
                elif year_num < 3:
                    return "중급"
                else:
                    return "숙련자"

        # 직접 매핑
        if val in ["초보", "beginner"]:
            return "초보"
        if val in ["중급", "intermediate"]:
            return "중급"
        if val in ["숙련", "숙련자", "advanced"]:
            return "숙련자"

        return "초보"

    def map_training_duration(value: Any) -> str:
        """운동 시간을 표준 형식으로 매핑"""
        if pd.isna(value):
            return "60-90"
        val = str(value).strip()

        # 숫자 기반 매핑 (주당 횟수 등)
        if val in ["3.0", "3", "3회"]:
            return "<60"
        elif val in ["4.0", "4", "4회"]:
            return "60-90"
        elif val in ["5.0", "5", "5회", "6.0", "6", "6회"]:
            return "90+"
        return "60-90"

    def map_training_time(value: Any) -> str:
        """운동 시간대를 표준 형식으로 매핑"""
        if pd.isna(value):
            return "저녁"
        val = str(value).strip()

        if "오전" in val or "05" in val or "12시" in val:
            return "오전"
        elif "오후" in val:
            return "오후"
        elif "저녁" in val or "18시" in val or "22시" in val:
            return "저녁"
        elif "밤" in val:
            return "밤"
        return "저녁"

    def map_diet_phase(value: Any) -> str:
        """다이어트 단계를 표준 형식으로 매핑"""
        if pd.isna(value):
            return "유지"
        val = str(value).strip()

        if "감량" in val or "체지방 감소" in val:
            return "체지방 감소"
        if "증량" in val or "벌크업" in val:
            return "벌크업"
        if "유지" in val:
            return "유지"
        return "유지"

    def map_user_goals(value: Any) -> List[str]:
        """사용자 목표를 표준 형식(list[str])으로 매핑"""
        if pd.isna(value):
            return ["회복"]

        raw = " ".join(parse_list_field(value))
        goals = []
        if ("근육량 증가" in raw) or ("근성장" in raw):
            goals.append("근성장")
        if ("운동 후 회복" in raw) or ("회복" in raw):
            goals.append("회복")
        if ("운동 전 에너지" in raw) or ("퍼포먼스" in raw):
            goals.append("퍼포먼스")
        if ("체지방 감소" in raw) or ("감량" in raw):
            goals.append("체지방 감소")
        if "집중" in raw:
            goals.append("집중력")

        goals = _dedup_list(goals)
        return goals if goals else ["회복"]

    def map_health_conditions(value: Any) -> List[str]:
        """건강 상태를 표준 형식(list[str])으로 매핑"""
        if pd.isna(value) or str(value).strip() == "없음":
            return []
        raw = " ".join(parse_list_field(value))
        conditions = []

        if ("유당불내증" in raw) or ("유당 불내증" in raw):
            conditions.append("유당 불내증")
        if ("카페인 민감성" in raw) or ("수면장애" in raw):
            conditions.append("수면장애")
        if "신장 질환" in raw:
            conditions.append("신장 질환")
        if "저혈압" in raw:
            conditions.append("저혈압")
        if "고혈압" in raw:
            conditions.append("고혈압")
        if "불안장애" in raw:
            conditions.append("불안장애")

        return _dedup_list(conditions)

    def map_allergy(value: Any) -> List[str]:
        """알러지를 표준 형식(list[str])으로 매핑"""
        if pd.isna(value) or str(value).strip() == "없음":
            return []
        raw_list = parse_list_field(value)
        out = []
        raw_join = " ".join(raw_list)
        if "카페인 민감성" in raw_join or "카페인" in raw_list:
            out.append("카페인")
        if "유당불내증" in raw_join or "유당" in raw_list:
            out.append("유당")
        return _dedup_list(out)

🔄 사용자 데이터를 INPUT_FORMAT 형식으로 변환 중...
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 오류: name 'parse_float_field' is not defined
사용자 변환 중 

Traceback (most recent call last):
  File "/var/folders/yd/4pvx9_5d6lj5__k2j2r0qs_h0000gp/T/ipykernel_18955/868808076.py", line 12, in convert_user_to_input_format
    age = parse_float_field(row.get("2) 나이(만 나이)", row.get("나이")))
          ^^^^^^^^^^^^^^^^^
NameError: name 'parse_float_field' is not defined
Traceback (most recent call last):
  File "/var/folders/yd/4pvx9_5d6lj5__k2j2r0qs_h0000gp/T/ipykernel_18955/868808076.py", line 12, in convert_user_to_input_format
    age = parse_float_field(row.get("2) 나이(만 나이)", row.get("나이")))
          ^^^^^^^^^^^^^^^^^
NameError: name 'parse_float_field' is not defined
Traceback (most recent call last):
  File "/var/folders/yd/4pvx9_5d6lj5__k2j2r0qs_h0000gp/T/ipykernel_18955/868808076.py", line 12, in convert_user_to_input_format
    age = parse_float_field(row.get("2) 나이(만 나이)", row.get("나이")))
          ^^^^^^^^^^^^^^^^^
NameError: name 'parse_float_field' is not defined
Traceback (most recent call last):
  File "/var/folders/yd/4pvx9_5d6lj

In [6]:
# ================================================================
# 변환된 데이터 확인 (rulebased_new.csv 룰 지원 필드 포함)
# ================================================================

print("📋 변환된 데이터 필드 목록 (rulebased_new.csv 룰 지원):")
print(f"  총 {len(users_converted.columns)}개 필드")
print(f"\n  필드 목록:")
for i, col in enumerate(users_converted.columns, 1):
    print(f"    {i:2d}. {col}")

print(f"\n📊 변환된 데이터 샘플 (첫 번째 사용자):")
if len(users_converted) > 0:
    sample = users_converted.iloc[0].to_dict()

    # 룰에서 중요한 필드들만 먼저 출력
    important_fields = [
        "age",
        "gender",
        "weight",
        "weight_kg",
        "lean_mass",
        "lbm_kg",
        "training_experience",
        "training_duration",
        "workout_duration",
        "training_time",
        "workout_time",
        "user_gender",
        "diet_phase",
        "diet_type",
        "user_goal",
        "health_conditions",
        "allergy",
    ]
    print("\n  [룰 필수 필드]")
    for key in important_fields:
        if key in sample:
            value = sample[key]
            # 리스트는 간단히 표시
            if isinstance(value, list):
                print(f"    {key}: {value}")
            else:
                print(f"    {key}: {value}")

    print("\n  [추가 필드 (기본값)]")
    for key, value in sample.items():
        if key not in important_fields:
            print(f"    {key}: {value}")

📋 변환된 데이터 필드 목록 (rulebased_new.csv 룰 지원):
  총 0개 필드

  필드 목록:

📊 변환된 데이터 샘플 (첫 번째 사용자):


In [7]:
# ================================================================
# 사용자별 타이밍별 주원료 제품 랭킹 생성 함수
# ================================================================


def get_main_ingredient_ranking(
    user_id,
    model,
    dataset,
    user_features_matrix,
    item_features_matrix,
    df_item_raw,
    timing="Pre",
    topk=10,
):
    """
    특정 사용자에 대해 타이밍별 주원료 제품 랭킹 반환

    Parameters:
    -----------
    user_id : int or str
        사용자 ID
    timing : str
        "Pre", "Intra", "Post" 중 하나
    topk : int
        상위 k개 제품 반환

    Returns:
    --------
    pd.DataFrame : 주원료 제품 랭킹 (product_id, category, product_name, Predicted_Score, rank 포함)
    """
    # 1. 타이밍별 추천 받기
    recs = get_filtered_recommendations(
        user_id=user_id,
        model=model,
        dataset=dataset,
        user_features_matrix=user_features_matrix,
        item_features_matrix=item_features_matrix,
        df_item_raw=df_item_raw,
        timing=timing,
        k_total=250,  # 넓은 범위에서 추천 받기
        k_final=250,  # 모든 추천 결과 받기
    )

    if recs.empty:
        return pd.DataFrame()

    # 2. 주원료 제품만 필터링
    if "ingredient_type" in df_item_raw.columns:
        # 메타데이터와 병합하여 ingredient_type 정보 추가
        recs_with_meta = recs.merge(
            df_item_raw[
                [
                    ITEM_ID_COL,
                    "ingredient_type",
                    "category",
                    "product_name",
                    "brand_name_kor",
                ]
            ],
            on=ITEM_ID_COL,
            how="left",
        )

        # 주원료만 필터링
        main_ing_recs = recs_with_meta[
            recs_with_meta["ingredient_type"] == "주원료"
        ].copy()
    else:
        # ingredient_type 컬럼이 없으면 전체 추천 결과 반환
        main_ing_recs = recs.copy()
        print("⚠️ ingredient_type 컬럼이 없어 전체 추천 결과를 반환합니다.")

    # 3. Predicted_Score 기준 내림차순 정렬 및 랭킹 추가
    main_ing_recs = main_ing_recs.sort_values(
        by="Predicted_Score", ascending=False
    ).reset_index(drop=True)

    main_ing_recs["rank"] = main_ing_recs.index + 1

    # 4. 상위 k개만 반환
    result = main_ing_recs.head(topk)[
        [
            "rank",
            ITEM_ID_COL,
            "category",
            "product_name",
            "brand_name_kor",
            "Predicted_Score",
        ]
    ]

    return result


def get_all_timing_main_ingredient_rankings(
    user_id,
    model,
    dataset,
    user_features_matrix,
    item_features_matrix,
    df_item_raw,
    topk=10,
):
    """
    특정 사용자에 대해 Pre/Intra/Post 모든 타이밍별 주원료 제품 랭킹 반환

    Returns:
    --------
    dict : {"Pre": DataFrame, "Intra": DataFrame, "Post": DataFrame}
    """
    results = {}

    for timing in ["Pre", "Intra", "Post"]:
        results[timing] = get_main_ingredient_ranking(
            user_id=user_id,
            model=model,
            dataset=dataset,
            user_features_matrix=user_features_matrix,
            item_features_matrix=item_features_matrix,
            df_item_raw=df_item_raw,
            timing=timing,
            topk=topk,
        )

    return results


print("✅ 주원료 제품 랭킹 함수 정의 완료!")

✅ 주원료 제품 랭킹 함수 정의 완료!


## 학습된 모델 가지고오기

In [8]:
# ================================================================
# 저장된 LightFM 모델 로드
# ================================================================
import pickle
import os
import pandas as pd
import numpy as np
from lightfm import LightFM

# 모델이 저장된 경로 설정
MODEL_DIR = "lightfm_artifacts"
MODEL_PATH = os.path.join(MODEL_DIR, "lightfm_model_latest.pkl")

# 모델이 없는 경우 타임스탬프가 포함된 파일명으로 찾기
if not os.path.exists(MODEL_PATH):
    # 최신 파일 찾기
    if os.path.exists(MODEL_DIR):
        model_files = [f for f in os.listdir(MODEL_DIR) if f.endswith(".pkl")]
        if model_files:
            # 파일명에서 타임스탬프 추출하여 최신 파일 선택
            model_files.sort(reverse=True)
            MODEL_PATH = os.path.join(MODEL_DIR, model_files[0])
            print(f"⚠️ 최신 모델 링크를 찾을 수 없어 타임스탬프 파일 사용: {MODEL_PATH}")
        else:
            raise FileNotFoundError(
                f"❌ 모델 파일을 찾을 수 없습니다. {MODEL_DIR} 디렉토리를 확인하세요."
            )
    else:
        raise FileNotFoundError(f"❌ 모델 디렉토리를 찾을 수 없습니다: {MODEL_DIR}")

try:
    # 모델 로드
    with open(MODEL_PATH, "rb") as f:
        artifacts = pickle.load(f)

    # 개별 변수로 추출
    model = artifacts["model"]
    dataset = artifacts["dataset"]
    user_features_matrix = artifacts["user_features_matrix"]
    item_features_matrix = artifacts["item_features_matrix"]
    df_item_raw = artifacts["df_item_raw"]
    ITEM_ID_COL = artifacts["ITEM_ID_COL"]
    OHE_ITEM_COLS = artifacts["OHE_ITEM_COLS"]
    ITEM_FULL_ID_MAP = artifacts.get("ITEM_FULL_ID_MAP", None)

    print("✅ 모델 및 아티팩트 로드 완료!")
    print(f"   로드 경로: {MODEL_PATH}")
    print(f"   모델 타입: {type(model)}")
    print(f"   Dataset 사용자 수: {len(dataset.mapping()[0])}")
    print(f"   Dataset 아이템 수: {len(dataset.mapping()[2])}")

except Exception as e:
    print(f"❌ 모델 로드 중 오류 발생: {e}")
    import traceback

    traceback.print_exc()
    raise



✅ 모델 및 아티팩트 로드 완료!
   로드 경로: lightfm_artifacts/lightfm_model_latest.pkl
   모델 타입: <class 'lightfm.lightfm.LightFM'>
   Dataset 사용자 수: 1037
   Dataset 아이템 수: 228


In [9]:
# ================================================================
# 추천 함수 정의
# ================================================================


def recommend_for_user(
    user_id,
    model,
    dataset,
    user_features_matrix,
    item_features_matrix,
    df_item_raw,
    k=250,
):
    """특정 user_id에 대해 LightFM 모델 기반으로 상위 K개의 아이템을 추천합니다."""

    user_id_map = dataset.mapping()[0]
    item_id_map = dataset.mapping()[2]
    item_id_rev_map = {v: k for k, v in item_id_map.items()}

    if user_id not in user_id_map:
        print(f"⚠️ 사용자 ID {user_id}가 데이터셋에 없습니다.")
        return pd.DataFrame()

    user_inner_id = user_id_map[user_id]
    n_items = len(item_id_map)
    all_item_ids = np.arange(n_items)

    scores = model.predict(
        user_ids=[user_inner_id] * n_items,
        item_ids=all_item_ids,
        user_features=user_features_matrix,
        item_features=item_features_matrix,
    )

    top_k_indices = np.argsort(-scores)[:k]
    recommended_item_ids = [item_id_rev_map[i] for i in top_k_indices]

    recommendation_df = pd.DataFrame(
        {ITEM_ID_COL: recommended_item_ids, "Predicted_Score": scores[top_k_indices]}
    )

    # timing_category를 포함한 메타데이터와 병합
    item_display_cols = ["category", "product_name", "flavor", "timing_category"]
    available_cols = [col for col in item_display_cols if col in df_item_raw.columns]

    final_recommendations = recommendation_df.merge(
        df_item_raw.rename(columns={ITEM_ID_COL: ITEM_ID_COL})[
            available_cols + [ITEM_ID_COL]
        ],
        on=ITEM_ID_COL,
        how="left",
    )

    return final_recommendations


def get_filtered_recommendations(
    user_id,
    model,
    dataset,
    user_features_matrix,
    item_features_matrix,
    df_item_raw,
    k_total=250,
    timing=None,
    k_final=3,
):
    """
    전체 추천 결과를 받은 후, timing_category (리스트)로 필터링하여 최종 K_final 개를 반환하는 함수.
    """

    all_recs = recommend_for_user(
        user_id,
        model,
        dataset,
        user_features_matrix,
        item_features_matrix,
        df_item_raw,
        k=k_total,
    )

    if all_recs.empty:
        return pd.DataFrame()

    # 타이밍 카테고리로 필터링
    if timing and "timing_category" in all_recs.columns:
        # 1. 타이밍에 맞는 제품 필터링
        timing_filtered = all_recs[
            all_recs["timing_category"].apply(
                lambda x: timing in x if isinstance(x, list) else x == timing
            )
        ]

        # 2. 점수 기반 선형 샘플링: 상위 k_final개만 선택
        filtered_recs = timing_filtered.head(k_final)
    else:
        filtered_recs = all_recs.head(k_final)

    # 최종 출력 컬럼 정리
    output_cols = [ITEM_ID_COL, "category", "product_name", "Predicted_Score"]
    final_cols = [col for col in output_cols if col in filtered_recs.columns]

    return filtered_recs[final_cols]


print("✅ 추천 함수 정의 완료!")

✅ 추천 함수 정의 완료!


In [10]:
# ================================================================
# apply_rules_to_user 함수 수정: NaN 처리 추가
# ================================================================


def apply_rules_to_user_fixed(
    rules_df: pd.DataFrame, user: Dict[str, Any], timing_filter: Optional[str] = None
) -> Dict[str, Dict[str, Any]]:
    """
    사용자에 룰 적용 (타이밍 필터링 옵션) - NaN 처리 버전

    Parameters:
    -----------
    timing_filter : "전", "중", "후" 또는 None (None이면 모든 타이밍)
    """
    result: Dict[str, Dict[str, Any]] = {}
    var_scope = {
        "age": user.get("age"),
        "weight_kg": user.get("weight_kg", user.get("weight")),
        "lbm_kg": user.get("lbm_kg", user.get("lean_mass")),
    }

    # 타이밍 필터링
    if timing_filter:
        rules_df = rules_df[
            rules_df["timing"].apply(
                lambda x: timing_filter in str(x) if pd.notna(x) else False
            )
        ].copy()

    # priority가 NaN인 경우 기본값 9로 채우기
    rules_df_filled = rules_df.copy()
    if "priority" in rules_df_filled.columns:
        rules_df_filled["priority"] = rules_df_filled["priority"].fillna(9)
    else:
        rules_df_filled["priority"] = 9

    rules_sorted = rules_df_filled.sort_values(
        by=["priority", "rule_type"]
    ).reset_index(drop=True)

    for _, row in rules_sorted.iterrows():
        ingredient = _norm_text(row.get("ingredient"))
        rule_type = _norm_text(row.get("rule_type"))
        # NaN 체크 추가 (이중 안전장치)
        priority_val = row.get("priority", 9)
        if pd.isna(priority_val):
            priority = 9
        else:
            try:
                priority = int(priority_val)
            except (ValueError, TypeError):
                priority = 9
        variable = _norm_text(row.get("variable"))
        condition = _norm_text(row.get("condition"))
        operation = _norm_text(row.get("operation"))
        value = _norm_text(row.get("value"))
        unit = _norm_text(row.get("unit"))

        if not ingredient or not rule_type:
            continue

        if ingredient not in result:
            result[ingredient] = {
                "dose_value": None,
                "dose_unit": unit,
                "dose_range": None,
                "prohibit": False,
                "alternatives": [],
                "warnings": [],
                "suggestions": [],
                "synergy": [],
            }

        cond_ok = True
        if variable and condition:
            cond_ok = _eval_condition(user, variable, condition)
        elif condition in ("TRUE", "FALSE"):
            cond_ok = condition == "TRUE"

        if not cond_ok:
            continue

        entry = result[ingredient]

        def parse_range(s: str) -> Optional[Tuple[float, float]]:
            m = re.match(r"^\s*(\d+(\.\d+)?)\s*-\s*(\d+(\.\d+)?)\s*$", s)
            if not m:
                return None
            return (float(m.group(1)), float(m.group(3)))

        def current_numeric():
            if entry["dose_value"] is not None:
                return float(entry["dose_value"])
            if entry["dose_range"] is not None:
                lo, hi, _u = entry["dose_range"]
                return (lo + hi) / 2.0
            return None

        if rule_type == "base_dose":
            if operation == "set" and value:
                entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                    float(value) if _as_number(value) is not None else None,
                    unit,
                    None,
                )
            elif operation == "set_range" and value:
                rng = parse_range(value)
                if rng:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        None,
                        unit,
                        (rng[0], rng[1], unit),
                    )
            elif operation == "set_min" and value:
                cur, v = current_numeric(), _as_number(value)
                if v is not None and (cur is None or cur < v):
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        float(v),
                        unit,
                        None,
                    )
            elif operation == "expression" and value:
                dose = _safe_eval_expression(value, var_scope)
                if dose is not None:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        float(dose),
                        unit,
                        None,
                    )

        elif rule_type == "adjustment":
            if operation == "set" and value:
                entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                    float(value) if _as_number(value) is not None else None,
                    unit,
                    None,
                )
            elif operation == "set_range" and value:
                rng = parse_range(value)
                if rng:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        None,
                        unit,
                        (rng[0], rng[1], unit),
                    )
            elif operation == "add" and value:
                cur = current_numeric() or 0.0
                entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                    float(cur) + float(value),
                    unit,
                    None,
                )
            elif operation == "multiply":
                mul = _as_number(value) if value is not None else 1.0
                cur = current_numeric()
                if cur is not None:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        float(cur) * float(mul),
                        unit or entry["dose_unit"],
                        None,
                    )
            elif operation == "expression" and value:
                dose = _safe_eval_expression(value, var_scope)
                if dose is not None:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        float(dose),
                        unit or entry["dose_unit"],
                        None,
                    )

        elif rule_type == "exception":
            if operation == "prohibit":
                (
                    entry["prohibit"],
                    entry["dose_value"],
                    entry["dose_range"],
                    entry["dose_unit"],
                ) = (True, 0.0, None, unit or entry["dose_unit"])

        elif rule_type == "interaction":
            if operation == "add_synergy" and value:
                entry["synergy"].append((value, row.get("unit") or ""))

        if entry["dose_value"] is not None and not entry["dose_unit"]:
            entry["dose_unit"] = unit

    return result


# 기존 함수 덮어쓰기
apply_rules_to_user = apply_rules_to_user_fixed

print("✅ apply_rules_to_user 함수 수정 완료 (NaN 처리 추가)!")

✅ apply_rules_to_user 함수 수정 완료 (NaN 처리 추가)!


## 추천 결과의 pre-intra-post ranking 만들기

In [11]:
# ================================================================
# 사용 예시: 특정 사용자에 대한 타이밍별 주원료 제품 랭킹
# ================================================================

# 필수 변수들이 정의되어 있는지 확인
required_vars = [
    "dataset",
    "model",
    "user_features_matrix",
    "item_features_matrix",
    "df_item_raw",
    "ITEM_ID_COL",
]
missing_vars = [var for var in required_vars if var not in globals()]

if missing_vars:
    print(f"❌ 다음 변수들이 정의되지 않았습니다: {', '.join(missing_vars)}")
    print("\n필요한 셀들을 먼저 실행하세요:")
    print("  1. 셀 18: 학습된 모델 가지고오기 (LightFM 모델 로드)")
    print(
        "  2. 셀 19: 추천 함수 정의 (recommend_for_user, get_filtered_recommendations)"
    )
    print(
        "  3. 셀 9: 주원료 제품 랭킹 함수 정의 (get_all_timing_main_ingredient_rankings)"
    )
else:
    # 예시 사용자 ID (users_converted의 인덱스 또는 실제 user_id 사용)
    # 주의: 모델의 dataset에 있는 user_id를 사용해야 함
    example_user_id = 2

    # dataset에 있는 user_id 확인
    try:
        if example_user_id in dataset.mapping()[0]:
            print(f"\n{'='*80}")
            print(f"사용자 ID {example_user_id}에 대한 타이밍별 주원료 제품 랭킹")
            print(f"{'='*80}\n")

            # 타이밍별 주원료 제품 랭킹 가져오기
            rankings = get_all_timing_main_ingredient_rankings(
                user_id=example_user_id,
                model=model,
                dataset=dataset,
                user_features_matrix=user_features_matrix,
                item_features_matrix=item_features_matrix,
                df_item_raw=df_item_raw,
                topk=10,  # 상위 10개
            )

            # 결과 출력
            for timing in ["Pre", "Intra", "Post"]:
                timing_emoji = {"Pre": "🏋️", "Intra": "💧", "Post": "💪"}
                print(
                    f"\n{timing_emoji.get(timing, '📦')} {timing}-Workout 주원료 제품 랭킹 (상위 10개)"
                )
                print("-" * 80)

                if not rankings[timing].empty:
                    try:
                        print(rankings[timing].to_string(index=False))
                    except:
                        print(rankings[timing].to_markdown(index=False))
                    print(f"\n총 {len(rankings[timing])}개 제품")
                else:
                    print("추천 결과가 없습니다.")

            print(f"\n{'='*80}")

        else:
            print(f"⚠️ 사용자 ID {example_user_id}가 데이터셋에 없습니다.")
            print(
                f"사용 가능한 사용자 ID 범위: {min(dataset.mapping()[0].keys())} ~ {max(dataset.mapping()[0].keys())}"
            )
            print("\n사용 예시:")
            print(
                "  example_user_id = 555  # 실제 데이터셋에 있는 user_id로 변경하세요"
            )
    except Exception as e:
        print(f"❌ 오류 발생: {e}")
        print("\n해결 방법:")
        print("  1. 셀 18을 먼저 실행하여 LightFM 모델을 로드하세요")
        print("  2. 셀 19를 실행하여 추천 함수들을 정의하세요")
        import traceback

        traceback.print_exc()


사용자 ID 2에 대한 타이밍별 주원료 제품 랭킹

❌ 오류 발생: "['category', 'product_name'] not in index"

해결 방법:
  1. 셀 18을 먼저 실행하여 LightFM 모델을 로드하세요
  2. 셀 19를 실행하여 추천 함수들을 정의하세요


Traceback (most recent call last):
  File "/var/folders/yd/4pvx9_5d6lj5__k2j2r0qs_h0000gp/T/ipykernel_18955/4276308127.py", line 39, in <module>
    rankings = get_all_timing_main_ingredient_rankings(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/yd/4pvx9_5d6lj5__k2j2r0qs_h0000gp/T/ipykernel_18955/1029390952.py", line 115, in get_all_timing_main_ingredient_rankings
    results[timing] = get_main_ingredient_ranking(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/yd/4pvx9_5d6lj5__k2j2r0qs_h0000gp/T/ipykernel_18955/1029390952.py", line 82, in get_main_ingredient_ranking
    result = main_ing_recs.head(topk)[
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/sumin/miniconda3/envs/lightfm_python311/lib/python3.11/site-packages/pandas/core/frame.py", line 4119, in __getitem__
    indexer = self.columns._get_indexer_strict(key, "columns")[1]
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/sumin/miniconda

In [12]:
# ================================================================
# 배치 처리: 모든 사용자에 대한 타이밍별 주원료 제품 랭킹 생성 및 저장
# ================================================================


def generate_all_users_main_ingredient_rankings(
    model,
    dataset,
    user_features_matrix,
    item_features_matrix,
    df_item_raw,
    user_ids=None,  # None이면 dataset의 모든 사용자
    topk=10,
    save_path=None,  # 저장 경로 (예: "main_ingredient_rankings.csv")
):
    """
    모든 사용자에 대해 타이밍별 주원료 제품 랭킹 생성

    Returns:
    --------
    pd.DataFrame : 모든 사용자의 랭킹 결과
        columns: [user_id, timing, rank, product_id, category, product_name, brand_name_kor, Predicted_Score]
    """
    if user_ids is None:
        user_ids = list(dataset.mapping()[0].keys())

    all_results = []

    print(f"🔄 {len(user_ids)}명의 사용자에 대해 랭킹 생성 중...")

    for idx, user_id in enumerate(user_ids, 1):
        if idx % 100 == 0:
            print(f"  진행 중: {idx}/{len(user_ids)}명 완료...")

        try:
            rankings = get_all_timing_main_ingredient_rankings(
                user_id=user_id,
                model=model,
                dataset=dataset,
                user_features_matrix=user_features_matrix,
                item_features_matrix=item_features_matrix,
                df_item_raw=df_item_raw,
                topk=topk,
            )

            # 결과를 리스트로 변환
            for timing in ["Pre", "Intra", "Post"]:
                if not rankings[timing].empty:
                    timing_df = rankings[timing].copy()
                    timing_df["user_id"] = user_id
                    timing_df["timing"] = timing
                    # 컬럼 순서 정리
                    timing_df = timing_df[
                        [
                            "user_id",
                            "timing",
                            "rank",
                            ITEM_ID_COL,
                            "category",
                            "product_name",
                            "brand_name_kor",
                            "Predicted_Score",
                        ]
                    ]
                    all_results.append(timing_df)

        except Exception as e:
            print(f"⚠️ 사용자 {user_id} 처리 중 오류: {e}")
            continue

    if all_results:
        result_df = pd.concat(all_results, ignore_index=True)
        print(f"\n✅ 완료: 총 {len(result_df)}개 랭킹 결과 생성")
        print(f"   사용자 수: {result_df['user_id'].nunique()}명")
        print(f"   타이밍별 분포:")
        for timing in ["Pre", "Intra", "Post"]:
            count = len(result_df[result_df["timing"] == timing])
            print(f"     {timing}: {count}개")

        # 저장
        if save_path:
            result_df.to_csv(save_path, index=False)
            print(f"\n💾 저장 완료: {save_path}")

        return result_df
    else:
        print("❌ 생성된 랭킹 결과가 없습니다.")
        return pd.DataFrame()


print("✅ 배치 처리 함수 정의 완료!")
print("\n사용 방법:")
print("  rankings_all = generate_all_users_main_ingredient_rankings(")
print("      model=model,")
print("      dataset=dataset,")
print("      user_features_matrix=user_features_matrix,")
print("      item_features_matrix=item_features_matrix,")
print("      df_item_raw=df_item_raw,")
print("      topk=10,")
print("      save_path='main_ingredient_rankings.csv'")
print("  )")

✅ 배치 처리 함수 정의 완료!

사용 방법:
  rankings_all = generate_all_users_main_ingredient_rankings(
      model=model,
      dataset=dataset,
      user_features_matrix=user_features_matrix,
      item_features_matrix=item_features_matrix,
      df_item_raw=df_item_raw,
      topk=10,
      save_path='main_ingredient_rankings.csv'
  )


## 룰 함수 정의

In [13]:
# ================================================================
# get_main_ingredient_ranking 함수 수정 버전 (컬럼 오류 수정)
# ================================================================


def get_main_ingredient_ranking_fixed(
    user_id,
    model,
    dataset,
    user_features_matrix,
    item_features_matrix,
    df_item_raw,
    timing="Pre",
    topk=10,
    user_dict=None,  # 사용자 정보 (rule 적용용)
    rules_df=None,  # rule DataFrame (rule 적용용)
):
    """
    특정 사용자에 대해 타이밍별 주원료 제품 랭킹 반환 (수정 버전)

    수정 사항:
    - LightFM 추천 결과를 순서대로 확인
    - 각 제품에 대해 rule 적용하여 prohibit 여부 확인
    - prohibit가 아닌 것만 순서대로 리스트에 추가
    - rank는 LightFM 추천 순서대로 부여
    """
    # 1. recommend_for_user를 직접 사용하여 모든 컬럼 확보
    all_recs = recommend_for_user(
        user_id=user_id,
        model=model,
        dataset=dataset,
        user_features_matrix=user_features_matrix,
        item_features_matrix=item_features_matrix,
        df_item_raw=df_item_raw,
        k=250,  # 넓은 범위에서 추천 받기
    )

    if all_recs.empty:
        return pd.DataFrame()

    # 1-1. 타이밍 필터링 (timing_category가 있는 경우)
    if timing and "timing_category" in all_recs.columns:
        recs = all_recs[
            all_recs["timing_category"].apply(
                lambda x: timing in x if isinstance(x, list) else x == timing
            )
        ].copy()
    else:
        recs = all_recs.copy()

    if recs.empty:
        return pd.DataFrame()

        # 2. 주원료 제품만 필터링 및 메타데이터 병합
    if "ingredient_type" in df_item_raw.columns:
        # 필요한 컬럼들 병합
        merge_cols = [ITEM_ID_COL, "ingredient_type", "ingredients"]  # ingredients 추가
        if "category" not in recs.columns and "category" in df_item_raw.columns:
            merge_cols.append("category")
        if "product_name" not in recs.columns and "product_name" in df_item_raw.columns:
            merge_cols.append("product_name")
        if "brand_name_kor" in df_item_raw.columns:
            merge_cols.append("brand_name_kor")

        recs_with_meta = recs.merge(
            df_item_raw[merge_cols],
            on=ITEM_ID_COL,
            how="left",
        )

        # 주원료만 필터링
        main_ing_recs = recs_with_meta[
            recs_with_meta["ingredient_type"] == "주원료"
        ].copy()
    else:
        # ingredient_type 컬럼이 없으면 전체 추천 결과 반환
        main_ing_recs = recs.copy()
        print("⚠️ ingredient_type 컬럼이 없어 전체 추천 결과를 반환합니다.")

    # 3. Predicted_Score 기준 내림차순 정렬 (LightFM 추천 순서)
    main_ing_recs = main_ing_recs.sort_values(
        by="Predicted_Score", ascending=False
    ).reset_index(drop=True)

    # 4. Rule 적용하여 prohibit 확인 및 필터링
    if user_dict is not None and rules_df is not None:
        # rule 적용
        rule_result = apply_rules_to_user(rules_df, user_dict, timing_filter=None)

        # 성분 키워드 매핑
        INGREDIENT_KEYWORDS = {
            "creatine": {"creatine", "creatine monohydrate"},
            "caffeine": {"caffeine", "카페인"},
            "arginine": {"arginine", "l-arginine"},
            "l-theanine": {"theanine", "l-theanine"},
            "glycerol": {"glycerol"},
            "nitrate": {"nitrate", "비트", "beet"},
            "taurine": {"taurine"},
            "whey_protein": {"whey", "wpc", "wpi", "wph"},
            "casein": {"casein", "micellar casein"},
            "bcaa": {"bcaa", "leucine", "isoleucine", "valine"},
            "betaine": {"betaine"},
            "glutamine": {"glutamine"},
            "l-carnitine": {"carnitine"},
        }

        def product_has_ingredient(product_row, rule_ingredient: str) -> bool:
            """제품이 특정 성분을 포함하는지 확인"""
            ingredients_str = str(product_row.get("ingredients", "")).lower()
            kws = INGREDIENT_KEYWORDS.get(rule_ingredient, {rule_ingredient.lower()})
            for kw in kws:
                if kw.lower() in ingredients_str:
                    return True
            return False

        # LightFM 순서대로 하나씩 확인하며 prohibit가 아닌 것만 추가
        safe_products = []
        rank = 1

        for _, row in main_ing_recs.iterrows():
            # prohibit 체크
            is_prohibited = False
            for ing, info in rule_result.items():
                if info.get("prohibit", False) and product_has_ingredient(row, ing):
                    is_prohibited = True
                    break

            # prohibit가 아니면 리스트에 추가
            if not is_prohibited:
                row_dict = row.to_dict()
                row_dict["rank"] = rank
                safe_products.append(row_dict)
                rank += 1

                # topk개만 수집
                if len(safe_products) >= topk:
                    break

        if not safe_products:
            return pd.DataFrame()

        main_ing_recs = pd.DataFrame(safe_products)
    else:
        # rule 적용 안 함 (기존 로직)
        main_ing_recs["rank"] = main_ing_recs.index + 1

    # 5. 최종 출력 컬럼 선택 (있는 컬럼만 선택하여 안전하게 처리)
    output_cols = ["rank", ITEM_ID_COL, "Predicted_Score"]
    optional_cols = ["category", "product_name", "brand_name_kor"]

    # 있는 컬럼만 추가
    for col in optional_cols:
        if col in main_ing_recs.columns:
            output_cols.append(col)

    # 최종 컬럼 확인 (없으면 최소한의 컬럼이라도 반환)
    available_output_cols = [col for col in output_cols if col in main_ing_recs.columns]

    if not available_output_cols:
        # 최소한의 컬럼이라도 반환
        available_output_cols = list(main_ing_recs.columns)[
            : min(topk, len(main_ing_recs.columns))
        ]

    result = main_ing_recs.head(topk)[available_output_cols]

    return result


def get_all_timing_main_ingredient_rankings_fixed(
    user_id,
    model,
    dataset,
    user_features_matrix,
    item_features_matrix,
    df_item_raw,
    topk=10,
    user_dict=None,  # 사용자 정보 (rule 적용용)
    rules_df=None,  # rule DataFrame (rule 적용용)
):
    """수정된 버전의 모든 타이밍 랭킹 함수"""
    results = {}

    for timing in ["Pre", "Intra", "Post"]:
        results[timing] = get_main_ingredient_ranking_fixed(
            user_id=user_id,
            model=model,
            dataset=dataset,
            user_features_matrix=user_features_matrix,
            item_features_matrix=item_features_matrix,
            df_item_raw=df_item_raw,
            timing=timing,
            topk=topk,
            user_dict=user_dict,
            rules_df=rules_df,
        )

    return results


# 기존 함수 덮어쓰기
get_main_ingredient_ranking = get_main_ingredient_ranking_fixed
get_all_timing_main_ingredient_rankings = get_all_timing_main_ingredient_rankings_fixed

print("✅ 주원료 제품 랭킹 함수 수정 완료 (rule prohibit 체크 포함)!")

✅ 주원료 제품 랭킹 함수 수정 완료 (rule prohibit 체크 포함)!


In [14]:
# ================================================================
# rulebased_new.csv 룰 적용을 위한 필수 함수들
# ================================================================

import math
import re
import ast
import operator
from typing import Dict, List, Optional, Tuple, Any


def _norm_text(x: Any) -> str:
    """텍스트 정규화"""
    if x is None or (isinstance(x, float) and math.isnan(x)):
        return ""
    return str(x).strip()


def _as_number(x: Any) -> Optional[float]:
    """숫자로 변환"""
    if x is None or x == "":
        return None
    try:
        return float(x)
    except (ValueError, TypeError):
        return None


def _eval_condition(user: Dict[str, Any], variable: str, condition: str) -> bool:
    """조건 평가"""
    if not condition or condition == "":
        return True

    cond = condition.strip()

    # CONTAINS 체크
    if "CONTAINS" in cond.upper():
        match = re.search(r'CONTAINS\s+["\']?([^"\']+)["\']?', cond, re.IGNORECASE)
        if match:
            search_val = match.group(1).strip()
            user_val = user.get(variable, "")

            # 리스트인 경우
            if isinstance(user_val, list):
                return any(search_val in str(v) for v in user_val)
            # 문자열인 경우
            return search_val in str(user_val)

    # OR 조건
    if " OR " in cond.upper():
        parts = [p.strip() for p in cond.split(" OR ", 1)]
        return _eval_condition(user, variable, parts[0]) or _eval_condition(
            user, variable, parts[1]
        )

    # 비교 연산자
    user_val = user.get(variable)
    if user_val is None:
        return False

    # 숫자 비교
    if re.match(r"^[<>=!]+", cond):
        op_map = {
            "<": operator.lt,
            "<=": operator.le,
            ">": operator.ge,
            ">=": operator.ge,
            "!=": operator.ne,
            "==": operator.eq,
        }
        for op_str, op_fn in op_map.items():
            if cond.startswith(op_str):
                rhs = cond[len(op_str) :].strip().strip("\"'")
                user_num = _as_number(user_val)
                rhs_num = _as_number(rhs)
                if user_num is not None and rhs_num is not None:
                    return op_fn(user_num, rhs_num)
                # 문자열 비교
                return op_fn(str(user_val), rhs)

    # 직접 일치
    if cond.startswith('"') and cond.endswith('"'):
        cond = cond[1:-1]
    if cond.startswith("'") and cond.endswith("'"):
        cond = cond[1:-1]

    if isinstance(user_val, list):
        return any(cond == str(v) for v in user_val)
    return str(user_val) == cond


def _safe_eval_expression(expr: str, var_scope: Dict[str, Any]) -> Optional[float]:
    """표현식 안전 평가"""
    try:
        return float(eval(expr, {"__builtins__": {}}, var_scope))
    except:
        return None


def apply_rules_to_user(
    rules_df: pd.DataFrame, user: Dict[str, Any], timing_filter: Optional[str] = None
) -> Dict[str, Dict[str, Any]]:
    """
    사용자에 룰 적용 (타이밍 필터링 옵션)

    Parameters:
    -----------
    timing_filter : "전", "중", "후" 또는 None (None이면 모든 타이밍)
    """
    result: Dict[str, Dict[str, Any]] = {}
    var_scope = {
        "age": user.get("age"),
        "weight_kg": user.get("weight_kg", user.get("weight")),
        "lbm_kg": user.get("lbm_kg", user.get("lean_mass")),
    }

    # 타이밍 필터링
    if timing_filter:
        rules_df = rules_df[
            rules_df["timing"].apply(
                lambda x: timing_filter in str(x) if pd.notna(x) else False
            )
        ].copy()

    # priority가 NaN인 경우 기본값 9로 채우기
    rules_df_filled = rules_df.copy()
    if "priority" in rules_df_filled.columns:
        rules_df_filled["priority"] = rules_df_filled["priority"].fillna(9)
    else:
        rules_df_filled["priority"] = 9

    rules_sorted = rules_df_filled.sort_values(
        by=["priority", "rule_type"]
    ).reset_index(drop=True)

    for _, row in rules_sorted.iterrows():
        ingredient = _norm_text(row.get("ingredient"))
        rule_type = _norm_text(row.get("rule_type"))
        # NaN 체크 추가 (이중 안전장치)
        priority_val = row.get("priority", 9)
        if pd.isna(priority_val):
            priority = 9
        else:
            try:
                priority = int(priority_val)
            except (ValueError, TypeError):
                priority = 9
        variable = _norm_text(row.get("variable"))
        condition = _norm_text(row.get("condition"))
        operation = _norm_text(row.get("operation"))
        value = _norm_text(row.get("value"))
        unit = _norm_text(row.get("unit"))

        if not ingredient or not rule_type:
            continue

        if ingredient not in result:
            result[ingredient] = {
                "dose_value": None,
                "dose_unit": unit,
                "dose_range": None,
                "prohibit": False,
                "alternatives": [],
                "warnings": [],
                "suggestions": [],
                "synergy": [],
            }

        cond_ok = True
        if variable and condition:
            cond_ok = _eval_condition(user, variable, condition)
        elif condition in ("TRUE", "FALSE"):
            cond_ok = condition == "TRUE"

        if not cond_ok:
            continue

        entry = result[ingredient]

        def parse_range(s: str) -> Optional[Tuple[float, float]]:
            m = re.match(r"^\s*(\d+(\.\d+)?)\s*-\s*(\d+(\.\d+)?)\s*$", s)
            if not m:
                return None
            return (float(m.group(1)), float(m.group(3)))

        def current_numeric():
            if entry["dose_value"] is not None:
                return float(entry["dose_value"])
            if entry["dose_range"] is not None:
                lo, hi, _u = entry["dose_range"]
                return (lo + hi) / 2.0
            return None

        if rule_type == "base_dose":
            if operation == "set" and value:
                entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                    float(value) if _as_number(value) is not None else None,
                    unit,
                    None,
                )
            elif operation == "set_range" and value:
                rng = parse_range(value)
                if rng:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        None,
                        unit,
                        (rng[0], rng[1], unit),
                    )
            elif operation == "set_min" and value:
                cur, v = current_numeric(), _as_number(value)
                if v is not None and (cur is None or cur < v):
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        float(v),
                        unit,
                        None,
                    )
            elif operation == "expression" and value:
                dose = _safe_eval_expression(value, var_scope)
                if dose is not None:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        float(dose),
                        unit,
                        None,
                    )

        elif rule_type == "adjustment":
            if operation == "set" and value:
                entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                    float(value) if _as_number(value) is not None else None,
                    unit,
                    None,
                )
            elif operation == "set_range" and value:
                rng = parse_range(value)
                if rng:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        None,
                        unit,
                        (rng[0], rng[1], unit),
                    )
            elif operation == "add" and value:
                cur = current_numeric() or 0.0
                entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                    float(cur) + float(value),
                    unit,
                    None,
                )
            elif operation == "multiply":
                mul = _as_number(value) if value is not None else 1.0
                cur = current_numeric()
                if cur is not None:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        float(cur) * float(mul),
                        unit or entry["dose_unit"],
                        None,
                    )
            elif operation == "expression" and value:
                dose = _safe_eval_expression(value, var_scope)
                if dose is not None:
                    entry["dose_value"], entry["dose_unit"], entry["dose_range"] = (
                        float(dose),
                        unit or entry["dose_unit"],
                        None,
                    )

        elif rule_type == "exception":
            if operation == "prohibit":
                (
                    entry["prohibit"],
                    entry["dose_value"],
                    entry["dose_range"],
                    entry["dose_unit"],
                ) = (True, 0.0, None, unit or entry["dose_unit"])

        elif rule_type == "interaction":
            if operation == "add_synergy" and value:
                entry["synergy"].append((value, row.get("unit") or ""))

        if entry["dose_value"] is not None and not entry["dose_unit"]:
            entry["dose_unit"] = unit

    return result


print("✅ 룰 적용 함수 정의 완료!")

✅ 룰 적용 함수 정의 완료!


In [15]:
# ================================================================
# 타이밍별 주원료 리스트업 함수
# ================================================================


def get_main_ingredients_by_timing(
    user: Dict[str, Any],
    rules_df: pd.DataFrame,
    items_df: pd.DataFrame,
    timing: str = "전",
) -> pd.DataFrame:
    """
    특정 타이밍에 대한 주원료 제품 리스트 반환

    Parameters:
    -----------
    user : 사용자 정보 딕셔너리
    rules_df : rulebased_new.csv DataFrame
    items_df : 제품 메타데이터 DataFrame
    timing : "전", "중", "후" 중 하나

    Returns:
    --------
    pd.DataFrame : 주원료 제품 리스트 (ingredient_type == "주원료")
    """
    # 1. 룰 적용 (타이밍 필터링)
    rule_result = apply_rules_to_user(rules_df, user, timing_filter=timing)

    # 2. 권장된 성분 추출 (금지되지 않고 용량이 있는 성분)
    recommended_ingredients = []
    for ing, info in rule_result.items():
        if info.get("prohibit", False):
            continue
        if (
            info.get("dose_value") not in (None, 0.0)
            or info.get("dose_range") is not None
        ):
            dose_str = ""
            if info.get("dose_range"):
                lo, hi, unit = info["dose_range"]
                dose_str = f"{lo}-{hi}{unit}"
            elif info.get("dose_value"):
                dose_str = f"{info['dose_value']}{info.get('dose_unit', '')}"

            recommended_ingredients.append(
                {
                    "ingredient": ing,
                    "dose": dose_str,
                    "dose_value": info.get("dose_value"),
                    "dose_range": info.get("dose_range"),
                    "dose_unit": info.get("dose_unit", ""),
                    "warnings": info.get("warnings", []),
                    "suggestions": info.get("suggestions", []),
                }
            )

    # 3. 주원료 제품만 필터링
    main_ingredient_items = items_df[items_df["ingredient_type"] == "주원료"].copy()

    # 4. 제품에서 권장 성분을 포함하는지 확인하고 매칭
    results = []

    # 성분 키워드 매핑 (대소문자 무시)
    INGREDIENT_KEYWORDS = {
        "creatine": {"creatine", "creatine monohydrate"},
        "caffeine": {"caffeine", "카페인"},
        "arginine": {"arginine", "l-arginine"},
        "l-theanine": {"theanine", "l-theanine"},
        "glycerol": {"glycerol"},
        "nitrate": {"nitrate", "비트", "beet"},
        "taurine": {"taurine"},
        "whey_protein": {"whey", "wpc", "wpi", "wph"},
        "casein": {"casein", "micellar casein"},
        "bcaa": {"bcaa", "leucine", "isoleucine", "valine"},
        "betaine": {"betaine"},
        "glutamine": {"glutamine"},
        "l-carnitine": {"carnitine"},
    }

    def product_has_ingredient(product_row, rule_ingredient: str) -> bool:
        """제품이 특정 성분을 포함하는지 확인"""
        ingredients_str = str(product_row.get("ingredients", "")).lower()

        # 키워드 매핑 확인
        kws = INGREDIENT_KEYWORDS.get(rule_ingredient, {rule_ingredient.lower()})
        for kw in kws:
            if kw.lower() in ingredients_str:
                return True
        return False

    # 각 제품에 대해 권장 성분 매칭 확인
    for _, item_row in main_ingredient_items.iterrows():
        matched_ingredients = []
        for rec_ing in recommended_ingredients:
            if product_has_ingredient(item_row, rec_ing["ingredient"]):
                matched_ingredients.append(rec_ing)

        if matched_ingredients:
            # 최상위 매칭 성분의 용량 정보 사용
            top_match = matched_ingredients[0]
            results.append(
                {
                    "product_id": item_row.get("product_id"),
                    "product_name": item_row.get("product_name"),
                    "category": item_row.get("category"),
                    "brand_name": item_row.get("brand_name"),
                    "matched_ingredient": top_match["ingredient"],
                    "recommended_dose": top_match["dose"],
                    "dose_unit": top_match["dose_unit"],
                    "timing": timing,
                }
            )

    result_df = pd.DataFrame(results)

    # 랭킹 추가 (임시로 알파벳 순서)
    if not result_df.empty:
        result_df = result_df.sort_values(by="product_name").reset_index(drop=True)
        result_df["rank"] = result_df.index + 1

    return result_df


def get_all_timing_main_ingredients(
    user: Dict[str, Any],
    rules_df: pd.DataFrame,
    items_df: pd.DataFrame,
) -> Dict[str, pd.DataFrame]:
    """
    전/중/후 모든 타이밍에 대한 주원료 제품 리스트 반환

    Returns:
    --------
    dict : {"전": DataFrame, "중": DataFrame, "후": DataFrame}
    """
    results = {}
    for timing in ["전", "중", "후"]:
        results[timing] = get_main_ingredients_by_timing(
            user=user,
            rules_df=rules_df,
            items_df=items_df,
            timing=timing,
        )
    return results


print("✅ 타이밍별 주원료 리스트업 함수 정의 완료!")

✅ 타이밍별 주원료 리스트업 함수 정의 완료!


In [16]:
# ================================================================
# CSV 사용자 데이터를 INPUT_FORMAT 형식으로 변환
# ================================================================
import re
from typing import Dict, Any, List, Optional

# -------------------------
# 공용 파서/정규화 유틸
# -------------------------


def _dedup_list(xs: List[str]) -> List[str]:
    """공백 제거 후 중복 제거(순서 보존)"""
    seen = set()
    out = []
    for x in xs:
        if x is None:
            continue
        s = str(x).strip()
        if not s or s == "없음" or s.lower() == "nan":
            continue
        if s not in seen:
            seen.add(s)
            out.append(s)
    return out


def parse_list_field(value: Any) -> List[str]:
    """CSV 값을 list[str]로 변환"""
    if pd.isna(value) or value == "" or str(value).strip() == "없음":
        return []
    if isinstance(value, str):
        items = re.split(r"[,/|]+", value)
        return _dedup_list(items)
    if isinstance(value, (list, tuple, set)):
        return _dedup_list([str(v) for v in value])
    return _dedup_list([str(value)])


def parse_boolean_field(value: Any) -> bool:
    """CSV 값을 boolean으로 변환"""
    if pd.isna(value):
        return False
    str_val = str(value).strip().lower()
    return str_val in ["네", "예", "true", "yes", "1", "y"]


def parse_float_field(value: Any) -> Optional[float]:
    """CSV 값을 float으로 변환"""
    if pd.isna(value) or value == "":
        return None
    try:
        return float(value)
    except (ValueError, TypeError):
        return None


def parse_string_field(value: Any) -> Optional[str]:
    """CSV 값을 문자열로 변환"""
    if pd.isna(value) or value == "":
        return None
    return str(value).strip()


# -------------------------
# 표준화 매핑 유틸
# -------------------------


def map_training_experience(value: Any) -> str:
    """운동 경험을 표준 형식으로 매핑"""
    if pd.isna(value):
        return "초보"
    val = str(value).strip().lower()

    # 년수 기반 매핑
    if "년" in val:
        years = re.search(r"(\d+(?:\.\d+)?)", val)
        if years:
            year_num = float(years.group(1))
            if year_num < 1:
                return "초보"
            elif year_num < 3:
                return "중급"
            else:
                return "숙련자"

    # 직접 매핑
    if val in ["초보", "beginner"]:
        return "초보"
    if val in ["중급", "intermediate"]:
        return "중급"
    if val in ["숙련", "숙련자", "advanced"]:
        return "숙련자"

    return "초보"


def map_training_duration(value: Any) -> str:
    """운동 시간을 표준 형식으로 매핑"""
    if pd.isna(value):
        return "60-90"
    val = str(value).strip()

    # 숫자 기반 매핑 (주당 횟수 등)
    if val in ["3.0", "3", "3회"]:
        return "<60"
    elif val in ["4.0", "4", "4회"]:
        return "60-90"
    elif val in ["5.0", "5", "5회", "6.0", "6", "6회"]:
        return "90+"
    return "60-90"


def map_training_time(value: Any) -> str:
    """운동 시간대를 표준 형식으로 매핑"""
    if pd.isna(value):
        return "저녁"
    val = str(value).strip()

    if "오전" in val or "05" in val or "12시" in val:
        return "오전"
    elif "오후" in val:
        return "오후"
    elif "저녁" in val or "18시" in val or "22시" in val:
        return "저녁"
    elif "밤" in val:
        return "밤"
    return "저녁"


def map_diet_phase(value: Any) -> str:
    """다이어트 단계를 표준 형식으로 매핑"""
    if pd.isna(value):
        return "유지"
    val = str(value).strip()

    if "감량" in val or "체지방 감소" in val:
        return "체지방 감소"
    if "증량" in val or "벌크업" in val:
        return "벌크업"
    if "유지" in val:
        return "유지"
    return "유지"


def map_user_goals(value: Any) -> List[str]:
    """사용자 목표를 표준 형식(list[str])으로 매핑"""
    if pd.isna(value):
        return ["회복"]

    raw = " ".join(parse_list_field(value))
    goals = []
    if ("근육량 증가" in raw) or ("근성장" in raw):
        goals.append("근성장")
    if ("운동 후 회복" in raw) or ("회복" in raw):
        goals.append("회복")
    if ("운동 전 에너지" in raw) or ("퍼포먼스" in raw):
        goals.append("퍼포먼스")
    if ("체지방 감소" in raw) or ("감량" in raw):
        goals.append("체지방 감소")
    if "집중" in raw:
        goals.append("집중력")

    goals = _dedup_list(goals)
    return goals if goals else ["회복"]


def map_health_conditions(value: Any) -> List[str]:
    """건강 상태를 표준 형식(list[str])으로 매핑"""
    if pd.isna(value) or str(value).strip() == "없음":
        return []
    raw = " ".join(parse_list_field(value))
    conditions = []

    if ("유당불내증" in raw) or ("유당 불내증" in raw):
        conditions.append("유당 불내증")
    if ("카페인 민감성" in raw) or ("수면장애" in raw):
        conditions.append("수면장애")
    if "신장 질환" in raw:
        conditions.append("신장 질환")
    if "저혈압" in raw:
        conditions.append("저혈압")
    if "고혈압" in raw:
        conditions.append("고혈압")
    if "불안장애" in raw:
        conditions.append("불안장애")

    return _dedup_list(conditions)


def map_allergy(value: Any) -> List[str]:
    """알러지를 표준 형식(list[str])으로 매핑"""
    if pd.isna(value) or str(value).strip() == "없음":
        return []
    raw_list = parse_list_field(value)
    out = []
    raw_join = " ".join(raw_list)
    if "카페인 민감성" in raw_join or "카페인" in raw_list:
        out.append("카페인")
    if "유당불내증" in raw_join or "유당" in raw_list:
        out.append("유당")
    return _dedup_list(out)


# -------------------------
# 메인 변환 함수
# -------------------------


def convert_user_to_input_format(row: pd.Series) -> Dict[str, Any]:
    """사용자 데이터 한 행을 INPUT_FORMAT 형식으로 변환"""
    try:
        user: Dict[str, Any] = {
            # 기본 정보
            "age": parse_float_field(row.get("2) 나이(만 나이)", row.get("나이"))),
            "gender": parse_string_field(row.get("3) 성별", row.get("성별"))),
            "weight": parse_float_field(row.get("6) 몸무게", row.get("몸무게"))),
            "lean_mass": None,  # CSV에 없음
            # 운동 관련
            "training_experience": map_training_experience(
                row.get("8) 운동 활동 기간", row.get("운동 활동 기간"))
            ),
            "training_duration": map_training_duration(
                row.get(
                    "9) 주에 몇 회 정도 운동을 진행하시나요?(택1)", row.get("운동 빈도")
                )
            ),
            "training_time": map_training_time(
                row.get(
                    "12-3) 시간 기준으로 운동 시작 시간이 언제인가요?(택 1)",
                    row.get("운동 시간"),
                )
            ),
            # 다이어트/목표
            "diet_phase": map_diet_phase(
                row.get(
                    "13-13) 해당 프로틴 이용 당시 운동 목적은 무엇이었나요? (체중)",
                    row.get("운동 목적"),
                )
            ),
            "user_goal": map_user_goals(
                row.get(
                    "13-4) 프로틴의 효과에서 고려한 세부 항목을 선택해주세요 (복수선택 가능)",
                    row.get("효과"),
                )
            ),  # list[str]
            # 건강/제약
            "health_conditions": map_health_conditions(
                row.get("10) 알러지 또는 민감성분(복수선택 가능)", row.get("알러지"))
            ),  # list[str]
            "allergy": map_allergy(
                row.get("10) 알러지 또는 민감성분(복수선택 가능)", row.get("알러지"))
            ),  # list[str]
            "current_stack": [],  # CSV에 없음
            "is_dehydrated": False,  # CSV에 없음
            # 선택적 필드
            "training_intensity": None,
            "diet_type": "일반식",
            "oral_hygiene": "해당 없음",
            "workout_type": None,
            "environment_heat_humid": None,
            "intake_period": None,
            "cvd_risk": None,
        }

        return user

    except Exception as e:
        print(f"사용자 변환 중 오류: {e}")
        return None


# -------------------------
# 전체 데이터프레임 변환
# -------------------------


def convert_users_df_to_input_format(df_users: pd.DataFrame) -> pd.DataFrame:
    """전체 사용자 데이터프레임을 INPUT_FORMAT 형식으로 변환"""
    converted_users = []

    for idx, row in df_users.iterrows():
        user = convert_user_to_input_format(row)
        if user is not None:
            converted_users.append(user)

    return pd.DataFrame(converted_users)


# -------------------------
# 변환 실행
# -------------------------

print("🔄 사용자 데이터를 INPUT_FORMAT 형식으로 변환 중...")
users_converted = convert_users_df_to_input_format(users)

print(f"✅ 변환 완료: {len(users_converted)}명의 사용자 데이터")
print(f"\n📊 변환된 데이터 샘플 (첫 번째 사용자):")
if len(users_converted) > 0:
    sample = users_converted.iloc[0].to_dict()
    for key, value in sample.items():
        print(f"  {key}: {value}")

🔄 사용자 데이터를 INPUT_FORMAT 형식으로 변환 중...
✅ 변환 완료: 1037명의 사용자 데이터

📊 변환된 데이터 샘플 (첫 번째 사용자):
  age: 30.0
  gender: 남성
  weight: 74.0
  lean_mass: None
  training_experience: 숙련자
  training_duration: 90+
  training_time: 저녁
  diet_phase: 유지
  user_goal: ['근성장']
  health_conditions: ['수면장애']
  allergy: ['카페인']
  current_stack: []
  is_dehydrated: False
  training_intensity: None
  diet_type: 일반식
  oral_hygiene: 해당 없음
  workout_type: None
  environment_heat_humid: None
  intake_period: None
  cvd_risk: None


## rule 적용하기 

In [17]:
# ================================================================
# 사용 예시: LightFM 순서 + rule prohibit 체크 (수정 버전)
# ================================================================

# 필수 변수들이 정의되어 있는지 확인
required_vars = [
    "dataset",
    "model",
    "user_features_matrix",
    "item_features_matrix",
    "df_item_raw",
    "ITEM_ID_COL",
]
missing_vars = [var for var in required_vars if var not in globals()]

if missing_vars:
    print(f"❌ 다음 변수들이 정의되지 않았습니다: {', '.join(missing_vars)}")
else:
    # 예시 사용자 ID
    example_user_id = 2

    # dataset에 있는 user_id 확인
    try:
        if example_user_id in dataset.mapping()[0]:
            print(f"\n{'='*80}")
            print(f"사용자 ID {example_user_id}에 대한 타이밍별 주원료 제품 랭킹")
            print(f"(LightFM 순서 + rule prohibit 체크)")
            print(f"{'='*80}\n")

            # 사용자 정보 가져오기 (rule 적용용)
            user_dict = None
            if "users_converted" in globals() and len(users_converted) > 0:
                # users_converted의 첫 번째 사용자 사용 (실제로는 user_id로 매칭 필요)
                user_dict = users_converted.iloc[0].to_dict()
                print(f"✅ 사용자 정보 로드 완료 (rule 적용)")

            # 타이밍별 주원료 제품 랭킹 가져오기 (LightFM 순서 + rule prohibit 체크)
            rankings = get_all_timing_main_ingredient_rankings(
                user_id=example_user_id,
                model=model,
                dataset=dataset,
                user_features_matrix=user_features_matrix,
                item_features_matrix=item_features_matrix,
                df_item_raw=df_item_raw,
                topk=10,  # 상위 10개
                user_dict=user_dict,  # rule 적용용
                rules_df=rules if "rules" in globals() else None,  # rule DataFrame
            )

            # 결과 출력
            for timing in ["Pre", "Intra", "Post"]:
                timing_emoji = {"Pre": "🏋️", "Intra": "💧", "Post": "💪"}
                print(
                    f"\n{timing_emoji.get(timing, '📦')} {timing}-Workout 주원료 제품 랭킹 (상위 10개)"
                )
                print("-" * 80)

                if not rankings[timing].empty:
                    try:
                        print(rankings[timing].to_string(index=False))
                    except:
                        print(rankings[timing].to_markdown(index=False))
                    print(f"\n총 {len(rankings[timing])}개 제품 (prohibit 제외)")
                else:
                    print("추천 결과가 없습니다.")

            print(f"\n{'='*80}")

        else:
            print(f"⚠️ 사용자 ID {example_user_id}가 데이터셋에 없습니다.")
            print(
                f"사용 가능한 사용자 ID 범위: {min(dataset.mapping()[0].keys())} ~ {max(dataset.mapping()[0].keys())}"
            )
            print("\n사용 예시:")
            print(
                "  example_user_id = 555  # 실제 데이터셋에 있는 user_id로 변경하세요"
            )
    except Exception as e:
        print(f"❌ 오류 발생: {e}")
        import traceback

        traceback.print_exc()


사용자 ID 2에 대한 타이밍별 주원료 제품 랭킹
(LightFM 순서 + rule prohibit 체크)

✅ 사용자 정보 로드 완료 (rule 적용)

🏋️ Pre-Workout 주원료 제품 랭킹 (상위 10개)
--------------------------------------------------------------------------------
 rank                     product_id  Predicted_Score category                 product_name               brand_name_kor
    1 RAW_PERFORMANCE_STRAWBERRYKIWI        -9.210345   프리워크아웃    CBUM 퍼포먼스 프리 워크아웃 스트로베리키위    CBUM 퍼포먼스 프리 워크아웃 스트로베리키위
    2      BPI_CLACARNITINE_SNOWCONE       -10.323280     다이어트    BPI CLA + Carnitine 스노우 콘    BPI CLA + Carnitine 스노우 콘
    3    BPI_CLACARNITINE_RAINBOWICE       -10.403310     다이어트 BPI CLA + Carnitine 레인보우 아이스 BPI CLA + Carnitine 레인보우 아이스
    4    BPI_CLACARNITINE_FRUITPUNCH       -10.422634     다이어트    BPI CLA + Carnitine 후르츠펀치    BPI CLA + Carnitine 후르츠펀치
    5    SAMDAE_INSEASON_CHERRYLEMON       -10.601768     탄수화물            삼대오백 카보린 인시즌 체리레몬                         삼대오백
    6   SAMDAE_OFFSEASON_SWEETPOTATO       -10.611115     탄수화물         

## Prohibit 필터링 분석

prohibit로 제외된 제품들이 있는지 확인


In [18]:
# ================================================================
# Prohibit 필터링 분석: rule 적용 전후 비교
# ================================================================

# 필수 변수들이 정의되어 있는지 확인
required_vars = [
    "dataset",
    "model",
    "user_features_matrix",
    "item_features_matrix",
    "df_item_raw",
    "ITEM_ID_COL",
]
missing_vars = [var for var in required_vars if var not in globals()]

if missing_vars:
    print(f"❌ 다음 변수들이 정의되지 않았습니다: {', '.join(missing_vars)}")
    print("\n필요한 셀들을 먼저 실행하세요.")
else:
    # 사용자 ID 2에 대한 분석
    example_user_id = 2

    try:
        if example_user_id in dataset.mapping()[0]:
            print(f"\n{'='*80}")
            print(f"사용자 ID {example_user_id}에 대한 Prohibit 필터링 분석")
            print(f"{'='*80}\n")

            # 사용자 정보 가져오기
            user_dict = None
            if "users_converted" in globals() and len(users_converted) > 0:
                user_dict = users_converted.iloc[0].to_dict()
                print(f"✅ 사용자 정보 로드 완료")
                print(f"   건강 상태: {user_dict.get('health_conditions', [])}")
                print(f"   알레르기: {user_dict.get('allergy', [])}")

            # 각 타이밍별로 분석
            for timing in ["Pre", "Intra", "Post"]:
                print(f"\n{'='*60}")
                print(f"📊 {timing}-Workout 타이밍 분석")
                print(f"{'='*60}")

                # 1. rule 적용 전: LightFM 추천만
                all_recs = recommend_for_user(
                    user_id=example_user_id,
                    model=model,
                    dataset=dataset,
                    user_features_matrix=user_features_matrix,
                    item_features_matrix=item_features_matrix,
                    df_item_raw=df_item_raw,
                    k=250,
                )

                # 타이밍 필터링
                if timing and "timing_category" in all_recs.columns:
                    recs = all_recs[
                        all_recs["timing_category"].apply(
                            lambda x: (
                                timing in x if isinstance(x, list) else x == timing
                            )
                        )
                    ].copy()
                else:
                    recs = all_recs.copy()

                # 주원료만 필터링
                if "ingredient_type" in df_item_raw.columns:
                    merge_cols = [ITEM_ID_COL, "ingredient_type", "ingredients"]
                    for col in ["category", "product_name", "brand_name_kor"]:
                        if col not in recs.columns and col in df_item_raw.columns:
                            merge_cols.append(col)

                    recs_with_meta = recs.merge(
                        df_item_raw[merge_cols],
                        on=ITEM_ID_COL,
                        how="left",
                    )

                    main_ing_recs = recs_with_meta[
                        recs_with_meta["ingredient_type"] == "주원료"
                    ].copy()
                else:
                    main_ing_recs = recs.copy()

                total_before = len(main_ing_recs)
                print(f"\n✅ Rule 적용 전: 총 {total_before}개 주원료 제품")

                # 2. rule 적용 후: prohibit 체크
                if user_dict is not None and "rules" in globals():
                    rule_result = apply_rules_to_user(
                        rules if "rules" in globals() else None,
                        user_dict,
                        timing_filter=None,
                    )

                    # prohibit된 성분 확인
                    prohibited_ingredients = [
                        ing
                        for ing, info in rule_result.items()
                        if info.get("prohibit", False)
                    ]
                    print(
                        f"\n🚫 Prohibit된 성분: {prohibited_ingredients if prohibited_ingredients else '없음'}"
                    )

                    # 성분 키워드 매핑
                    INGREDIENT_KEYWORDS = {
                        "creatine": {"creatine", "creatine monohydrate"},
                        "caffeine": {"caffeine", "카페인"},
                        "arginine": {"arginine", "l-arginine"},
                        "l-theanine": {"theanine", "l-theanine"},
                        "glycerol": {"glycerol"},
                        "nitrate": {"nitrate", "비트", "beet"},
                        "taurine": {"taurine"},
                        "whey_protein": {"whey", "wpc", "wpi", "wph"},
                        "casein": {"casein", "micellar casein"},
                        "bcaa": {"bcaa", "leucine", "isoleucine", "valine"},
                        "betaine": {"betaine"},
                        "glutamine": {"glutamine"},
                        "l-carnitine": {"carnitine"},
                    }

                    def product_has_ingredient(
                        product_row, rule_ingredient: str
                    ) -> bool:
                        ingredients_str = str(
                            product_row.get("ingredients", "")
                        ).lower()
                        kws = INGREDIENT_KEYWORDS.get(
                            rule_ingredient, {rule_ingredient.lower()}
                        )
                        for kw in kws:
                            if kw.lower() in ingredients_str:
                                return True
                        return False

                    # prohibit된 제품 필터링
                    filtered_recs = []
                    prohibited_products = []

                    for _, row in main_ing_recs.iterrows():
                        is_prohibited = False
                        for ing in prohibited_ingredients:
                            if product_has_ingredient(row, ing):
                                is_prohibited = True
                                prohibited_products.append(
                                    {
                                        "product_name": row.get(
                                            "product_name", "Unknown"
                                        ),
                                        "prohibited_ingredient": ing,
                                        "rank_before": row.get(
                                            "Predicted_Score", "N/A"
                                        ),
                                    }
                                )
                                break

                        if not is_prohibited:
                            filtered_recs.append(row)

                    total_after = len(filtered_recs)
                    num_prohibited = len(prohibited_products)

                    print(f"✅ Rule 적용 후: 총 {total_after}개 주원료 제품")
                    print(f"🚫 제외된 제품: {num_prohibited}개")

                    if prohibited_products:
                        print(f"\n📋 제외된 제품 상세:")
                        for i, prod in enumerate(prohibited_products, 1):
                            print(f"   {i}. {prod['product_name']}")
                            print(
                                f"      제외 이유: {prod['prohibited_ingredient']} prohibit"
                            )

                    # 상위 10개 비교
                    main_ing_recs_sorted = main_ing_recs.sort_values(
                        by="Predicted_Score", ascending=False
                    ).reset_index(drop=True)

                    print(f"\n🔍 상위 10개 제품 비교:")
                    print(f"\n   [Rule 적용 전]")
                    for i, row in main_ing_recs_sorted.head(10).iterrows():
                        product_name = row.get("product_name", "Unknown")
                        score = row.get("Predicted_Score", 0)
                        print(f"   {i+1:2d}. {product_name:<40} Score: {score:.2f}")

                    print(f"\n   [Rule 적용 후 - 상위 10개]")
                    filtered_df = (
                        pd.DataFrame(filtered_recs)
                        .sort_values(by="Predicted_Score", ascending=False)
                        .reset_index(drop=True)
                    )
                    for i, (_, row) in enumerate(filtered_df.head(10).iterrows(), 1):
                        product_name = row.get("product_name", "Unknown")
                        score = row.get("Predicted_Score", 0)
                        print(f"   {i+1:2d}. {product_name:<40} Score: {score:.2f}")

                    # 통계
                    print(f"\n📈 필터링 통계:")
                    print(
                        f"   제외율: {num_prohibited/total_before*100:.1f}% ({num_prohibited}/{total_before})"
                    )
                    if num_prohibited > 0:
                        print(f"   주원료 제품 중 prohibit로 제외된 제품이 있습니다!")
                    else:
                        print(f"   주원료 제품 중 prohibit로 제외된 제품이 없습니다.")
                else:
                    print(f"\n⚠️ Rule 적용 불가: user_dict 또는 rules가 없습니다.")

        else:
            print(f"⚠️ 사용자 ID {example_user_id}가 데이터셋에 없습니다.")

    except Exception as e:
        print(f"❌ 오류 발생: {e}")
        import traceback

        traceback.print_exc()


사용자 ID 2에 대한 Prohibit 필터링 분석

✅ 사용자 정보 로드 완료
   건강 상태: ['수면장애']
   알레르기: ['카페인']

📊 Pre-Workout 타이밍 분석

✅ Rule 적용 전: 총 70개 주원료 제품

🚫 Prohibit된 성분: ['caffeine']
✅ Rule 적용 후: 총 14개 주원료 제품
🚫 제외된 제품: 56개

📋 제외된 제품 상세:
   1. 노익스플로드 후르츠펀치
      제외 이유: caffeine prohibit
   2. CBUM 에센셜 프리 워크아웃 오렌지
      제외 이유: caffeine prohibit
   3. 노익스플로드 포도
      제외 이유: caffeine prohibit
   4. 애니멀 프라이멀 프리워크아웃 과일펀치
      제외 이유: caffeine prohibit
   5. 노익스플로드 그린애플
      제외 이유: caffeine prohibit
   6. CBUM 에센셜 프리 워크아웃 포도
      제외 이유: caffeine prohibit
   7. 예버디 사워베리
      제외 이유: caffeine prohibit
   8. C4 오리지널 프리워크웃 오렌지맛
      제외 이유: caffeine prohibit
   9. C4 오리지널 프리워크아웃 파인애플
      제외 이유: caffeine prohibit
   10. CBUM 에센셜 프리 워크아웃 복숭아 망고
      제외 이유: caffeine prohibit
   11. 삼대오백 프리워크아웃 오렌지맛
      제외 이유: caffeine prohibit
   12. 예버디 딸기레모네이드
      제외 이유: caffeine prohibit
   13. CBUM 에센셜 프리 워크아웃 시트러스 자몽
      제외 이유: caffeine prohibit
   14. 예버디 그린애플
      제외 이유: caffeine prohibit
   15. 애니멀 프라이멀 프리워크아웃 딸기수박맛
 

## 각 시점별 Top1 주원료에 나머지 룰 적용

각 시점(Pre/Intra/Post)별로 Top-1 주원료 제품을 선택한 후, rule에서 정의된 dose, synergy, warning 등을 모두 적용하여 최종 추천 결과 생성


In [19]:
# ================================================================
# 각 시점별 Top1 주원료에 나머지 룰 적용 함수
# ================================================================


def generate_final_recommendations_with_rules(
    user_id,
    model,
    dataset,
    user_features_matrix,
    item_features_matrix,
    df_item_raw,
    users_converted_df,
    rules_df,
    topk_main=10,  # 주원료 제품을 가져올 때 상위 k개
):
    """
    각 시점별 Top-1 주원료 제품 선택 후 모든 룰 적용하여 최종 추천 생성

    Returns:
    --------
    Dict[str, Dict]: 각 시점별 추천 결과
        {
            "Pre": {
                "main_product": {...},
                "supplements": [...],  # 보조 제품들
                "synergy": [...],
                "warnings": [...],
                "total_dose": {...},
            },
            ...
        }
    """

    # 1. 사용자 정보 가져오기 (user_id로 매칭)
    user_dict = None
    if users_converted_df is not None and len(users_converted_df) > 0:
        # dataset의 user_id와 users_converted_df의 인덱스가 다를 수 있으므로
        # 일단 첫 번째 사용자 사용 (실제로는 user_id 매핑 필요)
        user_dict = users_converted_df.iloc[0].to_dict()

    if user_dict is None:
        print("⚠️ 사용자 정보를 찾을 수 없습니다.")
        return {}

    # 2. 각 시점별 주원료 랭킹 가져오기 (prohibit 포함)
    rankings = get_all_timing_main_ingredient_rankings(
        user_id=user_id,
        model=model,
        dataset=dataset,
        user_features_matrix=user_features_matrix,
        item_features_matrix=item_features_matrix,
        df_item_raw=df_item_raw,
        topk=topk_main,
        user_dict=user_dict,
        rules_df=rules_df,
    )

    # 3. 룰 적용
    rule_result = apply_rules_to_user(rules_df, user_dict, timing_filter=None)

    # 4. 각 시점별로 Top-1 주원료 선택 및 룰 적용
    final_results = {}

    for timing in ["Pre", "Intra", "Post"]:
        timing_kor = {"Pre": "전", "Intra": "중", "Post": "후"}[timing]

        # Top-1 주원료 제품 가져오기
        if rankings[timing].empty:
            final_results[timing] = {
                "main_product": None,
                "supplements": [],
                "synergy": [],
                "warnings": [],
                "message": "추천할 주원료 제품이 없습니다.",
            }
            continue

        top1_product = rankings[timing].iloc[0].to_dict()
        main_product_name = top1_product.get("product_name", "Unknown")

        # 제품의 성분 확인
        product_id = top1_product.get("product_id")
        product_ingredients = ""
        if product_id:
            product_row = df_item_raw[df_item_raw["product_id"] == product_id]
            if not product_row.empty:
                product_ingredients = str(
                    product_row.iloc[0].get("ingredients", "")
                ).lower()

        print(f"\n{'='*80}")
        print(f"📌 {timing}-Workout 타이밍: Top-1 주원료 제품")
        print(f"{'='*80}")
        print(f"제품명: {main_product_name}")
        print(f"카테고리: {top1_product.get('category', 'N/A')}")
        print(f"브랜드: {top1_product.get('brand_name_kor', 'N/A')}")

        # 5. 해당 제품에 대한 룰 정보 추출
        supplements = []
        synergy_list = []
        warnings = []
        dose_info = {}

        # 제품이 포함하는 주요 성분 찾기
        INGREDIENT_KEYWORDS = {
            "creatine": {"creatine", "creatine monohydrate"},
            "caffeine": {"caffeine", "카페인"},
            "arginine": {"arginine", "l-arginine"},
            "l-theanine": {"theanine", "l-theanine"},
            "glycerol": {"glycerol"},
            "nitrate": {"nitrate", "비트", "beet"},
            "taurine": {"taurine"},
            "whey_protein": {"whey", "wpc", "wpi", "wph"},
            "casein": {"casein", "micellar casein"},
            "bcaa": {"bcaa", "leucine", "isoleucine", "valine"},
            "betaine": {"betaine"},
            "glutamine": {"glutamine"},
            "l-carnitine": {"carnitine"},
        }

        def product_has_ingredient(
            product_ingredients_str, rule_ingredient: str
        ) -> bool:
            kws = INGREDIENT_KEYWORDS.get(rule_ingredient, {rule_ingredient.lower()})
            for kw in kws:
                if kw.lower() in product_ingredients_str:
                    return True
            return False

        # 룰 결과를 타이밍별로 필터링하여 적용
        rule_result_timing = apply_rules_to_user(
            rules_df, user_dict, timing_filter=timing_kor
        )

        # 주원료 제품의 성분에 대해 룰 정보 추출
        for ing, info in rule_result_timing.items():
            if info.get("prohibit", False):
                continue  # prohibit는 이미 필터링됨

            # 제품이 해당 성분을 포함하는지 확인
            if product_has_ingredient(product_ingredients, ing):
                # 용량 정보
                if info.get("dose_value") is not None:
                    dose_info[ing] = {
                        "value": info["dose_value"],
                        "unit": info.get("dose_unit", ""),
                        "range": None,
                    }
                elif info.get("dose_range") is not None:
                    lo, hi, unit = info["dose_range"]
                    dose_info[ing] = {
                        "value": None,
                        "unit": unit,
                        "range": (lo, hi),
                    }

                # Synergy 정보
                if info.get("synergy"):
                    for syn_ing, syn_unit in info["synergy"]:
                        synergy_list.append(
                            {
                                "with": ing,
                                "synergy_ingredient": syn_ing,
                                "unit": syn_unit,
                            }
                        )

                # Warning 정보
                if info.get("warnings"):
                    warnings.extend(
                        [
                            {
                                "ingredient": ing,
                                "warning": w,
                            }
                            for w in info["warnings"]
                        ]
                    )

        # 보조 제품 추천 (synergy 성분이 있다면)
        if synergy_list:
            print(f"\n💡 Synergy 추천 보조 제품:")
            for idx, synergy in enumerate(synergy_list, 1):
                syn_ing = synergy["synergy_ingredient"]
                syn_unit = synergy.get("unit", "")

                # 해당 성분을 포함하는 보조 제품 찾기
                # TODO: 실제로는 주원료가 아닌 보조 제품을 추천해야 함
                print(f"   {idx}. {syn_ing}{syn_unit}")
                supplements.append(
                    {
                        "ingredient": syn_ing,
                        "unit": syn_unit,
                        "reason": f"'{synergy['with']}'와 함께 복용 시 효과 향상",
                    }
                )

        # 용량 정보 출력
        if dose_info:
            print(f"\n💊 권장 용량:")
            for ing, dose in dose_info.items():
                if dose["range"]:
                    lo, hi = dose["range"]
                    print(f"   • {ing}: {lo}-{hi}{dose['unit']}")
                elif dose["value"]:
                    print(f"   • {ing}: {dose['value']}{dose['unit']}")

        # 경고 출력
        if warnings:
            print(f"\n⚠️ 주의사항:")
            for warning in warnings:
                print(f"   • {warning['ingredient']}: {warning['warning']}")

        # 최종 결과 저장
        final_results[timing] = {
            "main_product": {
                "product_id": top1_product.get("product_id"),
                "product_name": main_product_name,
                "category": top1_product.get("category"),
                "brand_name_kor": top1_product.get("brand_name_kor"),
                "score": top1_product.get("Predicted_Score"),
                "rank": top1_product.get("rank"),
            },
            "supplements": supplements,
            "synergy": synergy_list,
            "warnings": warnings,
            "dose_info": dose_info,
        }

    return final_results

In [20]:
  result = generate_final_recommendations_with_rules(
      user_id=2,
      model=model,
      dataset=dataset,
      user_features_matrix=user_features_matrix,
      item_features_matrix=item_features_matrix,
      df_item_raw=df_item_raw,
      users_converted_df=users_converted,
      rules_df=rules,
      topk_main=10,
  )

  result


📌 Pre-Workout 타이밍: Top-1 주원료 제품
제품명: CBUM 퍼포먼스 프리 워크아웃 스트로베리키위
카테고리: 프리워크아웃
브랜드: CBUM 퍼포먼스 프리 워크아웃 스트로베리키위

💊 권장 용량:
   • betaine: 2.0g

📌 Intra-Workout 타이밍: Top-1 주원료 제품
제품명: 게토레이 파우더 오렌지
카테고리: 인트라워크아웃
브랜드: 게토레이

📌 Post-Workout 타이밍: Top-1 주원료 제품
제품명: 옵티멈 뉴트리션 골드스탠다드 웨이 초콜릿맛
카테고리: 프로틴
브랜드: 옵티멈 뉴트리션 골드스탠다드 웨이 초콜릿맛


{'Pre': {'main_product': {'product_id': 'RAW_PERFORMANCE_STRAWBERRYKIWI',
   'product_name': 'CBUM 퍼포먼스 프리 워크아웃 스트로베리키위',
   'category': '프리워크아웃',
   'brand_name_kor': 'CBUM 퍼포먼스 프리 워크아웃 스트로베리키위',
   'score': -9.210345268249512,
   'rank': 1},
  'supplements': [],
  'synergy': [],
  'dose_info': {'betaine': {'value': 2.0, 'unit': 'g', 'range': None}}},
 'Intra': {'main_product': {'product_id': 'GATORADE_POWDER_ORANGE',
   'product_name': '게토레이 파우더 오렌지',
   'category': '인트라워크아웃',
   'brand_name_kor': '게토레이',
   'score': -4.246463775634766,
   'rank': 1},
  'supplements': [],
  'synergy': [],
  'dose_info': {}},
 'Post': {'main_product': {'product_id': 'OPTIMUM_GSWHEY_CHOCOLATE',
   'product_name': '옵티멈 뉴트리션 골드스탠다드 웨이 초콜릿맛',
   'category': '프로틴',
   'brand_name_kor': '옵티멈 뉴트리션 골드스탠다드 웨이 초콜릿맛',
   'score': -4.745079040527344,
   'rank': 1},
  'supplements': [],
  'synergy': [],
  'dose_info': {}}}

## 레시피 생성: 필요한 성분 계산 및 보조 제품 추천

각 타이밍별로 룰 기반으로 필요한 모든 성분의 권장량을 계산하고, 주원료 제품이 충분하지 않으면 부원료 추가하여 레시피를 완성


In [21]:
# ================================================================
# 레시피 생성: 필요한 성분 계산 및 보조 제품 추가
# ================================================================


def parse_product_ingredients(ingredients_str):
    """
    제품의 성분 문자열을 파싱하여 성분명과 함량 추출

    Returns:
    --------
    Dict[str, float]: {성분명: 함량(g 또는 mg)} 형태
    """
    import json
    import re

    if pd.isna(ingredients_str) or not ingredients_str:
        return {}

    # JSON 형식인 경우 파싱
    try:
        ingredients_list = json.loads(ingredients_str)
    except:
        ingredients_list = []

    ingredient_amounts = {}

    for item in ingredients_list:
        if isinstance(item, dict):
            ingredient_name = item.get("ingredient", "").lower()
            amount = item.get("amount")
            unit = item.get("unit", "")

            if amount is not None:
                # unit을 g로 통일
                if unit and "mg" in unit.lower():
                    amount_in_g = amount / 1000.0
                else:
                    amount_in_g = float(amount)

                ingredient_amounts[ingredient_name] = amount_in_g

    return ingredient_amounts


def generate_recipe_for_timing(
    timing,
    user_dict,
    rules_df,
    items_df,
    rankings,
):
    """
    특정 타이밍에 대한 완전한 레시피 생성

    Returns:
    --------
    Dict: 레시피 정보
        {
            "timing": "Pre",
            "required_ingredients": {...},  # 필요한 성분
            "main_product": {...},  # 주원료 제품
            "supplement_products": [...],  # 보조 제품들
            "total_recipe": {...},  # 전체 레시피
        }
    """

    timing_kor = {"Pre": "전", "Intra": "중", "Post": "후"}[timing]

    # 1. 룰 기반으로 필요한 성분과 용량 계산
    rule_result = apply_rules_to_user(rules_df, user_dict, timing_filter=timing_kor)

    required_ingredients = {}
    for ing, info in rule_result.items():
        if info.get("prohibit", False):
            continue

        # 용량 정보 추출
        if info.get("dose_value") is not None:
            required_ingredients[ing] = {
                "required": info["dose_value"],
                "unit": info.get("dose_unit", ""),
            }
        elif info.get("dose_range") is not None:
            lo, hi, unit = info["dose_range"]
            required_ingredients[ing] = {
                "required": (lo + hi) / 2.0,  # 범위의 중간값 사용
                "unit": unit,
            }

    print(f"\n{'='*80}")
    print(f"📋 {timing}-Workout 레시피 생성")
    print(f"{'='*80}")
    print(f"\n✅ 룰 기반 필요 성분: {len(required_ingredients)}개")
    for ing, info in required_ingredients.items():
        print(f"   • {ing}: {info['required']}{info['unit']}")

    # 2. Top-1 주원료 제품 선택
    if rankings[timing].empty:
        return {
            "timing": timing,
            "required_ingredients": required_ingredients,
            "main_product": None,
            "supplement_products": [],
            "message": "추천할 주원료 제품이 없습니다.",
        }

    top1_product = rankings[timing].iloc[0].to_dict()
    main_product_id = top1_product.get("product_id")

    # 주원료 제품의 성분 정보 가져오기
    main_product_row = items_df[items_df["product_id"] == main_product_id]
    if main_product_row.empty:
        main_product_ingredients = {}
    else:
        ingredients_str = main_product_row.iloc[0].get("ingredients", "")
        main_product_ingredients = parse_product_ingredients(ingredients_str)

    print(f"\n🏆 주원료 제품: {top1_product.get('product_name')}")
    print(f"   제품 성분:")
    for ing, amount in main_product_ingredients.items():
        print(f"   • {ing}: {amount}g")

    # 3. 주원료 제품에서 얻을 수 있는 성분 계산
    ingredient_keywords = {
        "creatine": {"creatine", "creatine monohydrate"},
        "caffeine": {"caffeine", "카페인"},
        "arginine": {"arginine", "l-arginine"},
        "l-theanine": {"theanine", "l-theanine"},
        "glycerol": {"glycerol"},
        "nitrate": {"nitrate", "비트", "beet"},
        "taurine": {"taurine"},
        "whey_protein": {"whey", "wpc", "wpi", "wph"},
        "casein": {"casein", "micellar casein"},
        "bcaa": {"bcaa", "leucine", "isoleucine", "valine"},
        "betaine": {"betaine"},
        "glutamine": {"glutamine"},
        "l-carnitine": {"carnitine"},
    }

    def find_ingredient_in_product(product_ingredients, rule_ingredient):
        """제품 성분에서 룰 성분 찾기"""
        kws = ingredient_keywords.get(rule_ingredient, {rule_ingredient.lower()})
        for prod_ing, amount in product_ingredients.items():
            for kw in kws:
                if kw.lower() in prod_ing:
                    return amount
        return None

    # 주원료로 커버되는 성분 계산
    covered_by_main = {}
    still_needed = {}

    for ing, required_info in required_ingredients.items():
        available = find_ingredient_in_product(main_product_ingredients, ing)

        if available is not None:
            covered_by_main[ing] = available
            deficit = required_info["required"] - available

            if deficit > 0:
                still_needed[ing] = {
                    "needed": deficit,
                    "unit": required_info["unit"],
                    "covered_by_main": available,
                    "required": required_info["required"],
                }
        else:
            still_needed[ing] = {
                "needed": required_info["required"],
                "unit": required_info["unit"],
                "covered_by_main": 0,
                "required": required_info["required"],
            }

    print(f"\n📊 주원료로 커버:")
    for ing, amount in covered_by_main.items():
        print(f"   ✅ {ing}: {amount}g")

    if still_needed:
        print(f"\n⚠️ 추가 필요:")
        for ing, info in still_needed.items():
            print(
                f"   • {ing}: {info['needed']:.2f}{info['unit']} (주원료: {info['covered_by_main']:.2f}g / 필요: {info['required']:.2f}g)"
            )

    # 4. 필요한 성분을 보조 제품에서 찾기
    supplement_products = []

    if still_needed:
        print(f"\n🔍 보조 제품 검색 중...")

        # 부원료만 보조 제품으로 사용 (ingredient_type == "부원료")
        supplement_items = items_df[items_df["ingredient_type"] == "부원료"].copy()

        for ing, need_info in still_needed.items():
            best_supplement = None
            best_match_score = 0

            for _, item_row in supplement_items.iterrows():
                item_ingredients_str = item_row.get("ingredients", "")
                item_ingredients = parse_product_ingredients(item_ingredients_str)

                available = find_ingredient_in_product(item_ingredients, ing)

                if available is not None:
                    # 필요량에 가장 가까운 제품 선택
                    match_score = (
                        min(available, need_info["needed"]) / need_info["needed"]
                    )
                    if match_score > best_match_score:
                        best_match_score = match_score
                        best_supplement = {
                            "product_id": item_row.get("product_id"),
                            "product_name": item_row.get("product_name"),
                            "brand_name_kor": item_row.get("brand_name_kor"),
                            "ingredient": ing,
                            "amount_per_serving": available,
                            "needed": need_info["needed"],
                            "servings_needed": int(
                                (need_info["needed"] / available) + 0.5
                            ),  # 올림
                        }

            if best_supplement:
                supplement_products.append(best_supplement)
                print(
                    f"   ✅ {ing}: {best_supplement['product_name']} ({best_supplement['servings_needed']}회분)"
                )

    # 5. 최종 레시피 구성
    total_recipe = {}
    for ing in required_ingredients.keys():
        from_main = covered_by_main.get(ing, 0)
        from_supplements = 0

        for supp in supplement_products:
            if supp["ingredient"] == ing:
                from_supplements += supp["amount_per_serving"] * supp["servings_needed"]

        total_recipe[ing] = from_main + from_supplements

    print(f"\n📝 최종 레시피:")
    for ing, total in total_recipe.items():
        unit = required_ingredients[ing]["unit"]
        required = required_ingredients[ing]["required"]
        print(f"   • {ing}: {total:.2f}{unit} (목표: {required:.2f}{unit})")

    return {
        "timing": timing,
        "required_ingredients": required_ingredients,
        "main_product": top1_product,
        "supplement_products": supplement_products,
        "total_recipe": total_recipe,
        "covered_by_main": covered_by_main,
        "still_needed": still_needed,
    }


print("✅ 레시피 생성 함수 정의 완료!")

✅ 레시피 생성 함수 정의 완료!


In [22]:
# ================================================================
# 레시피 생성 실행
# ================================================================

# 필수 변수들이 정의되어 있는지 확인
required_vars = [
    "dataset",
    "model",
    "user_features_matrix",
    "item_features_matrix",
    "df_item_raw",
    "ITEM_ID_COL",
    "users_converted",
    "rules",
]
missing_vars = [var for var in required_vars if var not in globals()]

if missing_vars:
    print(f"❌ 다음 변수들이 정의되지 않았습니다: {', '.join(missing_vars)}")
    print("\n필요한 셀들을 먼저 실행하세요.")
else:
    # 사용자 ID 2에 대한 레시피 생성
    example_user_id = 2

    try:
        if example_user_id in dataset.mapping()[0]:
            print(f"\n{'='*80}")
            print(f"사용자 ID {example_user_id}에 대한 완전한 레시피 생성")
            print(f"{'='*80}")

            # 사용자 정보 가져오기
            user_dict = users_converted.iloc[0].to_dict()

            # 각 시점별 주원료 랭킹 가져오기
            rankings = get_all_timing_main_ingredient_rankings(
                user_id=example_user_id,
                model=model,
                dataset=dataset,
                user_features_matrix=user_features_matrix,
                item_features_matrix=item_features_matrix,
                df_item_raw=df_item_raw,
                topk=10,
                user_dict=user_dict,
                rules_df=rules,
            )

            # 각 타이밍별로 레시피 생성
            all_recipes = {}
            for timing in ["Pre", "Intra", "Post"]:
                recipe = generate_recipe_for_timing(
                    timing=timing,
                    user_dict=user_dict,
                    rules_df=rules,
                    items_df=df_item_raw,
                    rankings=rankings,
                )
                all_recipes[timing] = recipe

            # 요약 출력
            print(f"\n\n{'='*80}")
            print(f"📊 최종 레시피 요약")
            print(f"{'='*80}")

            for timing in ["Pre", "Intra", "Post"]:
                recipe = all_recipes.get(timing, {})

                if recipe.get("main_product"):
                    print(
                        f"\n{'🏋️💧💪'[['Pre','Intra','Post'].index(timing)]} {timing}-Workout 레시피:"
                    )
                    print(f"   주원료: {recipe['main_product']['product_name']}")

                    supplements = recipe.get("supplement_products", [])
                    if supplements:
                        print(f"   보조 제품: {len(supplements)}개")
                        for supp in supplements:
                            print(
                                f"      • {supp['product_name']}: {supp['servings_needed']}회분 ({supp['ingredient']} {supp['amount_per_serving']:.2f}g/serving × {supp['servings_needed']})"
                            )
                    else:
                        print(f"   보조 제품: 없음 (주원료로 충분)")
                else:
                    print(
                        f"\n{'🏋️💧💪'[['Pre','Intra','Post'].index(timing)]} {timing}-Workout:"
                    )
                    print(f"   {recipe.get('message', '추천 결과 없음')}")

            print(f"\n{'='*80}")

        else:
            print(f"⚠️ 사용자 ID {example_user_id}가 데이터셋에 없습니다.")

    except Exception as e:
        print(f"❌ 오류 발생: {e}")
        import traceback

        traceback.print_exc()


사용자 ID 2에 대한 완전한 레시피 생성

📋 Pre-Workout 레시피 생성

✅ 룰 기반 필요 성분: 3개
   • arginine: 6.0g
   • taurine: 1.0g
   • betaine: 2.0g

🏆 주원료 제품: CBUM 퍼포먼스 프리 워크아웃 스트로베리키위
   제품 성분:
   • l-citrulline: 6.0g
   • creatine monohydrate: 5.0g
   • beta alanine: 3.2g
   • betaine anhydrous: 2.5g
   • l-tyrosine: 2.0g

📊 주원료로 커버:
   ✅ betaine: 2.5g

⚠️ 추가 필요:
   • arginine: 6.00g (주원료: 0.00g / 필요: 6.00g)
   • taurine: 1.00g (주원료: 0.00g / 필요: 1.00g)

🔍 보조 제품 검색 중...

📝 최종 레시피:
   • arginine: 0.00g (목표: 6.00g)
   • taurine: 0.00g (목표: 1.00g)
   • betaine: 2.50g (목표: 2.00g)

📋 Intra-Workout 레시피 생성

✅ 룰 기반 필요 성분: 0개

🏆 주원료 제품: 게토레이 파우더 오렌지
   제품 성분:

📊 주원료로 커버:

📝 최종 레시피:

📋 Post-Workout 레시피 생성

✅ 룰 기반 필요 성분: 1개
   • creatine: 5.0g

🏆 주원료 제품: 옵티멈 뉴트리션 골드스탠다드 웨이 초콜릿맛
   제품 성분:

📊 주원료로 커버:

⚠️ 추가 필요:
   • creatine: 5.00g (주원료: 0.00g / 필요: 5.00g)

🔍 보조 제품 검색 중...
   ✅ creatine: MD 크레아틴 (1회분)

📝 최종 레시피:
   • creatine: 5.00g (목표: 5.00g)


📊 최종 레시피 요약

🏋 Pre-Workout 레시피:
   주원료: CBUM 퍼포먼스 프리 워크아웃 스트로베리키위
   보조 제품:

In [24]:
# ================================================================
# 간단 실행: 사용자에 대한 레시피 생성
# ================================================================


def generate_user_recipe_simple(user_id=2):
    """
    사용자 레시피 생성 (간단 버전)

    Parameters:
    -----------
    user_id : int
        사용자 ID

    Returns:
    --------
    dict : 각 타이밍별 레시피
    """

    # 필수 변수 확인
    required_vars = [
        "dataset",
        "model",
        "user_features_matrix",
        "item_features_matrix",
        "df_item_raw",
        "users_converted",
        "rules",
    ]
    missing = [v for v in required_vars if v not in globals()]
    if missing:
        print(f"❌ 누락된 변수: {', '.join(missing)}")
        return None

    # 사용자 정보
    user_dict = users_converted.iloc[0].to_dict()

    # 타이밍별 주원료 랭킹
    rankings = get_all_timing_main_ingredient_rankings(
        user_id=user_id,
        model=model,
        dataset=dataset,
        user_features_matrix=user_features_matrix,
        item_features_matrix=item_features_matrix,
        df_item_raw=df_item_raw,
        topk=10,
        user_dict=user_dict,
        rules_df=rules,
    )

    # 타이밍별 레시피 생성
    all_recipes = {}
    for timing in ["Pre", "Intra", "Post"]:
        all_recipes[timing] = generate_recipe_for_timing(
            timing=timing,
            user_dict=user_dict,
            rules_df=rules,
            items_df=df_item_raw,
            rankings=rankings,
        )

    return all_recipes


# 실행
print("🚀 레시피 생성 시작...")
recipes = generate_user_recipe_simple(user_id=2)

if recipes:
    print("\n\n" + "=" * 80)
    print("✅ 레시피 생성 완료!")
    print("=" * 80)

🚀 레시피 생성 시작...

📋 Pre-Workout 레시피 생성

✅ 룰 기반 필요 성분: 3개
   • arginine: 6.0g
   • taurine: 1.0g
   • betaine: 2.0g

🏆 주원료 제품: CBUM 퍼포먼스 프리 워크아웃 스트로베리키위
   제품 성분:
   • l-citrulline: 6.0g
   • creatine monohydrate: 5.0g
   • beta alanine: 3.2g
   • betaine anhydrous: 2.5g
   • l-tyrosine: 2.0g

📊 주원료로 커버:
   ✅ betaine: 2.5g

⚠️ 추가 필요:
   • arginine: 6.00g (주원료: 0.00g / 필요: 6.00g)
   • taurine: 1.00g (주원료: 0.00g / 필요: 1.00g)

🔍 보조 제품 검색 중...

📝 최종 레시피:
   • arginine: 0.00g (목표: 6.00g)
   • taurine: 0.00g (목표: 1.00g)
   • betaine: 2.50g (목표: 2.00g)

📋 Intra-Workout 레시피 생성

✅ 룰 기반 필요 성분: 0개

🏆 주원료 제품: 게토레이 파우더 오렌지
   제품 성분:

📊 주원료로 커버:

📝 최종 레시피:

📋 Post-Workout 레시피 생성

✅ 룰 기반 필요 성분: 1개
   • creatine: 5.0g

🏆 주원료 제품: 옵티멈 뉴트리션 골드스탠다드 웨이 초콜릿맛
   제품 성분:

📊 주원료로 커버:

⚠️ 추가 필요:
   • creatine: 5.00g (주원료: 0.00g / 필요: 5.00g)

🔍 보조 제품 검색 중...
   ✅ creatine: MD 크레아틴 (1회분)

📝 최종 레시피:
   • creatine: 5.00g (목표: 5.00g)


✅ 레시피 생성 완료!
