In [3]:
pip install google.generativeai

Collecting google.generativeai
  Downloading google_generativeai-0.8.5-py3-none-any.whl.metadata (3.9 kB)
Collecting google-ai-generativelanguage==0.6.15 (from google.generativeai)
  Downloading google_ai_generativelanguage-0.6.15-py3-none-any.whl.metadata (5.7 kB)
Collecting google-api-core (from google.generativeai)
  Downloading google_api_core-2.25.1-py3-none-any.whl.metadata (3.0 kB)
Collecting google-api-python-client (from google.generativeai)
  Downloading google_api_python_client-2.179.0-py3-none-any.whl.metadata (7.0 kB)
Collecting google-auth>=2.15.0 (from google.generativeai)
  Downloading google_auth-2.40.3-py2.py3-none-any.whl.metadata (6.2 kB)
Collecting proto-plus<2.0.0dev,>=1.22.3 (from google-ai-generativelanguage==0.6.15->google.generativeai)
  Downloading proto_plus-1.26.1-py3-none-any.whl.metadata (2.2 kB)
Collecting googleapis-common-protos<2.0.0,>=1.56.2 (from google-api-core->google.generativeai)
  Downloading googleapis_common_protos-1.70.0-py3-none-any.whl.met

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
streamlit 1.32.0 requires protobuf<5,>=3.20, but you have protobuf 5.29.5 which is incompatible.


In [11]:
# -*- coding: utf-8 -*-
"""
Persona-based Monthly Demand Forecast (2024-07 ~ 2025-06)
— product_info → (가격/GRP/ACV 자동 파싱) → 페르소나 시뮬레이션 → submission.csv

Input : product_info.csv [product_name, product_feature, category_level_1/2/3]
Output: submission.csv (months_since_launch_1..12, int)
        monthly_forecast.csv (마지막 제품 디버그용)
        persona_single_turn_prompt.txt/.pdf, sources_used.json, solution_report.md
"""

from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
import numpy as np, pandas as pd
import json, random, math, os, datetime as dt, re

# ---------- PDF export (optional, 없으면 txt만 저장) ----------
try:
    from reportlab.lib.pagesizes import A4
    from reportlab.pdfgen import canvas as pdfcanvas
    _HAS_RL = True
except Exception:
    _HAS_RL = False

# ---------- Dates ----------
FORECAST_START = dt.date(2024,7,1)
FORECAST_END   = dt.date(2025,6,30)
MONTHS = pd.period_range(FORECAST_START, FORECAST_END, freq='M')
RNG = np.random.default_rng(42)
random.seed(42)

# ---------- Sources registry (출처 기록 파일) ----------
class SourcesRegistry:
    def __init__(self): self._items=[]
    def add(self, kind, title, how_used, url=None, notes=""):
        self._items.append({"kind":kind,"title":title,"url":url,"how_used":how_used,
                             "notes":notes,"added_at":dt.datetime.now().isoformat()})
    def to_json(self, path="sources_used.json"):
        with open(path,"w",encoding="utf-8") as f: json.dump(self._items,f,ensure_ascii=False,indent=2)

SOURCES = SourcesRegistry()
SOURCES.add('paper','Using LLMs for Market Research (Brand, Israeli, Ngwe, 2024)',
            'LLM-as-simulator 절차/싱글턴 프롬프트 설계','(local pdf)')

# ---------- Persona schema ----------
ATTRIBUTES = [
    'age','gender','income_band','region','household_size','lifestyle',
    'health_focus','price_sensitivity','brand_loyalty','online_offline_mix',
    'channel_preference','promo_reactivity','ad_reach_susceptibility',
    'environmental_concern','innovation_seeker'
]

from dataclasses import dataclass
@dataclass
class Persona:
    persona_id:str; name:str
    age:int; gender:str; income_band:str; region:str; household_size:int; lifestyle:str
    health_focus:float; price_sensitivity:float; brand_loyalty:float; online_offline_mix:float
    channel_preference:str; promo_reactivity:float; ad_reach_susceptibility:float
    environmental_concern:float; innovation_seeker:float
    weights:Dict[str,float]; monthly_pattern:List[float]

# ---------- LLM prompt ----------
SINGLE_TURN_PERSONA_PROMPT = ("""
You are a market-simulation engine. Generate N synthetic Korean consumer personas for a **Dongwon new product** launch.
Return **valid JSON** only (list of persona objects).
Context:
- Category: {category}
- Product concept: {concept}
- Target price (KRW): {price}
- Packaging/size: {pack}
- Channels: {channels}
- Competitors: {competitors}
- Target market size (12-month addressable): {market_size}
- Launch months: 2024-07 to 2025-06

Each persona fields:
- persona_id, name
- age (18-69), gender ("남"|"여"), region (서울/수도권/광역시/기타), household_size (1-5)
- income_band ("~2천","2-4천","4-7천","7천~"), lifestyle (≤12 chars)
- health_focus, price_sensitivity, brand_loyalty, online_offline_mix, promo_reactivity,
  ad_reach_susceptibility, environmental_concern, innovation_seeker (0~1)
- channel_preference in {channel_vocab}
- weights: map **≥10** of {attribute_list} to weights in [-2.0, +2.0] (utility contribution for THIS product)
- monthly_pattern: list[12] for 2024-07..2025-06 in 0.6~1.4 (persona seasonality)

Constraints: diversify personas; JSON only (no comments/trailing commas).
""").strip()

def _build_single_turn_prompt(category, concept, price, pack, channels, competitors, market_size, channel_vocab):
    return SINGLE_TURN_PERSONA_PROMPT.format(
        category=category, concept=concept, price=price, pack=pack,
        channels=", ".join(channels), competitors=", ".join(competitors),
        market_size=f"{market_size:,}", channel_vocab=channel_vocab, attribute_list=ATTRIBUTES
    )

def save_single_turn_prompt(category, concept, price, pack, channels, competitors, market_size, channel_vocab):
    text = _build_single_turn_prompt(category, concept, price, pack, channels, competitors, market_size, channel_vocab)
    with open('persona_single_turn_prompt.txt','w',encoding='utf-8') as f: f.write(text)
    if _HAS_RL:
        c = pdfcanvas.Canvas('persona_single_turn_prompt.pdf', pagesize=A4)
        w,h = A4; y=h-50
        for line in text.split('\n'):
            while len(line)>100:
                c.drawString(40,y,line[:100]); y-=14; line=line[100:]; 
                if y<50: c.showPage(); y=h-50
            c.drawString(40,y,line); y-=14
            if y<50: c.showPage(); y=h-50
        c.save()

# ---------- LLM adapter (Gemini version) ----------
import google.generativeai as genai
import os

os.environ["GOOGLE_API_KEY"] = ""

class LLMAdapter:
    def __init__(self, model="gemini-1.5-flash-latest"):
        self.model = model
        api_key = os.environ.get("GOOGLE_API_KEY")
        if not api_key:
            raise RuntimeError("환경변수 GOOGLE_API_KEY 가 설정되지 않았습니다.")
        genai.configure(api_key=api_key)
        self.model_client = genai.GenerativeModel(self.model)

    def _extract_json_array(self, text: str) -> list:
        # 1) 코드펜스 제거
        text = re.sub(r"^```(?:json)?\s*", "", text.strip())
        text = re.sub(r"\s*```$", "", text.strip())
        # 2) 본문에서 첫 번째 JSON 배열만 추출
        m = re.search(r"\[\s*{.*}\s*\]", text, flags=re.S)
        if not m:
            # 백틱·주석 등 섞였을 가능성 → 전체에서 중괄호 배열만 다시 시도
            raise ValueError("LLM 출력에서 JSON 배열을 찾지 못했습니다.")
        return json.loads(m.group(0))

    def generate(self, n:int, prompt:str):
        msg = prompt.replace("Generate N", f"Generate {n}")
        try:
            rsp = self.model_client.generate_content(msg)
            # Gemini python SDK는 보통 rsp.text에 순수 텍스트 제공
            text = getattr(rsp, "text", None)
            if not text:
                # 후보 파트 합성 (예외 케이스)
                try:
                    parts = rsp.candidates[0].content.parts
                    text = "".join(getattr(p, "text", "") for p in parts)
                except Exception:
                    text = ""
            data = self._extract_json_array(text)
            personas = [Persona(**d) for d in data]
            return personas
        except Exception as e:
            print(f"[LLMAdapter] Gemini 응답 파싱 실패: {e}")
            print("→ 규칙 기반 페르소나로 폴백합니다.")
            return generate_personas_rule_based(n)
