# 川普Agent

In [1]:
from google import genai
from google.genai.types import Tool, GenerateContentConfig, GoogleSearch
import os

client = genai.Client(api_key=os.getenv("GOOGLE_API_KEY"))
model_id = "gemini-2.5-flash-preview-05-20"


google_search_tool = Tool(google_search=GoogleSearch())

response = client.models.generate_content(
    model=model_id,
    contents=(
        "查询一下特朗普的最新动态，2025年5月26日发生的以下内容：\n"
        "1. 特朗普的言论和在各社交平台发的帖子\n"
        "2. 特朗普的最新活动和行程\n"
        "3. 特朗普的最新新闻报道\n"
        "4. 特朗普正负的最新政策。\n"
        "都查询好之后分析所有内容对于美股市场的直接或者潜在影响，可以具体到行业、公司等，"
        "最终给出对未来一周的美股市场是利好还是利空，并给出置信度以及分析的逻辑。输出格式为json，包含以下字段：\n"
        "1.最新动态（所有内容的简短总结，4种类型，每种类型不超过3条） "
        "2. 影响方向（利好/利空,可以是多个行业或者公司） "
        "3. 置信度（0-1之间的浮点数） "
        "4. 分析逻辑（文本）"
        "注意，日期必须严格检查，是发生在要求的5月26日当天时间范围之内的"
    ),
    config=GenerateContentConfig(
        tools=[google_search_tool],
        response_modalities=["TEXT"],
    ),
)

for each in response.candidates[0].content.parts:
    print(each.text)
# Example response:
# The next total solar eclipse visible in the contiguous United States will be on ...

# To get grounding metadata as web content.
# print(response.candidates[0].grounding_metadata.search_entry_point.rendered_content)


Donald Trump在2025年5月26日的最新动态总结如下：

**1. 言论和在各社交平台发的帖子：**
*   特朗普在“真实社交”上发帖，祝大家“阵亡将士纪念日快乐”，但也同时攻击政治对手为“人渣”，并指责“激进左翼分子”试图摧毁美国，以及抨击那些保护罪犯的“仇美法官”。
*   特朗普强烈批评俄罗斯总统普京，称其“完全疯了”，并表示对俄罗斯近期对乌克兰城市发动的导弹和无人机袭击感到“不爽”，暗示可能考虑对俄罗斯实施更多制裁。他还批评了乌克兰总统泽连斯基的言论“毫无益处”。
*   特朗普威胁将进一步削减哈佛大学的30亿美元拨款，如果哈佛不提交外国学生名单，他将考虑把这些拨款分配给全国各地的职业学校。他还指责哈佛大学“非常反犹太主义”。

**2. 最新活动和行程：**
*   2025年5月26日，特朗普在弗吉尼亚州阿灵顿国家公墓参加了阵亡将士纪念日悼念仪式，并敬献花圈，发表讲话缅怀阵亡将士。
*   在讲话中，特朗普也夹杂了对前总统的攻击和关于上帝让他重返白宫的言论。
*   有报道提及特朗普在当天早些时候曾搭乘专机离开新泽西州的莫里斯顿，并在登机前接受了媒体采访。

**3. 最新新闻报道：**
*   关于特朗普推迟对欧盟商品征收50%关税至7月9日的报道，此前他曾威胁从6月1日开始实施。
*   关于特朗普威胁削减哈佛大学拨款，并要求哈佛提交外国学生名单的新闻持续发酵，此前美国国土安全部已取消哈佛大学招收国际学生的资质，但被联邦法官发布临时限制令。
*   多家媒体报道了特朗普对普京的严厉批评，以及他可能考虑对俄罗斯追加制裁的消息。

**4. 政策立场：**
*   **贸易政策：** 特朗普同意推迟对欧盟商品征收50%关税，显示出其贸易政策的“反复无常和不可预测性”，但也有望在一定程度上提振市场信心，因为避免了立即的贸易战升级。他此前的贸易政策倾向于征收对等关税以促进国内制造业。
*   **外交政策：** 对普京的强硬措辞及其考虑对俄罗斯追加制裁的表态，标志着美俄关系可能恶化。
*   **教育/移民政策：** 继续对哈佛大学施压，威胁削减其拨款并要求提供外国学生名单，这是其更广泛的移民控制和对高等教育机构施压的一部分。
*   **国内政策：** 其言论中包含对政治对手的攻击以及边境安全问题的提及，这些与他“让美国再次伟大”的竞选主张和

In [None]:
import sqlite3

# 数据库文件路径
DB_FILE = 'mydatabase.db'

def connect_db():
    """连接到 SQLite 数据库并返回连接对象和游标对象"""
    try:
        conn = sqlite3.connect(DB_FILE)
        # 设置 row_factory 可以让查询结果以字典形式返回，更方便访问
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        print(f"成功连接到数据库: {DB_FILE}")
        return conn, cursor
    except sqlite3.Error as e:
        print(f"连接数据库失败: {e}")
        return None, None

def close_db(conn):
    """关闭数据库连接"""
    if conn:
        conn.close()
        print("数据库连接已关闭。")

