In [None]:
# 開発履歴確認用

# セル1: 共通機能 - ライブラリのインポートと共通ユーティリティ
import keepa
import pandas as pd
from datetime import datetime
import logging
import os
import yaml
from pathlib import Path
import dotenv
import traceback
import requests
import time

# 共通ユーティリティ関数 - プロジェクトルート検出
def find_project_root():
    """
    プロジェクトのルートディレクトリを検出する
    """
    # 現在のファイルの絶対パスを取得
    current_dir = os.path.abspath(os.getcwd())
    
    # 親ディレクトリを探索
    path = Path(current_dir)
    while True:
        # .gitディレクトリがあればそれをルートとみなす
        if (path / '.git').exists():
            return str(path)
        
        # プロジェクトのルートを示す他のファイル/ディレクトリの存在チェック
        if (path / 'setup.py').exists() or (path / 'README.md').exists():
            return str(path)
        
        # これ以上上の階層がない場合は現在のディレクトリを返す
        if path.parent == path:
            return str(path)
        
        # 親ディレクトリへ
        path = path.parent

# 共通ユーティリティ関数 - 設定読み込み
def load_config(root_dir, config_path=None):
    """設定ファイルを読み込む"""
    if config_path is None:
        config_path = os.path.join(root_dir, 'config', 'settings.yaml')
        
    try:
        with open(config_path, 'r', encoding='utf-8') as f:
            config = yaml.safe_load(f)
            
        # Keepa API設定の存在確認
        if 'keepa_api' not in config:
            raise ValueError("設定ファイルにkeepa_apiセクションが見つかりません")
            
        # 出力設定の初期化（なければデフォルト値を設定）
        data_dir = os.path.join(root_dir, 'data')
        if 'output' not in config['keepa_api']:
            config['keepa_api']['output'] = {
                'input_file': os.path.join(data_dir, 'sp_api_output_filtered.csv'),
                'output_file': os.path.join(data_dir, 'keepa_output.csv')
            }
        else:
            # 相対パスを絶対パスに変換
            for key in ['input_file', 'output_file']:
                if key in config['keepa_api']['output']:
                    rel_path = config['keepa_api']['output'][key]
                    if not os.path.isabs(rel_path):
                        config['keepa_api']['output'][key] = os.path.join(data_dir, rel_path)
                
        logging.info(f"設定ファイルの読み込みに成功: {config_path}")
        return config
            
    except Exception as e:
        print(f"設定ファイルの読み込みに失敗: {str(e)}")
        raise