# ---------- Rule-based personas ----------
GENDERS=['남','여']; REGIONS=['서울','수도권','광역시','기타']
INCOME=['~2천','2-4천','4-7천','7천~']; CHANNELS=['hypermarket','convenience','ecommerce','SSM']
LIFESTYLES=['활동적','가성비','워라밸','건강지향','육아','미식','트렌디','야근많음']

def _rand_weights()->Dict[str,float]:
    return {a: float(np.clip(np.random.normal(0,0.7),-2,2)) for a in ATTRIBUTES}

def _rand_monthly_pattern()->List[float]:
    base=np.ones(12); 
    for k,v in {2:1.08, 8:1.10, 9:1.06}.items(): base[k]*=v
    base*=np.random.normal(1.0,0.05,12)
    pat=(base/base.mean())
    return list(np.clip(pat,0.6,1.4))

def generate_personas_rule_based(n:int)->List[Persona]:
    out=[]
    for i in range(n):
        out.append(Persona(
            persona_id=f"P{i+1:03d}", name=f"홍길{'동' if i%2==0 else '순'}{i%10}",
            age=int(np.random.randint(20,66)), gender=random.choice(GENDERS),
            region=random.choice(REGIONS), household_size=int(np.random.randint(1,5)),
            income_band=random.choice(INCOME), lifestyle=random.choice(LIFESTYLES),
            health_focus=float(np.clip(np.random.beta(2,3),0,1)),
            price_sensitivity=float(np.clip(np.random.beta(3,2),0,1)),
            brand_loyalty=float(np.clip(np.random.beta(2,4),0,1)),
            online_offline_mix=float(np.random.rand()),
            channel_preference=random.choice(CHANNELS),
            promo_reactivity=float(np.clip(np.random.beta(2,2),0,1)),
            ad_reach_susceptibility=float(np.clip(np.random.beta(2,2),0,1)),
            environmental_concern=float(np.clip(np.random.beta(2,3),0,1)),
            innovation_seeker=float(np.clip(np.random.beta(2,2),0,1)),
            weights=_rand_weights(), monthly_pattern=_rand_monthly_pattern()
        ))
    return out

# ---------- Demand model ----------
@dataclass
class MarketCalendar:
    price_krw:Dict[pd.Period,float]; discount_rate:Dict[pd.Period,float]
    ad_grps:Dict[pd.Period,float]; distribution:Dict[pd.Period,float]; competitor_pressure:Dict[pd.Period,float]

def default_calendar(base_price:int=3500)->MarketCalendar:
    price={m:float(base_price) for m in MONTHS}
    disc={m:(0.12 if m.month in (9,10,2) else 0.0) for m in MONTHS}
    ad={m:200.0 for m in MONTHS}
    for m in MONTHS:
        if m.month in (7,8): ad[m]=400.0
    dist={m:min(1.0,0.35+0.08*i) for i,m in enumerate(MONTHS)}
    comp={m:1.0 for m in MONTHS}
    return MarketCalendar(price,disc,ad,dist,comp)

@dataclass
class SimulationConfig:
    population:int; base_awareness:float; base_trial_rate:float; repeat_rate:float
    price_elasticity:float; ad_effect_per_100grp:float; promo_price_pass_through:float; noise_sd:float=0.05

def _sigmoid(x:float)->float: return 1/(1+math.exp(-x))

import math
def _persona_utility(p:Persona, mi:int, cfg:SimulationConfig, cal:MarketCalendar, period:pd.Period)->float:
    wsum=0.0
    for k,v in p.weights.items():
        val=getattr(p,k,None)
        if val is None: continue
        if isinstance(val,str) and k in ('gender','region','income_band','channel_preference','lifestyle'):
            val_num=(hash((k,val))%7)/6.0
        else:
            val_num=float(val) if not isinstance(val,str) else 0.5
        wsum+=v*val_num
    season=p.monthly_pattern[mi]
    ad=cfg.ad_effect_per_100grp*(cal.ad_grps[period]/100.0)*p.ad_reach_susceptibility
    net_price=cal.price_krw[period]*(1.0-cfg.promo_price_pass_through*cal.discount_rate[period])
    price_term=cfg.price_elasticity*math.log(max(net_price,1.0)/1000.0)*p.price_sensitivity
    comp=math.log(cal.competitor_pressure[period])
    dist=0.1*cal.distribution[period]
    return wsum+ad+dist+(-0.3*comp)-0.5+0.2*season+price_term

def simulate_monthly_demand(personas:List[Persona], cfg:SimulationConfig, cal:MarketCalendar, export_csv=True)->pd.DataFrame:
    rows=[]; cum_trials=0.0
    for mi,period in enumerate(MONTHS):
        probs=[]
        for p in personas:
            u=_persona_utility(p,mi,cfg,cal,period)
            base=_sigmoid(u)
            aware=min(1.0, cfg.base_awareness+0.15*(mi/11.0))
            trial=cfg.base_trial_rate*base
            probs.append(aware*trial*cal.distribution[period])
        exp_trials=cfg.population*np.mean(probs)
        cum_trials+=exp_trials
        repeats=cfg.repeat_rate*cum_trials*0.2
        demand=max(0.0,(exp_trials+repeats)*math.exp(np.random.normal(0,cfg.noise_sd)))
        rows.append({'month':period.to_timestamp('M'),
                     'trial_units':exp_trials,'repeat_units':repeats,'total_units':demand,
                     'distribution':cal.distribution[period],'avg_price':cal.price_krw[period],
                     'discount_rate':cal.discount_rate[period],'ad_grps':cal.ad_grps[period]})
    df=pd.DataFrame(rows)
    if export_csv: df.to_csv('monthly_forecast.csv',index=False)
    return df

# ---------- 텍스트 → 가격/GRP/ACV ----------
_PREMIUM_TABLE={'프리미엄':0.10,'고단백':0.08,'락토프리':0.07,'저나트륨':0.03,'유기':0.05,'친환경':0.05}
_BASE_UNIT_PRICE={'참치캔':2000/100.0,'액상조미료':900/100.0,'발효유':1100/100.0,'커피-CUP':640/100.0,'고급축산캔':2200/100.0}

def _extract_size(name:str)->Tuple[float,str]:
    m=re.search(r'(\d+(?:\.\d+)?)\s*(g|ml|mL|G|ML)', name)
    if not m:
        if '커피' in name: return 250.0,'mL'
        if '요거트' in name or '발효유' in name: return 400.0,'g'
        if '참치' in name: return 90.0,'g'
        if '조미' in name: return 500.0,'g'
        if '축산' in name: return 200.0,'g'
        return 180.0,'g'
    v,u=float(m.group(1)),m.group(2).lower()
    return v, ('mL' if 'ml' in u else 'g')

