In [2]:
import pandas as pd
import numpy as np
from scipy import stats
from sklearn.linear_model import LinearRegression
from datetime import datetime, timedelta
import warnings
from collections import Counter
import os
import requests
import json
import threading
warnings.filterwarnings('ignore')


class HolidayKeywordMiner:

    def __init__(self, data_path):
        """
        初始化
        data_path: 数据文件路径
        """
        self.data = pd.read_csv(data_path)
        # 去掉所有列都为-1的行
        self.data = self.data[~(self.data.drop(columns=['keyword']).eq(-1).all(axis=1))]
        self.keywords = self.data['keyword']
        # 提取日期列
        date_columns = [col for col in self.data.columns if col != 'keyword']
        self.dates = pd.to_datetime(date_columns, format='%Y%m%d')
        # 创建时间序列数据
        self.ts_data = self.data[date_columns].values
        self.ts_data[self.ts_data == -1] = 100000  # 将-1替换为大值
        self.config = {
            'pre_holiday_start': 60,  # 节前60天
            'pre_holiday_end': 3,    # 节日前3天
            
            'post_holiday_start': 7, # 节后7天
            'post_holiday_end': 14, # 节后2周

            'peak_window_start': 45, # 节日前45天
            'peak_window_end': 7,    # 节日后7天
            
            'pre_holiday_earlystart': 120,  # 节前90天
            'post_holiday_lateend': 30,    # 节后3周
        }
    
    """
    节日前60天向上，节日前1天到节日后14天向下，且最高峰在节日前45天到节日后7天内
    """
    def define_holiday_periods(self, holiday_date, years=[2023, 2024, 2025]):
        """
        定义节日期间
        holiday_date: 节日日期，如 '02-14' (情人节)
        years: 要分析的年份列表
        """
        holiday_periods = {}
        for year in years:
            holiday = f"{year}-{holiday_date}"
            holiday_dt = pd.to_datetime(holiday)
            
            # 节日前后时间段定义
            pre_holiday_start = holiday_dt - timedelta(days=self.config['pre_holiday_start'])  # 节前60天
            pre_holiday_end = holiday_dt - timedelta(days=self.config['pre_holiday_end'])     # 节日前7天

            post_holiday_start = holiday_dt - timedelta(days=self.config['post_holiday_start'])  # 节日前7天
            post_holiday_end = holiday_dt + timedelta(days=self.config['post_holiday_end'])   # 节后2周
            
            peak_window_start = holiday_dt - timedelta(days=self.config['peak_window_start'])  # 节日前45天
            peak_window_end = holiday_dt + timedelta(days=self.config['peak_window_end'])    # 节日后7天
            
            # 用于计算启动期、波峰期、衰退期的索引范围
            pre_holiday_earlystart = holiday_dt - timedelta(days=self.config['pre_holiday_earlystart'])  # 节前90天
            post_holiday_lateend = holiday_dt + timedelta(days=self.config['post_holiday_lateend'])  # 节后3周
            
            holiday_periods[year] = {
                'holiday_date': holiday_dt,
                'pre_period': (pre_holiday_start, pre_holiday_end),
                'post_period': (post_holiday_start, post_holiday_end),
                'peak_window': (peak_window_start, peak_window_end),
                'early_start_lateend': (pre_holiday_earlystart, post_holiday_lateend)
            }
        return holiday_periods

    def detect_peak_periods(self, time_series, dates_series, peak_window_indices):
        """
        检测波峰是否在节日窗口期内
        补充：如果全年搜索排名都为-1，也视为在窗口期内
        """
        if len(time_series) == 0:
            return False, None
        
        # 如果全年排名都为-1，直接返回True
        if np.all(time_series == 100000):
            return False, None
        
        # 找到最低排名（最高搜索量）的位置
        min_rank_idx = np.argmin(time_series)
        # 得到搜索量最高的日期 

        # 检查是否在节日窗口期内, 并返回波峰的日期
        return (peak_window_indices[0] <= min_rank_idx <= peak_window_indices[1]), dates_series[min_rank_idx]

    def analyze_trend_pattern(self, time_series, pre_indices, post_indices):
        """
        分析趋势模式：节前增长，节后下降
        """
        if len(time_series) < 8:  # 需要足够的数据点
            return False, 0, 0
        
        # 节前趋势分析
        pre_data = time_series[pre_indices[0]:pre_indices[1]+1]
        pre_x = np.arange(len(pre_data)).reshape(-1, 1)
        
        # 节后趋势分析
        post_data = time_series[post_indices[0]:post_indices[1]+1]
        post_x = np.arange(len(post_data)).reshape(-1, 1)
        # 计算斜率
        pre_slope = self.calculate_slope(pre_x, pre_data)
        post_slope = self.calculate_slope(post_x, post_data)
        
        # 模式条件：节前下降（排名上升，搜索量增长），节后上升（排名下降，搜索量减少）
        pattern_match = pre_slope <= 0 and post_slope >= 0 and (abs(pre_slope) <= abs(post_slope))
        
        return pattern_match, pre_slope, post_slope

    def calculate_slope(self, x, y):
        """计算线性回归斜率"""
        if len(y) < 2:
            return 0
        
        try:
            model = LinearRegression()
            model.fit(x, y)
            return model.coef_[0]
        except:
            return 0

    def start_peak_end_dates(self, time_series, dates_series, earlystart_lateend_indices):
        """
        挖掘关键的特征:
        1. 启动期: 挖掘一个关键词在节日前第一次稳定持续出现在前10W排名的时期
        2. 波峰期: 挖掘一个关键词在节日前搜索量最高的日期
        3. 衰退期: 挖掘一个关键词在节日后搜索掉到10W+的日期
        """
        start_idx, end_idx = earlystart_lateend_indices
        ts = time_series[start_idx:end_idx+1]
        ds = dates_series[start_idx:end_idx+1]
        # 1) 启动期：第一次排名<10W
        launch_date = None
        for i in range(len(ts)):
            if ts[i] < 100000:
                launch_date = ds[i]
                break
        # 2) 波峰期：搜索量最高（排名最低）的日期
        peak_date = ds[np.argmin(ts)] if len(ts) and np.min(ts) < 100000 else None
        # 3) 衰退期：节日后第一次连续3天排名≥10W
        decline_date = None
        # 90+ 30天，取后75%作为“节后”范围
        post_start = (len(ts) // 4) * 3 
        for i in range(post_start, len(ts) - 1):
            if all(ts[i+j] >= 100000 for j in range(2)):
                decline_date = ds[i]
                break

        return launch_date, peak_date, decline_date
        

    def validate_multi_year_pattern(self, holiday_date, keyword_idx, holiday_periods):
        """
        验证多年模式一致性
        """
        year_results = {}
        
        for year, periods in holiday_periods.items():
            # 获取该年份的时间段索引
            holiday_dt = pd.to_datetime(f"{year}-{holiday_date}")
            window_start = holiday_dt - timedelta(days=180)
            window_end   = holiday_dt + timedelta(days=30)
            year_mask = (self.dates >= window_start) & (self.dates <= window_end)
            year_data = self.ts_data[keyword_idx][year_mask]
            year_dates = self.dates[year_mask]
            
            if len(year_data) == 0:
                continue
                
            # 获取各时间段索引
            pre_start_idx, pre_end_idx = self.get_period_indices(year_dates, periods['pre_period'])
            post_start_idx, post_end_idx = self.get_period_indices(year_dates, periods['post_period'])
            peak_start_idx, peak_end_idx = self.get_period_indices(year_dates, periods['peak_window'])
            pre_holiday_earlystart, post_holiday_lateend = self.get_period_indices(year_dates, periods['early_start_lateend'])

            if None in [pre_start_idx, pre_end_idx, post_start_idx, post_end_idx]:
                continue
                
            # 检测波峰和模式
            peak_in_window, peak_date = self.detect_peak_periods(year_data, year_dates, (peak_start_idx, peak_end_idx))
            
            pattern_match, pre_slope, post_slope = self.analyze_trend_pattern(
                year_data, (pre_start_idx, pre_end_idx), (post_start_idx, post_end_idx)
            )
            # 计算启动日期、波峰日期、衰退日期
            launch_date, true_peak_date, decline_date = self.start_peak_end_dates(
                year_data, year_dates, (pre_holiday_earlystart, post_holiday_lateend)
            )
            
            year_results[year] = {
                'launch_date': launch_date,
                'true_peak_date': true_peak_date,
                'decline_date': decline_date,
                'min_rank': np.min(year_data) if len(year_data) > 0 else 100000,
                'pre_slope': pre_slope,
                'post_slope': post_slope,   
                'peak_date': peak_date,
                'peak_in_window': peak_in_window,
                'pattern_match': pattern_match
            }
        
        return year_results

    def get_period_indices(self, year_dates, period):
        """获取时间段对应的索引"""
        start_idx = np.where(year_dates >= period[0])[0]
        end_idx = np.where(year_dates <= period[1])[0]
        
        if len(start_idx) == 0 or len(end_idx) == 0:
            return None, None
        
        return start_idx[0], end_idx[-1]


    def mine_holiday_keywords(self, holiday_date, years=[2023, 2024, 2025], 
                         min_consistency=0.8, white_word=[]):
        """
        主函数：挖掘节日相关关键词
        """
        # 定义节日期间
        holiday_periods = self.define_holiday_periods(holiday_date, years)

        results = []
        for idx, keyword in enumerate(self.keywords):
            if idx % 1000 == 0:
                print(f"处理进度: {idx}/{len(self.keywords)}")
            # 验证多年模式
            year_results = self.validate_multi_year_pattern(holiday_date, idx, holiday_periods)
            if len(year_results) < len(years) * min_consistency:
                continue
            # 计算模式一致性
            peak_consistency = sum([r['peak_in_window'] for r in year_results.values()]) / len(year_results)
            pattern_consistency = sum([r['pattern_match'] for r in year_results.values()]) / len(year_results)
            
            hit_wight_words = False 
            for word in white_word:
                if word in keyword:
                    hit_wight_words = True
                    break
            # 过滤条件
            if (peak_consistency >= min_consistency and pattern_consistency >= min_consistency) or hit_wight_words:
                # 计算节前4周平均斜率
                avg_pre_slope = np.mean([r['pre_slope'] for r in year_results.values()])
                # 计算节后4周平均斜率
                avg_post_slope = np.mean([r['post_slope'] for r in year_results.values()])
                # 最高ABA排名（最低排名值）
                best_rank = min([r['min_rank'] for r in year_results.values()])
                # 最近一年的ABA相比上一年的ABA排名增加情况 
                rank_increase = year_results[years[-2]]['min_rank'] - year_results[years[-1]]['min_rank']
                rank_increase_ratio = rank_increase / year_results[years[-2]]['min_rank']
                # 最近一年的节前搜索增长斜率 - 上一年搜索增长斜率情况 
                slope_increase = abs(year_results[years[-1]]['pre_slope'] - abs(year_results[years[-2]]['pre_slope']))

                last_year_result = year_results.get(years[-1], None)
                results.append({
                    'keyword': keyword,
                    # 去年的启动日期、波峰日期、衰退日期
                    'last_year_launch_date':  last_year_result['launch_date'] if last_year_result else None,
                    'last_year_true_peak_date':  last_year_result['true_peak_date'] if last_year_result else None,
                    'last_year_decline_date':  last_year_result['decline_date'] if last_year_result else None,
                    # 最小的ABA排名（最低排名值）
                    'last_year_min_rank':  last_year_result['min_rank'] if last_year_result else None,
                    # 去年的节前搜索增长斜率
                    'last_year_pre_slope':  last_year_result['pre_slope'] if last_year_result else None,
                    # 去年的节后搜索增长斜率
                    'last_year_post_slope':  last_year_result['post_slope'] if last_year_result else None,
                    # 每年的对比
                    'best_aba_rank': best_rank, # 平均每年的最佳ABA排名
                    'avg_pre_slope': avg_pre_slope, # 平均每年的节前4周平均斜率
                    'avg_post_slope': avg_post_slope, # 平均每年的节后4周平均斜率
                    'slope_increase': slope_increase, # 最近一年节前斜率相对上一年的增幅
                    'rank_increase': rank_increase, # 最近一年ABA排名相对上一年的增幅
                    'rank_increase_ratio': rank_increase_ratio, # 最近一年ABA排名相对上一年的增幅
                    # 每年的原始数据
                    'year_results': year_results
                })
       
        results.sort(key=lambda x: x['avg_post_slope'], reverse=True)
        
        return pd.DataFrame(results)



In [3]:
from typing import List, Dict
from pydantic import BaseModel

# 假设 volcenginesdkarkruntime 已安装，Ark 可用
try:
    from volcenginesdkarkruntime import Ark
except ImportError:
    Ark = None

class KeyWordFilterAnalysis(BaseModel):
    # 根据实际返回结构定义，这里仅示例
    non_holiday_keywords: List[str]

def call_ark(user_msg: List[Dict], prompt: str, api_key: str, model: str) -> KeyWordFilterAnalysis:
    if Ark is None:
        raise RuntimeError("未安装 volcenginesdkarkruntime，请先在 requirements.txt 中添加并安装")
    client = Ark(api_key=api_key)
    completion = client.beta.chat.completions.parse(
        model=model,
        messages=[{"role": "system", "content": prompt}, {"role": "user", "content": user_msg}],
        response_format=KeyWordFilterAnalysis,
        extra_body={"thinking": {"type": "disabled"}},
    )
    return completion.choices[0].message.parsed

class HolidayKeywordFilter:
    def __init__(self, api_key: str = '29549de0-26ea-4e17-b73f-09ecdf08b678', model: str = "ep-20250618020820-t2x6m"):
        self.api_key = api_key
        self.model = model

    def filter_keywords_by_llm(self, keyword_list: list, holiday_date: str) -> list:
        """
        给定一组关键词，是我根据在节日前搜索增加节日后搜索降低的趋势挖掘的可能属于和该节日相关的关键词。
        现在需要你根据LLM去掉和该节日不相关的关键词

        使用大语言模型，每50个词一组，判断是否符合节日特征。
        使用大语言模型返回其中不符合的关键词，组成一个新的列表。
        
        Args:
            keyword_list: 输入的关键词列表
            holiday_date: 节日日期
            
        Returns:
            list: 符合节日特征的关键词列表
        """
        # 每50个关键词一组，分批调用大模型
        batch_size = 50
        filtered_keywords = []
        total_batches = (len(keyword_list) + batch_size - 1) // batch_size

        for i in range(0, len(keyword_list), batch_size):
            batch = keyword_list[i:i + batch_size]
            current_batch = i // batch_size + 1
            print(f"[进度] 正在处理第 {current_batch}/{total_batches} 批关键词（共 {len(batch)} 个）...")
            
            user_msg = {
                "type": "text",
                "text": (
                    f"节日：{holiday_date}\n"
                    "给定一组关键词，是我根据在亚马逊的ABA搜索词排名数据。符合节日前搜索增加，节日后搜索降低的趋势，挖掘的可能属于和该节日相关的关键词。\n"
                    "但是里面可能有不相关的词。现在需要你根据LLM去掉和该节日不相关的关键词。\n"
                    "请返回一个列表，仅包含明显与节日不相关的关键词（即需要剔除的词）。\n"
                    "如果全部相关，返回空列表[]。\n"
                    "关键词列表：\n" + "\n".join(batch)
                )
            }
            try:
                # 调用封装后的 Ark 接口
                result = call_ark(
                    user_msg=[user_msg],
                    prompt='你是一个专业的关键词筛选器，负责根据给定的节日日期和关键词列表，判断哪些关键词与该节日相关，哪些不相关。',
                    api_key=self.api_key,
                    model=self.model
                )
                non_holiday_keywords = result.non_holiday_keywords
                # 从当前批次中剔除这些词
                filtered_batch = [kw for kw in batch if kw not in non_holiday_keywords]
                filtered_keywords.extend(filtered_batch)
                print(f"[进度] 第 {current_batch} 批完成，剔除 {len(non_holiday_keywords)} 个关键词，保留 {len(filtered_batch)} 个。")
            except Exception as e:
                # 解析异常时默认保留全部
                filtered_keywords.extend(batch)
                print(f"[进度] 第 {current_batch} 批解析异常（{e}），默认保留全部 {len(batch)} 个关键词。")

        print(f"[完成] 全部批次处理完毕，最终保留 {len(filtered_keywords)} 个关键词。")
        return filtered_keywords


In [None]:
aba_file_path = 'data/df_10w_aba.csv' 

holiday = [
    # {
        
    #     'date': '01-01',
    #     'name': 'new_year',
    #     'white_word': ["new year", "happy new year", "resolution", "new year's eve", "countdown"]
    # },
    # {
    #     'date': '02-14',
    #     'name': 'valentine',
    #     'white_word': ["valentine", "valentine's", "love"]
    # }
    # {
    #     'date': '03-17',
    #     'name': 'st_patricks_day',
    #     'white_word': ["st patrick", "st. patrick", "irish"]
    # },
    # {
    #     'name': 'easter',
    #     'date': '04-20', # 仅作为示例参考，实际需要计算
    #     'white_word': ["easter", "bunny", "egg hunt", "easter basket", "spring"]
    # },
    # {
    #     'name': 'mothers_day',
    #     'date': '05-11', 
    #     'white_word': ["mother's day", "mom", "gifts for mom", "best mom", "mama"]
    # },
    # {
    #     'name': 'memorial_day',
    #     'date': '05-25', # 仅作为示例参考，实际需要计算
    #     'white_word': ["memorial day", "bbq", "outdoor", "summer start", "flag", "party"]
    # },
    # {
    #     'date': '06-01',
    #     'name': 'childrens_day',
    #     'white_word': ["children's day", "kids", "toys"]
    # },
    # {
    #     'date': '06-21',  # 使用特殊标识符
    #     'name': 'fathers_day',
    #     'white_word': ["father's day", "dad", "father"]
    # },
    # {
    #     'date': '07-04',
    #     'name': 'independence_day',
    #     'white_word': ["independence day", "4th of july", "fireworks", 'patriotic', 'independence']
    # },
    {
        'date': '10-31',
        'name': 'halloween',
        'white_word': ["halloween", "costume", "candy"]
    },
    {
        'date': '11-27',  # 使用特殊标识符
        'name': 'thanksgiving',
        'white_word': ["thanksgiving", "turkey", "dinner"]
    },
    {
        'date': '12-25',
        'name': 'christmas',
        'white_word': ["christmas", "xmas", "gift"]
    }
]
miner = HolidayKeywordMiner(aba_file_path)

for h in holiday:
    holiday_keyword_step1_df = miner.mine_holiday_keywords(
        holiday_date=h['date'],  #对应节日
        years=[2023,2024],
        min_consistency=0.8, # 80%的一致性要求
        white_word=h['white_word']
    )
    valid_keywords = list(holiday_keyword_step1_df.keyword.values)
    hkf = HolidayKeywordFilter()
    filter_result = hkf.filter_keywords_by_llm(keyword_list=valid_keywords, holiday_date=h['name'])
    # 只保留 filter_result 中的keyword列的值，存在的 keywords数组中的行
    holiday_keyword_df = holiday_keyword_step1_df[holiday_keyword_step1_df['keyword'].isin(filter_result)]
    holiday_keyword_df.to_csv(f'result/{h["name"]}_keyword_info.csv', index=False)
    
    # 得到原始的日期数据
    dates_data = pd.read_csv(aba_file_path)
    holiday_info_data = pd.read_csv(f'result/{h["name"]}_keyword_info.csv')

    holiday_keywords = holiday_info_data.keyword.values

    dates_data[dates_data.keyword.isin(holiday_keywords)].to_csv(f'result/{h["name"]}_keyword_trends.csv',index=False)


处理进度: 0/132913
处理进度: 1000/132913
处理进度: 2000/132913
处理进度: 3000/132913
处理进度: 4000/132913
处理进度: 5000/132913
处理进度: 6000/132913
处理进度: 7000/132913
处理进度: 8000/132913
处理进度: 9000/132913
处理进度: 10000/132913
处理进度: 11000/132913
处理进度: 12000/132913
处理进度: 13000/132913
处理进度: 14000/132913
处理进度: 15000/132913
处理进度: 16000/132913
处理进度: 17000/132913
处理进度: 18000/132913
处理进度: 19000/132913
处理进度: 20000/132913
处理进度: 21000/132913
处理进度: 22000/132913
处理进度: 23000/132913
处理进度: 24000/132913
处理进度: 25000/132913
处理进度: 26000/132913
处理进度: 27000/132913
处理进度: 28000/132913
处理进度: 29000/132913
处理进度: 30000/132913
处理进度: 31000/132913
处理进度: 32000/132913
处理进度: 33000/132913
处理进度: 34000/132913
处理进度: 35000/132913
处理进度: 36000/132913
处理进度: 37000/132913
处理进度: 38000/132913
处理进度: 39000/132913
处理进度: 40000/132913
处理进度: 41000/132913
处理进度: 42000/132913
处理进度: 43000/132913
处理进度: 44000/132913
处理进度: 45000/132913
处理进度: 46000/132913
处理进度: 47000/132913
处理进度: 48000/132913
处理进度: 49000/132913
处理进度: 50000/132913
处理进度: 51000/132913
处理进度: 52000/132913
处理进度: 

In [9]:
holiday_keyword_step1_df