# Cathay ADAI copy generator 202501

### 環境部署說明

#### 環境設置說明

建立 Cloud Storage Bucket
1. 在Cloud Storage 建立新的 bucket
2. 上傳必要檔案:
  - `my_tag_description_customer-segment.csv`
  - `my_tag_description_behavior.csv`
  - `genai_copies_request_form.csv`

2. 建立 BigQuery Dataset
1. 在 Cloud Console > BigQuery 建立新的 dataset
  - Dataset ID: `cathay_adai`  

----

### 基礎設定和初始化

初始設定說明

#### 1. 需更改部分
- PROJECT_ID
- LOCATION（如果有另外設定的話) 
- bucket_name
- CSV檔案名稱:
 - `Cathay-customer segment.csv` -> `my_tag_description_customer-segment.csv`
 - `behavior_tags.csv` -> `my_tag_description_behavior.csv` 
 - `ad_combinations.csv` -> `genai_copies_request_form.csv`

#### 2. 功能說明
1. 初始化三個服務:
  - Vertex AI (Gemini模型)
  - Cloud Storage
  - BigQuery
2. 讀取三個主要資料檔:
  - 客群標籤
  - 行為標籤
  - 行銷活動表單
3. 設定金融產品關鍵字黑名單 CopyRetryClaude does not have the ability to run the code it generates yet.Claude can make mistakes. Please double-check responses.

In [1]:
import os
import json
import time
from datetime import datetime
from vertexai import init
from vertexai.preview.generative_models import GenerativeModel, Image
from google.cloud import storage
from google.cloud import bigquery
import pandas as pd

# 初始化 Vertex AI
PROJECT_ID = "poc-55-genai"
LOCATION = "us-central1"
init(project=PROJECT_ID, location=LOCATION)

model = GenerativeModel("gemini-1.5-pro-002")

# 初始化 GCS client
client = storage.Client()

# 設定 bucket 和檔案資訊
bucket_name = "cathay-adai-55"

# 讀取客群資料
client_df = pd.read_csv(f"gs://{bucket_name}/my_tag_description_customer-segment.csv", encoding='utf-8')

# 讀取行為標籤資料
behavior_df = pd.read_csv(f"gs://{bucket_name}/my_tag_description_behavior.csv", encoding='utf-8')

# 讀取行銷活動資料
marketing_df = pd.read_csv(f'gs://{bucket_name}/genai_copies_request_form.csv')

# 共用黑名單關鍵字（適用於所有產品）
common_blacklist = [
    "財富自由", "增值", "增長", "升級", "穩穩賺", "錢滾錢",
    "績優", "排行榜", "推薦", "每月領息", "固定收益", 
    "被動收入", "現金流", "加薪", "跟著做", "跟著投資", 
    "月領雙薪", "幫你鋪路", "獲利達x%", "低價買", "放大退休金"
]

# 產品特定黑名單關鍵字
product_specific_blacklist = {
    "基金": ["人氣基金"],
    "證券": ["智慧加減碼", "手續費折扣", "手續費優惠", "人氣ETF", "人氣美股", "人氣台股"],
    "換匯": [],
    "ROBO智能投資": ["人氣基金", "人氣ETF"]
}

# 建立完整黑名單字典
blacklist_keywords = {}
for product, specific_words in product_specific_blacklist.items():
    blacklist_keywords[product] = common_blacklist + specific_words

In [2]:
client_df.head(2)

Unnamed: 0,NO,欄位中文名稱,欄位英文名稱(程式開發後回填),定義,GenAI 擴充,GenAI 關鍵字
0,1,MY_新手爸媽,MY_NewParent,40歲以下有幼齡小孩,"這群40歲以下的年輕父母正值職涯發展的關鍵期。由於育兒開支偏高且家庭責任重,消費模式審慎而有...",育兒基金、子女教育金、醫療保障、彈性理財、雙薪家庭
1,2,MY_銀髮族,MY_Elderly,60歲以上,"此客群財務狀況趨於穩定,支出模式審慎,資產配置以保守投資為主。他們的理財目標傾向於維持收入來...",退休金、醫療保險、傳承規劃、旅遊基金、複合式存款


In [3]:
behavior_df.head(2)

Unnamed: 0,欄位中文名稱,欄位英文名稱(程式開發後回填),定義
0,MY_網購族,MY_ECShopper,經常於線上通路進行消費
1,MY_電遊狂熱,MY_Gaming,經常購買模型、動漫、手遊等娛樂


In [4]:
marketing_df.head(2)