def _estimate_list_price(row:pd.Series)->int:
    name=str(row.get('product_name','')); feat=str(row.get('product_feature',''))
    c2=str(row.get('category_level_2','')); c3=str(row.get('category_level_3',''))
    size,unit=_extract_size(name)
    if '참치캔' in c3 or '참치' in c2: per=_BASE_UNIT_PRICE['참치캔']
    elif '조미료' in c3: per=_BASE_UNIT_PRICE['액상조미료']
    elif '발효유' in c2 or '요거트' in name: per=_BASE_UNIT_PRICE['발효유']
    elif '커피' in c2 or 'CUP' in c3: per=_BASE_UNIT_PRICE['커피-CUP']
    elif '축산캔' in c2: per=_BASE_UNIT_PRICE['고급축산캔']
    else: per=1000/100.0
    price=per*size
    prem=sum(v for k,v in _PREMIUM_TABLE.items() if k in feat)
    if '프리미엄' in name: prem+=0.10
    return int(round(price*(1.0+prem),-1))

def _parse_month_ranges(text:str)->List[int]:
    res=[]
    for m in re.finditer(r'(\d{1,2})\s*-\s*(\d{1,2})\s*월', text): a,b=int(m.group(1)),int(m.group(2)); res+=list(range(a,b+1))
    for m in re.finditer(r'(?<!-)\b(\d{1,2})\s*월', text): res.append(int(m.group(1)))
    return sorted(set([x for x in res if 1<=x<=12]))

def enrich_calendar_from_features(cal:MarketCalendar, feat:str, name:str)->None:
    months={m.month:m for m in MONTHS}; lower=feat.lower()
    for m in MONTHS: cal.ad_grps[m]=200.0
    if '광고 x' in lower or '광고x' in lower:
        for m in MONTHS: cal.ad_grps[m]=120.0
    if any(k in lower for k in ['광고 진행','tv','youtube','sns']):
        for mm in _parse_month_ranges(feat):
            if mm in months: cal.ad_grps[months[mm]]=max(cal.ad_grps[months[mm]],450.0)
    if '엘리베이터 광고' in feat:
        for mm in _parse_month_ranges(feat):
            if mm in months: cal.ad_grps[months[mm]]+=100.0
    if 'sns 바이럴' in feat:
        for mm in _parse_month_ranges(feat):
            if mm in months: cal.ad_grps[months[mm]]=max(cal.ad_grps[months[mm]],250.0)
    for m in MONTHS: cal.discount_rate[m]=0.12 if m.month in (9,10,2) else 0.0
    if any(k in feat for k in ['행사','프로모션','기획']):
        for mm in _parse_month_ranges(feat):
            if mm in months: cal.discount_rate[months[mm]]=min(0.2, cal.discount_rate[months[mm]]+0.05)
    start,step=0.35,0.08
    if any(k in name for k in ['CUP','컵','커피']): start,step=0.45,0.10
    if '엘리베이터 광고' in feat and any(mm in (6,7,8) for mm in _parse_month_ranges(feat)): start+=0.05
    cal.distribution={m:min(1.0,start+step*i) for i,m in enumerate(MONTHS)}

# ---------- Heuristics ----------
def _infer_channels(c1,c2,c3):
    if '발효유' in str(c2): return ['hypermarket','convenience']
    if '참치' in str(c2):   return ['hypermarket','SSM','ecommerce']
    if '조미료' in str(c3): return ['hypermarket','ecommerce']
    if '축산캔' in str(c2): return ['hypermarket','SSM','ecommerce']
    if '커피' in str(c2):   return ['convenience','hypermarket']
    return ['hypermarket','ecommerce']

def _infer_competitors(c2):
    if '참치' in str(c2): return ['CJ','오뚜기','사조']
    if '조미'  in str(c2): return ['CJ','오뚜기']
    if '발효유' in str(c2): return ['빙그레','매일','남양']
    if '축산캔' in str(c2): return ['SPAM','롯데','동원']
    if '커피' in str(c2):   return ['매일','동서','스타벅스RTD']
    return ['CJ','오뚜기','사조']

def _infer_market_size(name:str,c2:str='')->int:
    base=6_000_000
    if '발효유' in c2 or '요거트' in name: base=3_000_000
    elif '참치' in c2: base=10_000_000
    elif '조미' in c2: base=5_000_000
    elif '축산캔' in c2: base=4_000_000
    elif '커피' in c2: base=12_000_000
    h=(abs(hash(name))%41)/100.0
    return int(base*(0.8+h))

# ---------- Scenario runner ----------
def run_scenario(persona_count:int=400, use_llm:bool=False,
                 category:str='식품', concept:str='신제품',
                 price:int=3500, pack:str='unit',
                 channels:List[str]=['hypermarket','convenience','ecommerce'],
                 competitors:List[str]=['CJ','오뚜기','사조'],
                 market_size:int=3_200_000,
                 calendar:Optional[MarketCalendar]=None,
                 ad_effect_mult:float=1.0):
    save_single_turn_prompt(category, concept, price, pack, channels, competitors, market_size, CHANNELS)
    personas = (LLMAdapter().generate(persona_count, _build_single_turn_prompt(category,concept,price,pack,channels,competitors,market_size,CHANNELS))
                if use_llm else generate_personas_rule_based(persona_count))
    cal = calendar or default_calendar(price)
    cfg = SimulationConfig(population=market_size, base_awareness=0.18, base_trial_rate=0.35,
                           repeat_rate=0.55, price_elasticity=-1.2,
                           ad_effect_per_100grp=0.10*ad_effect_mult, promo_price_pass_through=0.8,
                           noise_sd=0.06)
    forecast=simulate_monthly_demand(personas,cfg,cal,export_csv=True)
    SOURCES.to_json('sources_used.json'); _write_solution_outline(forecast, persona_count, category, concept, price, pack)
    return forecast, personas

def _write_solution_outline(forecast:pd.DataFrame, persona_count:int, category:str, concept:str, price:int, pack:str):
    last=forecast.tail(1).iloc[0]
    md=(f"# 솔루션 설명 자료(초안)\n"
        f"## 개요\n- 카테고리:{category}\n- 컨셉:{concept}\n- 가격:{price:,} / {pack}\n- 페르소나:{persona_count}\n"
        f"## 핵심결과\n- 12M 합계:{int(forecast['total_units'].sum()):,} EA\n"
        f"- 런칭월:{int(forecast.iloc[0]['total_units']):,} EA\n- 최종월:{int(last['total_units']):,} EA\n")
    with open('solution_report.md','w',encoding='utf-8') as f: f.write(md)

# ---------- One product → 12m ----------
def forecast_one_product(row:pd.Series, persona_count:int=400, use_llm:bool=False)->np.ndarray:
    global RNG
    name=str(row.get('product_name','')).strip()
    feat=str(row.get('product_feature','')).strip()
    c1=str(row.get('category_level_1','')); c2=str(row.get('category_level_2','')); c3=str(row.get('category_level_3',''))
    # 제품별 시드 고정
    seed=abs(hash(name))%(2**32-1); RNG=np.random.default_rng(seed); random.seed(seed)
    # 가격/시장/채널
    price=_estimate_list_price(row)
    market_size=_infer_market_size(name,c2); channels=_infer_channels(c1,c2,c3); competitors=_infer_competitors(c2)
    # 캘린더 구성
    cal=default_calendar(price); enrich_calendar_from_features(cal, feat, name)
    # 유명인 보정
    celeb=1.0
    if '광고모델' in feat and any(k in feat for k in ['안유진','아이돌','연예인']): celeb=1.15
    # 패키지 텍스트
    size,unit=_extract_size(name); pack=f"{int(size)}{unit}"
    # 시뮬
    df,_=run_scenario(persona_count, use_llm, c1 or '식품', feat[:80] or '신제품',
                     price, pack, channels, competitors, market_size, cal, celeb)
    y=np.maximum(0, np.round(df['total_units'].values).astype(int))
    return y

# ---------- Submission ----------
def make_submission(product_info_csv:str, out_csv:str='submission.csv', persona_count:int=400, use_llm:bool=False)->pd.DataFrame:
    prod=pd.read_csv(product_info_csv)
    rows=[]
    for _,row in prod.iterrows():
        y=forecast_one_product(row, persona_count, use_llm)
        rows.append({'product_name':row['product_name'], **{f'months_since_launch_{i+1}':int(y[i]) for i in range(12)}})
    sub=pd.DataFrame(rows)
    sub.to_csv(out_csv,index=False)
    return sub

