In [7]:
# -*- coding: utf-8 -*-
"""
🎯 (2024+2025) 축제명 + 기준 월 → 같은 '분기'에 열리는 축제만 리스트업 (유사시기 단일 필터)
- 분기 규칙: 1→[1,2,3], 4/5/6→[4,5,6], 7/8/9→[7,8,9], 10/11/12→[10,11,12]
- 로드: ./csv/{2024,2025}_축제_핵심필드.csv (+ /mnt/data 경로도 시도)
- 아직 시작 안 한 축제(시작일 > 오늘) 제외
- 입력 축제명(정확·유사) 제외
- 출력: 기본 "축제명  (표시월)" / show_period=True면 "축제명  시작일~종료일  (표시월)"
"""

import os, re
from pathlib import Path
from typing import Optional, List, Dict, Set

import pandas as pd
from difflib import get_close_matches

# ---------------- Settings --------------------
CANDIDATE_FILES = [
    "./csv/2024_축제_핵심필드.csv",
    "./csv/2025_축제_핵심필드.csv",
    "/mnt/data/2024_축제_핵심필드.csv",
    "/mnt/data/2025_축제_핵심필드.csv",
]

# ---------------- Utils -----------------------
def normalize_text(s: str) -> str:
    if s is None: return ""
    s = str(s).strip().replace("\xa0", " ")
    return re.sub(r"\s+", "", s)

def parse_date_str(s: str) -> Optional[pd.Timestamp]:
    try:
        return pd.to_datetime(str(s), format="%Y-%m-%d", errors="coerce")
    except Exception:
        return None

def today_floor_ts() -> pd.Timestamp:
    return pd.Timestamp.today().normalize()

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """'축제유형' -> '축제 유형' 등 표준화 (본 스크립트는 유형 미사용이지만 컬럼 검증용)."""
    colmap = {}
    for c in df.columns:
        cc = str(c).strip()
        if cc == "축제유형":
            colmap[c] = "축제 유형"
    return df.rename(columns=colmap)

def ensure_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = standardize_columns(df)
    needed = ["연번", "광역자치단체명", "기초자치단체명", "축제명", "시작일", "종료일"]
    for c in needed:
        if c not in df.columns:
            raise ValueError(f"필수 컬럼 누락: {c}")
    return df

def read_many_csv(paths: List[str]) -> pd.DataFrame:
    frames = []
    for p in paths:
        if Path(p).exists():
            try:
                df = pd.read_csv(p, dtype=str).fillna("")
            except UnicodeDecodeError:
                df = pd.read_csv(p, dtype=str, encoding="utf-8-sig").fillna("")
            df = ensure_columns(df)
            df["_출처파일"] = Path(p).name
            frames.append(df)
    if not frames:
        raise FileNotFoundError("입력 CSV를 하나도 찾지 못했습니다.")
    out = pd.concat(frames, ignore_index=True)
    out = out.drop_duplicates(
        subset=["광역자치단체명","기초자치단체명","축제명","시작일","종료일"],
        keep="first"
    ).reset_index(drop=True)
    return out

def months_in_range(start: Optional[pd.Timestamp], end: Optional[pd.Timestamp]) -> Set[int]:
    """시작~종료 사이의 월(1..12) 집합. 연도 넘어가도 월만 모음."""
    if (start is None or pd.isna(start)) or (end is None or pd.isna(end)):
        return set()
    if end < start:
        start, end = end, start
    months = set()
    cur = pd.Timestamp(year=start.year, month=start.month, day=1)
    last = pd.Timestamp(year=end.year, month=end.month, day=1)
    while cur <= last:
        months.add(int(cur.month))
        if cur.month == 12:
            cur = pd.Timestamp(year=cur.year + 1, month=1, day=1)
        else:
            cur = pd.Timestamp(year=cur.year, month=cur.month + 1, day=1)
    return months

def quarter_months(pivot_month: int) -> List[int]:
    """1→[1,2,3], 4/5/6→[4,5,6], 7/8/9→[7,8,9], 10/11/12→[10,11,12]"""
    m = int(pivot_month)
    if m in (1,2,3):  return [1,2,3]
    if m in (4,5,6):  return [4,5,6]
    if m in (7,8,9):  return [7,8,9]
    return [10,11,12]

def circular_month_distance(a: int, b: int) -> int:
    a, b = int(a), int(b)
    d = abs(a - b)
    return min(d, 12 - d)

# ---------------- Core ------------------------
def load_data(csv_path_override: Optional[str] = None) -> pd.DataFrame:
    if csv_path_override and Path(csv_path_override).exists():
        try:
            df = pd.read_csv(csv_path_override, dtype=str).fillna("")
        except UnicodeDecodeError:
            df = pd.read_csv(csv_path_override, dtype=str, encoding="utf-8-sig").fillna("")
        df = ensure_columns(df)
        df["_출처파일"] = Path(csv_path_override).name
        return df
    return read_many_csv(CANDIDATE_FILES)