Unnamed: 0,產品類別,產品名稱,產品描述,活動資訊,版位,行銷目標描述,文案生成組數,行銷到達頁面,客群標籤,行為標籤
0,基金,數位買基金幸福長跑手續費0元,讓投資變習慣，定期定額申購基金，為自己好好打算,定期定額投資全系列基金享申購手續費優惠，1年以下手續費6折、滿1年(含)~2年手續費3折、滿...,APP AT 文字版位,申購基金定期定額,3,https://www.cathaybk.com.tw/cathaybk/personal/...,MY_新手爸媽,MY_美食族
1,基金,數位買基金幸福長跑手續費0元,讓投資變習慣，定期定額申購基金，為自己好好打算,定期定額投資全系列基金享申購手續費優惠，1年以下手續費6折、滿1年(含)~2年手續費3折、滿...,APP AT 文字版位,申購基金定期定額,3,https://www.cathaybk.com.tw/cathaybk/personal/...,MY_金融菁英,MY_美食族


### 輔助函數和文案生成相關函數

主要函式:

#### 1. `generate_ad_prompt()`
- 生成提示詞給 Gemini 模型
- 整合:
 - 產品資訊
 - 客群特徵
 - 行為標籤
 - 黑名單關鍵字
- 固定格式: JSON輸出3個文案

#### 2. 驗證相關函式
- `is_valid_copy()`: 檢查字數、黑名單
- `is_duplicate_copy()`: 檢查重複文案 
- `validate_campaign_data()`: 檢查必填欄位、產品類別

**參數規範**:
- 文案字數上限: 16字
- 每批次生成: 3個文案
- 允許產品類別: 基金、證券、換匯、ROBO智能投資

In [2]:
def generate_ad_prompt(row, client_df, behavior_df, blacklist_keywords, char_limit=16, batch_size=3):
    """生成提示詞"""
    # 取得客群資訊
    client_info = client_df[client_df['欄位中文名稱'] == row['客群標籤']].iloc[0]
    
    # 取得行為標籤資訊
    behavior_info = behavior_df[behavior_df['欄位中文名稱'] == row['行為標籤']].iloc[0]
    
    # 取得對應的黑名單關鍵字
    blacklist = blacklist_keywords.get(row['產品類別'], [])
    
    # 使用雙大括號處理 JSON 格式
    json_format = """{{
    "ad_copy_1": "第一個文案內容",
    "ad_copy_2": "第二個文案內容",
    "ad_copy_3": "第三個文案內容"
}}"""
    
    prompt = f"""請為銀行產品生成行銷文案，需符合以下要求：

產品資訊：
- 產品類別：{row['產品類別']}
- 產品名稱：{row['產品名稱']}
- 產品描述：{row['產品描述']}
- 活動內容：{row['活動資訊']}

目標客群特徵：
- 客群定義：{client_info['定義']}
- 客群描述：{client_info['GenAI 擴充']}
- 客群關鍵特徵：{client_info['GenAI 關鍵字']}

行為特徵：
- 目標客戶行為：{behavior_info['定義']}

行銷目標：
- 目標：{row['行銷目標描述']}

行銷版位：
{row['版位']}（字數限制：{char_limit}字以內）

限制條件：
1. 禁用關鍵字：{', '.join(blacklist)}
2. 不得有保證獲利、精準推薦或導引投資的內容
3. 需包含明確的行動呼籲
4. 避免提到任何職業名稱

請直接提供 JSON 格式輸出{batch_size}個文案，格式如下：

{json_format}

注意：直接提供 JSON 內容，不要加入任何其他格式標記或說明。"""
    return prompt

def is_valid_copy(copy, blacklist, char_limit):
    """檢查文案是否符合規則"""
    # 檢查字數限制
    if len(copy) > char_limit:
        return False
    
    # 檢查禁用關鍵字
    for word in blacklist:
        if word in copy:
            return False
            
    return True

def is_duplicate_copy(new_copy, existing_copies):
    """檢查是否重複文案"""
    return new_copy in existing_copies

def validate_campaign_data(row):
    """驗證行銷活動資料完整性"""
    required_fields = [
        '產品類別', '產品名稱', '產品描述', '活動資訊', 
        '版位', '行銷目標描述', '文案生成組數', '行銷到達頁面', 
        '客群標籤', '行為標籤'
    ]
    
    for field in required_fields:
        if field not in row or pd.isna(row[field]):
            raise ValueError(f"缺少必要欄位或欄位為空: {field}")
    
    # 確認產品類別是否在允許的範圍內
    valid_categories = ['基金', '證券', '換匯', 'ROBO智能投資']
    if row['產品類別'] not in valid_categories:
        raise ValueError(f"無效的產品類別: {row['產品類別']}")

