# 币安BTC K线数据分析

本notebook从币安API拉取BTC最近90天的小时级K线数据，生成每日K线图并上传到S3存储桶。

## 1. 安装和导入必要的库

In [None]:
# 安装必要的包
!pip install requests pandas matplotlib seaborn boto3 python-binance plotly

In [None]:
import requests
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.patches import Rectangle
import seaborn as sns
import boto3
from datetime import datetime, timedelta
import io
import os
from binance.client import Client
import numpy as np
import warnings
warnings.filterwarnings('ignore')

# 设置matplotlib中文字体支持
plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

## 2. 配置参数

In [None]:
# 配置参数
SYMBOL = 'BTCUSDT'
DAYS_BACK = 90
INTERVAL = '1h'  # 1小时K线

# S3配置 - 请根据实际情况修改
S3_BUCKET = 'your-s3-bucket-name'  # 替换为你的S3存储桶名称
S3_PREFIX = 'btc-kline-charts/'    # S3中的文件夹前缀

# AWS区域
AWS_REGION = 'us-east-1'  # 根据需要修改区域

## 3. 从币安API获取K线数据

In [None]:
def get_binance_klines(symbol, interval, days_back):
    """
    从币安API获取K线数据
    """
    # 计算开始时间
    end_time = datetime.now()
    start_time = end_time - timedelta(days=days_back)
    
    # 转换为毫秒时间戳
    start_timestamp = int(start_time.timestamp() * 1000)
    end_timestamp = int(end_time.timestamp() * 1000)
    
    # 币安API端点
    url = 'https://api.binance.com/api/v3/klines'
    
    params = {
        'symbol': symbol,
        'interval': interval,
        'startTime': start_timestamp,
        'endTime': end_timestamp,
        'limit': 1000  # 每次最多获取1000条数据
    }
    
    all_data = []
    
    while start_timestamp < end_timestamp:
        params['startTime'] = start_timestamp
        
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            
            if not data:
                break
                
            all_data.extend(data)
            
            # 更新开始时间为最后一条数据的时间+1毫秒
            start_timestamp = data[-1][6] + 1  # 使用close time + 1
            
            print(f"已获取 {len(all_data)} 条K线数据...")
            
        except requests.exceptions.RequestException as e:
            print(f"API请求错误: {e}")
            break
    
    return all_data

# 获取K线数据
print(f"正在获取 {SYMBOL} 最近 {DAYS_BACK} 天的 {INTERVAL} K线数据...")
kline_data = get_binance_klines(SYMBOL, INTERVAL, DAYS_BACK)
print(f"总共获取了 {len(kline_data)} 条K线数据")

## 4. 数据处理和清洗

In [None]:
def process_kline_data(kline_data):
    """
    处理K线数据，转换为DataFrame格式
    """
    columns = [
        'open_time', 'open', 'high', 'low', 'close', 'volume',
        'close_time', 'quote_asset_volume', 'number_of_trades',
        'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'
    ]
    
    df = pd.DataFrame(kline_data, columns=columns)
    
    # 转换数据类型
    numeric_columns = ['open', 'high', 'low', 'close', 'volume', 
                      'quote_asset_volume', 'number_of_trades',
                      'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']
    
    for col in numeric_columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    
    # 转换时间戳
    df['open_time'] = pd.to_datetime(df['open_time'], unit='ms')
    df['close_time'] = pd.to_datetime(df['close_time'], unit='ms')
    
    # 添加日期列
    df['date'] = df['open_time'].dt.date
    
    # 设置索引
    df.set_index('open_time', inplace=True)
    
    return df

# 处理数据
df = process_kline_data(kline_data)
print(f"数据处理完成，数据形状: {df.shape}")
print(f"数据时间范围: {df.index.min()} 到 {df.index.max()}")
df.head()

## 5. 生成每日K线图