# ---------- main ----------
if __name__ == '__main__':
    try:
        sub=make_submission('product_info.csv','submission.csv',persona_count=400,use_llm=True)
        print(sub.head())
        print("Files: submission.csv, monthly_forecast.csv, persona_single_turn_prompt.txt/.pdf, sources_used.json, solution_report.md")
    except Exception as e:
        print("Run make_submission(...) with correct CSV. Error:",e)

       product_name  months_since_launch_1  months_since_launch_2  \
0  덴마크 하이그릭요거트 400g                  93919                 133758   
1   동원맛참 고소참기름 135g                 292450                 418898   
2    동원맛참 고소참기름 90g                 230354                 297112   
3   동원맛참 매콤참기름 135g                 263078                 380298   
4    동원맛참 매콤참기름 90g                 161368                 203075   

   months_since_launch_3  months_since_launch_4  months_since_launch_5  \
0                 179243                 190409                 294858   
1                 528561                 678933                 863708   
2                 397737                 553827                 738092   
3                 487890                 710607                 862861   
4                 277533                 397054                 418334   

   months_since_launch_6  months_since_launch_7  months_since_launch_8  \
0                 311689                 414775                 48

In [12]:
# -*- coding: utf-8 -*-
"""
Persona-based Monthly Demand Forecast (2024-07 ~ 2025-06)
— product_info → (가격/GRP/ACV 자동 파싱 + 시즌성/정합도) → 페르소나 시뮬레이션 → submission.csv

Output:
  - submission.csv (months_since_launch_1..12, int)
  - monthly_forecast.csv (마지막 제품 디버깅용)
  - persona_single_turn_prompt.txt/.pdf, sources_used.json, solution_report.md
  - (선택) calibrated_theta.json 이 있으면 파라미터 자동 로드
"""

from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
import numpy as np, pandas as pd
import json, random, math, os, datetime as dt, re

# ---------- PDF export (optional) ----------
try:
    from reportlab.lib.pagesizes import A4
    from reportlab.pdfgen import canvas as pdfcanvas
    _HAS_RL = True
except Exception:
    _HAS_RL = False

# ---------- Dates ----------
FORECAST_START = dt.date(2024,7,1)
FORECAST_END   = dt.date(2025,6,30)
MONTHS = pd.period_range(FORECAST_START, FORECAST_END, freq='M')
RNG = np.random.default_rng(42)
random.seed(42)

# ---------- Sources registry ----------
class SourcesRegistry:
    def __init__(self): self._items=[]
    def add(self, kind, title, how_used, url=None, notes=""):
        self._items.append({"kind":kind,"title":title,"url":url,"how_used":how_used,
                            "notes":notes,"added_at":dt.datetime.now().isoformat()})
    def to_json(self, path="sources_used.json"):
        with open(path,"w",encoding="utf-8") as f: json.dump(self._items,f,ensure_ascii=False,indent=2)

SOURCES = SourcesRegistry()
SOURCES.add('paper','Using LLMs for Market Research (Brand, Israeli, Ngwe, 2024)',
            'LLM-as-simulator 절차/싱글턴 프롬프트 설계','(local pdf)')

# ---------- Persona schema ----------
ATTRIBUTES = [
    'age','gender','income_band','region','household_size','lifestyle',
    'health_focus','price_sensitivity','brand_loyalty','online_offline_mix',
    'channel_preference','promo_reactivity','ad_reach_susceptibility',
    'environmental_concern','innovation_seeker'
]

@dataclass
class Persona:
    persona_id:str; name:str
    age:int; gender:str; income_band:str; region:str; household_size:int; lifestyle:str
    health_focus:float; price_sensitivity:float; brand_loyalty:float; online_offline_mix:float
    channel_preference:str; promo_reactivity:float; ad_reach_susceptibility:float
    environmental_concern:float; innovation_seeker:float
    weights:Dict[str,float]; monthly_pattern:List[float]

# ---------- LLM prompt ----------
SINGLE_TURN_PERSONA_PROMPT = ("""
You are a market-simulation engine. Generate N synthetic Korean consumer personas for a **Dongwon new product** launch.
Return **valid JSON** only (list of persona objects).
Context:
- Category: {category}
- Product concept: {concept}
- Target price (KRW): {price}
- Packaging/size: {pack}
- Channels: {channels}
- Competitors: {competitors}
- Target market size (12-month addressable): {market_size}
- Launch months: 2024-07 to 2025-06

Each persona fields:
- persona_id, name
- age (18-69), gender ("남"|"여"), region (서울/수도권/광역시/기타), household_size (1-5)
- income_band ("~2천","2-4천","4-7천","7천~"), lifestyle (≤12 chars)
- health_focus, price_sensitivity, brand_loyalty, online_offline_mix, promo_reactivity,
  ad_reach_susceptibility, environmental_concern, innovation_seeker (0~1)
- channel_preference in {channel_vocab}
- weights: map **≥10** of {attribute_list} to weights in [-2.0, +2.0] (utility contribution for THIS product)
- monthly_pattern: list[12] for 2024-07..2025-06 in 0.6~1.4 (persona seasonality)

Constraints: diversify personas; JSON only (no comments/trailing commas).
""").strip()

CHANNELS_VOCAB = ['hypermarket','convenience','ecommerce','SSM']

def _build_single_turn_prompt(category, concept, price, pack, channels, competitors, market_size, channel_vocab):
    return SINGLE_TURN_PERSONA_PROMPT.format(
        category=category, concept=concept, price=price, pack=pack,
        channels=", ".join(channels), competitors=", ".join(competitors),
        market_size=f"{market_size:,}", channel_vocab=channel_vocab, attribute_list=ATTRIBUTES
    )

def save_single_turn_prompt(category, concept, price, pack, channels, competitors, market_size, channel_vocab):
    text = _build_single_turn_prompt(category, concept, price, pack, channels, competitors, market_size, channel_vocab)
    with open('persona_single_turn_prompt.txt','w',encoding='utf-8') as f: f.write(text)
    if _HAS_RL:
        c = pdfcanvas.Canvas('persona_single_turn_prompt.pdf', pagesize=A4)
        w,h = A4; y=h-50
        for line in text.split('\n'):
            while len(line)>100:
                c.drawString(40,y,line[:100]); y-=14; line=line[100:]; 
                if y<50: c.showPage(); y=h-50
            c.drawString(40,y,line); y-=14
            if y<50: c.showPage(); y=h-50
        c.save()

# ---------- (선택) LLM adapter (Gemini, robust JSON extraction) ----------
# 무료로 하려면 use_llm=False 사용, 또는 오픈소스 LLM(별도)로 JSON 생성 후 캐시 사용 권장.
import os, re
try:
    import google.generativeai as genai
    _HAS_GEMINI = True
except Exception:
    _HAS_GEMINI = False

class LLMAdapter:
    def __init__(self, model="gemini-1.5-flash-latest"):
        if not _HAS_GEMINI:
            raise RuntimeError("google-generativeai 미설치. 무료 사용시 use_llm=False 권장.")
        self.model = model
        api_key = os.environ.get("GOOGLE_API_KEY")
        if not api_key:
            raise RuntimeError("환경변수 GOOGLE_API_KEY 가 없음.")
        genai.configure(api_key=api_key)
        self.model_client = genai.GenerativeModel(self.model)

    def _extract_json_array(self, text: str) -> list:
        text = text.strip()
        text = re.sub(r"^```(?:json)?\s*", "", text)
        text = re.sub(r"\s*```$", "", text)
        m = re.search(r"\[\s*{.*}\s*\]", text, flags=re.S)
        if not m:
            raise ValueError("LLM 출력에서 JSON 배열을 찾지 못했습니다.")
        return json.loads(m.group(0))

    def generate(self, n:int, prompt:str):
        msg = prompt.replace("Generate N", f"Generate {n}")
        try:
            rsp = self.model_client.generate_content(msg)
            text = getattr(rsp, "text", None)
            if not text:
                try:
                    parts = rsp.candidates[0].content.parts
                    text = "".join(getattr(p, "text", "") for p in parts)
                except Exception:
                    text = ""
            data = self._extract_json_array(text)
            return [Persona(**d) for d in data]
        except Exception as e:
            print(f"[LLMAdapter] Gemini 파싱 실패: {e} → 규칙기반으로 폴백")
            return None  # 호출부에서 rule-based로 폴백