In [5]:
def generate_ad_prompt(row, client_df, behavior_df, blacklist_keywords, char_limit=16, batch_size=3):
    """生成提示詞"""
    # 取得客群資訊
    client_info = client_df[client_df['欄位中文名稱'] == row['客群標籤']].iloc[0]
    
    # 取得行為標籤資訊
    behavior_info = behavior_df[behavior_df['欄位中文名稱'] == row['行為標籤']].iloc[0]
    
    # 取得對應的黑名單關鍵字，使用產品類別映射
    product_category = row['產品類別']
    blacklist = blacklist_keywords.get(product_category, [])
    
    # 使用雙大括號處理 JSON 格式
    json_format = """{{
    "ad_copy_1": "第一個文案內容",
    "ad_copy_2": "第二個文案內容",
    "ad_copy_3": "第三個文案內容"
}}"""
    
    prompt = f"""請為銀行產品生成行銷文案，需符合以下要求：

產品資訊：
- 產品類別：{product_category}
- 產品名稱：{row['產品名稱']}
- 產品描述：{row['產品描述']}
- 活動內容：{row['活動資訊']}

目標客群特徵：
- 客群定義：{client_info['定義']}
- 客群描述：{client_info['GenAI 擴充']}
- 客群關鍵特徵：{client_info['GenAI 關鍵字']}

行為特徵：
- 目標客戶行為：{behavior_info['定義']}

行銷目標：
- 目標：{row['行銷目標描述']}

行銷版位：
{row['版位']}（字數限制：{char_limit}字以內）

限制條件：
1. 禁用關鍵字：{', '.join(blacklist)}
2. 不得有保證獲利、精準推薦或導引投資的內容
3. 需包含明確的行動呼籲
4. 避免提到任何職業名稱

請直接提供 JSON 格式輸出{batch_size}個文案，格式如下：

{json_format}

注意：直接提供 JSON 內容，不要加入任何其他格式標記或說明。"""
    return prompt

def is_valid_copy(copy, blacklist, char_limit):
    """檢查文案是否符合規則"""
    if not copy or not isinstance(copy, str):
        print("✗ 無效的文案格式")
        return False
    
    # 檢查字數限制
    if len(copy) > char_limit:
        print(f"✗ 超過字數限制：{len(copy)} > {char_limit}")
        return False
    
    # 檢查禁用關鍵字
    for word in blacklist:
        if word in copy:
            print(f"✗ 包含黑名單關鍵字：'{word}'")
            return False
            
    return True

def is_duplicate_copy(new_copy, existing_copies):
    """檢查是否重複文案"""
    return new_copy in existing_copies

def validate_campaign_data(row):
    """驗證行銷活動資料完整性"""
    required_fields = [
        '產品類別', '產品名稱', '產品描述', '活動資訊', 
        '版位', '行銷目標描述', '文案生成組數', '行銷到達頁面', 
        '客群標籤', '行為標籤'
    ]
    
    for field in required_fields:
        if field not in row or pd.isna(row[field]):
            raise ValueError(f"缺少必要欄位或欄位為空: {field}")
    
    # 確認產品類別是否在允許的範圍內
    valid_categories = list(product_specific_blacklist.keys())  # 使用黑名單字典的鍵作為有效類別
    if row['產品類別'] not in valid_categories:
        raise ValueError(f"無效的產品類別: {row['產品類別']}")

### 核心生成函數和資料處理

#### generate_campaign_copies()
- 文案生成主函式
- 限制重試次數: 最多50次
- 流程:
 1. 使用Gemini生成文案
 2. 清理JSON格式回應
 3. 驗證文案:
    - 字數限制
    - 黑名單關鍵字
    - 重複檢查
 4. 收集符合條件文案
 5. 達標返回結果

#### save_copies_batch()
- BigQuery存儲函式
- 功能:
 1. 深複製避免修改原始資料
 2. 時間戳格式轉換
 3. 批次寫入BigQuery
 4. 錯誤處理與紀錄

#### 重要參數
- char_limit: 16字
- target_count: 3個文案
- batch_size: 3個/批
- max_retries: 50次

