In [43]:
import requests
import pandas as pd
import time
from dotenv import load_dotenv
import os
from datetime import datetime, timedelta
from functools import reduce
from top_cryptos import get_top_cryptos
from functools import reduce

load_dotenv()

# Constants
COINAPI_KEY = os.getenv('COIN_API_KEY')

GRANULARITY = 3600  # 1 hour in seconds
TOTAL_HOURS = 100000  # total number of hours to fetch
CHUNK_SIZE = 100   # CoinAPI limit per request
TOP_CRYPTO_LIMIT = 40

headers = {
    'Accept': 'application/json',
    'X-CoinAPI-Key': COINAPI_KEY
}

In [44]:
def is_stablecoin(symbol):
    stablecoins = ['USDT', 'USDC', 'DAI', 'BUSD', 'TUSD', 'PAX', 'GUSD', 'HUSD', 'SUSD']
    return any(stablecoin in symbol for stablecoin in stablecoins)

In [45]:
# Function to fetch price and volume for a given symbol
def get_price_and_volume(symbol_id):
    url = f"https://rest.coinapi.io/v1/quotes/{symbol_id}/current"
    try:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            return {
                "time": data.get("time_exchange"),
                "price": data.get("bid_price"),
                "volume": data.get("volume")
            }
        else:
            print(f"Failed to fetch data for {symbol_id}: {response.status_code}")
            print(f"Error message: {response.text}")  # Print the error message from the response
            return None
    except Exception as e:
        print(f"Error fetching data for {symbol_id}: {e}")
        return None

In [46]:
def get_historic_candles(name, symbol_id, granularity, total_hours, chunk_size, now):
    all_data = []
    for i in range(0, total_hours, chunk_size):
        end_time = (now - timedelta(seconds=granularity * i)).replace(microsecond=0)
        start_time = (end_time - timedelta(seconds=granularity * chunk_size)).replace(microsecond=0)

        url = f"https://rest.coinapi.io/v1/ohlcv/{symbol_id}/history"
        params = {
            'period_id': '1HRS',
            'time_start': start_time.isoformat() + 'Z',
            'time_end': end_time.isoformat() + 'Z',
            'limit': chunk_size
        }

        for attempt in range(3):
            r = requests.get(url, headers=headers, params=params)
            if r.status_code == 429:
                print("Rate limit hit. Waiting 60 seconds...")
                time.sleep(60)
                continue
            r.raise_for_status()
            data = r.json()
            if len(data) == 0:  # Return immediately if no rows are fetched
                print(f"No data returned for {start_time} to {end_time}. Exiting.")
                return pd.DataFrame(all_data)  # Return what we have so far
            all_data.extend(data)
            print(f"Fetched {len(data)} rows from of {name} {start_time} to {end_time}")
            break
        else:
            raise Exception("Failed to fetch after multiple attempts.")

        time.sleep(1)  # Polite delay to avoid spamming API

  # Create DataFrame and rename columns
    df = pd.DataFrame(all_data)
    df = df.rename(columns={
        'time_period_start': 'time',
        'price_close': f'{name}_close',
        'volume_traded': f'{name}_volume'
    })
    df['time'] = pd.to_datetime(df['time'])

    # Drop all other columns and keep only time, close, and volume
    df = df[['time', f'{name}_close', f'{name}_volume']]
    df = df.sort_values('time').reset_index(drop=True)
    return df

In [47]:
symbols = get_top_cryptos(TOP_CRYPTO_LIMIT)
print(symbols)

