In [1]:
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
from datetime import datetime
import os

API_KEY = "TEczHISqFV6E0CZybwku74twqRSRJLU2dHwQ8o8UbU0hzfOJ"
BASE_URL = 'https://api.datasource.cybotrade.rs'

# Convert the input date string (YYYY-MM-DD) to UNIX timestamp in milliseconds for params purpose
def date_to_unix_ms(date_str):
    dt = datetime.strptime(date_str, "%Y-%m-%d")
    return int(dt.timestamp() * 1000)

In [2]:
# Function to make API requests with error handling
def fetch_cryptoquant_data(endpoint, params=None, max_retries=3):
    """
    Fetch data from CryptoQuant API with error handling
    
    Parameters:
    - endpoint: API endpoint path
    - params: Dictionary of query parameters
    - max_retries: Maximum number of retry attempts on failure
    
    Returns:
    - JSON response or None if failed
    """
    if params is None:
        params = {}
    
    # Format endpoint according to example: cryptoquant|btc/exchange-flows/inflow?exchange=okx&window=hour
    # We need to include parameters in the URL structure for endpoints
    url = f"{BASE_URL}/cryptoquant/{endpoint}"
    
    headers = {"X-API-Key": API_KEY}
    
    # Debug output
    print(f"Making API request to: {url}")
    print(f"With parameters: {params}")
    
    retry_count = 0
    while retry_count < max_retries:
        try:
            response = requests.get(url, headers=headers, params=params)
            
            print(f"Response status code: {response.status_code}")
            
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:
                # Rate limit hit
                print(f"Rate limit exceeded (429). Waiting before retry...")
                retry_count += 1
                time.sleep(60)  # Wait 60 seconds before retrying
            else:
                print(f"Error {response.status_code}: {response.text}")
                return None
        except Exception as e:
            print(f"Exception during API request: {str(e)}")
            retry_count += 1
            time.sleep(5)
    
    print(f"Failed after {max_retries} attempts")
    return None


In [None]:
# Function to collect Bitcoin data
def collect_btc_data(start_date, end_date):
    """
    Collect Bitcoin data from CryptoQuant API
    
    Parameters:
    - start_date: Start date in format YYYY-MM-DD
    - end_date: End date in format YYYY-MM-DD
    
    Returns:
    - Dictionary of pandas DataFrames with different data types
    """
    # Convert dates to timestamps
    start_ts = date_to_unix_ms(start_date)
    end_ts = date_to_unix_ms(end_date)
    
    # Dictionary to store all collected data
    data_dict = {}
    
    # Based on the error message, we need to specify either start_time+end_time or start_time+limit or end_time+limit
    # Let's use start_time and end_time as our parameters
    base_params = {
        "start_time": start_ts,
        "end_time": end_ts
    }
    
    # Based on the example from the screenshot: cryptoquant|btc/exchange-flows/inflow?exchange=okx&window=hour
    # Test with a single endpoint using the exact format from the example
    # test_endpoint = "btc/exchange-flows/inflow"
    # test_params = base_params.copy()
    # test_params.update({
    #     "exchange": "okx",
    #     "window": "hour"
    # })
    
    # print("\nTesting API connection with example endpoint...")
    # test_response = fetch_cryptoquant_data(test_endpoint, test_params)
    
    # if test_response:
    #     print("API test successful!")
    #     print("Example response structure:")
    #     if "data" in test_response:
    #         print(f"Data entries: {len(test_response['data'])}")
    #         if len(test_response['data']) > 0:
    #             print("First data entry:")
    #             print(test_response['data'][0])
    #     else:
    #         print("Response structure:", test_response.keys())
    # else:
    #     print("API test failed. Check your API key and endpoint format.")
    #     return data_dict
    
    # List of endpoints to collect based on successful test
    endpoints = [
        # Basic format: provider|asset/category/metric
        {"name": "exchange_inflow", "endpoint": "btc/exchange-flows/inflow", "params": {"exchange": "all_exchange","window": "hour"}},
        {"name": "exchange_outflow", "endpoint": "btc/exchange-flows/outflow", "params": {"exchange": "all_exchange", "window": "hour"}},
        {"name": "exchange_netflow", "endpoint": "btc/exchange-flows/netflow", "params": {"exchange": "all_exchange", "window": "hour"}},
        # {"name": "exchange_inflow", "endpoint": "btc/exchange-flows/inflow", "params": {"window": "hour"}},
        # {"name": "price", "endpoint": "btc/market-data/price-ohlcv", "params":  {"window": "hour"}},
        # Add more endpoints based on what works in the test
    ]
    
    # Collect data for each endpoint
    for endpoint_info in endpoints:
        name = endpoint_info["name"]
        endpoint = endpoint_info["endpoint"]
        
        print(f"\nCollecting {name} data...")
        
        # Combine base parameters with endpoint-specific parameters
        params = base_params.copy()
        if "params" in endpoint_info:
            params.update(endpoint_info["params"])
        
        # Make API request
        response = fetch_cryptoquant_data(endpoint, params)
        
        # Process response
        if response and "data" in response:
            df = pd.DataFrame(response["data"])
            
            # Process timestamp if present
            if "timestamp" in df.columns:
                df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms")
                df.set_index("datetime", inplace=True)
            
            # Store in dictionary
            data_dict[name] = df
            print(f"Successfully collected {len(df)} records for {name}")
        else:
            print(f"Failed to collect {name} data")
        
        # Add delay between requests
        time.sleep(5)
    
    return data_dict

In [15]:
# Define time periods - starting with a shorter period for testing
test_period = ("2020-01-01", "2020-01-31")  # Just one month for testing
    
print(f"\n=== Testing API with period {test_period[0]} to {test_period[1]} ===")
data = collect_btc_data(*test_period)
    
if data:
    print("\nData collection successful!")
    print("Available datasets:")
    for name, df in data.items():
        print(f"- {name}: {len(df)} records")
        print(df.head(2))
else:
    print("\nNo data was collected. Check API endpoints and parameters.")


=== Testing API with period 2020-01-01 to 2020-01-31 ===

Collecting exchange_inflow data...
Making API request to: https://api.datasource.cybotrade.rs/cryptoquant/btc/exchange-flows/inflow
With parameters: {'start_time': 1577808000000, 'end_time': 1580400000000, 'window': 'hour'}
Response status code: 404
Error 404: {"error":"failed to request 'cryptoquant|btc/exchange-flows/inflow?window=hour': do not retry: HTTP status client error (400 Bad Request) for url (https://api.cryptoquant.com/v1/btc/exchange-flows/inflow?window=hour&limit=100000&from=20191231T160000&to=20310529T070000) (Ok(\"{\\\"result\\\":{},\\\"status\\\":{\\\"code\\\":400,\\\"description\\\":\\\"400 Bad Request: The browser (or proxy) sent a request that this server could not understand.\\\",\\\"message\\\":\\\"bad_request\\\"}}\\n\"))"}
Failed to collect exchange_inflow data


KeyboardInterrupt: 