In [6]:
def generate_campaign_copies(row, client_df, behavior_df, blacklist_keywords, 
                           char_limit=16, target_count=3, batch_size=3,
                           max_retries=50):
    """改進的文案生成函數，限制總重試次數為50次"""
    all_copies = set()
    retries = 0
    product_category = row['產品類別']
    
    # 獲取對應產品類別的黑名單
    blacklist = blacklist_keywords.get(product_category, [])
    if not blacklist:
        print(f"警告：產品類別 '{product_category}' 找不到對應的黑名單")
    
    print(f"\n開始生成文案：")
    print(f"產品類別：{product_category}")
    print(f"產品名稱：{row['產品名稱']}")
    print(f"目標客群：{row['客群標籤']}")
    print(f"行為標籤：{row['行為標籤']}")
    print(f"黑名單關鍵字數量：{len(blacklist)}")
    
    while len(all_copies) < target_count and retries < max_retries:
        try:
            prompt = generate_ad_prompt(row, client_df, behavior_df, 
                                      blacklist_keywords, char_limit, batch_size)
            response = model.generate_content(prompt).text
            
            # 印出原始回應
            print("\n=== Gemini 原始回應 ===")
            print(response)
            print("=== 原始回應結束 ===\n")
            
            # 改進的 JSON 清理
            response = response.strip()
            if response.startswith('```') and response.endswith('```'):
                response = response[response.find('{'):response.rfind('}')+1]
            # 移除額外的大括號
            if response.startswith('{{'):
                response = response[1:]
            if response.endswith('}}'):
                response = response[:-1]
            
            print("=== 清理後的 JSON ===")
            print(response)
            print("=== JSON 結束 ===\n")
            
            copies = json.loads(response)
            
            # 印出當前生成的文案
            print(f"\n嘗試 {retries + 1}/{max_retries}:")
            for key, copy in copies.items():
                print(f"{key}: {copy}")
                print("檢查文案...")
                if is_valid_copy(copy, blacklist, char_limit):
                    if copy not in all_copies:
                        all_copies.add(copy)
                        print(f"✓ 有效文案已收集: {len(all_copies)}/{target_count}")
                    else:
                        print("✗ 重複的文案")
                
                if len(all_copies) >= target_count:
                    print("\n成功生成所需文案數量！")
                    return list(all_copies)[:target_count], True
            
            retries += 1
            
        except Exception as e:
            print(f"❌ 生成錯誤 (嘗試 {retries + 1}/{max_retries}): {str(e)}")
            retries += 1
            if retries >= max_retries:
                print(f"\n已達最大重試次數 {max_retries}")
                break
    
    return list(all_copies), len(all_copies) == target_count

def save_copies_batch(copies_batch):
    """批次存入文案到 BigQuery"""
    client = bigquery.Client()
    table_id = "poc-55-genai.cathay_adai.ad_copy"
    
    # 先進行一次深拷貝，避免修改原始資料
    import copy
    copies_to_save = copy.deepcopy(copies_batch)
    
    # 確保時間戳格式正確
    for row in copies_to_save:
        if isinstance(row['generated_time'], datetime):
            # 加入時區資訊的格式
            row['generated_time'] = row['generated_time'].strftime('%Y-%m-%d %H:%M:%S.%f %Z')
    
    try:
        errors = client.insert_rows_json(table_id, copies_to_save)
        if errors:
            error_msg = f"批次寫入遇到錯誤: {errors}"
            print(error_msg)
            raise Exception(error_msg)
        else:
            print(f"✓ 成功寫入 {len(copies_to_save)} 筆文案到 BigQuery")
    except Exception as e:
        print(f"❌ BigQuery 寫入錯誤: {str(e)}")
        raise

### 主要執行流程

#### process_single_campaign()
- 單一活動處理函式
- 流程:
 1. 驗證資料完整性
 2. 生成3個文案
 3. 建立campaign_id
 4. 儲存結果:
    - 本地JSON檔
    - 本地CSV檔 
    - BigQuery資料表

#### process_campaign_batch()
- 批次處理所有活動
- 記錄失敗活動資訊
- 即時顯示處理進度

#### run_ad_generation()
- 系統主程式
- 功能:
 1. 開始執行紀錄
 2. 批次產生文案
 3. 計算執行時間
 4. 產出執行報告:
    - 成功/失敗數量
    - 平均執行時間
    - 失敗活動清單