{'BTC': 'COINBASE_SPOT_BTC_USD', 'ETH': 'COINBASE_SPOT_ETH_USD', 'XRP': 'COINBASE_SPOT_XRP_USD', 'SOL': 'COINBASE_SPOT_SOL_USD', 'DOGE': 'COINBASE_SPOT_DOGE_USD', 'TRX': 'KRAKEN_SPOT_TRX_USD', 'ADA': 'COINBASE_SPOT_ADA_USD', 'LINK': 'COINBASE_SPOT_LINK_USD', 'HYPE': 'BITFINEX_SPOT_HYPE_USD', 'SUI': 'COINBASE_SPOT_SUI_USD', 'BCH': 'COINBASE_SPOT_BCH_USD', 'XLM': 'COINBASE_SPOT_XLM_USD', 'AVAX': 'COINBASE_SPOT_AVAX_USD', 'HBAR': 'COINBASE_SPOT_HBAR_USD', 'LTC': 'COINBASE_SPOT_LTC_USD', 'TON': 'KRAKEN_SPOT_TON_USD', 'SHIB': 'COINBASE_SPOT_SHIB_USD', 'WLFI': 'COINBASE_SPOT_WLFI_USD', 'DOT': 'COINBASE_SPOT_DOT_USD', 'UNI': 'COINBASE_SPOT_UNI_USD', 'XMR': 'KRAKEN_SPOT_XMR_USD', 'AAVE': 'COINBASE_SPOT_AAVE_USD', 'ENA': 'COINBASE_SPOT_ENA_USD', 'PEPE': 'COINBASE_SPOT_PEPE_USD', 'MNT': 'KRAKEN_SPOT_MNT_USD', 'ETC': 'COINBASE_SPOT_ETC_USD'}


In [48]:
results = []
nearest_hour_rounded_down = datetime.utcnow().replace(minute=0, second=0, microsecond=0)  # Round down to the nearest hour

for name, symbol_id in symbols.items():
    print(f"Fetching data for {name} ({symbol_id})...")
    df = get_historic_candles(name, symbol_id, GRANULARITY, TOTAL_HOURS, CHUNK_SIZE, nearest_hour_rounded_down)
    if df is not None and not df.empty:  # Null and empty check
        df.to_csv(f"./data/{name}_data.csv", index=False)  # Save to a CSV file
        print(f"Saved data for {name} to {name}_data.csv")
    else:
        print(f"No data fetched for {name} ({symbol_id}). Skipping.")
    time.sleep(1)  # Polite delay

Fetching data for BTC (COINBASE_SPOT_BTC_USD)...


  nearest_hour_rounded_down = datetime.utcnow().replace(minute=0, second=0, microsecond=0)  # Round down to the nearest hour


Fetched 100 rows from of BTC 2025-08-30 12:00:00 to 2025-09-03 16:00:00
Fetched 96 rows from of BTC 2025-08-26 08:00:00 to 2025-08-30 12:00:00
Fetched 100 rows from of BTC 2025-08-22 04:00:00 to 2025-08-26 08:00:00
Fetched 100 rows from of BTC 2025-08-18 00:00:00 to 2025-08-22 04:00:00
Fetched 100 rows from of BTC 2025-08-13 20:00:00 to 2025-08-18 00:00:00
Fetched 100 rows from of BTC 2025-08-09 16:00:00 to 2025-08-13 20:00:00
Fetched 100 rows from of BTC 2025-08-05 12:00:00 to 2025-08-09 16:00:00
Fetched 100 rows from of BTC 2025-08-01 08:00:00 to 2025-08-05 12:00:00
Fetched 100 rows from of BTC 2025-07-28 04:00:00 to 2025-08-01 08:00:00
Fetched 100 rows from of BTC 2025-07-24 00:00:00 to 2025-07-28 04:00:00
Fetched 100 rows from of BTC 2025-07-19 20:00:00 to 2025-07-24 00:00:00
Fetched 100 rows from of BTC 2025-07-15 16:00:00 to 2025-07-19 20:00:00
Fetched 100 rows from of BTC 2025-07-11 12:00:00 to 2025-07-15 16:00:00
Fetched 100 rows from of BTC 2025-07-07 08:00:00 to 2025-07-11 12

[]

