# Xアカウント AI自動抽選ツール - Google Colab版

このノートブックは、Google ColabでXアカウントの抽選判定を実行するためのものです。

## 機能
- **自動抽選判定**: Grok APIを使用してアカウントの抽選可否を自動判定
- **大量データ対応**: バッチ処理により30,000件以上のアカウントを処理可能
- **詳細な判定基準**: 懸賞アカウント、美容アカウント、その他懸念などを自動判定
- **結果出力**: CSV形式で結果を出力（Excel対応）
- **Google Drive連携**: CSVファイルのアップロード/ダウンロードに対応

## 使い方
1. このノートブックをGoogle Colabで開く
2. セルを順番に実行する
3. XAI_API_KEYを設定する（セル2）
4. CSVファイルをアップロードする（セル3）
5. 抽選処理を実行する（セル4以降）

## 1. 依存パッケージのインストール

In [25]:
# 必要なパッケージをインストール
!pip install -q xai-sdk>=1.3.1 python-dotenv>=1.0.0 pydantic>=2.0.0 pandas>=2.0.0 openpyxl>=3.1.0

print("✓ パッケージのインストールが完了しました")

/bin/bash: pip: command not found
✓ パッケージのインストールが完了しました


## 2. 環境変数の設定（XAI_API_KEY）

**重要**: 以下のセルでXAI_API_KEYを設定してください。

