In [1]:
import ccxt
import time
import os
import pandas as pd
from datetime import datetime
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
# 数据格式：
# symbol, exc, ts, timestamp, open, high, low, close, volume

In [None]:
import os
import time
import ccxt
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, timezone, timedelta

# === CONFIG ===
START_DATE = '2020-01-01'
INTERVAL = '1m'
BUCKET = 'crypto.kline.data'
LOCAL_TMP_DIR = './data'
SYMBOL_DIR = 'symbols.txt'

# AWS client
s3 = boto3.client('s3')
exchange = ccxt.binance({
    'options': {
        'defaultType': 'future'  # 关键：指定为 futures，不是 spot！
    }
})
# markets = exchange.load_markets()
symbol = 'BTC/USDT'
# market = exchange.market(symbol)

# === HELPERS ===
def ensure_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def read_symbols(loc):
    with open(loc, 'r') as f:
        symbols = f.readlines()
    return [i.strip() for i in symbols if len(i.strip()) > 0]

# def s3_file_exists(symbol, day):
#     normalized_symbol = symbol.replace("/", "-")
#     s3_key = f"{exchange.id}/{INTERVAL}/{normalized_symbol}/{day}.parquet"
#     try:
#         s3.head_object(Bucket=BUCKET, Key=s3_key)
#         return True
#     except s3.exceptions.ClientError as e:
#         if e.response['Error']['Code'] == "404":
#             return False
#         raise

def fetch_day_ohlcv(symbol, since_ts, until_ts):
    all_candles = []
    ts = since_ts
    while ts < until_ts:
        try:
            candles = exchange.fetch_ohlcv(symbol, timeframe=INTERVAL, since=ts, limit=1000)
            if not candles:
                break
            for c in candles:
                if c[0] >= until_ts:
                    break
                all_candles.append(c)
            ts = candles[-1][0] + 60_000
            time.sleep(0.25)
        except Exception as e:
            print(f"[Error] {symbol} @ {datetime.fromtimestamp(ts / 1000, timezone.utc)} : {e}")
            time.sleep(3)
    return all_candles

def save_daily_parquet_to_s3(symbol, day, candles):
    if not candles:
        print(f"[⚠️ Empty] No data for {symbol} on {day}")
        return

    normalized_symbol = symbol.replace("/", "-")
    s3_key = f"{exchange.id}/{INTERVAL}/{normalized_symbol}/{day}.parquet"
    local_path = os.path.join(LOCAL_TMP_DIR, s3_key.replace("/", "_"))
    ensure_dir(os.path.dirname(local_path))

    table = pa.Table.from_arrays(
        [
            pa.array([row[0] for row in candles], type=pa.int64()),     # timestamp
            pa.array([row[1] for row in candles], type=pa.float64()),   # open
            pa.array([row[2] for row in candles], type=pa.float64()),   # high
            pa.array([row[3] for row in candles], type=pa.float64()),   # low
            pa.array([row[4] for row in candles], type=pa.float64()),   # close
            pa.array([row[5] for row in candles], type=pa.float64()),   # volume
            pa.array([symbol] * len(candles)),                          # symbol
            pa.array([exchange.id] * len(candles)),                     # exchange
            pa.array([INTERVAL] * len(candles)),                        # interval
        ],
        names=["timestamp", "open", "high", "low", "close", "volume", "symbol", "exchange", "interval"]
    )

    pq.write_table(table, local_path, compression='snappy')
    s3.upload_file(local_path, BUCKET, s3_key)
    print(f"[S3 ✅] {symbol} {day} → {s3_key}")


def save_daily_parquet_to_local(symbol, day, candles):
    if not candles:
        print(f"[⚠️ Empty] No data for {symbol} on {day}")
        return

    normalized_symbol = symbol.replace("/", "-")
    local_path = f"{LOCAL_TMP_DIR}/{exchange.id}/{INTERVAL}/{normalized_symbol}/{day}.parquet"
    ensure_dir(os.path.dirname(local_path))

    table = pa.Table.from_arrays(
        [
            pa.array([row[0] for row in candles], type=pa.int64()),     # timestamp
            pa.array([row[1] for row in candles], type=pa.float64()),   # open
            pa.array([row[2] for row in candles], type=pa.float64()),   # high
            pa.array([row[3] for row in candles], type=pa.float64()),   # low
            pa.array([row[4] for row in candles], type=pa.float64()),   # close
            pa.array([row[5] for row in candles], type=pa.float64()),   # volume
            pa.array([symbol] * len(candles)),                          # symbol
            pa.array([exchange.id] * len(candles)),                     # exchange
            pa.array([INTERVAL] * len(candles)),                        # interval
        ],
        names=["timestamp", "open", "high", "low", "close", "volume", "symbol", "exchange", "interval"]
    )

    pq.write_table(table, local_path, compression='snappy')


# === MAIN ===
def main():
    ensure_dir(LOCAL_TMP_DIR)
    symbols = read_symbols(SYMBOL_DIR)
    start_date = datetime.strptime(START_DATE, '%Y-%m-%d').date()
    end_date = datetime.now(timezone.utc).date()

    for symbol in symbols:
        print(f"==> Processing {symbol}")
        current_date = end_date
        while current_date >= start_date:
            day_str = current_date.isoformat()
            # if s3_file_exists(symbol, day_str):
            #     print(f"[🛑 Skip] {symbol} {day_str} already exists in S3")
            #     current_date -= timedelta(days=1)
            #     continue
            since_ts = int(datetime.combine(current_date, datetime.min.time(), tzinfo=timezone.utc).timestamp() * 1000)
            until_ts = since_ts + 24 * 60 * 60 * 1000
            print(f"[📥 Fetching] {symbol} {day_str}")
            candles = fetch_day_ohlcv(symbol, since_ts, until_ts)
            # save_daily_parquet_to_s3(symbol, day_str, candles)
            save_daily_parquet_to_local(symbol, day_str, candles)
            current_date -= timedelta(days=1)