# ---------- Rule-based personas ----------
GENDERS=['남','여']; REGIONS=['서울','수도권','광역시','기타']
INCOME=['~2천','2-4천','4-7천','7천~']; LIFESTYLES=['활동적','가성비','워라밸','건강지향','육아','미식','트렌디','야근많음']

def _rand_weights()->Dict[str,float]:
    return {a: float(np.clip(np.random.normal(0,0.7),-2,2)) for a in ATTRIBUTES}

def _rand_monthly_pattern()->List[float]:
    base=np.ones(12)
    for k,v in {2:1.08, 8:1.10, 9:1.06}.items(): base[k]*=v  # 설/추석 부근
    base*=np.random.normal(1.0,0.05,12)
    pat=(base/base.mean())
    return list(np.clip(pat,0.6,1.4))

def generate_personas_rule_based(n:int)->List[Persona]:
    out=[]
    # (선택) 간단 층화: 연령대/성별 분포 타깃을 대략 반영할 수도 있음 (여기선 간략)
    for i in range(n):
        out.append(Persona(
            persona_id=f"P{i+1:03d}", name=f"홍길{'동' if i%2==0 else '순'}{i%10}",
            age=int(np.random.randint(20,66)), gender=random.choice(GENDERS),
            region=random.choice(REGIONS), household_size=int(np.random.randint(1,5)),
            income_band=random.choice(INCOME), lifestyle=random.choice(LIFESTYLES),
            health_focus=float(np.clip(np.random.beta(2,3),0,1)),
            price_sensitivity=float(np.clip(np.random.beta(3,2),0,1)),
            brand_loyalty=float(np.clip(np.random.beta(2,4),0,1)),
            online_offline_mix=float(np.random.rand()),
            channel_preference=random.choice(CHANNELS_VOCAB),
            promo_reactivity=float(np.clip(np.random.beta(2,2),0,1)),
            ad_reach_susceptibility=float(np.clip(np.random.beta(2,2),0,1)),
            environmental_concern=float(np.clip(np.random.beta(2,3),0,1)),
            innovation_seeker=float(np.clip(np.random.beta(2,2),0,1)),
            weights=_rand_weights(), monthly_pattern=_rand_monthly_pattern()
        ))
    return out

# ---------- Demand model ----------
@dataclass
class MarketCalendar:
    price_krw:Dict[pd.Period,float]
    discount_rate:Dict[pd.Period,float]
    ad_grps:Dict[pd.Period,float]
    distribution:Dict[pd.Period,float]
    competitor_pressure:Dict[pd.Period,float]
    category_season:Dict[pd.Period,float]          # 추가: 카테고리 시즌성(=1.0 기본)

def default_calendar(base_price:int=3500)->MarketCalendar:
    price={m:float(base_price) for m in MONTHS}
    disc={m:(0.12 if m.month in (9,10,2) else 0.0) for m in MONTHS}
    ad={m:200.0 for m in MONTHS}
    for m in MONTHS:
        if m.month in (7,8): ad[m]=400.0
    dist={m:min(1.0,0.35+0.08*i) for i,m in enumerate(MONTHS)}
    comp={m:1.0 for m in MONTHS}
    seas={m:1.0 for m in MONTHS}
    return MarketCalendar(price,disc,ad,dist,comp,seas)

# ---- 카테고리 시즌성 훅 ----
CATEGORY_SEASON = {
    '발효유': [1.05,1.08,1.07,1.03,1.02,1.00,0.98,0.97,0.99,1.02,1.03,1.04],
    '참치':   [0.95,0.98,1.02,1.00,1.03,1.05,1.06,1.08,1.12,1.10,1.03,0.98],
    '조미료': [0.96,0.97,0.98,1.00,1.01,1.02,1.03,1.05,1.08,1.12,1.10,1.00],
    '축산캔': [0.99,1.00,1.01,1.01,1.02,1.02,1.03,1.05,1.06,1.08,1.04,1.00],
    '커피':   [0.98,1.00,1.02,1.03,1.04,1.06,1.08,1.07,1.03,1.00,0.98,0.97],
}
def apply_category_seasonality(cal: MarketCalendar, category2: str):
    key = None
    for k in CATEGORY_SEASON:
        if k in str(category2):
            key = k; break
    if not key: return
    idx = CATEGORY_SEASON[key]
    for i,m in enumerate(MONTHS):
        cal.category_season[m] = float(idx[i])

@dataclass
class SimulationConfig:
    population:int; base_awareness:float; base_trial_rate:float; repeat_rate:float
    price_elasticity:float; ad_effect_per_100grp:float; promo_price_pass_through:float; noise_sd:float=0.05
    product_multiplier: float = 1.0  # 제품별 소폭 보정(캘리브 전용)

def _sigmoid(x:float)->float: return 1/(1+math.exp(-x))

# ---- 채널/특징 정합도 보상 ----
def _channel_fit(p: Persona, channels: List[str])->float:
    return 0.1 if p.channel_preference in channels else -0.05

def _feature_match_bonus(p: Persona, feat: str)->float:
    bonus=0.0
    if '고단백' in feat: bonus += 0.15*(p.health_focus - 0.5)
    if ('저당' in feat) or ('저나트륨' in feat): bonus += 0.12*(p.health_focus - 0.5)
    if '락토프리' in feat: bonus += 0.12*(p.health_focus - 0.5)
    if '프리미엄' in feat: bonus += 0.10*(p.innovation_seeker - 0.5)
    return bonus

import math
def _persona_utility(p:Persona, mi:int, cfg:SimulationConfig, cal:MarketCalendar, period:pd.Period,
                     current_channels:List[str], feature_text:str)->float:
    wsum=0.0
    for k,v in p.weights.items():
        val=getattr(p,k,None)
        if val is None: continue
        if isinstance(val,str) and k in ('gender','region','income_band','channel_preference','lifestyle'):
            val_num=(hash((k,val))%7)/6.0
        else:
            val_num=float(val) if not isinstance(val,str) else 0.5
        wsum+=v*val_num
    season=p.monthly_pattern[mi]
    ad=cfg.ad_effect_per_100grp*(cal.ad_grps[period]/100.0)*p.ad_reach_susceptibility
    net_price=cal.price_krw[period]*(1.0-cfg.promo_price_pass_through*cal.discount_rate[period])
    price_term=cfg.price_elasticity*math.log(max(net_price,1.0)/1000.0)*p.price_sensitivity
    comp=math.log(cal.competitor_pressure[period])
    dist=0.1*cal.distribution[period]
    # 카테고리 시즌성(1.0 기준)을 유틸리티 가산으로 반영
    cat_season_term = 0.15*(cal.category_season.get(period,1.0)-1.0)
    # 채널/특징 정합도
    channel_term = _channel_fit(p, current_channels)
    feature_term = _feature_match_bonus(p, feature_text)
    return wsum + ad + dist + (-0.3*comp) - 0.5 + 0.2*season + price_term + cat_season_term + channel_term + feature_term

