In [1]:
import base64
import hmac
import json
import time
import requests
import toml

from urllib.parse import urljoin

# params
base_url = 'https://www.domchdqmse.com'
api_config = toml.load('../config/api.toml')
api_key = api_config['api_key']
secret_key = api_config['secret_key']
passphrase = api_config['passphrase']

In [2]:
def request(method, uri, params=None, body=None, headers=None, auth=False):
    """Initiate network request
   @param method: request method, GET / POST / DELETE / PUT
   @param uri: request uri
   @param params: dict, request query params
   @param body: dict, request body
   @param headers: request http header
   @param auth: boolean, add permission verification or not
   """
    if params:
        query = "&".join(
            ["{}={}".format(k, params[k]) for k in sorted(params.keys())]
        )
        uri += "?" + query
    url = urljoin(base_url, uri)

    if auth:
        timestamp = (
                str(time.time()).split(".")[0]
                + "."
                + str(time.time()).split(".")[1][:3]
        )
        if body:
            body = json.dumps(body)
        else:
            body = ""
        message = str(timestamp) + str.upper(method) + uri + str(body)
        mac = hmac.new(
            bytes(secret_key, encoding="utf8"),
            bytes(message, encoding="utf-8"),
            digestmod="sha256",
        )
        d = mac.digest()
        sign = base64.b64encode(d)

        if not headers:
            headers = {}
        headers["Content-Type"] = "application/json"
        headers["OK-ACCESS-KEY"] = api_key
        headers["OK-ACCESS-SIGN"] = sign
        headers["OK-ACCESS-TIMESTAMP"] = str(timestamp)
        headers["OK-ACCESS-PASSPHRASE"] = passphrase
    result = requests.request(
        method, url, data=body, headers=headers, timeout=10
    ).json()
    if result.get("code") and result.get("code") != "0":
        return None, result
    return result, None

In [3]:
from datetime import datetime

def str_to_timestamp_ms(date_str, format_str="%Y-%m-%d"):
    # 将字符串转换为datetime对象
    dt = datetime.strptime(date_str, format_str)
    # 获取时间戳（秒）并转换为毫秒
    timestamp_ms = int(dt.timestamp() * 1000)
    return timestamp_ms

def timestamp_ms_to_str(timestamp_ms, format_str="%Y-%m-%d"):
    # 将毫秒转换为秒
    timestamp_s = timestamp_ms / 1000
    # 转换为datetime对象
    dt = datetime.fromtimestamp(timestamp_s)
    # 转换为指定格式的字符串
    return dt.strftime(format_str)

In [4]:
import pandas as pd
import time
from datetime import datetime, timedelta

def get_spot_klines(spot, begin, end, period='1D', limit=100):
	all_klines = []
	format_str = "%Y-%m-%d" if period == '1D' else "%Y-%m-%d %H:%M:%S"
	begin_dt = datetime.strptime(begin, format_str)
	end_dt = datetime.strptime(end, format_str)
	
	while end_dt >= begin_dt:
		print(begin_dt)
		diff = abs((end_dt - begin_dt).days)
		if period == '1D':
			params = {
				"instId": spot,
				"before": str_to_timestamp_ms(begin_dt.strftime(format_str), format_str),
				"after": str_to_timestamp_ms((begin_dt + timedelta(days=min(diff, limit))).strftime(format_str), format_str),
				"bar": period,
				"limit": limit,
			}
		elif period == '1H':
			params = {
				"instId": spot,
				"before": str_to_timestamp_ms(begin_dt.strftime(format_str), format_str),
				"after": str_to_timestamp_ms((begin_dt + timedelta(hours=min(diff * 24, limit))).strftime(format_str), format_str),
				"bar": period,
				"limit": limit,
			}
		
		try:
			result = request("GET", "/api/v5/market/history-candles", params=params, auth=True)
			klines = result[0]['data']
			all_klines.extend(klines)
			
			if period == '1D':
				begin_dt += timedelta(days=limit-1)
			elif period == '1H':
				begin_dt += timedelta(hours=limit-1)
			time.sleep(0.1)
			
		except Exception as e:
			print(f"Error fetching data: {e}")
			break
	
	# 处理所有收集到的数据
	df = None
	try:
		if all_klines:
			cols = ['timestamp', 'open', 'high', 'low', 'close', 'vol', 'volCcy', 'volCcyQuote', 'confirm']
			kline_dict = {col: [] for col in cols}
			
			for kline in all_klines:
				kline[0] = timestamp_ms_to_str(int(kline[0]), "%Y-%m-%d %H:%M:%S" if period == '1H' else "%Y-%m-%d")
				for idx, col in enumerate(cols):
					kline_dict[col].append(kline[idx] if col == 'timestamp' else float(kline[idx]))
			
			df = pd.DataFrame(kline_dict)
			# 按时间排序并去重
			df = df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True)
			
	except Exception as e:
		print(f"Error processing data: {e}")
	
	return df

# 使用示例
df = get_spot_klines('BTC-USDT', '2022-01-01', '2025-02-14', period='1D')
df
df.to_csv('./assets/btc_1d.csv', index=False)

2022-01-01 00:00:00
2022-04-10 00:00:00
2022-07-18 00:00:00
2022-10-25 00:00:00
2023-02-01 00:00:00
2023-05-11 00:00:00
2023-08-18 00:00:00
2023-11-25 00:00:00
2024-03-03 00:00:00
2024-06-10 00:00:00
2024-09-17 00:00:00
2024-12-25 00:00:00