In [3]:
exchange = ccxt.binance()
# markets = exchange.load_markets()
symbol = 'BTC/USDT'

start_date = datetime.strptime(START_DATE, '%Y-%m-%d').date()
end_date = datetime.now(timezone.utc).date()

print(f"==> Processing {symbol}")
current_date = end_date

day_str = current_date.isoformat()
since_ts = int(datetime.combine(current_date, datetime.min.time(), tzinfo=timezone.utc).timestamp() * 1000)
until_ts = since_ts + 24 * 60 * 60 * 1000
print(f"[📥 Fetching] {symbol} {day_str}")
candlesspot = fetch_day_ohlcv(symbol, since_ts, until_ts)

==> Processing BTC/USDT
[📥 Fetching] BTC/USDT 2025-07-27


In [2]:
exchange = ccxt.binance({
    'options': {
        'defaultType': 'future'  # 关键：指定为 futures，不是 spot！
    }
})
# markets = exchange.load_markets()
symbol = 'BTC/USDT'

symbol = 'BTC/USDT'
start_date = datetime.strptime(START_DATE, '%Y-%m-%d').date()
end_date = datetime.now(timezone.utc).date()

print(f"==> Processing {symbol}")
current_date = end_date

day_str = current_date.isoformat()
since_ts = int(datetime.combine(current_date, datetime.min.time(), tzinfo=timezone.utc).timestamp() * 1000)
until_ts = since_ts + 24 * 60 * 60 * 1000
print(f"[📥 Fetching] {symbol} {day_str}")
candles = fetch_day_ohlcv(symbol, since_ts, until_ts)

==> Processing BTC/USDT
[📥 Fetching] BTC/USDT 2025-07-27


In [5]:
candles[:5], candlesspot[:5]

([[1753574400000, 117889.3, 117926.0, 117889.2, 117912.1, 39.633],
  [1753574460000, 117912.1, 117912.1, 117862.3, 117862.3, 29.205],
  [1753574520000, 117862.4, 117880.0, 117862.3, 117871.1, 63.438],
  [1753574580000, 117871.2, 117871.2, 117862.3, 117862.3, 27.468],
  [1753574640000, 117862.3, 117862.4, 117855.0, 117855.1, 23.978]],
 [[1753574400000, 117919.99, 117952.12, 117919.99, 117947.74, 2.34686],
  [1753574460000, 117947.74, 117947.74, 117901.7, 117901.7, 4.25359],
  [1753574520000, 117901.7, 117910.54, 117901.7, 117910.53, 0.87761],
  [1753574580000, 117910.53, 117910.54, 117891.9, 117891.9, 3.7089],
  [1753574640000, 117891.9, 117891.91, 117891.89, 117891.89, 1.04697]])

In [40]:
def get_volumes_by_exc(exchange, pairs):
    res = []
    split = 50
    for k in range(len(pairs)//split + 1):
        pair = pairs[split*k : split*k+split]
        tickers = exchange.fetch_tickers(pair)

        volumes = [
            (symbol, ticker['quoteVolume'])
            for symbol, ticker in tickers.items()
            if ticker.get('quoteVolume')
        ]
        res += volumes
    return res

# 使用 ccxt 自动处理 fetch_tickers（它会拆分请求）
markets = exchange.load_markets()

# 只取现货 USDT 交易对
usdt_pairs = [s for s in markets if s.endswith('/USDT') and not markets[s]['contract']]


volumes = get_volumes_by_exc(exchange, usdt_pairs)

top_50 = sorted(volumes, key=lambda x: x[1], reverse=True)[:50]

# 输出
for i, (symbol, volume) in enumerate(top_50, 1):
    print(f"{i:2d}. {symbol:12s} Volume: {volume:,.2f}")


symbols = [symbol for (symbol, volume) in top_50]

 1. ETH/USDT     Volume: 2,042,389,459.99
 2. BTC/USDT     Volume: 2,016,684,682.47
 3. USDC/USDT    Volume: 1,117,361,857.83
 4. XRP/USDT     Volume: 446,798,143.69
 5. FDUSD/USDT   Volume: 435,025,101.63
 6. SOL/USDT     Volume: 420,366,199.46
 7. PEPE/USDT    Volume: 263,862,716.23
 8. DOGE/USDT    Volume: 213,654,307.53
 9. SUI/USDT     Volume: 132,874,674.55
10. BONK/USDT    Volume: 132,773,980.50
11. UNI/USDT     Volume: 121,399,320.20
12. BNB/USDT     Volume: 90,626,088.13
13. ADA/USDT     Volume: 89,923,554.22
14. SAHARA/USDT  Volume: 89,391,994.90
15. XLM/USDT     Volume: 77,298,990.40
16. WIF/USDT     Volume: 76,395,926.74
17. HYPER/USDT   Volume: 68,905,896.32
18. TRUMP/USDT   Volume: 66,226,899.81
19. PENGU/USDT   Volume: 66,032,115.87
20. NEIRO/USDT   Volume: 61,651,370.47
21. MAGIC/USDT   Volume: 60,806,227.20
22. BANANAS31/USDT Volume: 59,784,635.56
23. AAVE/USDT    Volume: 58,885,155.14
24. LA/USDT      Volume: 57,640,814.71
25. AVAX/USDT    Volume: 52,996,954.03
26. EN

In [None]:
with open('symbols.txt', 'w') as f:
    f.write('\n'.join(symbols))