def simulate_monthly_demand(personas:List[Persona], cfg:SimulationConfig, cal:MarketCalendar,
                            current_channels:List[str], feature_text:str,
                            export_csv=True)->pd.DataFrame:
    rows=[]; cum_trials=0.0
    for mi,period in enumerate(MONTHS):
        probs=[]
        for p in personas:
            u=_persona_utility(p,mi,cfg,cal,period,current_channels,feature_text)
            base=_sigmoid(u)
            aware=min(1.0, cfg.base_awareness+0.15*(mi/11.0))
            trial=cfg.base_trial_rate*base
            probs.append(aware*trial*cal.distribution[period])
        exp_trials=cfg.population*np.mean(probs)*cfg.product_multiplier
        cum_trials+=exp_trials
        repeats=cfg.repeat_rate*cum_trials*0.2
        demand=max(0.0,(exp_trials+repeats)*math.exp(np.random.normal(0,cfg.noise_sd)))
        rows.append({'month':period.to_timestamp('M'),
                     'trial_units':exp_trials,'repeat_units':repeats,'total_units':demand,
                     'distribution':cal.distribution[period],'avg_price':cal.price_krw[period],
                     'discount_rate':cal.discount_rate[period],'ad_grps':cal.ad_grps[period],
                     'category_season':cal.category_season.get(period,1.0)})
    df=pd.DataFrame(rows)
    if export_csv: df.to_csv('monthly_forecast.csv',index=False)
    return df

# ---------- 텍스트 → 가격/GRP/ACV ----------
_PREMIUM_TABLE={'프리미엄':0.10,'고단백':0.08,'락토프리':0.07,'저나트륨':0.03,'유기':0.05,'친환경':0.05}
_BASE_UNIT_PRICE={'참치캔':2000/100.0,'액상조미료':900/100.0,'발효유':1100/100.0,'커피-CUP':640/100.0,'고급축산캔':2200/100.0}

def _extract_size(name:str)->Tuple[float,str]:
    m=re.search(r'(\d+(?:\.\d+)?)\s*(g|ml|mL|G|ML)', name)
    if not m:
        if '커피' in name: return 250.0,'mL'
        if '요거트' in name or '발효유' in name: return 400.0,'g'
        if '참치' in name: return 90.0,'g'
        if '조미' in name: return 500.0,'g'
        if '축산' in name: return 200.0,'g'
        return 180.0,'g'
    v,u=float(m.group(1)),m.group(2).lower()
    return v, ('mL' if 'ml' in u else 'g')

def _estimate_list_price(row:pd.Series)->int:
    name=str(row.get('product_name','')); feat=str(row.get('product_feature',''))
    c2=str(row.get('category_level_2','')); c3=str(row.get('category_level_3',''))
    size,unit=_extract_size(name)
    if '참치캔' in c3 or '참치' in c2: per=_BASE_UNIT_PRICE['참치캔']
    elif '조미료' in c3: per=_BASE_UNIT_PRICE['액상조미료']
    elif '발효유' in c2 or '요거트' in name: per=_BASE_UNIT_PRICE['발효유']
    elif '커피' in c2 or 'CUP' in c3: per=_BASE_UNIT_PRICE['커피-CUP']
    elif '축산캔' in c2: per=_BASE_UNIT_PRICE['고급축산캔']
    else: per=1000/100.0
    price=per*size
    prem=sum(v for k,v in _PREMIUM_TABLE.items() if k in feat)
    if '프리미엄' in name: prem+=0.10
    return int(round(price*(1.0+prem),-1))

def _parse_month_ranges(text:str)->List[int]:
    res=[]
    for m in re.finditer(r'(\d{1,2})\s*-\s*(\d{1,2})\s*월', text): a,b=int(m.group(1)),int(m.group(2)); res+=list(range(a,b+1))
    for m in re.finditer(r'(?<!-)\b(\d{1,2})\s*월', text): res.append(int(m.group(1)))
    return sorted(set([x for x in res if 1<=x<=12]))

def enrich_calendar_from_features(cal:MarketCalendar, feat:str, name:str)->None:
    months={m.month:m for m in MONTHS}; lower=feat.lower()
    for m in MONTHS: cal.ad_grps[m]=200.0
    if '광고 x' in lower or '광고x' in lower:
        for m in MONTHS: cal.ad_grps[m]=120.0
    if any(k in lower for k in ['광고 진행','tv','youtube','sns']):
        for mm in _parse_month_ranges(feat):
            if mm in months: cal.ad_grps[months[mm]]=max(cal.ad_grps[months[mm]],450.0)
    if '엘리베이터 광고' in feat:
        for mm in _parse_month_ranges(feat):
            if mm in months: cal.ad_grps[months[mm]]+=100.0
    if 'sns 바이럴' in feat:
        for mm in _parse_month_ranges(feat):
            if mm in months: cal.ad_grps[months[mm]]=max(cal.ad_grps[months[mm]],250.0)
    for m in MONTHS: cal.discount_rate[m]=0.12 if m.month in (9,10,2) else 0.0
    if any(k in feat for k in ['행사','프로모션','기획']):
        for mm in _parse_month_ranges(feat):
            if mm in months: cal.discount_rate[months[mm]]=min(0.2, cal.discount_rate[months[mm]]+0.05)
    start,step=0.35,0.08
    if any(k in name for k in ['CUP','컵','커피']): start,step=0.45,0.10
    if '엘리베이터 광고' in feat and any(mm in (6,7,8) for mm in _parse_month_ranges(feat)): start+=0.05
    cal.distribution={m:min(1.0,start+step*i) for i,m in enumerate(MONTHS)}

# ---------- Heuristics ----------
def _infer_channels(c1,c2,c3):
    if '발효유' in str(c2): return ['hypermarket','convenience']
    if '참치' in str(c2):   return ['hypermarket','SSM','ecommerce']
    if '조미료' in str(c3): return ['hypermarket','ecommerce']
    if '축산캔' in str(c2): return ['hypermarket','SSM','ecommerce']
    if '커피' in str(c2):   return ['convenience','hypermarket']
    return ['hypermarket','ecommerce']

def _infer_competitors(c2):
    if '참치' in str(c2): return ['CJ','오뚜기','사조']
    if '조미'  in str(c2): return ['CJ','오뚜기']
    if '발효유' in str(c2): return ['빙그레','매일','남양']
    if '축산캔' in str(c2): return ['SPAM','롯데','동원']
    if '커피' in str(c2):   return ['매일','동서','스타벅스RTD']
    return ['CJ','오뚜기','사조']

def _infer_market_size(name:str,c2:str='')->int:
    base=6_000_000
    if '발효유' in c2 or '요거트' in name: base=3_000_000
    elif '참치' in c2: base=10_000_000
    elif '조미' in c2: base=5_000_000
    elif '축산캔' in c2: base=4_000_000
    elif '커피' in c2: base=12_000_000
    h=(abs(hash(name))%41)/100.0
    return int(base*(0.8+h))

# ---------- Calibrated params 로드 (있으면 사용) ----------
CALIB_PATH = "calibrated_theta.json"
def load_calibrated_theta()->Optional[dict]:
    if os.path.exists(CALIB_PATH):
        try:
            with open(CALIB_PATH,"r",encoding="utf-8") as f:
                return json.load(f)
        except Exception:
            return None
    return None