In [49]:
def prep(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    # normalize *_close -> *_price (optional)
    rename_map = {c: f"{c.rsplit('_',1)[0]}_price" for c in df.columns if c.endswith('_close')}
    if rename_map:
        df = df.rename(columns=rename_map)

    # parse & sort
    df['time'] = pd.to_datetime(df['time'], format='ISO8601', utc=True)
    df = df.drop_duplicates('time').sort_values('time').set_index('time')

    # optional guards
    for c in df.columns:
        if c.endswith('_price'):
            df[c] = df[c].mask(df[c] <= 0)  # avoid bad logs later
    return df

def merge_asof_many(dfs, tolerance='5min', direction='nearest'):
    dfs = [prep(d) for d in dfs]
    # choose a calendar to control the output timeline:
    # here: use the intersection of timestamps across all dfs (robust),
    # or pick the densest df as the first/left.
    # Simple: start with the densest df as left:
    left = max(dfs, key=lambda d: len(d))
    others = [d for d in dfs if d is not left]

    tol = pd.Timedelta(tolerance)

    def merge_two(left, right):
        # Avoid column collisions: add suffixes if needed
        intersect = set(left.columns).intersection(right.columns)
        if intersect:
            right = right.rename(columns={c: f"{c}_r" for c in intersect})
        return pd.merge_asof(
            left.sort_index(),
            right.sort_index(),
            left_index=True,
            right_index=True,
            direction=direction,
            tolerance=tol
        )

    merged = reduce(merge_two, others, left)
    return merged.reset_index().rename(columns={'index': 'time'})


In [50]:
# Directory containing the CSV files
csv_directory = "./data"  # Replace with the path to your CSV files
output_file = "crypto_market_dataset.csv"  # Name of the output file

# List all CSV files in the directory
csv_files = [f for f in os.listdir(csv_directory) if f.endswith('.csv')]

# Read all CSV files into a list of DataFrames
dataframes = []
for csv_file in csv_files:
    file_path = os.path.join(csv_directory, csv_file)
    print(f"Processing {file_path}...")
    df = pd.read_csv(file_path)
    
    # Check if the CSV has the specific column format
    if list(df.columns) == [
        "time_period_start", "time_period_end", "time_open", "time_close",
        "price_open", "price_high", "price_low", "price_close",
        "volume_traded", "trades_count"
    ]:
        # Rename columns to the correct format
        df = df.rename(columns={
            "time_period_start": "time",
            "price_close": f"{csv_file.split('_')[0]}_close",
            "volume_traded": f"{csv_file.split('_')[0]}_volume"
        })
        # Keep only the relevant columns
        df = df[["time", f"{csv_file.split('_')[0]}_close", f"{csv_file.split('_')[0]}_volume"]]
    
    
    dataframes.append(df)

# Merge all DataFrames on the 'time' column
merged = merge_asof_many(dataframes, tolerance='5min', direction='nearest')

# Save the combined DataFrame to a new CSV file
merged.to_csv(output_file, index=False)
print(f"Combined CSV saved to {output_file}")

Processing ./data/ENA_data.csv...
Processing ./data/TAO_data.csv...
Processing ./data/MNT_data.csv...
Processing ./data/AAVE_data.csv...
Processing ./data/SOL_data.csv...
Processing ./data/SHIB_data.csv...
Processing ./data/XLM_data.csv...
Processing ./data/ETC_data.csv...
Processing ./data/ETH_data.csv...
Processing ./data/PEPE_data.csv...
Processing ./data/UNI_data.csv...
Processing ./data/BTC_data.csv...
Processing ./data/SUI_data.csv...
Processing ./data/XMR_data.csv...
Processing ./data/LINK_data.csv...
Processing ./data/DOGE_data.csv...
Processing ./data/WLFI_data.csv...
Processing ./data/AVAX_data.csv...
Processing ./data/LTC_data.csv...
Processing ./data/XRP_data.csv...
Processing ./data/TRX_data.csv...
Processing ./data/BCH_data.csv...
Processing ./data/DOT_data.csv...
Processing ./data/TON_data.csv...
Processing ./data/ADA_data.csv...
Processing ./data/HYPE_data.csv...
Processing ./data/HBAR_data.csv...
Combined CSV saved to crypto_market_dataset.csv


In [51]:
df = pd.read_csv("./crypto_market_dataset.csv")
df.head()

Unnamed: 0,time,BTC_price,BTC_volume,ENA_price,ENA_volume,TAO_price,TAO_volume,MNT_price,MNT_volume,AAVE_price,...,DOT_price,DOT_volume,TON_price,TON_volume,ADA_price,ADA_volume,HYPE_price,HYPE_volume,HBAR_price,HBAR_volume
0,2015-01-14 16:00:00+00:00,185.91,1.05,,,,,,,,...,,,,,,,,,,
1,2015-01-14 18:00:00+00:00,186.0,0.015362,,,,,,,,...,,,,,,,,,,
2,2015-01-14 19:00:00+00:00,120.0,0.264638,,,,,,,,...,,,,,,,,,,
3,2015-01-15 01:00:00+00:00,192.0,1.13,,,,,,,,...,,,,,,,,,,
4,2015-01-15 04:00:00+00:00,150.0,0.0722,,,,,,,,...,,,,,,,,,,