In [9]:
def process_single_campaign(row, save_to_db=True):
    """處理單一活動的文案生成和存儲"""
    try:
        # 驗證活動資料
        validate_campaign_data(row)
        
        # 生成文案
        campaign_copies, is_complete = generate_campaign_copies(
            row=row,
            client_df=client_df,
            behavior_df=behavior_df,
            blacklist_keywords=blacklist_keywords,
            char_limit=16,
            target_count=3,
            batch_size=3
        )
        
        if not campaign_copies:
            raise Exception("未能生成任何有效文案")
        
        if not is_complete:
            return False, f"僅生成 {len(campaign_copies)}/3 個文案"
            
        # 準備文案資料
        campaign_id = f"{row['產品類別']}_{row['客群標籤']}_{datetime.now().strftime('%Y%m%d')}"
        
        copies_data = []
        for i, copy in enumerate(campaign_copies):
            # 根據 BigQuery schema 準備資料
            copy_data = {
                "campaign_id": campaign_id,
                "product_category": row['產品類別'],
                "product_name": row['產品名稱'],
                "product_description": row['產品描述'],
                "activity_info": row['活動資訊'],
                "placement": row['版位'],
                "marketing_objective": row['行銷目標描述'],
                "copy_generation_count": str(row['文案生成組數']),  # 轉為 STRING
                "marketing_landing_page": row['行銷到達頁面'],
                "customer_segment": row['客群標籤'],
                "behavior_tag": row['行為標籤'],
                "ad_copy": copy,
                "char_count": len(copy),  # INTEGER
                "batch_number": i + 1,    # INTEGER
                "generated_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # TIMESTAMP
            }
            copies_data.append(copy_data)
        
        # 儲存到臨時檔案
        temp_dir = "cathay_adai_temp"
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        
        # 儲存JSON格式
        temp_file_json = os.path.join(temp_dir, f"{campaign_id}.json")
        with open(temp_file_json, 'w', encoding='utf-8') as f:
            json.dump(copies_data, f, ensure_ascii=False, indent=2)
            
        # 儲存CSV格式
        temp_file_csv = os.path.join(temp_dir, f"{campaign_id}.csv")
        pd.DataFrame(copies_data).to_csv(temp_file_csv, index=False, encoding='utf-8')
            
        # 寫入資料庫
        if save_to_db:
            save_copies_batch(copies_data)
            
        return True, None
        
    except Exception as e:
        error_msg = f"處理失敗: {str(e)}"
        print(error_msg)
        return False, error_msg

def save_copies_batch(copies_batch):
    """批次存入文案到 BigQuery"""
    client = bigquery.Client()
    table_id = "poc-55-genai.cathay_adai.ad_copy"
    
    # 先進行一次深拷貝，避免修改原始資料
    import copy
    copies_to_save = copy.deepcopy(copies_batch)
    
    try:
        errors = client.insert_rows_json(table_id, copies_to_save)
        if errors:
            error_msg = f"批次寫入遇到錯誤: {errors}"
            print(error_msg)
            raise Exception(error_msg)
        else:
            print(f"✓ 成功寫入 {len(copies_to_save)} 筆文案到 BigQuery")
    except Exception as e:
        print(f"❌ BigQuery 寫入錯誤: {str(e)}")
        raise

def process_campaign_batch():
    """批次處理所有活動"""
    failed_campaigns = []
    total_campaigns = len(marketing_df)
    
    print(f"\n開始處理總共 {total_campaigns} 個活動組合")
    print("每個活動將生成 3 個文案")
    print("-" * 50)
    
    for index, row in marketing_df.iterrows():
        print(f"\n開始處理第 {index + 1}/{total_campaigns} 個活動")
        print(f"產品類別: {row['產品類別']}")
        print(f"客群標籤: {row['客群標籤']}")
        
        # 檢查產品類別的黑名單
        current_blacklist = blacklist_keywords.get(row['產品類別'], [])
        if not current_blacklist:
            print(f"警告：產品類別 '{row['產品類別']}' 沒有對應的黑名單")
        else:
            print(f"黑名單關鍵字數量：{len(current_blacklist)}")
        
        success, error = process_single_campaign(row)
        
        if not success:
            failed_campaigns.append({
                "product_category": row['產品類別'],
                "customer_segment": row['客群標籤'],
                "error": error,
                "blacklist_count": len(current_blacklist)  # 新增：記錄失敗時的黑名單數量
            })
        
        print("-" * 50)
    
    return failed_campaigns