# ---------- Scenario runner ----------
def run_scenario(persona_count:int=400, use_llm:bool=False,
                 category:str='식품', concept:str='신제품',
                 price:int=3500, pack:str='unit',
                 channels:List[str]=['hypermarket','convenience','ecommerce'],
                 competitors:List[str]=['CJ','오뚜기','사조'],
                 market_size:int=3_200_000,
                 calendar:Optional[MarketCalendar]=None,
                 ad_effect_mult:float=1.0,
                 feature_text:str=""
                 ):
    save_single_turn_prompt(category, concept, price, pack, channels, competitors, market_size, CHANNELS_VOCAB)

    personas = None
    if use_llm:
        try:
            personas = LLMAdapter().generate(persona_count,
                        _build_single_turn_prompt(category,concept,price,pack,channels,competitors,market_size,CHANNELS_VOCAB))
        except Exception as e:
            print(f"[run_scenario] LLM 사용 실패: {e} → 규칙기반으로 진행")
    if personas is None:
        personas = generate_personas_rule_based(persona_count)

    cal = calendar or default_calendar(price)

    # 캘리브 파라미터 로드(있으면 우선적용)
    theta = load_calibrated_theta()
    if theta:
        base_awareness    = float(theta.get("base_awareness", 0.18))
        base_trial_rate   = float(theta.get("base_trial_rate", 0.35))
        repeat_rate       = float(theta.get("repeat_rate", 0.55))
        price_elasticity  = float(theta.get("price_elasticity", -1.2))
        ad_effect_k       = float(theta.get("ad_effect_per_100grp", 0.10))*ad_effect_mult
        promo_pass        = float(theta.get("promo_price_pass_through", 0.8))
        product_mult      = float(theta.get("product_multiplier", 1.0))
    else:
        base_awareness, base_trial_rate, repeat_rate = 0.18, 0.35, 0.55
        price_elasticity, ad_effect_k, promo_pass = -1.2, 0.10*ad_effect_mult, 0.8
        product_mult = 1.0

    cfg = SimulationConfig(
        population=market_size,
        base_awareness=base_awareness,
        base_trial_rate=base_trial_rate,
        repeat_rate=repeat_rate,
        price_elasticity=price_elasticity,
        ad_effect_per_100grp=ad_effect_k,
        promo_price_pass_through=promo_pass,
        noise_sd=0.06,
        product_multiplier=product_mult
    )
    forecast=simulate_monthly_demand(personas, cfg, cal, channels, feature_text, export_csv=True)
    SOURCES.to_json('sources_used.json'); _write_solution_outline(forecast, persona_count, category, concept, price, pack)
    return forecast, personas

def _write_solution_outline(forecast:pd.DataFrame, persona_count:int, category:str, concept:str, price:int, pack:str):
    last=forecast.tail(1).iloc[0]
    md=(f"# 솔루션 설명 자료(초안)\n"
        f"## 개요\n- 카테고리:{category}\n- 컨셉:{concept}\n- 가격:{price:,} / {pack}\n- 페르소나:{persona_count}\n"
        f"## 핵심결과\n- 12M 합계:{int(forecast['total_units'].sum()):,} EA\n"
        f"- 런칭월:{int(forecast.iloc[0]['total_units']):,} EA\n- 최종월:{int(last['total_units']):,} EA\n")
    with open('solution_report.md','w',encoding='utf-8') as f: f.write(md)

# ---------- One product → 12m (MC 앙상블 지원) ----------
def forecast_one_product(row:pd.Series, persona_count:int=400, use_llm:bool=False, mc_runs:int=7)->np.ndarray:
    global RNG
    name=str(row.get('product_name','')).strip()
    feat=str(row.get('product_feature','')).strip()
    c1=str(row.get('category_level_1','')); c2=str(row.get('category_level_2','')); c3=str(row.get('category_level_3',''))

    # 제품별 시드 고정
    seed=abs(hash(name))%(2**32-1)
    # 가격/시장/채널
    price=_estimate_list_price(row)
    market_size=_infer_market_size(name,c2); channels=_infer_channels(c1,c2,c3); competitors=_infer_competitors(c2)
    # 캘린더 구성
    cal=default_calendar(price); enrich_calendar_from_features(cal, feat, name); apply_category_seasonality(cal, c2)
    # 유명인 보정(광고효과 가중)
    ad_mult=1.0
    if '광고모델' in feat and any(k in feat for k in ['안유진','아이돌','연예인']): ad_mult=1.15
    # 패키지 텍스트
    size,unit=_extract_size(name); pack=f"{int(size)}{unit}"

    # Monte Carlo 앙상블: seed 변화로 여러 번 → 월별 중앙값 사용
    preds=[]
    for r in range(mc_runs):
        RNG=np.random.default_rng(seed + r*1337); random.seed(seed + r*7331)
        df,_=run_scenario(persona_count, use_llm, c1 or '식품', feat[:80] or '신제품',
                          price, pack, channels, competitors, market_size, cal, ad_mult, feature_text=feat)
        preds.append(np.maximum(0, np.array(df['total_units'].values)))
    y = np.rint(np.median(np.stack(preds, axis=0), axis=0)).astype(int)
    return y

# ---------- Submission ----------
def make_submission(product_info_csv:str, out_csv:str='submission.csv', persona_count:int=400,
                    use_llm:bool=False, mc_runs:int=7)->pd.DataFrame:
    prod=pd.read_csv(product_info_csv)
    rows=[]
    for _,row in prod.iterrows():
        y=forecast_one_product(row, persona_count, use_llm, mc_runs=mc_runs)
        rows.append({'product_name':row['product_name'],
                     **{f'months_since_launch_{i+1}':int(y[i]) for i in range(12)}})
    sub=pd.DataFrame(rows)
    sub.to_csv(out_csv,index=False)
    return sub

# ---------- (선택) 간단 캘리브레이션 골격 ----------
# 유사제품의 과거 월별 판매(y_true: 12개)와 템플릿 캘린더/페르소나를 넣고
# theta를 탐색해 calibrated_theta.json 저장하는 자리. 실제 y_true가 있을 때만 사용.
def simple_calibrate(y_true: np.ndarray,
                     personas: List[Persona],
                     cal_template: MarketCalendar,
                     init: dict = None,
                     bounds: dict = None):
    """
    y_true: (12,) 실측 또는 프록시 월별 판매
    init:   초기값 딕트, 없으면 기본
    bounds: (lo, hi) 딕트
    """
    from scipy.optimize import minimize
    init = init or dict(base_awareness=0.18, base_trial_rate=0.35, repeat_rate=0.55,
                        price_elasticity=-1.2, ad_effect_per_100grp=0.10,
                        promo_price_pass_through=0.8, product_multiplier=1.0)
    bounds = bounds or dict(
        base_awareness=(0.05,0.6), base_trial_rate=(0.05,0.8), repeat_rate=(0.2,0.8),
        price_elasticity=(-2.5,-0.2), ad_effect_per_100grp=(0.02,0.25),
        promo_price_pass_through=(0.5,0.95), product_multiplier=(0.6,1.6)
    )

    keys = list(init.keys())
    x0 = np.array([init[k] for k in keys])
    lo = np.array([bounds[k][0] for k in keys])
    hi = np.array([bounds[k][1] for k in keys])

    channels = ['hypermarket','convenience','ecommerce']
    feat_text = ""

    def pack(theta):
        return {k: float(v) for k,v in zip(keys, theta)}

    def clamp(theta):
        return np.minimum(np.maximum(theta, lo), hi)

    def simulate_theta(theta):
        th = pack(theta)
        cfg = SimulationConfig(
            population=3_000_000,
            base_awareness=th['base_awareness'],
            base_trial_rate=th['base_trial_rate'],
            repeat_rate=th['repeat_rate'],
            price_elasticity=th['price_elasticity'],
            ad_effect_per_100grp=th['ad_effect_per_100grp'],
            promo_price_pass_through=th['promo_price_pass_through'],
            noise_sd=0.0,
            product_multiplier=th['product_multiplier']
        )
        df = simulate_monthly_demand(personas, cfg, cal_template, channels, feat_text, export_csv=False)
        return df['total_units'].values

    def loss(theta):
        theta = clamp(theta)
        y_pred = simulate_theta(theta)
        mape = np.mean(np.abs(y_true - y_pred) / (y_true + 1))
        reg = 1e-3*np.sum((theta - x0)**2)  # 약한 규제
        return float(mape + reg)

    res = minimize(loss, x0, method='Nelder-Mead', options={'maxiter':300})
    theta_opt = pack(clamp(res.x))
    with open(CALIB_PATH,'w',encoding='utf-8') as f: json.dump(theta_opt,f,ensure_ascii=False,indent=2)
    print("Saved calibrated params ->", CALIB_PATH)
    return theta_opt

