# On-Demand API 使用指南

实时爬取 Twitter 数据的 HTTP 接口。

**API 地址**: `http://34.68.50.220:8000`

## 准备工作

安装必要的库：

In [None]:
# 如果没有安装 requests，运行这行
# !pip install requests

import requests
from datetime import datetime, timedelta
from pprint import pprint
import json

## 1. 健康检查

测试 API 是否可用：

In [None]:
# 健康检查
health_url = "http://34.68.50.220:8000/api/v1/health"
response = requests.get(health_url)

print(f"状态码: {response.status_code}")
pprint(response.json())

## 2. 基础用法：搜索单个关键词

最简单的使用方式：

In [None]:
# API 端点
api_url = "http://34.68.50.220:8000/api/v1/on_demand_scrape"

# 请求参数
payload = {
    "keywords": ["bitcoin"],
    "limit": 20
}

# 发送请求
response = requests.post(api_url, json=payload, timeout=120)
result = response.json()

# 查看结果
print(f"成功: {result['success']}")
print(f"获取推文数: {result['count']}")
print(f"执行耗时: {result['execution_time_seconds']} 秒")
print(f"搜索条件: {result['query']}")

查看第一条推文：

In [None]:
if result['data']:
    first_tweet = result['data'][0]
    
    print(f"推文URL: {first_tweet['uri']}")
    print(f"发布时间: {first_tweet['datetime']}")
    print(f"\n作者: @{first_tweet['content']['username']}")
    print(f"内容: {first_tweet['content']['text']}")
    print(f"\n互动数据:")
    print(f"  点赞: {first_tweet['content']['like_count']}")
    print(f"  转发: {first_tweet['content']['retweet_count']}")
    print(f"  回复: {first_tweet['content']['reply_count']}")
    print(f"  浏览: {first_tweet['content']['view_count']}")

## 3. 多关键词搜索（AND 逻辑）

搜索同时包含多个关键词的推文：

In [None]:
payload = {
    "keywords": ["bitcoin", "price"],
    "keyword_mode": "all",  # 必须同时包含 bitcoin 和 price
    "limit": 50
}

response = requests.post(api_url, json=payload, timeout=120)
result = response.json()

print(f"搜索条件: {result['query']}")
print(f"获取推文数: {result['count']}")

## 4. 多关键词搜索（OR 逻辑）

搜索包含任一关键词的推文：

In [None]:
payload = {
    "keywords": ["bitcoin", "ethereum"],
    "keyword_mode": "any",  # 包含 bitcoin 或 ethereum 即可
    "limit": 50
}

response = requests.post(api_url, json=payload, timeout=120)
result = response.json()

print(f"搜索条件: {result['query']}")
print(f"获取推文数: {result['count']}")

## 5. 指定时间范围

搜索指定日期范围内的推文：

In [None]:
# 搜索最近 7 天的推文
end_date = datetime.now()
start_date = end_date - timedelta(days=7)

payload = {
    "keywords": ["AI"],
    "start_date": start_date.strftime("%Y-%m-%dT%H:%M:%S"),
    "end_date": end_date.strftime("%Y-%m-%dT%H:%M:%S"),
    "limit": 100
}

response = requests.post(api_url, json=payload, timeout=120)
result = response.json()

print(f"搜索条件: {result['query']}")
print(f"时间范围: {start_date.date()} 至 {end_date.date()}")
print(f"获取推文数: {result['count']}")

## 6. 搜索指定用户的推文

获取特定用户发布的推文：

In [None]:
payload = {
    "usernames": ["elonmusk"],
    "limit": 30
}

response = requests.post(api_url, json=payload, timeout=120)
result = response.json()

print(f"搜索条件: {result['query']}")
print(f"获取推文数: {result['count']}")

## 7. 组合搜索（用户 + 关键词）

搜索特定用户包含特定关键词的推文：

In [None]:
payload = {
    "keywords": ["AI"],
    "usernames": ["OpenAI", "AnthropicAI"],
    "limit": 50
}

response = requests.post(api_url, json=payload, timeout=120)
result = response.json()

print(f"搜索条件: {result['query']}")
print(f"获取推文数: {result['count']}")

## 8. 数据处理示例

将返回的数据转换为表格格式：

