<a href="https://colab.research.google.com/github/railman-misaka/twikit/blob/main/250101%E3%80%90ReBest%E6%A7%98_%E3%83%AA%E3%83%97%E3%83%A9%E3%82%A4%E6%A4%9C%E7%B4%A2%E3%80%91.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#セットアップ

In [1]:
# 必要なライブラリのインストール
!pip install twikit pandas openpyxl

# 必要なインポート
import os
from google.colab import drive
from google.colab import files
import json
import pandas as pd
from datetime import datetime
from twikit import Client
import asyncio
from collections import Counter
from google.colab import output

# Google Driveのマウント
drive.mount('/content/drive')

# ディレクトリ設定
WORK_DIR = '/content/drive/MyDrive/Twitter_Analysis'
COOKIE_DIR = f"{WORK_DIR}/twitter_json"
RESULTS_DIR = f"{WORK_DIR}/profile_results"

# ディレクトリ作成
os.makedirs(COOKIE_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

print("セットアップ完了！")
print(f"Cookie path: {COOKIE_DIR}/cookie_edit.json")
print(f"Results directory: {RESULTS_DIR}")

# クッキーファイルのアップロード
def upload_cookie_file():
    print("cookie_edit.jsonファイルをアップロードしてください")
    uploaded = files.upload()

    if 'cookie_edit.json' in uploaded:
        with open(f"{COOKIE_DIR}/cookie_edit.json", 'wb') as f:
            f.write(uploaded['cookie_edit.json'])
        print("クッキーファイルを保存しました")
        return True
    else:
        print("cookie_edit.jsonファイルがアップロードされませんでした")
        return False

upload_cookie_file()

Collecting twikit
  Downloading twikit-2.2.0-py3-none-any.whl.metadata (4.3 kB)
Collecting filetype (from twikit)
  Downloading filetype-1.2.0-py2.py3-none-any.whl.metadata (6.5 kB)
Collecting pyotp (from twikit)
  Downloading pyotp-2.9.0-py3-none-any.whl.metadata (9.8 kB)
Collecting socksio==1.* (from httpx[socks]->twikit)
  Downloading socksio-1.0.0-py3-none-any.whl.metadata (6.1 kB)
Downloading twikit-2.2.0-py3-none-any.whl (78 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.2/78.2 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading filetype-1.2.0-py2.py3-none-any.whl (19 kB)
Downloading socksio-1.0.0-py3-none-any.whl (12 kB)
Downloading pyotp-2.9.0-py3-none-any.whl (13 kB)
Installing collected packages: filetype, socksio, pyotp, twikit
Successfully installed filetype-1.2.0 pyotp-2.9.0 socksio-1.0.0 twikit-2.2.0
Mounted at /content/drive
セットアップ完了！
Cookie path: /content/drive/MyDrive/Twitter_Analysis/twitter_json/cookie_edit.json
Results directory: /c

Saving cookie_edit.json to cookie_edit.json
クッキーファイルを保存しました


True

# リプライ検索

## セットアップ

In [5]:
# セル2: 定義用コード

class TwitterProfileAnalyzer:
    def __init__(self):
        self.client = Client(language='en-US')
        self.cookie_path = os.path.join(COOKIE_DIR, "cookie_edit.json")
        self.results_dir = RESULTS_DIR

    async def setup(self):
        try:
            with open(self.cookie_path, 'r', encoding='utf-8') as file:
                cookies = json.load(file)
            self.client.set_cookies(cookies)
            print("認証に成功しました！")
            return True
        except Exception as e:
            print(f"認証エラー: {e}")
            return False

    async def analyze_user_replies(self, screen_name, tweets_to_analyze=200):
        """指定したユーザーのツイートから、リプライを分析する"""
        try:
            # ユーザー情報を取得
            target_user = await self.client.get_user_by_screen_name(screen_name)
            print(f"{screen_name}のツイートを分析中...")

            # リプライしているユーザーをスクリーンネームベースで追跡
            reply_counter = Counter()
            reply_users = {}

            # ツイートを取得（リプライを含む）
            print("ツイートを取得中...")
            results = await self.client.get_user_tweets(
                target_user.id,
                tweet_type='Replies',  # Repliesタイプに変更
                count=min(tweets_to_analyze, 100)
            )

            analyzed_count = 0
            while results and analyzed_count < tweets_to_analyze:
                for tweet in results:
                    try:
                        # リプライ先のツイートテキストを解析
                        if hasattr(tweet, 'text') and tweet.text.startswith('@'):
                            # @ユーザー名を抽出
                            mentioned_users = [
                                word[1:] for word in tweet.text.split()
                                if word.startswith('@')
                            ]

                            for reply_to in mentioned_users:
                                if reply_to and reply_to.lower() != screen_name.lower():
                                    try:
                                        if reply_to not in reply_users:
                                            # スクリーンネームからユーザー情報を取得
                                            try:
                                                reply_user = await self.client.get_user_by_screen_name(reply_to)
                                                reply_users[reply_to] = reply_user
                                                print(f"ユーザー {reply_to} の情報を取得しました")
                                            except Exception as e:
                                                print(f"ユーザー {reply_to} の取得エラー: {e}")
                                                continue
                                        reply_counter[reply_to] += 1
                                    except Exception as e:
                                        print(f"ユーザー {reply_to} の情報取得をスキップ: {e}")
                                        continue
                    except Exception as e:
                        print(f"ツイート解析エラー: {e}")
                        continue

                    analyzed_count += 1
                    if analyzed_count % 20 == 0:
                        print(f"{analyzed_count}件のツイートを分析済み")

                if analyzed_count < tweets_to_analyze and results.next_cursor:
                    try:
                        results = await results.next()
                    except Exception as e:
                        print(f"追加ツイート取得エラー: {e}")
                        break
                else:
                    break

            print(f"\n分析完了: {analyzed_count}件のツイートを処理")
            print(f"リプライ先ユーザー数: {len(reply_users)}人")

            return reply_counter, reply_users

        except Exception as e:
            print(f"分析エラー: {e}")
            return Counter(), {}

    async def get_user_profile(self, user):
        """ユーザーのプロフィール情報を取得する"""
        try:
            # プロフィール情報を辞書形式で整理
            profile_data = {
                'user_id': user.id,                      # ユーザーID
                'name': user.name,                       # 表示名
                'screen_name': user.screen_name,         # @ユーザー名
                'description': user.description,         # プロフィール文
                'location': user.location,               # 場所
                'followers_count': user.followers_count, # フォロワー数
                'following_count': user.following_count, # フォロー数
                'tweets_count': user.statuses_count,     # ツイート数
                'created_at': user.created_at,           # アカウント作成日
                'profile_image_url': user.profile_image_url  # プロフィール画像URL
            }
            return profile_data
        except Exception as e:
            print(f"プロフィール取得エラー: {e}")
            return None

    async def get_user_tweets(self, user, count=1):
        """ユーザーの投稿を取得する"""
        try:
            tweets = []
            results = await user.get_tweets(tweet_type='Tweets', count=count)

            for tweet in results:
                tweet_data = {
                    'tweet_id': tweet.id,                      # ツイートID
                    'text': tweet.text,                        # ツイート本文
                    'created_at': tweet.created_at,            # 投稿日時
                    'retweet_count': tweet.retweet_count,      # リツイート数
                    'like_count': tweet.favorite_count,        # いいね数
                    'reply_count': tweet.reply_count,          # 返信数
                    'is_retweet': bool(tweet.retweeted_tweet), # リツイートかどうか
                    'is_quote': tweet.is_quote_status          # 引用ツイートかどうか
                }
                tweets.append(tweet_data)

            return tweets
        except Exception as e:
            print(f"ツイート取得エラー: {e}")
            return []

    def save_results(self, frequent_repliers_data, target_screen_name):
        """分析結果をJSONファイルとして保存する"""
        try:
            current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = os.path.join(
                self.results_dir,
                f"analysis_{target_screen_name}_{current_time}.json"
            )

            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(frequent_repliers_data, f, ensure_ascii=False, indent=2)
            print(f"分析結果を保存しました: {filename}")
            return filename
        except Exception as e:
            print(f"保存エラー: {e}")
            return None

    def save_to_excel(self, frequent_repliers_data, target_screen_name):
        """分析結果をExcelファイルとして保存"""
        try:
            # データフレーム用のリストを作成
            rows = []
            for user_data in frequent_repliers_data:
                profile = user_data['profile']
                tweets = user_data['recent_tweets']

                # 最新のツイート3件を結合
                recent_tweets_text = "\n".join(
                    [tweet['text'] for tweet in tweets[:3]]
                ) if tweets else ""

                # 1行のデータとして整形
                row = {
                    '分析対象ユーザー': target_screen_name,
                    '分析日時': datetime.now().strftime("%Y-%m-%d %H:%M"),
                    'ユーザー名': profile['name'],
                    'ユーザーID': f"@{profile['screen_name']}",
                    'プロフィール文': profile['description'],
                    'アカウント作成日': profile['created_at'],
                    'リプライ数': user_data['reply_count'],
                    'フォロワー数': profile['followers_count'],
                    'フォロー数': profile['following_count'],
                    'ツイート数': profile['tweets_count'],
                    '場所': profile['location'],
                    'プロフィール画像URL': profile['profile_image_url'],
                    'アカウントURL': f"https://twitter.com/{profile['screen_name']}",
                    '最近のツイート': recent_tweets_text
                }
                rows.append(row)

            # DataFrameを作成
            df = pd.DataFrame(rows)

            # Excelファイル名を生成
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            excel_file = os.path.join(
                self.results_dir,
                f"リプライ分析_{target_screen_name}_{timestamp}.xlsx"
            )

            # Excelファイルとして保存
            with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
                df.to_excel(writer, sheet_name='リプライ分析結果', index=False)

                # 列幅の自動調整
                worksheet = writer.sheets['リプライ分析結果']
                for idx, col in enumerate(df.columns):
                    max_length = max(
                        df[col].astype(str).apply(len).max(),
                        len(str(col))
                    )
                    # 最大幅を50文字に制限
                    adjusted_width = min(max_length + 2, 50)
                    worksheet.column_dimensions[chr(65 + idx)].width = adjusted_width

                # 行の高さを調整（最近のツイート用）
                for row in range(2, len(df) + 2):  # Excelは1始まりでヘッダーがあるため+2
                    worksheet.row_dimensions[row].height = 60

            print(f"\nExcelファイルを保存しました: {excel_file}")
            return excel_file

        except Exception as e:
            print(f"Excelファイルの保存中にエラーが発生しました: {e}")
            return None

    async def run_analysis(self):
        if not await self.setup():
            return

        # ユーザー入力を対話的に取得
        target_user = input("分析対象のユーザー名を入力してください（@を除く）: ").strip()
        if target_user.startswith('@'):
            target_user = target_user[1:]

        try:
            min_replies = int(input("最小リプライ数を入力してください（例: 3）: ").strip())
        except ValueError:
            print("無効な数値が入力されました。デフォルト値 3 を使用します。")
            min_replies = 3

        try:
            tweets_to_analyze = int(input("分析するツイート数を入力してください（例: 200）: ").strip())
        except ValueError:
            print("無効な数値が入力されました。デフォルト値 200 を使用します。")
            tweets_to_analyze = 200

        print(f"\n{target_user}のリプライを分析します...")
        print(f"- 分析対象ツイート数: {tweets_to_analyze}")
        print(f"- 最小リプライ数: {min_replies}")

        # リプライを分析
        reply_counter, reply_users = await self.analyze_user_replies(target_user, tweets_to_analyze)

        if not reply_counter:
            print("\nリプライが見つかりませんでした。")
            return

        # 頻繁にリプライしているユーザーの情報を収集
        frequent_repliers_data = []
        print("\nリプライの多いユーザーの情報を収集中...")

        for screen_name, reply_count in reply_counter.most_common():
            if reply_count >= min_replies and screen_name in reply_users:
                try:
                    user = reply_users[screen_name]
                    profile_data = await self.get_user_profile(user)
                    tweets = await self.get_user_tweets(user, count=3)

                    if profile_data:
                        user_data = {
                            'profile': profile_data,
                            'reply_count': reply_count,
                            'recent_tweets': tweets
                        }
                        frequent_repliers_data.append(user_data)
                        print(f"@{screen_name}の情報を取得しました（リプライ数: {reply_count}）")
                except Exception as e:
                    print(f"@{screen_name}の情報取得に失敗: {e}")
                    continue

        # 結果を保存
        if frequent_repliers_data:
            self.save_results(frequent_repliers_data, target_user)
            self.save_to_excel(frequent_repliers_data, target_user)
            print("\n分析が完了しました！")
        else:
            print("\n分析対象となるユーザーが見つかりませんでした。")


分析対象ユーザーが送信したリプライの回数をカウント。
送信したリプライの文章を収集。

In [None]:
# セル2: 定義用コード

import asyncio
import json
import os
from collections import Counter
from datetime import datetime

import pandas as pd
from twikit import Client


class TwitterProfileAnalyzer:
    def __init__(self):
        self.client = Client(language='en-US')
        self.cookie_path = os.path.join(COOKIE_DIR, "cookie_edit.json")
        self.results_dir = RESULTS_DIR

    async def setup(self):
        try:
            with open(self.cookie_path, 'r', encoding='utf-8') as file:
                cookies = json.load(file)
            self.client.set_cookies(cookies)
            print("認証に成功しました！")
            return True
        except Exception as e:
            print(f"認証エラー: {e}")
            return False

    async def analyze_user_replies(self, screen_name, tweets_to_analyze=200):
        """指定したユーザーのツイートから、リプライを分析する"""
        try:
            # ユーザー情報を取得
            target_user = await self.client.get_user_by_screen_name(screen_name)
            print(f"{screen_name}のツイートを分析中...")

            # リプライしているユーザーをスクリーンネームベースで追跡
            reply_counter = Counter()
            reply_users = {}

            # 分析対象ユーザーが送信したリプライのカウントとテキスト
            sent_replies_counter = 0
            sent_replies_texts = []

            # ツイートを取得（リプライを含む）
            print("ツイートを取得中...")
            results = await self.client.get_user_tweets(
                target_user.id,
                tweet_type='Replies',  # Repliesタイプに変更
                count=min(tweets_to_analyze, 100)
            )

            analyzed_count = 0
            while results and analyzed_count < tweets_to_analyze:
                for tweet in results:
                    try:
                        # リプライ先のツイートテキストを解析
                        if hasattr(tweet, 'text') and tweet.text.startswith('@'):
                            # @ユーザー名を抽出
                            mentioned_users = [
                                word[1:] for word in tweet.text.split()
                                if word.startswith('@')
                            ]

                            for reply_to in mentioned_users:
                                if reply_to and reply_to.lower() != screen_name.lower():
                                    try:
                                        if reply_to not in reply_users:
                                            # スクリーンネームからユーザー情報を取得
                                            try:
                                                reply_user = await self.client.get_user_by_screen_name(reply_to)
                                                reply_users[reply_to] = reply_user
                                                print(f"ユーザー {reply_to} の情報を取得しました")
                                            except Exception as e:
                                                print(f"ユーザー {reply_to} の取得エラー: {e}")
                                                continue
                                        reply_counter[reply_to] += 1
                                    except Exception as e:
                                        print(f"ユーザー {reply_to} の情報取得をスキップ: {e}")
                                        continue

                            # 送信したリプライのカウントとテキストを収集
                            sent_replies_counter += 1
                            sent_replies_texts.append(tweet.text)

                    except Exception as e:
                        print(f"ツイート解析エラー: {e}")
                        continue

                    analyzed_count += 1
                    if analyzed_count % 20 == 0:
                        print(f"{analyzed_count}件のツイートを分析済み")

                if analyzed_count < tweets_to_analyze and results.next_cursor:
                    try:
                        results = await results.next()
                    except Exception as e:
                        print(f"追加ツイート取得エラー: {e}")
                        break
                else:
                    break

            print(f"\n分析完了: {analyzed_count}件のツイートを処理")
            print(f"リプライ先ユーザー数: {len(reply_users)}人")
            print(f"送信したリプライ数: {sent_replies_counter}件")

            return reply_counter, reply_users, sent_replies_counter, sent_replies_texts

        except Exception as e:
            print(f"分析エラー: {e}")
            return Counter(), {}, 0, []

    async def get_user_profile(self, user):
        """ユーザーのプロフィール情報を取得する"""
        try:
            # プロフィール情報を辞書形式で整理
            profile_data = {
                'user_id': user.id,                      # ユーザーID
                'name': user.name,                       # 表示名
                'screen_name': user.screen_name,         # @ユーザー名
                'description': user.description,         # プロフィール文
                'location': user.location,               # 場所
                'followers_count': user.followers_count, # フォロワー数
                'following_count': user.following_count, # フォロー数
                'tweets_count': user.statuses_count,     # ツイート数
                'created_at': user.created_at,           # アカウント作成日
                'profile_image_url': user.profile_image_url  # プロフィール画像URL
            }
            return profile_data
        except Exception as e:
            print(f"プロフィール取得エラー: {e}")
            return None

    async def get_user_tweets(self, user, count=1):
        """ユーザーの投稿を取得する"""
        try:
            tweets = []
            results = await user.get_tweets(tweet_type='Tweets', count=count)

            for tweet in results:
                tweet_data = {
                    'tweet_id': tweet.id,                      # ツイートID
                    'text': tweet.text,                        # ツイート本文
                    'created_at': tweet.created_at,            # 投稿日時
                    'retweet_count': tweet.retweet_count,      # リツイート数
                    'like_count': tweet.favorite_count,        # いいね数
                    'reply_count': tweet.reply_count,          # 返信数
                    'is_retweet': bool(tweet.retweeted_tweet), # リツイートかどうか
                    'is_quote': tweet.is_quote_status          # 引用ツイートかどうか
                }
                tweets.append(tweet_data)

            return tweets
        except Exception as e:
            print(f"ツイート取得エラー: {e}")
            return []

    def save_results(self, frequent_repliers_data, target_screen_name, sent_replies_counter, sent_replies_texts):
        """分析結果をJSONファイルとして保存する"""
        try:
            current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = os.path.join(
                self.results_dir,
                f"analysis_{target_screen_name}_{current_time}.json"
            )

            results_data = {
                "target_user": target_screen_name,
                "sent_replies_count": sent_replies_counter,
                "sent_replies_texts": sent_replies_texts,
                "frequent_repliers": frequent_repliers_data
            }

            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(results_data, f, ensure_ascii=False, indent=2)
            print(f"分析結果を保存しました: {filename}")
            return filename
        except Exception as e:
            print(f"保存エラー: {e}")
            return None

    def save_to_excel(self, frequent_repliers_data, target_screen_name, sent_replies_counter, sent_replies_texts):
        """分析結果をExcelファイルとして保存"""
        try:
            # リプライ送信情報をDataFrameに追加
            replies_info = {
                '分析対象ユーザー': target_screen_name,
                '送信したリプライ数': sent_replies_counter,
                '送信したリプライのテキスト': "\n\n".join(sent_replies_texts)
            }
            replies_df = pd.DataFrame([replies_info])

            # 頻繁にリプライしているユーザーのデータフレーム用のリストを作成
            repliers_rows = []
            for user_data in frequent_repliers_data:
                profile = user_data['profile']
                tweets = user_data['recent_tweets']

                # 最新のツイート3件を結合
                recent_tweets_text = "\n".join(
                    [tweet['text'] for tweet in tweets[:3]]
                ) if tweets else ""

                # 1行のデータとして整形
                row = {
                    '分析対象ユーザー': target_screen_name,
                    'ユーザー名': profile['name'],
                    'ユーザーID': f"@{profile['screen_name']}",
                    'プロフィール文': profile['description'],
                    'アカウント作成日': profile['created_at'],
                    'リプライ数': user_data['reply_count'],
                    'フォロワー数': profile['followers_count'],
                    'フォロー数': profile['following_count'],
                    'ツイート数': profile['tweets_count'],
                    '場所': profile['location'],
                    'プロフィール画像URL': profile['profile_image_url'],
                    'アカウントURL': f"https://twitter.com/{profile['screen_name']}",
                    '最近のツイート': recent_tweets_text
                }
                repliers_rows.append(row)

            # Repliers DataFrame
            repliers_df = pd.DataFrame(repliers_rows)

            # Excelファイル名を生成
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            excel_file = os.path.join(
                self.results_dir,
                f"リプライ分析_{target_screen_name}_{timestamp}.xlsx"
            )

            # Excelファイルとして保存
            with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
                # リプライ送信情報をシート1に保存
                replies_df.to_excel(writer, sheet_name='送信リプライ情報', index=False)

                # 頻繁にリプライしているユーザー情報をシート2に保存
                repliers_df.to_excel(writer, sheet_name='リプライ分析結果', index=False)

                # 列幅の自動調整
                for sheet_name in writer.sheets:
                    worksheet = writer.sheets[sheet_name]
                    df = replies_df if sheet_name == '送信リプライ情報' else repliers_df
                    for idx, col in enumerate(df.columns):
                        max_length = max(
                            df[col].astype(str).apply(len).max(),
                            len(str(col))
                        )
                        # 最大幅を50文字に制限
                        adjusted_width = min(max_length + 2, 50)
                        worksheet.column_dimensions[chr(65 + idx)].width = adjusted_width

                    # 行の高さを調整（送信リプライ情報シートのみ）
                    if sheet_name == '送信リプライ情報':
                        worksheet.row_dimensions[2].height = 300  # テキストが長いため高さを増やす

            print(f"\nExcelファイルを保存しました: {excel_file}")
            return excel_file

        except Exception as e:
            print(f"Excelファイルの保存中にエラーが発生しました: {e}")
            return None

    async def run_analysis(self):
        if not await self.setup():
            return

        # ユーザー入力を対話的に取得
        target_user = input("分析対象のユーザー名を入力してください（@を除く）: ").strip()
        if target_user.startswith('@'):
            target_user = target_user[1:]

        try:
            min_replies = int(input("最小リプライ数を入力してください（例: 3）: ").strip())
        except ValueError:
            print("無効な数値が入力されました。デフォルト値 3 を使用します。")
            min_replies = 3

        try:
            tweets_to_analyze = int(input("分析するツイート数を入力してください（例: 200）: ").strip())
        except ValueError:
            print("無効な数値が入力されました。デフォルト値 200 を使用します。")
            tweets_to_analyze = 200

        print(f"\n{target_user}のリプライを分析します...")
        print(f"- 分析対象ツイート数: {tweets_to_analyze}")
        print(f"- 最小リプライ数: {min_replies}")

        # リプライを分析
        reply_counter, reply_users, sent_replies_counter, sent_replies_texts = await self.analyze_user_replies(target_user, tweets_to_analyze)

        if not reply_counter:
            print("\nリプライが見つかりませんでした。")
            return

        # 頻繁にリプライしているユーザーの情報を収集
        frequent_repliers_data = []
        print("\nリプライの多いユーザーの情報を収集中...")

        for screen_name, reply_count in reply_counter.most_common():
            if reply_count >= min_replies and screen_name in reply_users:
                try:
                    user = reply_users[screen_name]
                    profile_data = await self.get_user_profile(user)
                    tweets = await self.get_user_tweets(user, count=3)

                    if profile_data:
                        user_data = {
                            'profile': profile_data,
                            'reply_count': reply_count,
                            'recent_tweets': tweets
                        }
                        frequent_repliers_data.append(user_data)
                        print(f"@{screen_name}の情報を取得しました（リプライ数: {reply_count}）")
                except Exception as e:
                    print(f"@{screen_name}の情報取得に失敗: {e}")
                    continue

        # 結果を保存
        if frequent_repliers_data:
            self.save_results(frequent_repliers_data, target_user, sent_replies_counter, sent_replies_texts)
            self.save_to_excel(frequent_repliers_data, target_user, sent_replies_counter, sent_replies_texts)
            print("\n分析が完了しました！")
        else:
            print("\n分析対象となるユーザーが見つかりませんでした。")


送信リプライの詳細な収集:

送信先ユーザーごとのリプライ数とリプライ内容を収集し、保存します。


In [None]:
# セル2: 定義用コード

import asyncio
import json
import os
from collections import Counter, defaultdict  # defaultdict をインポート
from datetime import datetime

import pandas as pd
from twikit import Client


class TwitterProfileAnalyzer:
    def __init__(self):
        self.client = Client(language='en-US')
        self.cookie_path = os.path.join(COOKIE_DIR, "cookie_edit.json")
        self.results_dir = RESULTS_DIR

    async def setup(self):
        try:
            with open(self.cookie_path, 'r', encoding='utf-8') as file:
                cookies = json.load(file)
            self.client.set_cookies(cookies)
            print("認証に成功しました！")
            return True
        except Exception as e:
            print(f"認証エラー: {e}")
            return False

    async def analyze_user_replies(self, screen_name, tweets_to_analyze=200):
        """指定したユーザーのツイートから、リプライを分析する"""
        try:
            # ユーザー情報を取得
            target_user = await self.client.get_user_by_screen_name(screen_name)
            print(f"{screen_name}のツイートを分析中...")

            # リプライしているユーザーをスクリーンネームベースで追跡
            reply_counter = Counter()
            reply_users = {}
            sent_replies_counter = 0
            sent_replies_texts = []

            # ユーザーごとのリプライ文を保存する辞書を追加
            sent_replies_texts_per_user = defaultdict(list)

            # ツイートを取得（リプライを含む）
            print("ツイートを取得中...")
            results = await self.client.get_user_tweets(
                target_user.id,
                tweet_type='Replies',  # Repliesタイプに変更
                count=min(tweets_to_analyze, 100)
            )

            analyzed_count = 0
            while results and analyzed_count < tweets_to_analyze:
                for tweet in results:
                    try:
                        # リプライ先のツイートテキストを解析
                        if hasattr(tweet, 'text') and tweet.text.startswith('@'):
                            # @ユーザー名を抽出
                            mentioned_users = [
                                word[1:] for word in tweet.text.split()
                                if word.startswith('@')
                            ]

                            for reply_to in mentioned_users:
                                if reply_to and reply_to.lower() != screen_name.lower():
                                    try:
                                        if reply_to not in reply_users:
                                            # スクリーンネームからユーザー情報を取得
                                            try:
                                                reply_user = await self.client.get_user_by_screen_name(reply_to)
                                                reply_users[reply_to] = reply_user
                                                print(f"ユーザー {reply_to} の情報を取得しました")
                                            except Exception as e:
                                                print(f"ユーザー {reply_to} の取得エラー: {e}")
                                                continue
                                        reply_counter[reply_to] += 1

                                        # ユーザーごとのリプライ文を追加
                                        sent_replies_texts_per_user[reply_to].append(tweet.text)

                                    except Exception as e:
                                        print(f"ユーザー {reply_to} の情報取得をスキップ: {e}")
                                        continue

                            # 送信したリプライのカウントとテキストを収集
                            sent_replies_counter += 1
                            sent_replies_texts.append(tweet.text)

                    except Exception as e:
                        print(f"ツイート解析エラー: {e}")
                        continue

                    analyzed_count += 1
                    if analyzed_count % 20 == 0:
                        print(f"{analyzed_count}件のツイートを分析済み")

                if analyzed_count < tweets_to_analyze and results.next_cursor:
                    try:
                        results = await results.next()
                    except Exception as e:
                        print(f"追加ツイート取得エラー: {e}")
                        break
                else:
                    break

            print(f"\n分析完了: {analyzed_count}件のツイートを処理")
            print(f"リプライ先ユーザー数: {len(reply_users)}人")
            print(f"送信したリプライ数: {sent_replies_counter}件")

            return reply_counter, reply_users, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user

        except Exception as e:
            print(f"分析エラー: {e}")
            return Counter(), {}, 0, [], {}

    async def get_user_profile(self, user):
        """ユーザーのプロフィール情報を取得する"""
        try:
            # プロフィール情報を辞書形式で整理
            profile_data = {
                'user_id': user.id,                      # ユーザーID
                'name': user.name,                       # 表示名
                'screen_name': user.screen_name,         # @ユーザー名
                'description': user.description,         # プロフィール文
                'location': user.location,               # 場所
                'followers_count': user.followers_count, # フォロワー数
                'following_count': user.following_count, # フォロー数
                'tweets_count': user.statuses_count,     # ツイート数
                'created_at': user.created_at,           # アカウント作成日
                'profile_image_url': user.profile_image_url  # プロフィール画像URL
            }
            return profile_data
        except Exception as e:
            print(f"プロフィール取得エラー: {e}")
            return None

    async def get_user_tweets(self, user, count=1):
        """ユーザーの投稿を取得する"""
        try:
            tweets = []
            results = await user.get_tweets(tweet_type='Tweets', count=count)

            for tweet in results:
                tweet_data = {
                    'tweet_id': tweet.id,                      # ツイートID
                    'text': tweet.text,                        # ツイート本文
                    'created_at': tweet.created_at,            # 投稿日時
                    'retweet_count': tweet.retweet_count,      # リツイート数
                    'like_count': tweet.favorite_count,        # いいね数
                    'reply_count': tweet.reply_count,          # 返信数
                    'is_retweet': bool(tweet.retweeted_tweet), # リツイートかどうか
                    'is_quote': tweet.is_quote_status          # 引用ツイートかどうか
                }
                tweets.append(tweet_data)

            return tweets
        except Exception as e:
            print(f"ツイート取得エラー: {e}")
            return []

    def save_results(self, frequent_repliers_data, target_screen_name, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user):
        """分析結果をJSONファイルとして保存する"""
        try:
            current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = os.path.join(
                self.results_dir,
                f"analysis_{target_screen_name}_{current_time}.json"
            )

            # ユーザーごとのリプライ文を追加
            users_replies = {
                user: texts for user, texts in sent_replies_texts_per_user.items()
            }

            results_data = {
                "target_user": target_screen_name,
                "sent_replies_count": sent_replies_counter,
                "sent_replies_texts": sent_replies_texts,
                "sent_replies_texts_per_user": users_replies,  # 追加
                "frequent_repliers": frequent_repliers_data
            }

            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(results_data, f, ensure_ascii=False, indent=2)
            print(f"分析結果を保存しました: {filename}")
            return filename
        except Exception as e:
            print(f"保存エラー: {e}")
            return None

    def save_to_excel(self, frequent_repliers_data, target_screen_name, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user):
        """分析結果をExcelファイルとして保存"""
        try:
            # リプライ送信情報をDataFrameに追加
            replies_info = {
                '分析対象ユーザー': target_screen_name,
                '送信したリプライ数': sent_replies_counter,
                '送信したリプライのテキスト': "\n\n".join(sent_replies_texts)
            }
            replies_df = pd.DataFrame([replies_info])

            # 頻繁にリプライしているユーザーのデータフレーム用のリストを作成
            repliers_rows = []
            for user_data in frequent_repliers_data:
                profile = user_data['profile']
                tweets = user_data['recent_tweets']

                # 最新のツイート3件を結合
                recent_tweets_text = "\n".join(
                    [tweet['text'] for tweet in tweets[:3]]
                ) if tweets else ""

                # 1行のデータとして整形
                row = {
                    '分析対象ユーザー': target_screen_name,
                    'ユーザー名': profile['name'],
                    'ユーザーID': f"@{profile['screen_name']}",
                    'プロフィール文': profile['description'],
                    'アカウント作成日': profile['created_at'],
                    'リプライ数': user_data['reply_count'],
                    'フォロワー数': profile['followers_count'],
                    'フォロー数': profile['following_count'],
                    'ツイート数': profile['tweets_count'],
                    '場所': profile['location'],
                    'プロフィール画像URL': profile['profile_image_url'],
                    'アカウントURL': f"https://twitter.com/{profile['screen_name']}",
                    '最近のツイート': recent_tweets_text,
                    # ユーザーごとのリプライ文を追加
                    'リプライ文': "\n\n".join(sent_replies_texts_per_user.get(profile['screen_name'], []))
                }
                repliers_rows.append(row)

            # Repliers DataFrame
            repliers_df = pd.DataFrame(repliers_rows)

            # Excelファイル名を生成
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            excel_file = os.path.join(
                self.results_dir,
                f"リプライ分析_{target_screen_name}_{timestamp}.xlsx"
            )

            # Excelファイルとして保存
            with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
                # リプライ送信情報をシート1に保存
                replies_df.to_excel(writer, sheet_name='送信リプライ情報', index=False)

                # 頻繁にリプライしているユーザー情報をシート2に保存
                repliers_df.to_excel(writer, sheet_name='リプライ分析結果', index=False)

                # 列幅の自動調整
                for sheet_name in writer.sheets:
                    worksheet = writer.sheets[sheet_name]
                    if sheet_name == '送信リプライ情報':
                        df = replies_df
                    else:
                        df = repliers_df
                    for idx, col in enumerate(df.columns):
                        max_length = max(
                            df[col].astype(str).apply(len).max(),
                            len(str(col))
                        )
                        # 最大幅を50文字に制限
                        adjusted_width = min(max_length + 2, 50)
                        worksheet.column_dimensions[chr(65 + idx)].width = adjusted_width

                    # 行の高さを調整（送信リプライ情報シートのみ）
                    if sheet_name == '送信リプライ情報':
                        worksheet.row_dimensions[2].height = 300  # テキストが長いため高さを増やす

            print(f"\nExcelファイルを保存しました: {excel_file}")
            return excel_file

        except Exception as e:
            print(f"Excelファイルの保存中にエラーが発生しました: {e}")
            return None

    async def run_analysis(self):
        if not await self.setup():
            return

        # ユーザー入力を対話的に取得
        target_user = input("分析対象のユーザー名を入力してください（@を除く）: ").strip()
        if target_user.startswith('@'):
            target_user = target_user[1:]

        try:
            min_replies = int(input("最小リプライ数を入力してください（例: 3）: ").strip())
        except ValueError:
            print("無効な数値が入力されました。デフォルト値 3 を使用します。")
            min_replies = 3

        try:
            tweets_to_analyze = int(input("分析するツイート数を入力してください（例: 200）: ").strip())
        except ValueError:
            print("無効な数値が入力されました。デフォルト値 200 を使用します。")
            tweets_to_analyze = 200

        print(f"\n{target_user}のリプライを分析します...")
        print(f"- 分析対象ツイート数: {tweets_to_analyze}")
        print(f"- 最小リプライ数: {min_replies}")

        # リプライを分析
        reply_counter, reply_users, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user = await self.analyze_user_replies(target_user, tweets_to_analyze)

        if not reply_counter:
            print("\nリプライが見つかりませんでした。")
            return

        # 頻繁にリプライしているユーザーの情報を収集
        frequent_repliers_data = []
        print("\nリプライの多いユーザーの情報を収集中...")

        for screen_name, reply_count in reply_counter.most_common():
            if reply_count >= min_replies and screen_name in reply_users:
                try:
                    user = reply_users[screen_name]
                    profile_data = await self.get_user_profile(user)
                    tweets = await self.get_user_tweets(user, count=3)

                    if profile_data:
                        user_data = {
                            'profile': profile_data,
                            'reply_count': reply_count,
                            'recent_tweets': tweets
                        }
                        frequent_repliers_data.append(user_data)
                        print(f"@{screen_name}の情報を取得しました（リプライ数: {reply_count}）")
                except Exception as e:
                    print(f"@{screen_name}の情報取得に失敗: {e}")
                    continue

        # 結果を保存
        if frequent_repliers_data:
            self.save_results(frequent_repliers_data, target_user, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user)
            self.save_to_excel(frequent_repliers_data, target_user, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user)
            print("\n分析が完了しました！")
        else:
            print("\n分析対象となるユーザーが見つかりませんでした。")


機能の概要
リプライを送信する機能の追加

指定したツイートに対してリプライを送信します。
送信したリプライに対する返信を取得する機能の追加

送信したリプライに対する返信ツイートを取得し、その回数と内容を記録します。

In [3]:
# セル2: 定義用コード

import asyncio
import json
import os
from collections import Counter, defaultdict
from datetime import datetime

import pandas as pd
from twikit import Client

class TwitterProfileAnalyzer:
    def __init__(self):
        self.client = Client(language='en-US')
        self.cookie_path = os.path.join(COOKIE_DIR, "cookie_edit.json")
        self.results_dir = RESULTS_DIR

        # 送信したリプライとその返信を追跡する辞書
        self.sent_replies = {}  # key: sent_reply_tweet_id, value: reply_content

    async def setup(self):
        try:
            with open(self.cookie_path, 'r', encoding='utf-8') as file:
                cookies = json.load(file)
            self.client.set_cookies(cookies)
            print("認証に成功しました！")
            return True
        except Exception as e:
            print(f"認証エラー: {e}")
            return False

    async def send_reply(self, tweet_id, reply_text):
        """指定したツイートにリプライを送信する"""
        try:
            sent_tweet = await self.client.create_tweet(text=reply_text, in_reply_to_tweet_id=tweet_id)
            print(f"リプライを送信しました: {sent_tweet.id}")
            self.sent_replies[sent_tweet.id] = {
                'original_tweet_id': tweet_id,
                'reply_text': reply_text,
                'replies_received': []
            }
            return sent_tweet.id
        except Exception as e:
            print(f"リプライ送信エラー: {e}")
            return None

    async def get_replies_to_sent_reply(self, sent_reply_id, max_replies=100):
        """送信したリプライに対する返信を取得する"""
        try:
            print(f"リプライID {sent_reply_id} に対する返信を取得中...")
            # 送信したリプライに対する返信を検索
            replies = await self.client.search_tweets(
                query=f'to:{self.get_screen_name_from_sent_reply(sent_reply_id)}',
                since_id=sent_reply_id,
                max_results=min(max_replies, 100)
            )
            count = 0
            for tweet in replies:
                if tweet.in_reply_to_status_id == sent_reply_id:
                    self.sent_replies[sent_reply_id]['replies_received'].append(tweet.text)
                    count += 1
            print(f"返信を{count}件取得しました。")
            return count, self.sent_replies[sent_reply_id]['replies_received']
        except Exception as e:
            print(f"返信取得エラー: {e}")
            return 0, []

    def get_screen_name_from_sent_reply(self, sent_reply_id):
        """送信したリプライのスクリーンネームを取得する（仮実装）"""
        # Twikitの機能によっては、送信したリプライのユーザー情報を直接取得できるかもしれません。
        # ここでは仮にスクリーンネームを取得する方法を示します。
        try:
            tweet = self.client.get_tweet(sent_reply_id)
            return tweet.author.screen_name
        except Exception as e:
            print(f"スクリーンネーム取得エラー: {e}")
            return ""

    async def analyze_user_replies(self, screen_name, tweets_to_analyze=200):
        """指定したユーザーのツイートから、リプライを分析する"""
        try:
            # ユーザー情報を取得
            target_user = await self.client.get_user_by_screen_name(screen_name)
            print(f"{screen_name}のツイートを分析中...")

            # リプライしているユーザーをスクリーンネームベースで追跡
            reply_counter = Counter()
            reply_users = {}
            sent_replies_counter = 0
            sent_replies_texts = []

            # ユーザーごとのリプライ文を保存する辞書を追加
            sent_replies_texts_per_user = defaultdict(list)

            # ツイートを取得（リプライを含む）
            print("ツイートを取得中...")
            results = await self.client.get_user_tweets(
                target_user.id,
                tweet_type='Replies',  # Repliesタイプに変更
                count=min(tweets_to_analyze, 100)
            )

            analyzed_count = 0
            while results and analyzed_count < tweets_to_analyze:
                for tweet in results:
                    try:
                        # リプライ先のツイートテキストを解析
                        if hasattr(tweet, 'text') and tweet.text.startswith('@'):
                            # @ユーザー名を抽出
                            mentioned_users = [
                                word[1:] for word in tweet.text.split()
                                if word.startswith('@')
                            ]

                            for reply_to in mentioned_users:
                                if reply_to and reply_to.lower() != screen_name.lower():
                                    try:
                                        if reply_to not in reply_users:
                                            # スクリーンネームからユーザー情報を取得
                                            try:
                                                reply_user = await self.client.get_user_by_screen_name(reply_to)
                                                reply_users[reply_to] = reply_user
                                                print(f"ユーザー {reply_to} の情報を取得しました")
                                            except Exception as e:
                                                print(f"ユーザー {reply_to} の取得エラー: {e}")
                                                continue
                                        reply_counter[reply_to] += 1

                                        # ユーザーごとのリプライ文を追加
                                        sent_replies_texts_per_user[reply_to].append(tweet.text)

                                        # ここでリプライを送信する場合は、以下のコメントを外して適宜修正してください
                                        # reply_tweet_id = await self.send_reply(tweet.id, f"@{reply_to} ありがとうございます！")
                                        # if reply_tweet_id:
                                        #     # 送信したリプライに対する返信を取得
                                        #     await self.get_replies_to_sent_reply(reply_tweet_id)

                                    except Exception as e:
                                        print(f"ユーザー {reply_to} の情報取得をスキップ: {e}")
                                        continue

                            # 送信したリプライのカウントとテキストを収集
                            sent_replies_counter += 1
                            sent_replies_texts.append(tweet.text)

                    except Exception as e:
                        print(f"ツイート解析エラー: {e}")
                        continue

                    analyzed_count += 1
                    if analyzed_count % 20 == 0:
                        print(f"{analyzed_count}件のツイートを分析済み")

                if analyzed_count < tweets_to_analyze and results.next_cursor:
                    try:
                        results = await results.next()
                    except Exception as e:
                        print(f"追加ツイート取得エラー: {e}")
                        break
                else:
                    break

            print(f"\n分析完了: {analyzed_count}件のツイートを処理")
            print(f"リプライ先ユーザー数: {len(reply_users)}人")
            print(f"送信したリプライ数: {sent_replies_counter}件")

            return reply_counter, reply_users, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user

        except Exception as e:
            print(f"分析エラー: {e}")
            return Counter(), {}, 0, [], {}

    async def get_user_profile(self, user):
        """ユーザーのプロフィール情報を取得する"""
        try:
            # プロフィール情報を辞書形式で整理
            profile_data = {
                'user_id': user.id,                      # ユーザーID
                'name': user.name,                       # 表示名
                'screen_name': user.screen_name,         # @ユーザー名
                'description': user.description,         # プロフィール文
                'location': user.location,               # 場所
                'followers_count': user.followers_count, # フォロワー数
                'following_count': user.following_count, # フォロー数
                'tweets_count': user.statuses_count,     # ツイート数
                'created_at': user.created_at,           # アカウント作成日
                'profile_image_url': user.profile_image_url  # プロフィール画像URL
            }
            return profile_data
        except Exception as e:
            print(f"プロフィール取得エラー: {e}")
            return None

    async def get_user_tweets(self, user, count=1):
        """ユーザーの投稿を取得する"""
        try:
            tweets = []
            results = await user.get_tweets(tweet_type='Tweets', count=count)

            for tweet in results:
                tweet_data = {
                    'tweet_id': tweet.id,                      # ツイートID
                    'text': tweet.text,                        # ツイート本文
                    'created_at': tweet.created_at,            # 投稿日時
                    'retweet_count': tweet.retweet_count,      # リツイート数
                    'like_count': tweet.favorite_count,        # いいね数
                    'reply_count': tweet.reply_count,          # 返信数
                    'is_retweet': bool(tweet.retweeted_tweet), # リツイートかどうか
                    'is_quote': tweet.is_quote_status          # 引用ツイートかどうか
                }
                tweets.append(tweet_data)

            return tweets
        except Exception as e:
            print(f"ツイート取得エラー: {e}")
            return []

    def save_results(self, frequent_repliers_data, target_screen_name, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user, replies_received_data):
        """分析結果をJSONファイルとして保存する"""
        try:
            current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = os.path.join(
                self.results_dir,
                f"analysis_{target_screen_name}_{current_time}.json"
            )

            # ユーザーごとのリプライ文を追加
            users_replies = {
                user: texts for user, texts in sent_replies_texts_per_user.items()
            }

            # 送信したリプライに対する返信データを追加
            replies_received = {
                sent_reply_id: {
                    'original_tweet_id': data['original_tweet_id'],
                    'reply_text': data['reply_text'],
                    'replies_received': data['replies_received']
                }
                for sent_reply_id, data in self.sent_replies.items()
            }

            results_data = {
                "target_user": target_screen_name,
                "sent_replies_count": sent_replies_counter,
                "sent_replies_texts": sent_replies_texts,
                "sent_replies_texts_per_user": users_replies,
                "replies_received": replies_received,  # 追加
                "frequent_repliers": frequent_repliers_data
            }

            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(results_data, f, ensure_ascii=False, indent=2)
            print(f"分析結果を保存しました: {filename}")
            return filename
        except Exception as e:
            print(f"保存エラー: {e}")
            return None

    def save_to_excel(self, frequent_repliers_data, target_screen_name, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user):
        """分析結果をExcelファイルとして保存"""
        try:
            # リプライ送信情報をDataFrameに追加
            replies_info = {
                '分析対象ユーザー': target_screen_name,
                '送信したリプライ数': sent_replies_counter,
                '送信したリプライのテキスト': "\n\n".join(sent_replies_texts)
            }
            replies_df = pd.DataFrame([replies_info])

            # 頻繁にリプライしているユーザーのデータフレーム用のリストを作成
            repliers_rows = []
            for user_data in frequent_repliers_data:
                profile = user_data['profile']
                tweets = user_data['recent_tweets']

                # 最新のツイート3件を結合
                recent_tweets_text = "\n".join(
                    [tweet['text'] for tweet in tweets[:3]]
                ) if tweets else ""

                # 1行のデータとして整形
                row = {
                    '分析対象ユーザー': target_screen_name,
                    'ユーザー名': profile['name'],
                    'ユーザーID': f"@{profile['screen_name']}",
                    'プロフィール文': profile['description'],
                    'アカウント作成日': profile['created_at'],
                    'リプライ数': user_data['reply_count'],
                    'フォロワー数': profile['followers_count'],
                    'フォロー数': profile['following_count'],
                    'ツイート数': profile['tweets_count'],
                    '場所': profile['location'],
                    'プロフィール画像URL': profile['profile_image_url'],
                    'アカウントURL': f"https://twitter.com/{profile['screen_name']}",
                    '最近のツイート': recent_tweets_text,
                    # ユーザーごとのリプライ文を追加
                    'リプライ文': "\n\n".join(sent_replies_texts_per_user.get(profile['screen_name'], []))
                }
                repliers_rows.append(row)

            # Repliers DataFrame
            repliers_df = pd.DataFrame(repliers_rows)

            # Excelファイル名を生成
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            excel_file = os.path.join(
                self.results_dir,
                f"リプライ分析_{target_screen_name}_{timestamp}.xlsx"
            )

            # Excelファイルとして保存
            with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
                # リプライ送信情報をシート1に保存
                replies_df.to_excel(writer, sheet_name='送信リプライ情報', index=False)

                # 頻繁にリプライしているユーザー情報をシート2に保存
                repliers_df.to_excel(writer, sheet_name='リプライ分析結果', index=False)

                # 列幅の自動調整
                for sheet_name in writer.sheets:
                    worksheet = writer.sheets[sheet_name]
                    if sheet_name == '送信リプライ情報':
                        df = replies_df
                    else:
                        df = repliers_df
                    for idx, col in enumerate(df.columns):
                        max_length = max(
                            df[col].astype(str).apply(len).max(),
                            len(str(col))
                        )
                        # 最大幅を50文字に制限
                        adjusted_width = min(max_length + 2, 50)
                        worksheet.column_dimensions[chr(65 + idx)].width = adjusted_width

                    # 行の高さを調整（送信リプライ情報シートのみ）
                    if sheet_name == '送信リプライ情報':
                        worksheet.row_dimensions[2].height = 300  # テキストが長いため高さを増やす

            print(f"\nExcelファイルを保存しました: {excel_file}")
            return excel_file

        except Exception as e:
            print(f"Excelファイルの保存中にエラーが発生しました: {e}")
            return None

    async def run_analysis(self):
        if not await self.setup():
            return

        # ユーザー入力を対話的に取得
        target_user = input("分析対象のユーザー名を入力してください（@を除く）: ").strip()
        if target_user.startswith('@'):
            target_user = target_user[1:]

        try:
            min_replies = int(input("最小リプライ数を入力してください（例: 3）: ").strip())
        except ValueError:
            print("無効な数値が入力されました。デフォルト値 3 を使用します。")
            min_replies = 3

        try:
            tweets_to_analyze = int(input("分析するツイート数を入力してください（例: 200）: ").strip())
        except ValueError:
            print("無効な数値が入力されました。デフォルト値 200 を使用します。")
            tweets_to_analyze = 200

        print(f"\n{target_user}のリプライを分析します...")
        print(f"- 分析対象ツイート数: {tweets_to_analyze}")
        print(f"- 最小リプライ数: {min_replies}")

        # リプライを分析
        reply_counter, reply_users, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user = await self.analyze_user_replies(target_user, tweets_to_analyze)

        if not reply_counter:
            print("\nリプライが見つかりませんでした。")
            return

        # 頻繁にリプライしているユーザーの情報を収集
        frequent_repliers_data = []
        print("\nリプライの多いユーザーの情報を収集中...")

        for screen_name, reply_count in reply_counter.most_common():
            if reply_count >= min_replies and screen_name in reply_users:
                try:
                    user = reply_users[screen_name]
                    profile_data = await self.get_user_profile(user)
                    tweets = await self.get_user_tweets(user, count=3)

                    if profile_data:
                        user_data = {
                            'profile': profile_data,
                            'reply_count': reply_count,
                            'recent_tweets': tweets
                        }
                        frequent_repliers_data.append(user_data)
                        print(f"@{screen_name}の情報を取得しました（リプライ数: {reply_count}）")
                except Exception as e:
                    print(f"@{screen_name}の情報取得に失敗: {e}")
                    continue


        # 結果を保存
        if frequent_repliers_data:
            self.save_results(frequent_repliers_data, target_user, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user, self.sent_replies)
            self.save_to_excel(frequent_repliers_data, target_user, sent_replies_counter, sent_replies_texts, sent_replies_texts_per_user)
            print("\n分析が完了しました！")
        else:
            print("\n分析対象となるユーザーが見つかりませんでした。")


リプライサーチ　実行用

In [6]:
# セル3: 実行用コード

import asyncio

# TwitterProfileAnalyzerのインスタンスを作成
analyzer = TwitterProfileAnalyzer()

# 非同期関数を直接awaitで実行
await analyzer.run_analysis()


認証に成功しました！
分析対象のユーザー名を入力してください（@を除く）: keitaro_aigc
最小リプライ数を入力してください（例: 3）: 2
分析するツイート数を入力してください（例: 200）: 300

keitaro_aigcのリプライを分析します...
- 分析対象ツイート数: 300
- 最小リプライ数: 2
keitaro_aigcのツイートを分析中...
ツイートを取得中...
20件のツイートを分析済み
40件のツイートを分析済み
60件のツイートを分析済み
80件のツイートを分析済み
100件のツイートを分析済み
120件のツイートを分析済み
140件のツイートを分析済み
160件のツイートを分析済み
ユーザー taiyo_ai_gakuse の情報を取得しました
ユーザー sora19ai の情報を取得しました
ユーザー riku720720 の情報を取得しました
ユーザー hayakawagomi の情報を取得しました
180件のツイートを分析済み
ユーザー jenova_ai_ の情報を取得しました
ユーザー shodaiiiiii の情報を取得しました
200件のツイートを分析済み
ユーザー satori_sz9 の情報を取得しました
220件のツイートを分析済み
ユーザー SuguruKun_ai の情報を取得しました
ユーザー Aoi_genai の情報を取得しました
240件のツイートを分析済み
ユーザー HIROKICHI_PD の情報を取得しました
260件のツイートを分析済み
ユーザー rinte0321 の情報を取得しました
ユーザー uchukunda2 の情報を取得しました
280件のツイートを分析済み
300件のツイートを分析済み

分析完了: 303件のツイートを処理
リプライ先ユーザー数: 12人

リプライの多いユーザーの情報を収集中...
@SuguruKun_aiの情報を取得しました（リプライ数: 6）
@Aoi_genaiの情報を取得しました（リプライ数: 6）
@satori_sz9の情報を取得しました（リプライ数: 2）
@HIROKICHI_PDの情報を取得しました（リプライ数: 2）
@rinte0321の情報を取得しました（リプライ数: 2）
分析結果を保存しました: /content/