# ---------- main ----------
if __name__ == '__main__':
    try:
        # 개발 중에는 무료로: use_llm=False, mc_runs=7~21 권장
        sub=make_submission('product_info.csv','submission.csv',
                            persona_count=400, use_llm=True, mc_runs=5)
        print(sub.head())
        print("Files: submission.csv, monthly_forecast.csv, persona_single_turn_prompt.txt/.pdf, sources_used.json, solution_report.md")
    except Exception as e:
        print("Run make_submission(...) with correct CSV. Error:",e)


[LLMAdapter] Gemini 파싱 실패: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerDayPerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 50
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 7
}
] → 규칙기반으로 폴백
[LLMAdapter] Gemini 파싱 실패: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"

In [1]:
# Step1. category_prices.csv 생성 (API 버전 예시)
import requests, pandas as pd, datetime

CLIENT_ID = "12A3OnjYdiu2_EY_DkwS"
CLIENT_SECRET = "p4qammTuR1"

def naver_price_search(query):
    url = "https://openapi.naver.com/v1/search/shop.json"
    headers = {"X-Naver-Client-Id": CLIENT_ID, "X-Naver-Client-Secret": CLIENT_SECRET}
    params = {"query": query, "display": 3, "sort": "sim"}
    resp = requests.get(url, headers=headers, params=params)
    if resp.status_code != 200: return None
    items = resp.json().get("items", [])
    if not items: return None
    prices = [int(it["lprice"]) for it in items]
    return min(prices) if prices else None

def build_category_prices(product_info_csv, out_csv="category_prices.csv"):
    prod = pd.read_csv(product_info_csv)
    rows = []
    for _, row in prod.iterrows():
        name = str(row["product_name"])
        query = f"{name}"
        price = naver_price_search(query)
        rows.append({
            "product_name": name,
            "category_level_1": row["category_level_1"],
            "category_level_2": row["category_level_2"],
            "category_level_3": row["category_level_3"],
            "pack_size_value": int(''.join([c for c in name if c.isdigit()]) or 0),
            "pack_size_unit": "g" if "g" in name.lower() else "ml",
            "list_price": price,
            "channel": "네이버쇼핑",
            "observed_at": datetime.date.today().isoformat()
        })
    df = pd.DataFrame(rows)
    df.to_csv(out_csv, index=False, encoding="utf-8-sig")
    print("Saved:", out_csv)
    return df

# Step2. LightGBM 학습
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
import numpy as np

def train_price_model(csv_path="category_prices.csv"):
    df = pd.read_csv(csv_path).dropna(subset=["list_price"])
    y = df["list_price"].values

    # feature engineering
    X = df[["pack_size_value","pack_size_unit","category_level_1","category_level_2","category_level_3","product_name"]].copy()
    # 간단하게 pack_size_unit -> 0(g)/1(ml)
    X["pack_size_unit"] = X["pack_size_unit"].map({"g":0,"ml":1,"mL":1})
    # product_feature 키워드 기반 flag 생성 (프리미엄, 고단백, 락토프리 등)
    X["is_premium"] = df["product_name"].str.contains("프리미엄").astype(int)
    X["is_high_protein"] = df["product_name"].str.contains("고단백").astype(int)
    X["is_lactofree"] = df["product_name"].str.contains("락토프리").astype(int)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    model = lgb.LGBMRegressor(n_estimators=500, learning_rate=0.05, max_depth=7)
    model.fit(X_train, y_train)
    print("Train R2:", model.score(X_train, y_train))
    print("Test R2:", model.score(X_test, y_test))

    import joblib
    joblib.dump(model, "price_model.pkl")
    print("Saved price_model.pkl")
    return model

# Step3. 기존 코드 교체
import joblib
def _estimate_list_price(row):
    try:
        model = joblib.load("price_model.pkl")
        size = int(''.join([c for c in str(row["product_name"]) if c.isdigit()]) or 0)
        unit = 0 if "g" in str(row["product_name"]).lower() else 1
        feat = {
            "pack_size_value": size,
            "pack_size_unit": unit,
            "category_level_1": row["category_level_1"],
            "category_level_2": row["category_level_2"],
            "category_level_3": row["category_level_3"],
            "is_premium": int("프리미엄" in row["product_feature"]),
            "is_high_protein": int("고단백" in row["product_feature"]),
            "is_lactofree": int("락토프리" in row["product_feature"])
        }
        X = pd.DataFrame([feat])
        return int(model.predict(X)[0])
    except Exception as e:
        print("Price model fallback:", e)
        return 3000


In [2]:
# ---------- One product → 12m (MC 앙상블 지원) ----------
def forecast_one_product(row: pd.Series, persona_count: int = 400, use_llm: bool = False, mc_runs: int = 9) -> np.ndarray:
    global RNG
    name = str(row.get('product_name', '')).strip()
    feat = str(row.get('product_feature', '')).strip()
    c1 = str(row.get('category_level_1', ''))
    c2 = str(row.get('category_level_2', ''))
    c3 = str(row.get('category_level_3', ''))

    # 제품별 시드 고정
    seed = abs(hash(name)) % (2**32 - 1)

    # 가격 (ML 모델 기반)
    price = _estimate_list_price(row)

    # 시장/채널/경쟁사 추론
    market_size = _infer_market_size(name, c2)
    channels = _infer_channels(c1, c2, c3)
    competitors = _infer_competitors(c2)

    # 캘린더 구성
    cal = default_calendar(price)
    enrich_calendar_from_features(cal, feat, name)
    apply_category_seasonality(cal, c2)

    # 유명인 보정(광고모델 효과)
    ad_mult = 1.0
    if '광고모델' in feat and any(k in feat for k in ['안유진', '아이돌', '연예인']):
        ad_mult = 1.15

    # 패키지 텍스트
    size, unit = _extract_size(name)
    pack = f"{int(size)}{unit}"

    # Monte Carlo 앙상블 → 월별 중앙값 사용
    preds = []
    for r in range(mc_runs):
        RNG = np.random.default_rng(seed + r * 1337)
        random.seed(seed + r * 7331)
        df, _ = run_scenario(
            persona_count, use_llm,
            c1 or '식품', feat[:80] or '신제품',
            price, pack, channels, competitors,
            market_size, cal, ad_mult, feature_text=feat
        )
        preds.append(np.maximum(0, np.array(df['total_units'].values)))
    y = np.rint(np.median(np.stack(preds, axis=0), axis=0)).astype(int)
    return y

# ---------- Submission ----------
def make_submission(product_info_csv: str, out_csv: str = 'submission.csv', persona_count: int = 400,
                    use_llm: bool = False, mc_runs: int = 9) -> pd.DataFrame:
    prod = pd.read_csv(product_info_csv)
    rows = []
    for _, row in prod.iterrows():
        y = forecast_one_product(row, persona_count, use_llm, mc_runs=mc_runs)
        rows.append({
            'product_name': row['product_name'],
            **{f'months_since_launch_{i+1}': int(y[i]) for i in range(12)}
        })
    sub = pd.DataFrame(rows)
    sub.to_csv(out_csv, index=False)
    print("Saved:", out_csv)
    return sub


In [None]:
# Step1. 가격 데이터 수집 → category_prices.csv 생성
build_category_prices("product_info.csv", out_csv="category_prices.csv")

# Step2. LightGBM 학습
train_price_model("category_prices.csv")

# Step3. 판매량 예측 실행
submission = make_submission("product_info.csv", "submission.csv", persona_count=400, use_llm=False, mc_runs=9)

print(submission.head())