def filter_by_quarter_time_only(
    df: pd.DataFrame,
    query_name: str,
    pivot_month: int,
) -> pd.DataFrame:
    """유사 시기(분기)만으로 필터링."""
    qmonths = set(quarter_months(pivot_month)).copy()

    # 날짜 파싱 + 아직 시작 안 한 축제 제외
    today = today_floor_ts()
    df = df.copy()
    df["시작일_dt"] = df["시작일"].apply(parse_date_str)
    df["종료일_dt"] = df["종료일"].apply(parse_date_str)
    df = df[(~df["시작일_dt"].isna()) & (~df["종료일_dt"].isna()) & (df["시작일_dt"] <= today)]

    # 입력 축제명(정확·유사) 제외
    inorm = normalize_text(query_name)
    all_names = df["축제명"].astype(str).tolist()
    close = set(get_close_matches(query_name, all_names, n=3, cutoff=0.85))
    def _is_self(x: str) -> bool:
        nx = normalize_text(x)
        return (nx == inorm) or (x in close)
    df = df[~df["축제명"].astype(str).apply(_is_self)]

    # 분기와 겹치는 것만
    def _keep(row) -> bool:
        rng = months_in_range(row["시작일_dt"], row["종료일_dt"])
        return len(rng & qmonths) > 0
    df = df[df.apply(_keep, axis=1)].copy()

    # 표시월 계산: 분기 내 겹치는 월 중 pivot에 가장 가까운 월
    df["시작월"] = df["시작일_dt"].dt.month.astype(int)
    def _display_month(row) -> int:
        rng = months_in_range(row["시작일_dt"], row["종료일_dt"]) & qmonths
        if not rng:
            return int(row["시작월"])
        if pivot_month in rng:
            return int(pivot_month)
        return sorted(list(rng), key=lambda m: circular_month_distance(m, pivot_month))[0]
    df["표시월"] = df.apply(_display_month, axis=1).astype(int)

    # 정렬 & 정리
    df["_sd"] = df["시작일_dt"]
    df = df.sort_values(by=["표시월","_sd","축제명"]).drop(columns=["_sd"]).reset_index(drop=True)
    return df[["광역자치단체명","기초자치단체명","축제명","시작일","종료일","표시월","_출처파일"]]

# ---------------- Runner -----------------------
def run_timeonly_quarter(
    festival_name: str,
    pivot_month: int,
    csv_path_override: Optional[str] = None,
    print_limit: int = 300,
    dedupe: bool = True,
    show_period: bool = False,  # True면 "이름  시작~종료 (표시월)"
):
    df = load_data(csv_path_override)
    out = filter_by_quarter_time_only(df, festival_name, int(pivot_month))

    # 중복 이름 제거(원하면)
    if dedupe and not out.empty:
        out = out.drop_duplicates(subset=["축제명"], keep="first")

    # 출력
    print(f"입력 축제명: {festival_name} | 기준 월: {pivot_month} → 분기월={quarter_months(pivot_month)}")
    rows = []
    for _, r in out.iterrows():
        if show_period:
            rows.append(f"{r['축제명']}  {r['시작일']}~{r['종료일']}  ({int(r['표시월'])}월)")
        else:
            rows.append(f"{r['축제명']}  ({int(r['표시월'])}월)")

    print(f"\n같은 분기(유사시기) 축제 수: {len(rows)}")
    if rows:
        print("\n[목록]")
        for i, line in enumerate(rows[:print_limit], start=1):
            print(f"{i:>3}. {line}")
        if len(rows) > print_limit:
            print(f"... ({len(rows)-print_limit}개 더 있음)")
    return out

# ------------- Example -------------
if __name__ == "__main__":
    # 예: '담양산타축제', 12월 기준 → 10~12월 유사시기만
    run_timeonly_quarter("홍어축제", pivot_month=4, show_period=True)
    # run_timeonly_quarter("임실 산타축제", pivot_month=1, show_period=False)


입력 축제명: 홍어축제 | 기준 월: 4 → 분기월=[4, 5, 6]

같은 분기(유사시기) 축제 수: 468

[목록]
  1. 제62회 진해군항제
(군악의장페스티벌)  2024-03-23~2024-04-01  (4월)
  2. 2024 경포 벚꽃 축제  2024-03-29~2024-04-03  (4월)
  3. 2024년 섬수선화축제  2024-03-29~2024-04-07  (4월)
  4. 정읍 벚꽃축제  2024-03-29~2024-04-01  (4월)
  5. 청춘, 금오천 벚꽃 페스티벌  2024-03-30~2024-04-02  (4월)
  6. 강남동 새봄맞이 축제  2024-03-31~2024-04-02  (4월)
  7. 제2회 십승지명당운본고원벚꽃축제  2024-04-01~2024-04-01  (4월)
  8. 금성면 제1회 고비산 산벚꽃축제  2024-04-02~2024-04-02  (4월)
  9. 안양충훈벚꽃축제  2024-04-02~2024-04-02  (4월)
 10. 제28회 비슬산 참꽃 문화제  2024-04-02~2024-04-02  (4월)
 11. 제2회 남대천 벚꽃축제  2024-04-02~2024-04-02  (4월)
 12. 우이천 벚꽃 축제  2024-04-03~2024-04-07  (4월)
 13. 2024 보은 벚꽃길 축제  2024-04-05~2024-04-07  (4월)
 14. 2024 섬 튤립축제  2024-04-05~2024-04-14  (4월)
 15. 제8회여주흥천남한강벚꽃축제  2024-04-05~2024-04-07  (4월)
 16. 2024 영랑호 벚꽃축제  2024-04-06~2024-04-07  (4월)
 17. 2024년 인천대공원 벚꽃축제  2024-04-06~2024-04-07  (4월)
 18. 제20회삼척맹방유채꽃축제  2024-04-06~2024-04-14  (4월)
 19. 제19회 창녕낙동강유채축제  2024-04-11~2024-04-14  (4월)
 20. 419혁명국민문화제