In [None]:
# 提取关键字段
def extract_tweet_info(tweet_data):
    tweets = []
    for item in tweet_data:
        content = item['content']
        tweets.append({
            '用户名': content['username'],
            '发布时间': item['datetime'],
            '推文内容': content['text'][:100] + '...' if len(content['text']) > 100 else content['text'],
            '点赞数': content['like_count'],
            '转发数': content['retweet_count'],
            '浏览数': content['view_count'],
            '链接': item['uri']
        })
    return tweets

# 使用示例
if result['data']:
    tweets_info = extract_tweet_info(result['data'])
    
    # 打印前 5 条
    for i, tweet in enumerate(tweets_info[:5], 1):
        print(f"\n=== 推文 {i} ===")
        for key, value in tweet.items():
            print(f"{key}: {value}")

使用 pandas 展示（需要安装 pandas）：

In [None]:
# 取消注释以安装 pandas
# !pip install pandas

import pandas as pd

if result['data']:
    tweets_info = extract_tweet_info(result['data'])
    df = pd.DataFrame(tweets_info)
    display(df)

## 9. 错误处理

完整的错误处理示例：

In [None]:
def fetch_tweets(payload, max_retries=3):
    """
    获取推文，带重试机制
    """
    api_url = "http://34.68.50.220:8000/api/v1/on_demand_scrape"
    
    for attempt in range(max_retries):
        try:
            response = requests.post(api_url, json=payload, timeout=120)
            response.raise_for_status()
            
            result = response.json()
            print(f"✓ 成功获取 {result['count']} 条推文")
            print(f"  耗时: {result['execution_time_seconds']} 秒")
            return result
            
        except requests.Timeout:
            print(f"✗ 请求超时 (尝试 {attempt + 1}/{max_retries})")
            if attempt < max_retries - 1:
                print("  等待 10 秒后重试...")
                import time
                time.sleep(10)
                
        except requests.HTTPError as e:
            status_code = e.response.status_code
            
            if status_code == 400:
                print(f"✗ 参数错误: {e.response.json()}")
                return None
            elif status_code == 408:
                print(f"✗ 请求超时，建议减少 limit 或缩小时间范围")
                return None
            elif status_code == 503:
                print(f"✗ 服务暂时不可用 (尝试 {attempt + 1}/{max_retries})")
                if attempt < max_retries - 1:
                    print("  等待 30 秒后重试...")
                    import time
                    time.sleep(30)
            else:
                print(f"✗ HTTP 错误 {status_code}: {e.response.text}")
                return None
                
        except Exception as e:
            print(f"✗ 未知错误: {e}")
            return None
    
    print(f"✗ 达到最大重试次数，失败")
    return None

# 使用示例
payload = {
    "keywords": ["crypto"],
    "limit": 50
}

result = fetch_tweets(payload)

## 10. 保存结果

将结果保存到文件：

In [None]:
# 保存为 JSON
if result and result['data']:
    with open('tweets_result.json', 'w', encoding='utf-8') as f:
        json.dump(result, f, ensure_ascii=False, indent=2)
    print("✓ 结果已保存到 tweets_result.json")

# 保存为 CSV
if result and result['data']:
    import pandas as pd
    tweets_info = extract_tweet_info(result['data'])
    df = pd.DataFrame(tweets_info)
    df.to_csv('tweets_result.csv', index=False, encoding='utf-8')
    print("✓ 结果已保存到 tweets_result.csv")

## 参数说明

| 参数 | 类型 | 必填 | 默认值 | 说明 |
|------|------|------|-----------|------|
| `keywords` | 字符串数组 | 否* | - | 搜索关键词（1-10个） |
| `usernames` | 字符串数组 | 否* | - | Twitter 用户名（1-10个） |
| `keyword_mode` | 字符串 | 否 | `"all"` | `"all"` = 必须包含所有关键词<br>`"any"` = 包含任一关键词即可 |
| `start_date` | 时间字符串 | 否 | - | 起始时间，格式 `YYYY-MM-DDTHH:MM:SS` |
| `end_date` | 时间字符串 | 否 | - | 结束时间，格式 `YYYY-MM-DDTHH:MM:SS` |
| `limit` | 整数 | 否 | `100` | 返回推文数量（1-500） |

**注意**: `keywords` 和 `usernames` 至少提供一个

## 性能参考

- 爬取 20 条推文：约 10-15 秒
- 爬取 100 条推文：约 20-30 秒
- 超时限制：90 秒

## 使用建议

1. **limit 设置**：根据需求设置，建议 20-100 条
2. **时间范围**：建议单次不超过 1 个月
3. **超时处理**：客户端超时设置建议 120 秒