def run_ad_generation():
    """主要運行函數"""
    print("=== 國泰世華銀行 AI 行銷文案生成系統 ===")
    print(f"開始時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"總活動數: {len(marketing_df)}")
    print("每活動文案數: 3")
    print("\n黑名單設定：")
    for product, blacklist in blacklist_keywords.items():
        print(f"- {product}: {len(blacklist)} 個關鍵字")
    print("=" * 50)
    
    start_time = time.time()
    failed_campaigns = process_campaign_batch()
    
    # 計算總執行時間
    total_duration = time.time() - start_time
    
    print("\n=== 執行結果摘要 ===")
    print(f"總耗時: {total_duration:.2f} 秒")
    print(f"平均每活動耗時: {(total_duration/len(marketing_df)):.2f} 秒")
    print(f"成功活動數: {len(marketing_df) - len(failed_campaigns)}")
    print(f"失敗活動數: {len(failed_campaigns)}")
    
    if failed_campaigns:
        print("\n失敗活動清單：")
        for campaign in failed_campaigns:
            print(f"- {campaign['product_category']} - {campaign['customer_segment']}")
            print(f"  原因: {campaign['error']}")
            print(f"  黑名單數量: {campaign['blacklist_count']}")
    
    print("\n作業完成！")
    return failed_campaigns

### 文案生成

這段執行一次，觀察一下執行的結果，如果一直Gemini產出的「文字」程式碼一直無法處理，就stop it!

In [10]:
# 執行文案生成
if __name__ == "__main__":
    failed_campaigns = run_ad_generation()

=== 國泰世華銀行 AI 行銷文案生成系統 ===
開始時間: 2025-01-06 10:02:39
總活動數: 12
每活動文案數: 3

黑名單設定：
- 基金: 22 個關鍵字
- 證券: 27 個關鍵字
- 換匯: 21 個關鍵字
- ROBO智能投資: 23 個關鍵字

開始處理總共 12 個活動組合
每個活動將生成 3 個文案
--------------------------------------------------

開始處理第 1/12 個活動
產品類別: 基金
客群標籤: MY_新手爸媽
黑名單關鍵字數量：22

開始生成文案：
產品類別：基金
產品名稱：數位買基金幸福長跑手續費0元
目標客群：MY_新手爸媽
行為標籤：MY_美食族
黑名單關鍵字數量：22

=== Gemini 原始回應 ===
{{
    "ad_copy_1": "愛與未來，從0元手續費開始 invest",
    "ad_copy_2": "養兒育女，定期定額輕鬆存",
    "ad_copy_3": "孩子未來，用0元手續費投資"
}}

=== 原始回應結束 ===

=== 清理後的 JSON ===
{
    "ad_copy_1": "愛與未來，從0元手續費開始 invest",
    "ad_copy_2": "養兒育女，定期定額輕鬆存",
    "ad_copy_3": "孩子未來，用0元手續費投資"
}
=== JSON 結束 ===


嘗試 1/50:
ad_copy_1: 愛與未來，從0元手續費開始 invest
檢查文案...
✗ 超過字數限制：20 > 16
ad_copy_2: 養兒育女，定期定額輕鬆存
檢查文案...
✓ 有效文案已收集: 1/3
ad_copy_3: 孩子未來，用0元手續費投資
檢查文案...
✓ 有效文案已收集: 2/3

=== Gemini 原始回應 ===
{{
    "ad_copy_1": "養兒育女，基金0手續費，現在開始投資",
    "ad_copy_2": "為孩子存教育金，基金0手續費輕鬆存",
    "ad_copy_3": "定期定額投資基金，享0手續費優惠"
}}

=== 原始回應結束 ===

=== 清理後的 JSON ===
{
    "ad_copy_1"

#### retry 機制

從剛剛終止的地方retry

In [None]:
# 從第6個活動開始處理（跳過前5個已完成的活動）
start_index = 5  # 索引5代表第6個活動
subset_df = marketing_df.iloc[start_index:]

print(f"開始處理剩餘 {len(subset_df)} 個活動")
print(f"從第 {start_index + 1} 個活動開始")
print("-" * 50)

start_time = time.time()
failed_campaigns = []

for index, row in subset_df.iterrows():
    print(f"\n處理第 {index + 1}/{len(marketing_df)} 個活動")
    print(f"產品類別: {row['產品類別']}")
    print(f"客群標籤: {row['客群標籤']}")
    
    success, error = process_single_campaign(row)
    
    if not success:
        failed_campaigns.append({
            "product_category": row['產品類別'],
            "customer_segment": row['客群標籤'],
            "error": error
        })
    
    print("-" * 50)

total_duration = time.time() - start_time
print(f"\n完成！總耗時: {total_duration:.2f} 秒")

if failed_campaigns:
    print("\n失敗的活動：")
    for campaign in failed_campaigns:
        print(f"- {campaign['product_category']} - {campaign['customer_segment']}")

### 單次選取Campaign

先skip，需要再寫 

### 在最終表加一個欄位

In [None]:
marketing_df

In [None]:
from google.cloud import bigquery
import pandas as pd

# Initialize BigQuery Client
client = bigquery.Client()

# Read original data from BigQuery
query = """
SELECT * FROM `poc-55-genai.cathay_adai.ad_copy`
"""
old_data_df = client.query(query).to_dataframe()

# Read marketing data with targets
marketing_df = pd.read_csv('gs://cathay-adai-55/genai_copies_request_form.csv')

# Create lookup dictionary for marketing targets
marketing_targets = {}
for _, row in marketing_df.iterrows():
    key = f"{row['產品類別']}_{row['客群標籤']}"
    marketing_targets[key] = row['行銷目標描述']

# Add marketing_target to the dataframe
def get_marketing_target(row):
    key = f"{row['product_category']}_{row['customer_segment']}"
    return marketing_targets.get(key, '')

old_data_df['marketing_target'] = old_data_df.apply(get_marketing_target, axis=1)

# Define new table schema
schema = [
    bigquery.SchemaField("campaign_id", "STRING", description="活動唯一識別碼"),
    bigquery.SchemaField("product_category", "STRING", description="產品類別"),
    bigquery.SchemaField("product_name", "STRING", description="產品名稱"),
    bigquery.SchemaField("customer_segment", "STRING", description="目標客群"),
    bigquery.SchemaField("behavior_tag", "STRING", description="行為標籤"),
    bigquery.SchemaField("marketing_target", "STRING", description="行銷目標描述"),
    bigquery.SchemaField("ad_copy", "STRING", description="文案內容"),
    bigquery.SchemaField("char_count", "INTEGER", description="文案字數"),
    bigquery.SchemaField("batch_number", "INTEGER", description="批次編號"),
    bigquery.SchemaField("generated_time", "TIMESTAMP", description="生成時間"),
    bigquery.SchemaField("status", "STRING", description="文案狀態")
]

# Create new table
table_id = 'poc-55-genai.cathay_adai.ad_copy_full'
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table, exists_ok=True)

# Load data to new table
job_config = bigquery.LoadJobConfig(
    schema=schema,
    write_disposition="WRITE_TRUNCATE"
)

job = client.load_table_from_dataframe(
    old_data_df, table_id, job_config=job_config
)
job.result()  # Wait for the job to complete

print(f"Loaded {client.get_table(table_id).num_rows} rows into {table_id}")

## 建立10組文案

先建立一個空表格

```sql
CREATE OR REPLACE TABLE `poc-55-genai.cathay_adai.ad_copy_10`
(
    campaign_id STRING,
    product_category STRING,
    product_name STRING,
    product_description STRING,
    activity_info STRING,
    placement STRING,
    marketing_objective STRING,
    copy_generation_count STRING,
    marketing_landing_page STRING,
    customer_segment STRING,
    behavior_tag STRING,
    ad_copy STRING,
    char_count INT64,
    batch_number INT64,
    generated_time TIMESTAMP
)


In [6]:
def process_single_campaign_10(row, save_to_db=True):
    """處理單一活動的文案生成和存儲 - 10款版本"""
    try:
        # 驗證活動資料
        validate_campaign_data(row)
        
        # 生成文案 - 修改為10個
        campaign_copies, is_complete = generate_campaign_copies(
            row=row,
            client_df=client_df,
            behavior_df=behavior_df,
            blacklist_keywords=blacklist_keywords,
            char_limit=16,
            target_count=10,  # 改為10個
            batch_size=3      # 保持每批3個
        )
        
        if not campaign_copies:
            raise Exception("未能生成任何有效文案")
        
        if not is_complete:
            return False, f"僅生成 {len(campaign_copies)}/10 個文案"
            
        # 準備文案資料
        campaign_id = f"{row['產品類別']}_{row['客群標籤']}_{row['行為標籤']}_{datetime.now().strftime('%Y%m%d')}"
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        copies_data = []
        for i, copy in enumerate(campaign_copies):
            copy_data = {
                "campaign_id": campaign_id,
                "product_category": row['產品類別'],
                "product_name": row['產品名稱'],
                "product_description": row['產品描述'],
                "activity_info": row['活動資訊'],
                "placement": row['版位'],
                "marketing_objective": row['行銷目標描述'],
                "copy_generation_count": row['文案生成組數'],
                "marketing_landing_page": row['行銷到達頁面'],
                "customer_segment": row['客群標籤'],
                "behavior_tag": row['行為標籤'],
                "ad_copy": copy,
                "char_count": len(copy),
                "batch_number": i + 1,
                "generated_time": current_time
            }
            copies_data.append(copy_data)
        
        # 儲存到臨時檔案
        temp_dir = "cathay_adai_temp_10"  # 更改資料夾名稱以區分
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        
        # 儲存JSON格式
        temp_file_json = os.path.join(temp_dir, f"{campaign_id}.json")
        with open(temp_file_json, 'w', encoding='utf-8') as f:
            json.dump(copies_data, f, ensure_ascii=False, indent=2)
            
        # 儲存CSV格式
        temp_file_csv = os.path.join(temp_dir, f"{campaign_id}.csv")
        pd.DataFrame(copies_data).to_csv(temp_file_csv, index=False, encoding='utf-8')
            
        # 寫入資料庫 - 使用新的表格
        if save_to_db:
            save_copies_batch_10(copies_data)  # 新的儲存函數
            
        return True, None
        
    except Exception as e:
        error_msg = f"處理失敗: {str(e)}"
        print(error_msg)
        return False, error_msg

def save_copies_batch_10(copies_batch):
    """批次存入文案到 BigQuery - 10款版本"""
    client = bigquery.Client()
    table_id = "poc-55-genai.cathay_adai.ad_copy_10"  # 新的表格
    
    # 先進行一次深拷貝，避免修改原始資料
    import copy
    copies_to_save = copy.deepcopy(copies_batch)
    
    # 確保時間戳格式正確
    for row in copies_to_save:
        if isinstance(row['generated_time'], datetime):
            row['generated_time'] = row['generated_time'].strftime('%Y-%m-%d %H:%M:%S.%f %Z')
    
    try:
        errors = client.insert_rows_json(table_id, copies_to_save)
        if errors:
            error_msg = f"批次寫入遇到錯誤: {errors}"
            print(error_msg)
            raise Exception(error_msg)
        else:
            print(f"✓ 成功寫入 {len(copies_to_save)} 筆文案到 BigQuery")
    except Exception as e:
        print(f"❌ BigQuery 寫入錯誤: {str(e)}")
        raise

def process_campaign_batch_10():
    """批次處理所有活動 - 10款版本"""
    failed_campaigns = []
    total_campaigns = len(marketing_df)
    
    print(f"\n開始處理總共 {total_campaigns} 個活動組合")
    print("每個活動將生成 10 個文案")
    print("-" * 50)
    
    for index, row in marketing_df.iterrows():
        print(f"\n開始處理第 {index + 1}/{total_campaigns} 個活動")
        success, error = process_single_campaign_10(row)
        
        if not success:
            failed_campaigns.append({
                "product_category": row['產品類別'],
                "customer_segment": row['客群標籤'],
                "error": error
            })
        
        print("-" * 50)
    
    return failed_campaigns

def run_ad_generation_10():
    """主要運行函數 - 10款版本"""
    print("=== 國泰世華銀行 AI 行銷文案生成系統 (10款版本) ===")
    print(f"開始時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"總活動數: {len(marketing_df)}")
    print("每活動文案數: 10")
    print("=" * 50)
    
    start_time = time.time()
    failed_campaigns = process_campaign_batch_10()
    
    # 計算總執行時間
    total_duration = time.time() - start_time
    
    print("\n=== 執行結果摘要 ===")
    print(f"總耗時: {total_duration:.2f} 秒")
    print(f"平均每活動耗時: {(total_duration/len(marketing_df)):.2f} 秒")
    print(f"成功活動數: {len(marketing_df) - len(failed_campaigns)}")
    print(f"失敗活動數: {len(failed_campaigns)}")
    
    if failed_campaigns:
        print("\n失敗活動清單：")
        for campaign in failed_campaigns:
            print(f"- {campaign['product_category']} - {campaign['customer_segment']}")
            print(f"  原因: {campaign['error']}")
    
    print("\n作業完成！")
    return failed_campaigns

# 執行10款文案生成
if __name__ == "__main__":
    failed_campaigns = run_ad_generation_10()

=== 國泰世華銀行 AI 行銷文案生成系統 (10款版本) ===
開始時間: 2025-01-03 07:08:03
總活動數: 12
每活動文案數: 10

開始處理總共 12 個活動組合
每個活動將生成 10 個文案
--------------------------------------------------

開始處理第 1/12 個活動

開始生成文案：
產品：數位買基金幸福長跑手續費0元
目標客群：MY_新手爸媽
行為標籤：MY_美食族

=== Gemini 原始回應 ===
{{
    "ad_copy_1": "孩子未來，現在開始投資",
    "ad_copy_2": "定期定額0元起，為愛儲蓄",
    "ad_copy_3": "輕鬆養成投資習慣，立即申購"
}}
=== 原始回應結束 ===

=== 清理後的 JSON ===
{
    "ad_copy_1": "孩子未來，現在開始投資",
    "ad_copy_2": "定期定額0元起，為愛儲蓄",
    "ad_copy_3": "輕鬆養成投資習慣，立即申購"
}
=== JSON 結束 ===


嘗試 1/50:
ad_copy_1: 孩子未來，現在開始投資
ad_copy_2: 定期定額0元起，為愛儲蓄
ad_copy_3: 輕鬆養成投資習慣，立即申購
✓ 有效文案已收集: 1/10
✓ 有效文案已收集: 2/10
✓ 有效文案已收集: 3/10

=== Gemini 原始回應 ===
{{
    "ad_copy_1": "為寶貝存未來，基金0元起",
    "ad_copy_2": "教育基金輕鬆扣，手續費優惠中",
    "ad_copy_3": "數位買基金，享0元手續費"
}}
=== 原始回應結束 ===

=== 清理後的 JSON ===
{
    "ad_copy_1": "為寶貝存未來，基金0元起",
    "ad_copy_2": "教育基金輕鬆扣，手續費優惠中",
    "ad_copy_3": "數位買基金，享0元手續費"
}
=== JSON 結束 ===


嘗試 2/50:
ad_copy_1: 為寶貝存未來，基金0元起
ad_copy_2: 教育基金輕鬆扣，手續費優惠中
ad_copy_3: 數位買基金，享