# 共通ユーティリティ関数 - ログ設定
def setup_logging(log_dir, name_prefix="keepa_product"):
    """ログ機能のセットアップ"""
    # すでに存在するハンドラを削除（重複を防ぐため）
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
    
    # ログファイルパスの設定
    log_file = os.path.join(log_dir, f'{name_prefix}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
    
    # 基本設定
    logging.basicConfig(
        filename=log_file,
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        encoding='utf-8'
    )
    
    # コンソールにもログを出力
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    console.setFormatter(formatter)
    logging.getLogger('').addHandler(console)
    
    # ログファイルの場所を明示的に表示
    print(f"ログファイル: {log_file}")
    logging.info(f"ログ機能の初期化が完了しました: {log_file}")
    
    return log_file

# 共通ユーティリティ関数 - 安全なデータ取得
def safe_get(data, *keys, default=None):
    """基本的なデータ取得用のヘルパー関数"""
    for key in keys:
        try:
            data = data[key]
        except (KeyError, TypeError, IndexError):
            return default
    return data

In [74]:
# セル2: KeepaSellerInfoクラスの定義と初期化メソッド
class KeepaSellerInfo:
    """
    Keepa APIを使用してセラー情報を取得・探索するクラス
    
    このクラスは以下の機能を提供します:
    - セラーIDからセラー情報の取得
    - FBA出品有無、評価件数などの抽出
    - 結果のCSVファイル出力
    """
    
    def __init__(self, config_path=None):
        """
        KeepaSellerInfoの初期化
        
        Parameters:
            config_path (str): 設定ファイルのパス（指定なしの場合はデフォルト値を使用）
        """
        # プロジェクトルートディレクトリの検出
        self.root_dir = find_project_root()
        
        # 環境変数の読み込み
        dotenv.load_dotenv(os.path.join(self.root_dir, '.env'))
        
        # ディレクトリパスの設定
        self.data_dir = os.path.join(self.root_dir, 'data')
        self.log_dir = os.path.join(self.root_dir, 'logs')
        
        # ディレクトリが存在しない場合は作成
        os.makedirs(self.data_dir, exist_ok=True)
        os.makedirs(self.log_dir, exist_ok=True)
        
        # 設定ファイルの読み込み
        self.config = load_config(self.root_dir, config_path)
        
        # 環境変数から認証情報を取得して設定ファイルにマージ
        self._merge_env_variables()
        
        # Keepa APIキーの設定
        self.api_key = self.config['keepa_api']['api_key']
        
        # API URLの設定
        self.api_url = "https://api.keepa.com"
        
        # ログ設定
        setup_logging(self.log_dir, "keepa_seller")
        
        # API制限用のカウンター
        self.token_counter = 0
        self.last_request_time = None
    
    def _merge_env_variables(self):
        """環境変数から認証情報を取得し、設定ファイルにマージする"""
        # Keepa APIキーを環境変数から取得
        api_key = os.getenv('KEEPA_API_KEY')
        
        if api_key:
            self.config['keepa_api']['api_key'] = api_key
            print("Keepa APIキーを環境変数から設定しました")
        else:
            # 環境変数に設定されていない場合は設定ファイルの値を使用
            print("環境変数からKeepa APIキーが取得できません。設定ファイルの値を使用します。")
        
        # seller_info 設定の初期化（なければデフォルト値を設定）
        if 'keepa_seller' not in self.config:
            self.config['keepa_seller'] = {}
                
        if 'output' not in self.config['keepa_seller']:
            self.config['keepa_seller']['output'] = {
                'input_file': 'seller_ids.csv',
                'output_file': 'keepa_seller_output.csv'
            }

In [76]:
# セル3: API通信関連メソッド
class KeepaSellerInfo(KeepaSellerInfo):  # クラスの継続
    def _check_api_tokens(self, required_tokens=1):
        """
        API制限をチェックし、必要なトークン数が利用可能かどうかを確認する
        
        Parameters:
            required_tokens (int): 必要なトークン数
            
        Returns:
            bool: トークンが利用可能な場合はTrue
        """
        # 前回のリクエストから経過した時間を計算
        current_time = time.time()
        if self.last_request_time:
            elapsed_seconds = current_time - self.last_request_time
            # 1分あたり100トークンが回復する（Keepa APIの仕様）
            # 1秒あたり約1.67トークン
            recovered_tokens = int(elapsed_seconds * 1.67)
            self.token_counter = max(0, self.token_counter - recovered_tokens)
        
        # 現在のトークン数を取得
        try:
            response = requests.get(
                f"{self.api_url}/token",
                params={"key": self.api_key}
            )
            
            if response.status_code == 200:
                data = response.json()
                self.token_counter = data.get('tokensLeft', 0)
                logging.info(f"残りトークン数: {self.token_counter}")
                
                # 十分なトークンがあるかチェック
                if self.token_counter < required_tokens:
                    wait_time = int((required_tokens - self.token_counter) / 1.67) + 1
                    logging.warning(f"トークン不足。{wait_time}秒待機します...")
                    time.sleep(wait_time)
                    # 再チェック
                    return self._check_api_tokens(required_tokens)
                
                self.last_request_time = current_time
                return True
            else:
                logging.error(f"トークン取得エラー: {response.status_code} - {response.text}")
                return False
                
        except Exception as e:
            logging.error(f"トークンチェックエラー: {str(e)}")
            return False

    def get_seller_info(self, seller_id):
        """
        Keepa APIを使用してセラー情報を取得する
        
        Parameters:
            seller_id (str): セラーID
            
        Returns:
            dict: セラー情報のデータ
        """
        # APIトークンのチェック（セラー情報取得には1トークン必要）
        if not self._check_api_tokens(1):
            logging.error(f"API制限により処理を中断します: {seller_id}")
            return None
        
        try:
            # Seller APIリクエスト
            params = {
                "key": self.api_key,
                "domain": 5,  # 5 = amazon.co.jp
                "seller": seller_id
                # オプションで storefront=1 を追加すると、出品ASIN一覧も取得可能（+9トークン消費）
                # "storefront": 1
            }
            
            logging.info(f"セラー情報リクエスト開始: {seller_id}")
            response = requests.get(f"{self.api_url}/seller", params=params)
            
            # レスポンスをチェック
            if response.status_code == 200:
                data = response.json()
                logging.info(f"セラー情報取得成功: {seller_id}")
                return data
            else:
                logging.error(f"セラー情報取得エラー: {response.status_code} - {response.text}")
                return None
                
        except Exception as e:
            logging.error(f"セラー情報取得中に例外が発生: {str(e)}")
            logging.error(traceback.format_exc())
            return None

In [78]:
# セル4: データ処理メソッド
class KeepaSellerInfo(KeepaSellerInfo):  # クラスの継続
    def extract_seller_data(self, seller_data, seller_id):
        """
        セラーデータから必要な情報を抽出する
        
        Parameters:
            seller_data (dict): APIから取得したセラーデータ
            seller_id (str): セラーID
            
        Returns:
            dict: 抽出したセラー情報
        """
        # データがない場合のデフォルト値を設定
        default_data = {
            'セラーID': seller_id,
            'セラー名': '不明',
            '出品商品数': 0,
            '総評価件数': 0,
            '肯定的な評価': 0,
            'FBA出品': 'データなし',
            'ストアURL': f"https://www.amazon.co.jp/sp?seller={seller_id}",
            '評価更新日': '',
            '中国出荷': 'データなし'
        }
    
        # セラーデータのチェック
        if not seller_data or 'sellers' not in seller_data or not seller_data['sellers']:
            logging.warning(f"セラー情報がありません: {seller_id}")
            return default_data
        
        # セラー情報を取得 - APIの応答形式に合わせて修正
        # sellers は辞書で、キーがセラーID
        seller_info = seller_data['sellers'].get(seller_id)
        
        # セラー情報が見つからない場合
        if not seller_info:
            logging.warning(f"セラー情報が見つかりません: {seller_id}")
            return default_data
        
        # 肯定的な評価の割合を取得（配列の4番目の要素を使用）
        positive_rating = 0
        if 'positiveRating' in seller_info:
            if isinstance(seller_info['positiveRating'], list) and len(seller_info['positiveRating']) >= 4:
                positive_rating = seller_info['positiveRating'][3]  # 全期間の肯定的な割合（インデックス3）
            elif isinstance(seller_info['positiveRating'], (int, float)):
                positive_rating = seller_info['positiveRating']
        
        # 基本情報を抽出
        extracted_data = {
            'セラーID': seller_id,
            'セラー名': seller_info.get('sellerName', '不明'),
            '出品商品数': 0,  # デフォルト値（後で更新）
            '総評価件数': self._get_rating_count(seller_info),
            '肯定的な評価': positive_rating,
            'FBA出品': 'あり' if seller_info.get('hasFBA', 0) == 1 else 'なし',
            'ストアURL': f"https://www.amazon.co.jp/sp?seller={seller_id}",
            '評価更新日': self._format_keepa_time(seller_info.get('lastRatingUpdate', 0)),
            '中国出荷': 'あり' if seller_info.get('shipsFromChina', False) else 'なし'
        }
        
        # 出品数情報を追加（あれば）
        if 'totalStorefrontAsins' in seller_info:
            if isinstance(seller_info['totalStorefrontAsins'], list) and len(seller_info['totalStorefrontAsins']) > 1:
                extracted_data['出品商品数'] = seller_info['totalStorefrontAsins'][1]
            else:
                extracted_data['出品商品数'] = seller_info['totalStorefrontAsins']
        
        return extracted_data
    
    def _get_rating_count(self, seller_info):
        """
        評価件数を取得する
        
        Parameters:
            seller_info (dict): セラー情報
            
        Returns:
            int: 評価件数
        """
        # ratingCount が存在する場合は、その値を返す
        if 'ratingCount' in seller_info:
            rating_count = seller_info['ratingCount']
            # ratingCount が配列の場合は、4番目の要素を使用（0から数えて3番目）
            if isinstance(rating_count, list) and len(rating_count) > 3:
                return rating_count[3]
            # 単一の数値の場合はそのまま返す
            if isinstance(rating_count, (int, float)):
                return rating_count
        
        # feedbackCount が存在する場合は、その値を返す
        if 'feedbackCount' in seller_info:
            return seller_info['feedbackCount']
        
        # どちらもない場合は0を返す
        return 0

    def _format_keepa_time(self, keepa_time):
        """
        Keepaのタイムスタンプをフォーマットする
        
        Parameters:
            keepa_time (int): Keepaのタイムスタンプ（分単位）
            
        Returns:
            str: フォーマットした日時文字列
        """
        if not keepa_time:
            return ''
            
        try:
            # Keepaの時間はUnixエポックからの分数で表される
            # Keepaのベースタイム（2011年1月1日）をミリ秒に変換
            keepa_base_time = 1293840000  # 2011年1月1日の秒数
            unix_time = (keepa_time * 60) + keepa_base_time
            dt = datetime.fromtimestamp(unix_time)
            return dt.strftime('%Y-%m-%d %H:%M')
        except Exception as e:
            logging.error(f"日付変換エラー: {str(e)}")
            return str(keepa_time)

In [80]:
# セル5: ファイル入出力メソッド
class KeepaSellerInfo(KeepaSellerInfo):  # クラスの継続
    def load_seller_ids(self, input_file=None):
        """
        セラーIDリストを読み込む
        
        Parameters:
            input_file (str): 入力ファイルのパス（指定なしの場合は設定ファイルから読み込み）
            
        Returns:
            list: セラーIDのリスト
        """
        # 入力ファイル名の設定
        if input_file is None:
            input_file = os.path.join(
                self.data_dir, 
                self.config['keepa_seller']['output']['input_file']
            )
            
        seller_ids = []
        
        try:
            # CSVファイルの存在確認
            if not os.path.exists(input_file):
                error_msg = f"入力ファイルが見つかりません: {input_file}"
                logging.error(error_msg)
                raise FileNotFoundError(error_msg)
                
            # CSVファイルからセラーIDを読み込み
            df = pd.read_csv(input_file, encoding='utf-8-sig')
            
            # セラーID列の特定
            seller_column = None
            
            # まず「セラーID」という名前の列を探す
            if 'セラーID' in df.columns:
                seller_column = 'セラーID'
            else:
                # 「セラーID」がなければ、seller や id を含む列名を探す
                for col in df.columns:
                    if 'seller' in col.lower() or 'id' in col.lower():
                        seller_column = col
                        break
                    
            if not seller_column:
                # 最初の列をセラーID列として使用
                seller_column = df.columns[0]
                logging.warning(f"セラーID列が特定できなかったため、最初の列 '{seller_column}' を使用します")
            
            # セラーIDリストの取得
            seller_ids = df[seller_column].dropna().unique().tolist()
            logging.info(f"{len(seller_ids)}件のセラーIDを読み込みました")
            print(f"📝 {len(seller_ids)}件のセラーIDを読み込みました（列名: {seller_column}）")
            
            return seller_ids
            
        except Exception as e:
            error_msg = f"セラーIDの読み込み中にエラーが発生: {str(e)}"
            logging.error(error_msg)
            raise

    def save_to_csv(self, data, output_file=None):
        """
        データをCSVファイルに保存する
        
        Parameters:
            data (list): 保存するデータ（辞書のリスト）
            output_file (str): 出力ファイルのパス（指定なしの場合は設定ファイルから読み込み）
        """
        try:
            if not data:
                logging.warning("保存するデータがありません")
                return
                
            # 出力ファイル名の設定
            if output_file is None:
                output_file = os.path.join(
                    self.data_dir, 
                    self.config['keepa_seller']['output']['output_file']
                )
                
            # DataFrameに変換
            df = pd.DataFrame(data)
            
            # CSVに保存
            df.to_csv(output_file, index=False, encoding='utf-8-sig')
            logging.info(f"データを保存しました: {output_file} ({len(df)}件)")
            print(f"✅ {len(df)}件のデータを {output_file} に保存しました")
            
        except Exception as e:
            error_msg = f"データの保存中にエラーが発生: {str(e)}"
            logging.error(error_msg)
            print(f"❌ {error_msg}")
            raise

In [82]:
# セル6: メイン処理メソッド
class KeepaSellerInfo(KeepaSellerInfo):  # クラスの継続
    def explore_all_sellers(self, input_file=None, output_file=None):
        """
        すべてのセラーを探索する
        
        Parameters:
            input_file (str): 入力ファイルのパス（指定なしの場合は設定ファイルから読み込み）
            output_file (str): 出力ファイルのパス（指定なしの場合は設定ファイルから読み込み）
            
        Returns:
            int: 処理したセラーの数
        """
        # 実行時間測定開始
        start_time = time.time()
        
        # セラーIDの読み込み
        seller_ids = self.load_seller_ids(input_file)
        
        # セラー情報の取得
        all_seller_data = []
        
        print(f"全{len(seller_ids)}件のセラー情報取得を開始します...")
        
        for i, seller_id in enumerate(seller_ids, 1):
            try:
                print(f"セラー {i}/{len(seller_ids)}: {seller_id} の情報取得中...")
                
                # セラー情報の取得
                seller_data = self.get_seller_info(seller_id)
                
                # データの抽出
                extracted_data = self.extract_seller_data(seller_data, seller_id)
                
                # 結果の追加
                all_seller_data.append(extracted_data)
                
                # APIの負荷軽減のため少し待機
                time.sleep(0.5)
                
                print(f"セラー {seller_id} の情報取得に成功しました")
                
            except Exception as e:
                logging.error(f"セラー {seller_id} の処理中にエラーが発生: {str(e)}")
                logging.error(traceback.format_exc())
                continue
        
        # 結果の保存
        if all_seller_data:
            self.save_to_csv(all_seller_data, output_file)
        
        # 実行時間計測終了
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        print(f"\n===== 処理完了 - 合計 {len(all_seller_data)} 件 =====")
        print(f"実行時間: {elapsed_time:.2f} 秒")
        
        return len(all_seller_data)

In [84]:
# セル7: 実行コード
if __name__ == "__main__":
    try:
        # KeepaSellerInfoのインスタンスを作成
        explorer = KeepaSellerInfo()
        
        # すべてのセラーを探索
        explorer.explore_all_sellers()
        
    except Exception as e:
        print(f"エラーが発生しました: {str(e)}")
        traceback.print_exc()

2025-03-12 15:33:57,648 - INFO - 設定ファイルの読み込みに成功: C:\Users\inato\Documents\amazon-research\config\settings.yaml
2025-03-12 15:33:57,649 - INFO - ログ機能の初期化が完了しました: C:\Users\inato\Documents\amazon-research\logs\keepa_seller_20250312_153357.log
2025-03-12 15:33:57,655 - INFO - 3件のセラーIDを読み込みました


Keepa APIキーを環境変数から設定しました
ログファイル: C:\Users\inato\Documents\amazon-research\logs\keepa_seller_20250312_153357.log
📝 3件のセラーIDを読み込みました（列名: セラーI）
全3件のセラー情報取得を開始します...
セラー 1/3: A2LN80EYNTOLCG の情報取得中...


2025-03-12 15:33:58,457 - INFO - 残りトークン数: 300
2025-03-12 15:33:58,457 - INFO - セラー情報リクエスト開始: A2LN80EYNTOLCG
2025-03-12 15:33:59,238 - INFO - セラー情報取得成功: A2LN80EYNTOLCG


セラー A2LN80EYNTOLCG の情報取得に成功しました
セラー 2/3: A1OBH3OMSF3I6W の情報取得中...


2025-03-12 15:34:00,546 - INFO - 残りトークン数: 299
2025-03-12 15:34:00,546 - INFO - セラー情報リクエスト開始: A1OBH3OMSF3I6W
2025-03-12 15:34:01,335 - INFO - セラー情報取得成功: A1OBH3OMSF3I6W


セラー A1OBH3OMSF3I6W の情報取得に成功しました
セラー 3/3: A19X7E2Z5AKALT の情報取得中...


2025-03-12 15:34:02,639 - INFO - 残りトークン数: 299
2025-03-12 15:34:02,639 - INFO - セラー情報リクエスト開始: A19X7E2Z5AKALT
2025-03-12 15:34:03,439 - INFO - セラー情報取得成功: A19X7E2Z5AKALT
2025-03-12 15:34:03,954 - INFO - データを保存しました: C:\Users\inato\Documents\amazon-research\data\keepa_seller_output.csv (3件)


セラー A19X7E2Z5AKALT の情報取得に成功しました
✅ 3件のデータを C:\Users\inato\Documents\amazon-research\data\keepa_seller_output.csv に保存しました

===== 処理完了 - 合計 3 件 =====
実行時間: 6.30 秒