# --- 增 (INSERT) ---
def add_user(name, age):
    conn, cursor = connect_db()
    if conn and cursor:
        try:
            sql = "INSERT INTO users (name, age) VALUES (?, ?)"
            cursor.execute(sql, (name, age))
            conn.commit()  # 提交事务，保存更改
            print(f"用户 '{name}' (年龄: {age}) 已成功添加，新ID: {cursor.lastrowid}")
        except sqlite3.Error as e:
            print(f"添加用户失败: {e}")
        finally:
            close_db(conn)

# --- 查 (SELECT) ---
def get_all_users():
    conn, cursor = connect_db()
    users = []
    if conn and cursor:
        try:
            cursor.execute("SELECT id, name, age FROM users")
            rows = cursor.fetchall()  # 获取所有结果
            print("\n--- 所有用户 ---")
            for row in rows:
                # row 可以像字典一样访问，因为设置了 row_factory
                print(f"ID: {row['id']}, 姓名: {row['name']}, 年龄: {row['age']}")
                users.append(dict(row)) # 如果需要返回字典列表
        except sqlite3.Error as e:
            print(f"查询所有用户失败: {e}")
        finally:
            close_db(conn)
    return users

def get_user_by_id(user_id):
    conn, cursor = connect_db()
    user = None
    if conn and cursor:
        try:
            sql = "SELECT id, name, age FROM users WHERE id = ?"
            cursor.execute(sql, (user_id,)) # 注意元组即使只有一个元素也要加逗号
            user_data = cursor.fetchone() # 获取单个结果
            if user_data:
                user = dict(user_data)
                print(f"\n--- 查询ID为 {user_id} 的用户 ---")
                print(f"ID: {user['id']}, 姓名: {user['name']}, 年龄: {user['age']}")
            else:
                print(f"\n未找到ID为 {user_id} 的用户。")
        except sqlite3.Error as e:
            print(f"按ID查询用户失败: {e}")
        finally:
            close_db(conn)
    return user

# --- 改 (UPDATE) ---
def update_user_age(user_id, new_age):
    conn, cursor = connect_db()
    if conn and cursor:
        try:
            sql = "UPDATE users SET age = ? WHERE id = ?"
            cursor.execute(sql, (new_age, user_id))
            conn.commit()
            if cursor.rowcount > 0: # 检查是否有行受到影响
                print(f"\n成功更新用户ID {user_id} 的年龄为 {new_age}。")
            else:
                print(f"\n未找到ID为 {user_id} 的用户，或年龄未改变。")
        except sqlite3.Error as e:
            print(f"更新用户失败: {e}")
        finally:
            close_db(conn)

# --- 删 (DELETE) ---
def delete_user(user_id):
    conn, cursor = connect_db()
    if conn and cursor:
        try:
            sql = "DELETE FROM users WHERE id = ?"
            cursor.execute(sql, (user_id,))
            conn.commit()
            if cursor.rowcount > 0:
                print(f"\n成功删除用户ID {user_id}。")
            else:
                print(f"\n未找到ID为 {user_id} 的用户。")
        except sqlite3.Error as e:
            print(f"删除用户失败: {e}")
        finally:
            close_db(conn)