APIキーは[xAI Console](https://console.x.ai/)で取得できます。

In [26]:
import os

# XAI_API_KEYを設定
# 方法1: 直接APIキーを入力する（簡単な方法）
api_key = "your_xai_api_key_here"  # ← ここにAPIキーを貼り付けてください
os.environ['XAI_API_KEY'] = api_key

# 方法2: 対話的に入力する（セキュリティが高い方法）
# from getpass import getpass
# api_key = getpass('XAI_API_KEYを入力してください: ')
# os.environ['XAI_API_KEY'] = api_key

print("✓ APIキーが設定されました")

✓ APIキーが設定されました


## 3. Google Driveのマウント（オプション）

Google DriveにCSVファイルを保存している場合、このセルを実行してDriveをマウントしてください。

In [27]:
# Google Driveのマウント（Google Colab環境でのみ動作）
try:
    from google.colab import drive
    drive.mount('/content/drive')
    print("✓ Google Driveがマウントされました")
except ModuleNotFoundError:
    print("⚠ Google Colab環境ではありません。Google Driveのマウントをスキップします。")
    print("   このセルはGoogle Colabで実行する場合のみ必要です。")
except Exception as e:
    print(f"⚠ Google Driveのマウントに失敗しました: {e}")
    print("   このセルはオプションです。スキップして続行できます。")

⚠ Google Colab環境ではありません。Google Driveのマウントをスキップします。
   このセルはGoogle Colabで実行する場合のみ必要です。


## 4. 必要な関数の定義

抽選判定に必要な関数を定義します。

In [28]:
import csv
import os
import json
import re
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock, local

from pydantic import BaseModel, Field

# xai-sdkのインポート
try:
    from xai_sdk import Client
    from xai_sdk.chat import user, system
    try:
        from xai_sdk.tools import x_search as x_search_func
    except ImportError:
        x_search_func = None
        print("警告: x_searchツールをインポートできませんでした。ツールなしで実行します。")
except ImportError:
    print("エラー: xai-sdkがインストールされていません。")
    raise

# クライアントプール（スレッドローカルストレージ）
_thread_local = local()

def get_client():
    """スレッドローカルなクライアントを取得（プール式）"""
    if not hasattr(_thread_local, 'client'):
        api_key = os.getenv('XAI_API_KEY')
        if api_key:
            _thread_local.client = Client(api_key=api_key)
        else:
            _thread_local.client = None
    return _thread_local.client

print("✓ 関数の定義が完了しました")

✓ 関数の定義が完了しました


In [29]:
# Pydanticスキーマ定義

class LotteryJudgement(BaseModel):
    """抽選判定結果"""
    is_prize_account: bool = Field(description="懸賞アカウントかどうか")
    is_prize_account_reason: str = Field(description="懸賞アカウント判定理由")
    is_beauty_account: bool = Field(description="美容アカウントかどうか")
    is_beauty_account_reason: str = Field(description="美容アカウント判定理由")
    is_other_concern: bool = Field(description="その他懸念があるかどうか")
    is_other_concern_reason: str = Field(description="その他懸念判定理由")
    is_teenager: bool = Field(description="10代学生かどうか")
    is_teenager_reason: str = Field(description="10代学生判定理由")
    is_male: bool = Field(description="明らかな男性かどうか")
    is_male_reason: str = Field(description="男性判定理由")
    is_female: bool = Field(description="明らかな女性かどうか")
    is_female_reason: str = Field(description="女性判定理由")
    is_eligible: bool = Field(description="最終判定（当選可能か）")
    final_reason: str = Field(description="最終判定理由")
    campaign_participation_confirmed: bool = Field(description="キャンペーン参加確認")


class LotteryAnalysis(BaseModel):
    """抽選分析結果"""
    handle: str = Field(description="Xハンドル")
    analysis_date: str = Field(description="分析日時")
    profile_summary: str = Field(description="プロフィール概要")
    recent_posts_summary: str = Field(description="直近投稿の傾向")
    judgement: LotteryJudgement = Field(description="判定結果")


class LotteryAnalysisRoot(BaseModel):
    """ルートモデル"""
    lottery_analysis: LotteryAnalysis

print("✓ スキーマ定義が完了しました")

✓ スキーマ定義が完了しました


In [30]:
# CSV読み込み関数

def extract_handle_from_url(url: str) -> Optional[str]:
    """URLからハンドル名を抽出"""
    if not url or not isinstance(url, str):
        return None
    
    url = url.strip()
    patterns = [
        r'https?://(?:x\.com|twitter\.com)/([^/?\s]+)',
        r'@?([a-zA-Z0-9_]+)',
    ]
    
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            handle = match.group(1)
            if re.match(r'^[a-zA-Z0-9_]{1,15}$', handle):
                return handle
    
    return None


def load_handles_from_csv(csv_file_path: str) -> List[str]:
    """CSVファイルから handle を読み込む"""
    handles = []
    
    # ファイルの存在確認
    import os
    if not os.path.exists(csv_file_path):
        print(f"エラー: ファイルが見つかりません: {csv_file_path}")
        print(f"現在のディレクトリ: {os.getcwd()}")
        print(f"ディレクトリ内のファイル: {os.listdir('.')}")
        return handles
    
    # 複数のエンコーディングを試す（より多くのエンコーディングを追加）
    encodings = ['utf-8-sig', 'utf-8', 'shift-jis', 'cp932', 'euc-jp', 'iso-2022-jp', 'latin-1', 'windows-1252']
    lines = None
    used_encoding = None
    
    for encoding in encodings:
        try:
            with open(csv_file_path, 'r', encoding=encoding, errors='replace') as f:
                lines = f.readlines()
                used_encoding = encoding
                print(f"✓ CSVファイルを {encoding} エンコーディングで読み込みました（{len(lines)}行）")
                break
        except UnicodeDecodeError:
            continue
        except Exception as e:
            print(f"警告: {encoding} エンコーディングで読み込み中にエラー: {e}")
            continue
    
    if lines is None:
        print(f"エラー: CSVファイルのエンコーディングを検出できませんでした")
        print(f"ファイルパス: {csv_file_path}")
        print(f"ファイルサイズ: {os.path.getsize(csv_file_path) if os.path.exists(csv_file_path) else 'N/A'} bytes")
        # バイナリモードで最初の数バイトを確認
        try:
            with open(csv_file_path, 'rb') as f:
                first_bytes = f.read(100)
                print(f"ファイルの最初の100バイト（16進数）: {first_bytes.hex()[:200]}")
        except Exception as e:
            print(f"ファイル読み込みエラー: {e}")
        return handles
    
    try:
        if len(lines) < 4:
            print(f"警告: CSVファイルの行数が不足しています（4行以上必要）")
            return handles
        
        # ヘッダー行を自動検出
        header_row_idx = None
        for i in range(3, min(len(lines), 20)):
            line = lines[i].strip()
            if 'ユーザーID' in line or 'URL' in line or 'user' in line.lower() or 'handle' in line.lower():
                header_row_idx = i
                print(f"ヘッダー行を検出: {i+1}行目")
                break
        
        if header_row_idx is None:
            header_row_idx = 3
            print(f"警告: ヘッダー行が見つからないため、4行目を使用します")
        
        header_line = lines[header_row_idx].strip()
        data_start_idx = header_row_idx + 1
        reader = csv.reader(lines[data_start_idx:])
        
        header_reader = csv.reader([header_line])
        header = next(header_reader)
        
        # 旧形式（handle列がある場合）
        if 'handle' in header or 'ハンドル' in header or 'Handle' in header:
            handle_col_idx = None
            for i, col in enumerate(header):
                if col.lower() in ['handle', 'ハンドル']:
                    handle_col_idx = i
                    break
            
            if handle_col_idx is not None:
                for row in reader:
                    if len(row) > handle_col_idx:
                        handle = row[handle_col_idx].strip().lstrip('@')
                        if handle:
                            handles.append(handle)
                print(f"旧形式で {len(handles)} 件のハンドルを読み込みました")
                return handles
        
        # 新形式: A列（ユーザーID）とB列（URL）から抽出
        user_id_col = None
        url_col = None
        
        for i, col in enumerate(header):
            col_stripped = col.strip()
            if 'ユーザーID' in col_stripped or 'ユーザーid' in col_stripped:
                user_id_col = i
                break
        
        for i, col in enumerate(header):
            col_stripped = col.strip()
            if col_stripped == 'URL' or col_stripped.lower() == 'url':
                url_col = i
                break
        
        if user_id_col is None:
            user_id_col = 0
        if url_col is None:
            url_col = 1
        
        for row_num, row in enumerate(reader, start=data_start_idx + 1):
            if len(row) <= max(user_id_col, url_col):
                continue
            
            user_id = row[user_id_col].strip() if len(row) > user_id_col else ''
            url = row[url_col].strip() if len(row) > url_col else ''
            
            handle = None
            if url:
                handle = extract_handle_from_url(url)
            
            if not handle and user_id:
                handle = user_id.lstrip('@')
                if not re.match(r'^[a-zA-Z0-9_]{1,15}$', handle):
                    handle = None
            
            if handle:
                handles.append(handle)
        
        print(f"新形式で {len(handles)} 件のハンドルを読み込みました")
        
    except Exception as e:
        print(f"エラー: CSVファイルの読み込みに失敗しました: {e}")
        import traceback
        traceback.print_exc()
    
    return handles

print("✓ CSV読み込み関数が定義されました")

✓ CSV読み込み関数が定義されました


In [31]:
# 抽選判定関数

def analyze_lottery_eligibility(xHandle: str, campaign_keyword: str = "") -> Optional[Dict[str, Any]]:
    """Grok API を使用してアカウントを判定"""
    api_key = os.getenv('XAI_API_KEY')
    if not api_key:
        print("エラー: XAI_API_KEYが環境変数に設定されていません。")
        return None
    
    try:
        client = get_client()
        if client is None:
            print("エラー: クライアントの初期化に失敗しました。")
            return None
        
        now = datetime.now()
        analysis_date_str = now.strftime("%Y-%m-%d %H:%M:%S")
        
        # デフォルトルール
        default_rules = """
#### 【懸賞アカウント判定】is_prize_account

以下いずれかに該当する場合は懸賞アカウントとして**排除（true）**:

- プロフィール（Bio/ユーザーネーム/位置情報）に「懸賞」「当選」の記載がある
- 直近10投稿すべてが懸賞系投稿・懸賞系リポスト（他人の当選報告リポストも含む）
- ピン留め投稿が懸賞系（#○○の当選報告など）

**重要**: 直近10投稿に1投稿でも通常投稿があればOK（false）
**注意**: 当該キャンペーン投稿は10投稿にカウントしない
**絶対に10ではなく、12,13投稿目に通常投稿あっても当選OK**

#### 【美容アカウント判定】is_beauty_account

以下に該当する場合は美容アカウントとして**排除（true）**:

- コスメ投稿（スキンケア・メイク・ヘアケア・ボディケア）をしている
- **ただし以下はOK（false）**: モニター・PR系投稿、試供品で熱量高い投稿、自分でやったマニキュア
- **NG要素**: 美容サプリ・カラコン、ネイルサロン、他人のコスメ投稿リポスト

#### 【その他懸念判定】is_other_concern

以下に該当する場合は**排除（true）**:

1. 公序良俗違反（モラル違反、炎上、誹謗中傷）
2. 過激投稿（性的、愚痴、死をにおわせる、ケンカ系）
3. ネガティブ投稿（病気は基本OK、賞品使用で影響ある場合NG）
4. お金配り投稿・リポスト
5. 政治的投稿・リポスト
6. 災害・犯罪系投稿・リポスト
7. 外国人（日本語投稿なし、日本在住確証なし）

**判断軸**: クライアントに「この人を当選させます」と言えるか

#### 【10代学生排除】is_teenager

プロフィールor投稿に学校関連記載（「テストが～」「ラストJK」など）で**排除（true）**
年齢記載で10代と判断できる場合も排除

#### 【性別判定】is_male / is_female

- **明らかな男性**: ユーザー名が男性名、投稿に「俺」「僕」「妻」
- **明らかな女性**: ユーザー名が女性名、投稿に「私」「夫」「旦那」
- 写真や投稿内容全体で総合判断
"""
        
        system_prompt = """あなたはXアカウントの抽選判定を行う専門家です。
提供されたツールを使用して実際のデータを取得し、厳密な判定ルールに基づいて判定を行ってください。
判定は必ずファクトベースで、投稿内容やプロフィール情報を根拠として明示してください。"""
        
        user_prompt = f"""
対象アカウント: @{xHandle}

## 分析ステップ

### 1. データ取得

以下のツールを使用してデータを取得してください：

- **x_user_search**: query='{xHandle}' でプロフィール取得（Bio、フォロワー数、位置情報、ユーザーネーム）
- **x_keyword_search**: query='from:{xHandle}', limit=15, mode='Latest' で最新15投稿取得（ピン留めは除外）

**重要**: これらのツールを使用して実際のデータを取得してから判定を行ってください。

### 2. 抽選判定ルール（厳密に従う）

{default_rules}

### 3. 最終判定

**is_eligible**: 以下の条件をすべて満たす場合のみ true

- is_prize_account = false
- is_beauty_account = false
- is_other_concern = false
- is_teenager = false
- is_male / is_female が条件に合致

## 出力形式（JSON、厳密に従う）

{{
  "lottery_analysis": {{
    "handle": "{xHandle}",
    "analysis_date": "{analysis_date_str}",
    "profile_summary": "[プロフィール概要]",
    "recent_posts_summary": "[直近投稿の傾向]",
    "judgement": {{
      "is_prize_account": false,
      "is_prize_account_reason": "[判定理由。該当する場合は具体的投稿例を引用]",
      "is_beauty_account": false,
      "is_beauty_account_reason": "[判定理由]",
      "is_other_concern": false,
      "is_other_concern_reason": "[判定理由]",
      "is_teenager": false,
      "is_teenager_reason": "[判定理由]",
      "is_male": false,
      "is_male_reason": "[判定理由]",
      "is_female": true,
      "is_female_reason": "[判定理由]",
      "is_eligible": true,
      "final_reason": "[当選可否の総合判断理由]",
      "campaign_participation_confirmed": false
    }}
  }}
}}

**重要**: 判定は必ずファクトベース。投稿内容やプロフィール情報を根拠として明示すること。
"""
        
        # xAI SDKを使用したAPI呼び出し
        try:
            tools = []
            if x_search_func is not None:
                tools = [x_search_func()]
            
            chat = client.chat.create(
                model="grok-3",
                messages=[system(system_prompt)],
                tools=tools if tools else None
            )
            
            chat.append(user(user_prompt))
            response = chat.sample()
            content = response.content
            
            if content is None:
                print(f"警告: レスポンスからコンテンツを取得できませんでした (@{xHandle})")
                return None
            
            # JSON文字列をパース
            if isinstance(content, str):
                json_match = re.search(r'\{.*\}', content, re.DOTALL)
                if json_match:
                    content = json_match.group(0)
                try:
                    data = json.loads(content)
                except json.JSONDecodeError:
                    print(f"警告: JSON形式のレスポンスを取得できませんでした (@{xHandle})")
                    return None
            else:
                data = content
            
            # Pydanticで検証
            try:
                result = LotteryAnalysisRoot(**data)
                return result.model_dump()
            except Exception as e:
                print(f"警告: Pydantic検証に失敗しました (@{xHandle}): {e}")
                return None
                
        except Exception as e:
            print(f"エラー: xAI SDK API呼び出しに失敗しました (@{xHandle}): {e}")
            return None
        
    except Exception as e:
        print(f"エラー: Grok API呼び出しに失敗しました (@{xHandle}): {e}")
        return None

print("✓ 抽選判定関数が定義されました")

✓ 抽選判定関数が定義されました


In [32]:
# CSV出力関数

def export_results_to_csv(results: List[Dict[str, Any]], output_path: str):
    """判定結果をCSV形式で出力"""
    if not results:
        print("警告: 出力する結果がありません。")
        return
    
    fieldnames = [
        'handle',
        'is_eligible',
        'is_prize_account',
        'is_beauty_account',
        'is_other_concern',
        'is_teenager',
        'is_male',
        'is_female',
        'final_reason',
        'profile_summary',
        'analysis_date'
    ]
    
    try:
        with open(output_path, 'w', encoding='utf-8-sig', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            
            for result in results:
                analysis = result.get('lottery_analysis', {})
                judgement = analysis.get('judgement', {})
                
                row = {
                    'handle': analysis.get('handle', ''),
                    'is_eligible': '当選可' if judgement.get('is_eligible') else '排除',
                    'is_prize_account': '○' if judgement.get('is_prize_account') else '',
                    'is_beauty_account': '○' if judgement.get('is_beauty_account') else '',
                    'is_other_concern': '○' if judgement.get('is_other_concern') else '',
                    'is_teenager': '○' if judgement.get('is_teenager') else '',
                    'is_male': '○' if judgement.get('is_male') else '',
                    'is_female': '○' if judgement.get('is_female') else '',
                    'final_reason': judgement.get('final_reason', ''),
                    'profile_summary': analysis.get('profile_summary', ''),
                    'analysis_date': analysis.get('analysis_date', '')
                }
                writer.writerow(row)
        
        print(f"結果をCSVファイルに出力しました: {output_path}")
        
    except Exception as e:
        print(f"エラー: CSV出力に失敗しました: {e}")

print("✓ CSV出力関数が定義されました")

✓ CSV出力関数が定義されました


## 5. CSVファイルのアップロード

以下のセルでCSVファイルをアップロードしてください。

**対応形式**:
- 旧形式: `handle`列を含むCSV
- 新形式: A列（ユーザーID）とB列（URL）が4行目以降から始まるCSV

In [33]:
# CSVファイルのアップロード（Google Colab環境でのみ動作）
import os

try:
    from google.colab import files
    
    # CSVファイルをアップロード
    uploaded = files.upload()
    
    # アップロードされたファイル名を取得
    csv_filename = list(uploaded.keys())[0] if uploaded else None
    
    if csv_filename:
        print(f"✓ ファイルがアップロードされました: {csv_filename}")
        print(f"ファイルサイズ: {len(uploaded[csv_filename])} bytes")
        print(f"保存先: {os.path.abspath(csv_filename)}")
        
        # ファイルが正しく保存されているか確認
        if os.path.exists(csv_filename):
            print(f"✓ ファイルが正しく保存されました")
        else:
            print(f"警告: ファイルが見つかりません。現在のディレクトリ: {os.getcwd()}")
    else:
        print("エラー: ファイルがアップロードされませんでした")
        csv_filename = None
        
except ModuleNotFoundError:
    print("⚠ Google Colab環境ではありません。")
    print("   ローカル環境で実行する場合は、CSVファイルのパスを直接指定してください。")
    print("   例: csv_filename = 'path/to/your/file.csv'")
    
    # ローカル環境用: CSVファイルのパスを手動で指定
    csv_filename = None  # ← ここにCSVファイルのパスを入力してください
    
    if csv_filename and os.path.exists(csv_filename):
        print(f"✓ CSVファイルが見つかりました: {csv_filename}")
    elif csv_filename:
        print(f"⚠ 警告: 指定されたファイルが見つかりません: {csv_filename}")
        csv_filename = None
    else:
        print("   現在、CSVファイルは指定されていません。")
        
except Exception as e:
    print(f"⚠ エラーが発生しました: {e}")
    csv_filename = None

⚠ Google Colab環境ではありません。
   ローカル環境で実行する場合は、CSVファイルのパスを直接指定してください。
   例: csv_filename = 'path/to/your/file.csv'
   現在、CSVファイルは指定されていません。


## 6. CSVファイルの読み込み

アップロードしたCSVファイルからハンドルを読み込みます。

In [34]:
# CSVファイルからハンドルを読み込み
import os

if csv_filename:
    # ファイルパスの確認
    print(f"読み込み対象ファイル: {csv_filename}")
    print(f"ファイルの存在確認: {os.path.exists(csv_filename)}")
    
    if os.path.exists(csv_filename):
        handles = load_handles_from_csv(csv_filename)
        print(f"\n読み込んだハンドル数: {len(handles)}")
        if len(handles) > 0:
            print(f"最初の5件: {handles[:5]}")
        else:
            print("警告: ハンドルが読み込まれませんでした。CSVファイルの形式を確認してください。")
    else:
        print(f"エラー: ファイルが見つかりません: {csv_filename}")
        print(f"現在のディレクトリ: {os.getcwd()}")
        print(f"ディレクトリ内のファイル:")
        for f in os.listdir('.'):
            if f.endswith('.csv'):
                print(f"  - {f}")
        handles = []
else:
    print("エラー: CSVファイルがアップロードされていません")
    handles = []

エラー: CSVファイルがアップロードされていません


## 7. 抽選処理の実行

以下のセルで抽選処理を実行します。

**設定可能なパラメータ**:
- `limit`: 処理件数制限（テスト用、Noneの場合は全件処理）
- `workers`: 並列処理のワーカー数（デフォルト: 50）
- `chunk_size`: 1チャンクあたりの処理件数（デフォルト: 200）
- `campaign_keyword`: キャンペーンキーワード（オプション）

In [35]:
# 処理設定
LIMIT = None  # テスト用: 10件のみ処理する場合は 10 を設定
WORKERS = 50  # 並列処理のワーカー数
CHUNK_SIZE = 200  # 1チャンクあたりの処理件数
CAMPAIGN_KEYWORD = ""  # キャンペーンキーワード（オプション）

print(f"処理設定:")
print(f"  処理件数制限: {LIMIT if LIMIT else '全件'}")
print(f"  ワーカー数: {WORKERS}")
print(f"  チャンクサイズ: {CHUNK_SIZE}")
print(f"  キャンペーンキーワード: {CAMPAIGN_KEYWORD if CAMPAIGN_KEYWORD else 'なし'}")

処理設定:
  処理件数制限: 全件
  ワーカー数: 50
  チャンクサイズ: 200
  キャンペーンキーワード: なし


In [36]:
# 抽選処理の実行
import time

if not handles:
    print("エラー: ハンドルが読み込まれていません")
else:
    # 件数制限
    if LIMIT:
        handles = handles[:LIMIT]
        print(f"処理件数制限: {LIMIT}件")
    
    total = len(handles)
    print(f"\n{'='*70}")
    print(f"高速並列処理を開始します（プール式）")
    print(f"ワーカー数: {WORKERS}, チャンクサイズ: {CHUNK_SIZE}件")
    print(f"総処理数: {total}件")
    print(f"{'='*70}\n")
    
    # 結果リストとカウンター（スレッドセーフ）
    results = []
    results_lock = Lock()
    eligible_count = 0
    excluded_count = 0
    error_count = 0
    completed_count = 0
    
    def process_handle(handle: str, index: int, total: int) -> Optional[Dict[str, Any]]:
        """1つのハンドルを処理する関数（並列処理用）"""
        nonlocal eligible_count, excluded_count, error_count, completed_count
        
        try:
            result = analyze_lottery_eligibility(handle, CAMPAIGN_KEYWORD)
            
            with results_lock:
                completed_count += 1
                
                if result:
                    results.append(result)
                    analysis = result.get('lottery_analysis', {})
                    judgement = analysis.get('judgement', {})
                    
                    if judgement.get('is_eligible'):
                        eligible_count += 1
                        if completed_count % 100 == 0 or completed_count <= 10:
                            print(f"[{completed_count}/{total}] ✓ @{handle}: 当選可")
                    else:
                        excluded_count += 1
                        if completed_count % 100 == 0 or completed_count <= 10:
                            reason = judgement.get('final_reason', '')
                            print(f"[{completed_count}/{total}] ✗ @{handle}: 排除 - {reason[:50]}...")
                else:
                    error_count += 1
                    print(f"[{completed_count}/{total}] ✗ @{handle}: 判定に失敗しました")
            
            return result
            
        except Exception as e:
            with results_lock:
                error_count += 1
                completed_count += 1
                print(f"[{completed_count}/{total}] ✗ @{handle}: エラー - {e}")
            return None
    
    # チャンクに分割して処理
    chunk_size = CHUNK_SIZE
    total_chunks = (total + chunk_size - 1) // chunk_size
    
    start_time = time.time()
    
    # ハンドルをチャンクに分割
    chunks = [handles[i:i + chunk_size] for i in range(0, total, chunk_size)]
    
    # 各チャンクを順次処理
    for chunk_idx, chunk in enumerate(chunks, 1):
        chunk_start = (chunk_idx - 1) * chunk_size + 1
        chunk_end = min(chunk_start + len(chunk) - 1, total)
        chunk_start_time = time.time()
        
        print(f"\n[チャンク {chunk_idx}/{total_chunks}] 処理開始 ({chunk_start}-{chunk_end}件目, {len(chunk)}件)")
        
        with ThreadPoolExecutor(max_workers=WORKERS) as executor:
            future_to_handle = {
                executor.submit(process_handle, handle, chunk_start + idx, total): (chunk_start + idx, handle)
                for idx, handle in enumerate(chunk)
            }
            
            for future in as_completed(future_to_handle):
                idx, handle = future_to_handle[future]
                try:
                    future.result()
                except Exception:
                    pass
        
        # チャンク処理完了後のサマリー
        chunk_elapsed = time.time() - chunk_start_time
        total_elapsed = time.time() - start_time
        avg_time_per_item = total_elapsed / completed_count if completed_count > 0 else 0
        remaining_items = total - completed_count
        estimated_remaining_time = avg_time_per_item * remaining_items if avg_time_per_item > 0 else 0
        
        print(f"[チャンク {chunk_idx}/{total_chunks}] 完了（{chunk_elapsed:.1f}秒）")
        print(f"  当選可: {eligible_count}, 排除: {excluded_count}, エラー: {error_count}")
        print(f"  進捗: {completed_count}/{total} ({completed_count*100//total}%)")
        print(f"  経過時間: {total_elapsed/60:.1f}分, 残り見積もり: {estimated_remaining_time/60:.1f}分")
        print(f"  処理速度: {completed_count/total_elapsed*60:.1f}件/分\n")
    
    # サマリー表示
    print("\n" + "="*60)
    print("処理完了")
    print("="*60)
    print(f"総処理数: {total}")
    print(f"当選可能: {eligible_count}")
    print(f"排除: {excluded_count}")
    print(f"エラー: {error_count}")
    print("="*60)

SyntaxError: no binding for nonlocal 'eligible_count' found (1622560307.py, line 29)

## 8. 結果のCSV出力

処理結果をCSVファイルに出力します。

In [None]:
# 結果をCSVに出力
if results:
    output_filename = f"lottery_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    export_results_to_csv(results, output_filename)
    print(f"\n✓ 結果ファイル: {output_filename}")
else:
    print("警告: 出力する結果がありません")

## 9. 結果ファイルのダウンロード

処理結果のCSVファイルをダウンロードします。

In [None]:
# 結果ファイルのダウンロード（Google Colab環境でのみ動作）
if results:
    try:
        from google.colab import files
        files.download(output_filename)
        print(f"✓ ファイルをダウンロードしました: {output_filename}")
    except ModuleNotFoundError:
        print(f"⚠ Google Colab環境ではありません。")
        print(f"   結果ファイルは以下のパスに保存されています: {os.path.abspath(output_filename)}")
        print(f"   ローカル環境では、このファイルを直接開くことができます。")
    except Exception as e:
        print(f"⚠ ダウンロードに失敗しました: {e}")
        print(f"   結果ファイルは以下のパスに保存されています: {os.path.abspath(output_filename)}")
else:
    print("警告: ダウンロードするファイルがありません")

## トラブルシューティング

### APIキーエラー
- XAI_API_KEYが正しく設定されているか確認してください
- [xAI Console](https://console.x.ai/)でAPIキーを確認してください

### CSV読み込みエラー
- CSVファイルがUTF-8エンコードであることを確認してください
- ファイル形式が正しいか確認してください（旧形式または新形式）

### 処理速度が遅い場合
- `WORKERS`の値を調整してください（デフォルト: 50）
- APIレート制限に達している場合は、`WORKERS`を減らしてください

### メモリ不足エラー
- `CHUNK_SIZE`を小さくしてください（デフォルト: 200）
- `LIMIT`を設定して、処理件数を制限してください