In [None]:
def create_daily_kline_chart(daily_data, date_str):
    """
    创建单日K线图
    """
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10), 
                                   gridspec_kw={'height_ratios': [3, 1]})
    
    # 主图：K线图
    for idx, (timestamp, row) in enumerate(daily_data.iterrows()):
        # 确定颜色（红涨绿跌）
        color = 'red' if row['close'] >= row['open'] else 'green'
        
        # 绘制影线
        ax1.plot([idx, idx], [row['low'], row['high']], color='black', linewidth=1)
        
        # 绘制实体
        height = abs(row['close'] - row['open'])
        bottom = min(row['open'], row['close'])
        
        rect = Rectangle((idx-0.3, bottom), 0.6, height, 
                        facecolor=color, edgecolor='black', alpha=0.8)
        ax1.add_patch(rect)
    
    # 设置主图
    ax1.set_title(f'BTC/USDT {date_str} 小时K线图', fontsize=16, fontweight='bold')
    ax1.set_ylabel('价格 (USDT)', fontsize=12)
    ax1.grid(True, alpha=0.3)
    
    # 设置x轴标签
    hours = [f"{i:02d}:00" for i in range(0, 24, 2)]
    ax1.set_xticks(range(0, len(daily_data), 2))
    ax1.set_xticklabels(hours, rotation=45)
    
    # 添加价格信息
    daily_open = daily_data.iloc[0]['open']
    daily_close = daily_data.iloc[-1]['close']
    daily_high = daily_data['high'].max()
    daily_low = daily_data['low'].min()
    daily_volume = daily_data['volume'].sum()
    
    change = daily_close - daily_open
    change_pct = (change / daily_open) * 100
    
    info_text = f"开盘: ${daily_open:.2f} | 收盘: ${daily_close:.2f} | 最高: ${daily_high:.2f} | 最低: ${daily_low:.2f}\n"
    info_text += f"涨跌: ${change:.2f} ({change_pct:+.2f}%) | 成交量: {daily_volume:.2f}"
    
    ax1.text(0.02, 0.98, info_text, transform=ax1.transAxes, 
             verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
    
    # 副图：成交量
    colors = ['red' if row['close'] >= row['open'] else 'green' 
              for _, row in daily_data.iterrows()]
    
    ax2.bar(range(len(daily_data)), daily_data['volume'], color=colors, alpha=0.7)
    ax2.set_ylabel('成交量', fontsize=12)
    ax2.set_xlabel('时间', fontsize=12)
    ax2.grid(True, alpha=0.3)
    
    ax2.set_xticks(range(0, len(daily_data), 2))
    ax2.set_xticklabels(hours, rotation=45)
    
    plt.tight_layout()
    
    return fig

# 按日期分组数据
daily_groups = df.groupby('date')
print(f"总共有 {len(daily_groups)} 天的数据")

## 6. 初始化S3客户端

In [None]:
# 初始化S3客户端
try:
    s3_client = boto3.client('s3', region_name=AWS_REGION)
    print("S3客户端初始化成功")
    
    # 检查存储桶是否存在
    try:
        s3_client.head_bucket(Bucket=S3_BUCKET)
        print(f"S3存储桶 '{S3_BUCKET}' 可访问")
    except:
        print(f"警告: 无法访问S3存储桶 '{S3_BUCKET}'，请检查存储桶名称和权限")
        print("将在本地保存图片文件")
        S3_BUCKET = None
        
except Exception as e:
    print(f"S3客户端初始化失败: {e}")
    print("将在本地保存图片文件")
    S3_BUCKET = None

## 7. 生成图表并上传到S3

In [None]:
def upload_to_s3(fig, filename, s3_client, bucket, prefix):
    """
    将matplotlib图表上传到S3
    """
    if not bucket:
        # 本地保存
        local_dir = 'btc_charts'
        os.makedirs(local_dir, exist_ok=True)
        local_path = os.path.join(local_dir, filename)
        fig.savefig(local_path, dpi=300, bbox_inches='tight')
        print(f"图表已保存到本地: {local_path}")
        return local_path
    
    try:
        # 将图表保存到内存中的字节流
        img_buffer = io.BytesIO()
        fig.savefig(img_buffer, format='png', dpi=300, bbox_inches='tight')
        img_buffer.seek(0)
        
        # 上传到S3
        s3_key = f"{prefix}{filename}"
        s3_client.upload_fileobj(
            img_buffer, 
            bucket, 
            s3_key,
            ExtraArgs={'ContentType': 'image/png'}
        )
        
        s3_url = f"https://{bucket}.s3.{AWS_REGION}.amazonaws.com/{s3_key}"
        print(f"图表已上传到S3: {s3_url}")
        return s3_url
        
    except Exception as e:
        print(f"上传到S3失败: {e}")
        # 备用：保存到本地
        local_dir = 'btc_charts'
        os.makedirs(local_dir, exist_ok=True)
        local_path = os.path.join(local_dir, filename)
        fig.savefig(local_path, dpi=300, bbox_inches='tight')
        print(f"图表已保存到本地: {local_path}")
        return local_path

# 生成所有日期的K线图
uploaded_files = []
total_days = len(daily_groups)

print(f"开始生成 {total_days} 天的K线图...")

for i, (date, daily_data) in enumerate(daily_groups, 1):
    date_str = date.strftime('%Y-%m-%d')
    
    # 确保有足够的数据（至少有几个小时的数据）
    if len(daily_data) < 5:
        print(f"跳过 {date_str}：数据不足 ({len(daily_data)} 条记录)")
        continue
    
    print(f"正在处理 {date_str} ({i}/{total_days})...")
    
    try:
        # 创建K线图
        fig = create_daily_kline_chart(daily_data, date_str)
        
        # 生成文件名
        filename = f"BTC_USDT_1h_{date_str}.png"
        
        # 上传到S3或保存到本地
        file_location = upload_to_s3(fig, filename, s3_client, S3_BUCKET, S3_PREFIX)
        uploaded_files.append({
            'date': date_str,
            'filename': filename,
            'location': file_location
        })
        
        # 关闭图表以释放内存
        plt.close(fig)
        
    except Exception as e:
        print(f"处理 {date_str} 时出错: {e}")
        continue

print(f"\n完成！总共生成了 {len(uploaded_files)} 个K线图")

## 8. 生成汇总报告

In [None]:
# 创建汇总报告
summary_df = pd.DataFrame(uploaded_files)

if not summary_df.empty:
    print("\n=== 生成的K线图汇总 ===")
    print(summary_df.to_string(index=False))
    
    # 保存汇总报告
    summary_filename = f"btc_kline_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    
    if S3_BUCKET:
        # 上传汇总报告到S3
        csv_buffer = io.StringIO()
        summary_df.to_csv(csv_buffer, index=False)
        csv_buffer.seek(0)
        
        try:
            s3_client.upload_fileobj(
                io.BytesIO(csv_buffer.getvalue().encode()),
                S3_BUCKET,
                f"{S3_PREFIX}{summary_filename}",
                ExtraArgs={'ContentType': 'text/csv'}
            )
            print(f"\n汇总报告已上传到S3: {S3_PREFIX}{summary_filename}")
        except Exception as e:
            print(f"上传汇总报告失败: {e}")
            summary_df.to_csv(summary_filename, index=False)
            print(f"汇总报告已保存到本地: {summary_filename}")
    else:
        summary_df.to_csv(summary_filename, index=False)
        print(f"\n汇总报告已保存到本地: {summary_filename}")
else:
    print("\n没有成功生成任何K线图")

## 9. 数据统计分析

In [None]:
# 生成数据统计
print("\n=== BTC价格统计分析 ===")
print(f"数据时间范围: {df.index.min().strftime('%Y-%m-%d %H:%M')} 到 {df.index.max().strftime('%Y-%m-%d %H:%M')}")
print(f"总数据点数: {len(df)}")
print(f"\n价格统计:")
print(f"  最高价: ${df['high'].max():.2f}")
print(f"  最低价: ${df['low'].min():.2f}")
print(f"  平均价: ${df['close'].mean():.2f}")
print(f"  期间涨跌: ${df['close'].iloc[-1] - df['open'].iloc[0]:.2f}")
print(f"  期间涨跌幅: {((df['close'].iloc[-1] / df['open'].iloc[0]) - 1) * 100:.2f}%")
print(f"\n成交量统计:")
print(f"  总成交量: {df['volume'].sum():.2f} BTC")
print(f"  平均小时成交量: {df['volume'].mean():.2f} BTC")
print(f"  最大小时成交量: {df['volume'].max():.2f} BTC")

# 计算每日统计
daily_stats = df.groupby('date').agg({
    'open': 'first',
    'high': 'max',
    'low': 'min',
    'close': 'last',
    'volume': 'sum'
})

daily_stats['daily_change'] = daily_stats['close'] - daily_stats['open']
daily_stats['daily_change_pct'] = (daily_stats['daily_change'] / daily_stats['open']) * 100

print(f"\n每日统计:")
print(f"  上涨天数: {(daily_stats['daily_change'] > 0).sum()}")
print(f"  下跌天数: {(daily_stats['daily_change'] < 0).sum()}")
print(f"  最大单日涨幅: {daily_stats['daily_change_pct'].max():.2f}%")
print(f"  最大单日跌幅: {daily_stats['daily_change_pct'].min():.2f}%")
print(f"  平均日波动率: {daily_stats['daily_change_pct'].abs().mean():.2f}%")

## 10. 完成提示

In [None]:
print("\n" + "="*50)
print("🎉 BTC K线数据分析完成！")
print("="*50)

if S3_BUCKET:
    print(f"📊 所有K线图已上传到S3存储桶: {S3_BUCKET}")
    print(f"📁 S3路径前缀: {S3_PREFIX}")
else:
    print(f"📊 所有K线图已保存到本地目录: btc_charts/")

print(f"📈 总共生成了 {len(uploaded_files)} 个每日K线图")
print(f"📋 汇总报告文件: {summary_filename if 'summary_filename' in locals() else 'N/A'}")
print(f"⏰ 处理完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

print("\n如需重新运行或修改参数，请修改第2节的配置参数后重新执行相关单元格。")