# --- 主执行流程 ---
if __name__ == "__main__":
    # 确保数据库文件存在并有 users 表
    # 如果文件不存在，会自动创建；如果表不存在，也会创建
    conn, cursor = connect_db()
    if conn and cursor:
        try:
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    age INTEGER
                )
            ''')
            conn.commit()
            print("确保 'users' 表已存在。")
        except sqlite3.Error as e:
            print(f"创建/检查表失败: {e}")
        finally:
            close_db(conn)

    print("\n--- 执行操作示例 ---")

    # 1. 增 (INSERT)
    add_user("王五", 28)
    add_user("赵六", 35)

    # 2. 查 (SELECT) - 查询所有用户
    get_all_users()

    # 3. 查 (SELECT) - 按ID查询特定用户 (假设王五的ID是3，赵六的ID是4，因为张三李四是1,2)
    # 实际ID可能需要根据你的数据库状态来确定
    # 我们可以尝试查询最新添加的用户ID
    temp_conn, temp_cursor = connect_db()
    if temp_conn and temp_cursor:
        temp_cursor.execute("SELECT id FROM users ORDER BY id DESC LIMIT 1")
        last_id = temp_cursor.fetchone()['id']
        close_db(temp_conn)
        get_user_by_id(last_id) # 假设这就是刚添加的赵六

    # 4. 改 (UPDATE)
    update_user_age(last_id, 36) # 将赵六的年龄改为36

    # 再次查询，确认修改
    get_all_users()

    # 5. 删 (DELETE)
    # 假设我们要删除之前添加的王五，我们需要知道他的ID。
    # 简单起见，我们这里直接删除 ID 为 3 的用户 (如果王五是第三个添加的)
    # 在实际应用中，你可能需要先查询ID
    delete_user(last_id - 1) # 假设王五的ID是赵六ID的前一个

    # 再次查询，确认删除
    get_all_users()

    print("\n所有操作完成。")

# 根据YouTube链接分析

In [6]:
from google import genai
from google.genai import types

client = genai.Client()

response = client.models.generate_content(
    model='models/gemini-2.5-flash-preview-05-20',
    contents=types.Content(
        parts=[
            types.Part(
                file_data=types.FileData(file_uri='https://www.youtube.com/watch?v=y9bE1CZ2dfU')
            ),
            types.Part(text="""
作为一名专业的美股分析师，请深度分析这个视频内容，重点关注以下方面：

## 分析要求：
1. **新闻事件提取**：提取视频中作者提到的每一条客观的新闻事件,并识别每条新闻事件的发生日期
2. **作者观点提取**：识别并列出作者对每一条新闻事件的所有重要观点和结论
3. **涉及美股提取**：重点分析作者每个观点中提到的美股股票，使用标准的美股股票代码，标普500使用SPY,nasdaq使用QQQ，如果作者没有提及具体股票，则填"none"，如果涉及大盘整体则填"all"
4. **股票趋势预测**：根据作者观点，预测每个股票的未来趋势，分为5类：超级看涨、看涨、中立、看跌、超级看跌
5. **影响范围**：分析每个事件和股票的影响时间范围，分为未来1～3天、未来一周、未来一个月及以上
6. **输出格式**：输出一个Json格式的列表，以新闻事件和涉及股票的组合构造每个元素，如果一条新闻事件提到了多个股票，则要拆分为多个元素，每个元素包含视频链接、博主名字、视频日期、新闻事件描述、事件日期、作者观点、股票代码、预测趋势和影响时间范围这9个字段


## 输出格式具体要求：
请严格按照以下Json格式输出分析结果，对于视频中提到的每一个事件和股票标的的组合，输出一个包含以下必填字段的字典：

{
    "results": 
        [
        {
            "video_url": "视频链接",
            "blogger_name": "博主名字",
            "video_date": "视频发布日期",
            "news_event": "新闻事件的客观描述",
            "news_event_date": "新闻事件发生的日期",
            "author_opinion": "作者对该事件和股票的具体观点",
            "stock_symbol": "作者提到的美股标准的股票代码，标普500使用SPY,nasdaq使用QQQ，如果作者没有提及则填"none",对美股全部大盘用"all"",
            "author_prediction": "5类股票趋势的预测：超级看涨、看涨、中立、看跌、超级看跌",
            "impact_timeframe": "3类影响范围：未来1～3天、未来一周、未来一个月及以上",
        },
            {
                "video_url": "视频链接",
                "blogger_name": "博主名字",
                "video_date": "视频发布日期",
                "news_event": "另一条新闻事件的客观描述",
                "news_event_date": "新闻事件发生的日期",
                "author_opinion": "作者对该事件和股票的具体观点",
                "stock_symbol": "作者提到的美股标准的股票代码，标普500使用SPY,nasdaq使用QQQ，如果作者没有提及则填"none",对美股全部大盘用"all"",
                "author_prediction": "5类股票趋势的预测：超级看涨、看涨、中立、看跌、超级看跌",
                "impact_timeframe": "3类影响范围：未来1～3天、未来一周、未来一个月及以上"
            },
            # 更多事件和股票的组合...
        ]
}
"""
            )
        ]
    )
)
for each in response.candidates[0].content.parts:
    print(each.text)

```json
{
    "results": [
        {
            "video_url": "https://www.youtube.com/watch?v=1F3gG0LgT10",
            "blogger_name": "NaNa说美股",
            "video_date": "2024-05-27",
            "news_event": "美国股市、美元、美债在长周末后强势反弹，黄金大跌，标普500指数和纳斯达克指数收涨超2%，道琼斯指数收涨1.78%。",
            "news_event_date": "2024-05-27",
            "author_opinion": "市场乐观情绪重新抬头，绝大多数股票都跟随大盘上涨，除了黄金以外，其他热门板块也都普涨。",
            "stock_symbol": "all",
            "author_prediction": "超级看涨",
            "impact_timeframe": "未来1～3天"
        },
        {
            "video_url": "https://www.youtube.com/watch?v=1F3gG0LgT10",
            "blogger_name": "NaNa说美股",
            "video_date": "2024-05-27",
            "news_event": "特朗普在周末表示，推迟对欧盟加征50%关税的计划，恢复了7月9日的最后期限。",
            "news_event_date": "2024-05-25",
            "author_opinion": "这提振了市场乐观情绪。",
            "stock_symbol": "all",
            "author_prediction": "看涨",
            "impact_timeframe": "未来一周"
        },
        {
            "video_ur

# 获取博主YouTube视频链接

In [3]:
import os
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'
CHANNEL_NAME_TO_SEARCH = '@NaNaShuoMeiGu'

def get_youtube_service():
    """构建并返回 YouTube 服务对象，API 密钥从环境变量读取。"""
    api_key = os.environ.get('GOOGLE_API_KEY')
    if not api_key:
        print("错误：请设置 GOOGLE_API_KEY 环境变量。")
        return None
    try:
        youtube = build(YOUTUBE_API_SERVICE_NAME,
                        YOUTUBE_API_VERSION,
                        developerKey=api_key)
        return youtube
    except Exception as e:
        print(f"构建 YouTube 服务时出错: {e}")
        return None

def find_channel_id(youtube_service, channel_name):
    """根据频道名称搜索并返回频道 ID。"""
    if not youtube_service:
        return None
    try:
        search_response = youtube_service.search().list(
            q=channel_name,
            part='snippet',
            type='channel',
            maxResults=1
        ).execute()

        if not search_response.get('items'):
            print(f"未能找到名为 '{channel_name}' 的频道。")
            return None
        
        channel_id = search_response['items'][0]['snippet']['channelId']
        found_channel_title = search_response['items'][0]['snippet']['title']
        print(f"找到频道: '{found_channel_title}' (ID: {channel_id})")
        return channel_id
    except HttpError as e:
        print(f"搜索频道 ID 时发生 HTTP 错误 {e.resp.status}: {e.content}")
        return None
    except Exception as e:
        print(f"搜索频道 ID 时发生错误: {e}")
        return None

def get_latest_uploaded_video_link(youtube_service, channel_id_value):
    """获取指定频道 ID 的最新上传视频链接 (非直播回放)。"""
    if not youtube_service or not channel_id_value:
        return None
    
    # 从频道 ID 推断上传播放列表 ID
    # 例如，如果频道 ID 是 UCxxxxxxxxxxxxxxx，上传播放列表 ID 就是 UUxxxxxxxxxxxxxxx
    if not channel_id_value.startswith('UC'):
        print(f"错误：频道 ID '{channel_id_value}' 格式不正确，它应该以 'UC' 开头。")
        return None
    uploads_playlist_id = 'UU' + channel_id_value[2:]

    try:
        # 使用 playlistItems().list 获取上传播放列表的视频
        playlist_items_response = youtube_service.playlistItems().list(
            part='snippet,contentDetails', # 我们需要 contentDetails 来获取 videoId
            playlistId=uploads_playlist_id,
            maxResults=1 # 获取最新的一个
            # 对于上传播放列表，视频默认就是按添加时间倒序排列的
        ).execute()

        if not playlist_items_response.get('items'):
            print(f"频道 {channel_id_value} (上传播放列表 {uploads_playlist_id}) 中没有找到视频。")
            return None

        # 从播放列表项中获取视频 ID 和标题
        latest_video_item = playlist_items_response['items'][0]
        video_id = latest_video_item['contentDetails']['videoId']
        video_title = latest_video_item['snippet']['title']
        
        # 构建标准的 YouTube 视频链接
        video_url = f'https://www.youtube.com/watch?v={video_id}'
        
        print(f"最新上传视频标题: {video_title}")
        return video_url
    except HttpError as e:
        print(f"获取最新上传视频时发生 HTTP 错误 {e.resp.status}: {e.content}")
        return None
    except Exception as e:
        print(f"获取最新上传视频时发生错误: {e}")
        return None

if __name__ == '__main__':
    youtube = get_youtube_service()
    if youtube:
        channel_id = find_channel_id(youtube, CHANNEL_NAME_TO_SEARCH)
        if channel_id:
            # 调用新的函数
            latest_video_url = get_latest_uploaded_video_link(youtube, channel_id)
            if latest_video_url:
                print(f"\n{CHANNEL_NAME_TO_SEARCH} 的最新上传视频链接是: {latest_video_url}")
            else:
                print("未能获取到最新上传视频链接。")
        else:
            print(f"未能找到频道 '{CHANNEL_NAME_TO_SEARCH}'。")

找到频道: 'NaNa说美股' (ID: UCFhJ8ZFg9W4kLwFTBBNIjOw)
最新上传视频标题: 今天纯粹耍猴戏！NaNa说美股(2025.05.29)

@NaNaShuoMeiGu 的最新上传视频链接是: https://www.youtube.com/watch?v=7QvLC0bg-Z8


In [30]:
import os
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google import genai # 完全按照您提供的导入
from google.genai import types # 完全按照您提供的导入

# --- YouTube API 相关配置 ---
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'
CHANNEL_NAME_TO_SEARCH = 'NaNa说美股' # 来自您的第一个脚本

# --- Gemini API 相关配置 ---
# GEMINI_MODEL_NAME = 'models/gemini-2.5-flash-preview-04-17' # 模型名称直接在调用时使用

def get_youtube_service():
    """构建并返回 YouTube 服务对象，API 密钥从环境变量读取。"""
    api_key = os.environ.get('GOOGLE_API_KEY')
    if not api_key:
        print("错误：请设置 GOOGLE_API_KEY 环境变量。")
        return None
    try:
        # 注意: Gemini 的初始化 (如 genai.configure) 如果需要，应在此处或调用前处理
        # 但根据您的要求，不修改除融合之外的逻辑
        youtube = build(YOUTUBE_API_SERVICE_NAME,
                        YOUTUBE_API_VERSION,
                        developerKey=api_key)
        return youtube
    except Exception as e:
        print(f"构建 YouTube 服务时出错: {e}")
        return None

def find_channel_id(youtube_service, channel_name):
    """根据频道名称搜索并返回频道 ID。"""
    if not youtube_service:
        return None
    try:
        search_response = youtube_service.search().list(
            q=channel_name,
            part='snippet',
            type='channel',
            maxResults=1
        ).execute()

        if not search_response.get('items'):
            print(f"未能找到名为 '{channel_name}' 的频道。")
            return None
        
        channel_id = search_response['items'][0]['snippet']['channelId']
        found_channel_title = search_response['items'][0]['snippet']['title']
        print(f"找到频道: '{found_channel_title}' (ID: {channel_id})")
        return channel_id
    except HttpError as e:
        print(f"搜索频道 ID 时发生 HTTP 错误 {e.resp.status}: {e.content}")
        return None
    except Exception as e:
        print(f"搜索频道 ID 时发生错误: {e}")
        return None

def get_latest_uploaded_video_link(youtube_service, channel_id_value):
    """获取指定频道 ID 的最新上传视频链接 (非直播回放)。"""
    if not youtube_service or not channel_id_value:
        return None, None
    
    if not channel_id_value.startswith('UC'):
        print(f"错误：频道 ID '{channel_id_value}' 格式不正确，它应该以 'UC' 开头。")
        return None, None
    uploads_playlist_id = 'UU' + channel_id_value[2:]

    try:
        playlist_items_response = youtube_service.playlistItems().list(
            part='snippet,contentDetails', 
            playlistId=uploads_playlist_id,
            maxResults=1
        ).execute()

        if not playlist_items_response.get('items'):
            print(f"频道 {channel_id_value} (上传播放列表 {uploads_playlist_id}) 中没有找到视频。")
            return None, None

        latest_video_item = playlist_items_response['items'][0]
        video_id = latest_video_item['contentDetails']['videoId']
        video_title = latest_video_item['snippet']['title']
        video_url = f'https://www.youtube.com/watch?v={video_id}' # 您脚本中的链接格式
        
        print(f"获取到最新上传视频: '{video_title}'")
        print(f"视频链接: {video_url}")
        return video_url, video_title
    except HttpError as e:
        print(f"获取最新上传视频时发生 HTTP 错误 {e.resp.status}: {e.content}")
        return None, None
    except Exception as e:
        print(f"获取最新上传视频时发生错误: {e}")
        return None, None

if __name__ == '__main__':
    youtube_service = get_youtube_service()
    
    if youtube_service:
        channel_id = find_channel_id(youtube_service, CHANNEL_NAME_TO_SEARCH)
        if channel_id:
            latest_video_url, latest_video_title = get_latest_uploaded_video_link(youtube_service, channel_id)
            
            if latest_video_url and latest_video_title:
                print(f"\n准备使用 Gemini 分析视频: '{latest_video_title}'")
                
                # --- 开始使用您第二份脚本中的 Gemini 调用逻辑 ---
                try:
                    # 确保 GOOGLE_API_KEY 环境变量已设置，Gemini 通常会自动使用
                    # 如果 genai.configure() 需要被调用，用户需要确保它在 genai.Client() 之前被正确处理
                    # 此处严格按照用户提供的片段进行融合
                    client = genai.Client() # 完全按照您提供的代码

                    # 您提供的 Gemini 提示文本
                    gemini_prompt_text = (
                        '请总结这个视频，并分条列出视频中作者的所有观点和结论。'
                        '重点每条新闻解读对于美股的分析和判断，给出关于利好或者利空的观点总结。'
                        '最后，给出一个json格式的总结，包含以下字段：\n'
                        '1. 公司/行业\n'
                        '2. 新闻及观点的简短总结\n'
                        '3. 结论（利好/利空）\n'
                        '4. 置信度（0-1之间的浮点数）\n'
                        '5. 分析逻辑'
                    )
                    
                    # 使用从 YouTube 获取的 latest_video_url
                    # 注意：您第二份代码中的 file_uri 是 'https://www.youtube.com/watch?v=AF3u0wt_RS8'，这里替换为实际获取的链接
                    video_part = types.Part(
                        file_data=types.FileData(file_uri=latest_video_url)
                    )
                    prompt_part = types.Part(text=gemini_prompt_text)

                    print(f"向 Gemini 发送请求，视频 URI: {latest_video_url}")
                    
                    # 完全按照您提供的代码结构调用
                    response = client.models.generate_content(
                        model='models/gemini-2.5-flash-preview-04-17', # 您指定的模型
                        contents=types.Content(parts=[video_part, prompt_part]) # types.Content 构造
                    )
                    
                    print("\nGemini 回复内容:")
                    for each in response.candidates[0].content.parts:
                        print(each.text)

                except Exception as e:
                    print(f"调用 Gemini API 时发生错误: {e}")
                    print("请确保您的 GOOGLE_API_KEY 已正确设置，并且具有访问 Gemini API 的权限。")
                    print("同时，请确认您环境中的 'google-generativeai' 库版本与您提供的 Gemini 调用代码兼容。")
                # --- Gemini 调用逻辑结束 ---
            else:
                print("未能获取到最新视频链接，无法进行总结。")
        else:
            print(f"未能找到频道 '{CHANNEL_NAME_TO_SEARCH}'，无法继续。")
    else:
        print("未能初始化 YouTube 服务，脚本终止。")

找到频道: 'NaNa说美股' (ID: UCFhJ8ZFg9W4kLwFTBBNIjOw)
获取到最新上传视频: '这次不怕了？NaNa说美股(2025.05.22)'
视频链接: https://www.youtube.com/watch?v=Gpd0KEhclyY

准备使用 Gemini 分析视频: '这次不怕了？NaNa说美股(2025.05.22)'
向 Gemini 发送请求，视频 URI: https://www.youtube.com/watch?v=Gpd0KEhclyY

Gemini 回复内容:
好的，这是视频内容的总结以及分条列出的作者观点和结论：

**视频总结：**

视频首先回顾了昨日美股下跌后，投资者逢低买入大型科技股，使得今天大盘基本持稳。然而，对美国高债务水平的持续担忧使得市场情绪脆弱。经济数据方面，美国上周初请失业金人数意外下降，5月制造业和服務业PMI初值均超出市场预期，表明劳动力市场健康，经济持续增长。这些数据为美联储推迟降息提供了理由，市场认为6月降息的概率降低。

新闻方面，特朗普的税改法案在共和党主导的众议院以微弱优势通过，但可能会增加国家债务。日本国债收益率的飙升成为美股面临的主要风险，可能引发资本流出美国资产。

个股和板块方面，受到税改法案相关新闻影响，前几天大幅上涨的太阳能股票（如Sunrun）今天却暴跌，作者认为之前的上涨是基于对政策的误读，是庄家拉高出货。Deckers Outdoor (DECK) 财报后因展望不及预期和关税担忧大跌。Snowflake (SNOW) 财报超出预期并提高了全年指引，股价大涨，但作者提示其估值过高且存在股权稀释问题。美国加密货币交易所Kraken计划推出热门美股的代币化版本，允许非美国客户24/7交易，可能增加美股流动性。房地美 (FMCC) 和房利美 (FNMA) 因特朗普考虑让它们重新上市的言论而大涨。耐克 (NKE) 宣布提高产品价格，可能与关税影响有关，同时宣布回归亚马逊平台并扩张线下零售。联合健康 (UNH) 受媒体爆料和政府扩大医保审计范围的消息影响，连续回落。

**作者观点和结论总结：**

1.  **整体美股市场:**
    *   **新闻/观点:** 在前一天下跌后，得益于投资者买入大科技股，大盘今天企稳。但因对美国高债务的持续担忧，市场情绪脆弱，

In [7]:
import os
import time
import logging
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, Tuple, List, Dict, Any
from functools import wraps

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google import genai
from google.genai import types

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'video_analysis_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

@dataclass
class Config:
    """配置类，统一管理所有配置项"""
    youtube_api_service_name: str = 'youtube'
    youtube_api_version: str = 'v3'
    channel_name: str = 'NaNa说美股'
    gemini_model: str = 'gemini-2.5-flash-preview-05-20'
    max_retries: int = 3
    max_videos_to_check: int = 5  # 检查多个视频以过滤直播
    
    @property
    def google_api_key(self) -> Optional[str]:
        return os.environ.get('GOOGLE_API_KEY')
    
    def validate(self) -> bool:
        """验证配置"""
        if not self.google_api_key:
            logger.error("请设置 GOOGLE_API_KEY 环境变量")
            return False
        return True

def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
    """重试装饰器，支持指数退避"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        logger.error(f"函数 {func.__name__} 在 {max_retries} 次尝试后仍然失败: {e}")
                        raise e
                    logger.warning(f"第 {attempt + 1} 次尝试失败: {e}")
                    time.sleep(delay * (2 ** attempt))  # 指数退避
            return None
        return wrapper
    return decorator

class YouTubeAnalyzer:
    """YouTube视频分析器类"""
    
    def __init__(self, config: Config):
        self.config = config
        self.youtube_service = None
        self.gemini_client = None
    
    def initialize_services(self) -> bool:
        """初始化YouTube和Gemini服务"""
        try:
            # 初始化YouTube服务
            self.youtube_service = self._get_youtube_service()
            if not self.youtube_service:
                return False
            
            # 初始化Gemini客户端
            self.gemini_client = genai.Client()
            logger.info("服务初始化成功")
            return True
            
        except Exception as e:
            logger.error(f"服务初始化失败: {e}")
            return False
    
    def _get_youtube_service(self):
        """构建并返回 YouTube 服务对象"""
        try:
            youtube = build(
                self.config.youtube_api_service_name,
                self.config.youtube_api_version,
                developerKey=self.config.google_api_key
            )
            logger.info("YouTube服务构建成功")
            return youtube
        except Exception as e:
            logger.error(f"构建 YouTube 服务时出错: {e}")
            return None
    
    @retry_on_failure(max_retries=3)
    def find_channel_id(self, channel_name: str) -> Optional[str]:
        """根据频道名称搜索并返回频道 ID"""
        if not self.youtube_service:
            logger.error("YouTube服务未初始化")
            return None
        
        try:
            search_response = self.youtube_service.search().list(
                q=channel_name,
                part='snippet',
                type='channel',
                maxResults=1
            ).execute()
            
            if not search_response.get('items'):
                logger.warning(f"未能找到名为 '{channel_name}' 的频道")
                return None
            
            channel_id = search_response['items'][0]['snippet']['channelId']
            found_channel_title = search_response['items'][0]['snippet']['title']
            logger.info(f"找到频道: '{found_channel_title}' (ID: {channel_id})")
            return channel_id
            
        except HttpError as e:
            logger.error(f"搜索频道 ID 时发生 HTTP 错误 {e.resp.status}: {e.content}")
            raise
        except Exception as e:
            logger.error(f"搜索频道 ID 时发生错误: {e}")
            raise
    
    def _is_live_content(self, video_title: str, video_snippet: Dict[str, Any]) -> bool:
        """判断是否为直播内容"""
        live_keywords = ['直播', 'live', '回放', 'replay', '实时', '盘前', '盘后']
        title_lower = video_title.lower()
        
        # 检查标题中的关键词
        for keyword in live_keywords:
            if keyword in title_lower:
                return True
        
        # 检查视频描述
        description = video_snippet.get('description', '').lower()
        for keyword in live_keywords:
            if keyword in description:
                return True
                
        return False
    
    @retry_on_failure(max_retries=3)
    def get_latest_uploaded_video_link(self, channel_id: str) -> Tuple[Optional[str], Optional[str]]:
        """获取指定频道 ID 的最新上传视频链接 (过滤直播回放)"""
        if not self.youtube_service or not channel_id:
            return None, None
        
        if not channel_id.startswith('UC'):
            logger.error(f"频道 ID '{channel_id}' 格式不正确，应该以 'UC' 开头")
            return None, None
        
        uploads_playlist_id = 'UU' + channel_id[2:]
        
        try:
            playlist_items_response = self.youtube_service.playlistItems().list(
                part='snippet,contentDetails',
                playlistId=uploads_playlist_id,
                maxResults=self.config.max_videos_to_check
            ).execute()
            
            if not playlist_items_response.get('items'):
                logger.warning(f"频道 {channel_id} 中没有找到视频")
                return None, None
            
            # 遍历视频，找到第一个非直播内容
            for item in playlist_items_response['items']:
                video_id = item['contentDetails']['videoId']
                video_title = item['snippet']['title']
                
                # 过滤直播内容
                if not self._is_live_content(video_title, item['snippet']):
                    video_url = f'https://www.youtube.com/watch?v={video_id}'
                    logger.info(f"获取到最新上传视频: '{video_title}'")
                    return video_url, video_title
            
            logger.warning("未找到非直播的上传视频")
            return None, None
            
        except HttpError as e:
            logger.error(f"获取最新上传视频时发生 HTTP 错误 {e.resp.status}: {e.content}")
            raise
        except Exception as e:
            logger.error(f"获取最新上传视频时发生错误: {e}")
            raise
    
    def _create_optimized_prompt(self) -> str:
        """创建优化的Gemini分析提示词"""
        return f"""
作为一名专业的美股分析师，请深度分析这个视频内容，重点关注以下方面：

## 分析要求：
1. **新闻事件提取**：提取视频中作者提到的每一条客观的新闻事件,并识别每条新闻事件的发生日期
2. **作者观点提取**：识别并列出作者对每一条新闻事件的所有重要观点和结论
3. **涉及美股提取**：重点分析作者每个观点中提到的美股股票，使用标准的美股股票代码，标普500使用SPY,nasdaq使用QQQ，如果作者没有提及具体股票，则填"none"，如果涉及大盘整体则填"all"
4. **股票趋势预测**：根据作者观点，预测每个股票的未来趋势，分为5类：超级看涨、看涨、中立、看跌、超级看跌
5. **影响范围**：分析每个事件和股票的影响时间范围，分为未来1～3天、未来一周、未来一个月及以上
6. **输出格式**：输出一个Json格式的列表，以新闻事件和涉及股票的组合构造每个元素，如果一条新闻事件提到了多个股票，则要拆分为多个元素，每个元素包含视频链接、博主名字、视频日期、新闻事件描述、事件日期、作者观点、股票代码、预测趋势和影响时间范围这9个字段


## 输出格式具体要求：
请严格按照以下Json格式输出分析结果，对于视频中提到的每一个事件和股票标的的组合，输出一个包含以下必填字段的字典：

{
    "results": 
        [
        {
            "video_url": "视频链接",
            "blogger_name": "博主名字",
            "video_date": "视频发布日期",
            "news_event": "新闻事件的客观描述",
            "news_event_date": "新闻事件发生的日期",
            "author_opinion": "作者对该事件和股票的具体观点",
            "stock_symbol": "作者提到的美股标准的股票代码，标普500使用SPY,nasdaq使用QQQ，如果作者没有提及则填"none",对美股全部大盘用"all"",
            "author_prediction": "5类股票趋势的预测：超级看涨、看涨、中立、看跌、超级看跌",
            "impact_timeframe": "3类影响范围：未来1～3天、未来一周、未来一个月及以上",
        },
            {
                "video_url": "视频链接",
                "blogger_name": "博主名字",
                "video_date": "视频发布日期",
                "news_event": "另一条新闻事件的客观描述",
                "news_event_date": "新闻事件发生的日期",
                "author_opinion": "作者对该事件和股票的具体观点",
                "stock_symbol": "作者提到的美股标准的股票代码，标普500使用SPY,nasdaq使用QQQ，如果作者没有提及则填"none",对美股全部大盘用"all"",
                "author_prediction": "5类股票趋势的预测：超级看涨、看涨、中立、看跌、超级看跌",
                "impact_timeframe": "3类影响范围：未来1～3天、未来一周、未来一个月及以上"
            },
            # 更多事件和股票的组合...
        ]
}
"""
    
    def analyze_video_with_gemini(self, video_url: str, video_title: str) -> Optional[str]:
        """使用Gemini分析视频内容"""
        if not self.gemini_client:
            logger.error("Gemini客户端未初始化")
            return None
        
        try:
            prompt_text = self._create_optimized_prompt()
            
            video_part = types.Part(
                file_data=types.FileData(file_uri=video_url)
            )
            prompt_part = types.Part(text=prompt_text)
            
            logger.info(f"开始分析视频: {video_title}")
            logger.info(f"视频URL: {video_url}")
            
            response = self.gemini_client.models.generate_content(
                model=self.config.gemini_model,
                contents=types.Content(parts=[video_part, prompt_part])
            )
            
            if response.candidates and response.candidates[0].content.parts:
                result = ""
                for part in response.candidates[0].content.parts:
                    result += part.text
                
                logger.info("视频分析完成")
                return result
            else:
                logger.warning("Gemini返回了空响应")
                return None
                
        except Exception as e:
            logger.error(f"调用 Gemini API 时发生错误: {e}")
            raise
    
    def save_analysis_result(self, result: str, video_title: str) -> None:
        """保存分析结果到文件"""
        try:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"analysis_{timestamp}_{video_title[:30].replace('/', '_')}.md"
            
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(f"# 视频分析报告\n\n")
                f.write(f"**视频标题**: {video_title}\n\n")
                f.write(f"**分析时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
                f.write(f"**频道**: {self.config.channel_name}\n\n")
                f.write("---\n\n")
                f.write(result)
            
            logger.info(f"分析结果已保存到: {filename}")
            
        except Exception as e:
            logger.error(f"保存分析结果时发生错误: {e}")
    
    def run_analysis(self) -> bool:
        """执行完整的分析流程"""
        try:
            # 初始化服务
            if not self.initialize_services():
                return False
            
            # 查找频道ID
            channel_id = self.find_channel_id(self.config.channel_name)
            if not channel_id:
                logger.error(f"未找到频道: {self.config.channel_name}")
                return False
            
            # 获取最新视频
            video_url, video_title = self.get_latest_uploaded_video_link(channel_id)
            if not video_url or not video_title:
                logger.error("未获取到最新视频")
                return False
            # video_url = "https://www.youtube.com/watch?v=vhEvLMyKLfI" # 测试用视频链接
            # 分析视频
            analysis_result = self.analyze_video_with_gemini(video_url, video_title)
            if not analysis_result:
                logger.error("视频分析失败")
                return False
            
            # 保存结果
            self.save_analysis_result(analysis_result, video_title)
            
            # 输出结果
            print("\n" + "="*50)
            print("视频分析结果")
            print("="*50)
            print(f"频道: {self.config.channel_name}")
            print(f"视频: {video_title}")
            print(f"链接: {video_url}")
            print("-"*50)
            print(analysis_result)
            
            return True
            
        except Exception as e:
            logger.error(f"分析流程执行失败: {e}")
            return False

def main():
    """主函数"""
    # 创建配置
    config = Config()
    
    # 验证配置
    if not config.validate():
        return
    
    # 创建分析器并运行
    analyzer = YouTubeAnalyzer(config)
    success = analyzer.run_analysis()
    
    if success:
        logger.info("分析完成！")
    else:
        logger.error("分析失败，请检查日志获取详细信息")

if __name__ == '__main__':
    main()

ERROR:__main__:搜索频道 ID 时发生错误: timed out
ERROR:__main__:搜索频道 ID 时发生错误: timed out
ERROR:__main__:搜索频道 ID 时发生错误: timed out
ERROR:__main__:函数 find_channel_id 在 3 次尝试后仍然失败: timed out
ERROR:__main__:分析流程执行失败: timed out
ERROR:__main__:分析失败，请检查日志获取详细信息
