## Market Data Puller and Cleaner Notebook
# 
# **Objective:**  
# This notebook extracts 1-minute OHLCV data for BTC/USDT, ETH/USDT, and ADA/USDT from Binance over a two-year period using a round-robin approach. It then cleans the data by reindexing to a full 1-minute interval, identifies missing rows, and saves the cleaned data.
#
# **Structure:**  
# - **Data Extraction:** Pull data from Binance and save raw CSV files.
# - **Data Cleaning:** Load CSVs, reindex to complete 1-minute intervals, report missing rows, and drop rows that are completely empty.

In [1]:
# src/api/binance_client.py

from datetime import datetime
import ccxt
import time

class BinanceClient:
    def __init__(self, api_key=None, secret=None, enable_rate_limit=True):
        """
        Initialize the Binance.US client with optional API keys.
        For public endpoints, keys are not required.
        """
        self.exchange = ccxt.binanceus({
            'apiKey': api_key,
            'secret': secret,
            'enableRateLimit': enable_rate_limit,
        })
        self.load_markets()

    def load_markets(self):
        """Load all markets and store them in the client."""
        try:
            self.markets = self.exchange.load_markets()
            return self.markets
        except Exception as e:
            print("Error loading markets:", e)
            return None

    def get_klines(self, symbol, timeframe='1m', since=None, limit=500):
        """Fetch OHLCV (candlestick) data for a symbol and match Kraken's format."""
        try:
            klines = self.exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=limit)
            return [
                {
                    'timestamp': datetime.utcfromtimestamp(entry[0] / 1000).strftime('%Y-%m-%d %H:%M:%S'),
                    'open': float(entry[1]),
                    'high': float(entry[2]),
                    'low': float(entry[3]),
                    'close': float(entry[4]),
                    'volume': float(entry[5])  # Binance volume at index 5
                }
                for entry in klines
            ]
        except Exception as e:
            print(f"Error fetching klines for {symbol}: {e}")
            return None

In [3]:
import os
import pandas as pd
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import time

# Initialize the Binance client
client = BinanceClient()

# Define the list of symbols and settings
symbols = ['BTC/USDT', 'ETH/USDT', 'ADA/USDT']
timeframe = '1m'

# Define the overall date range: start 2 years ago until yesterday at midnight
start_date = datetime.now() - timedelta(days=730)
end_date = datetime.combine(datetime.now().date() - timedelta(days=1), datetime.min.time())

# Create a dictionary to track each symbol's state (current start timestamp and output file)
symbol_info = {}
for symbol in symbols:
    symbol_info[symbol] = {
        'current_since': int(start_date.timestamp() * 1000),  # in milliseconds for the API
        'file': f"binance_klines_{symbol.replace('/', '')}_2years.csv"
    }

# Remove any existing CSV files so that we start fresh
for info in symbol_info.values():
    if os.path.exists(info['file']):
        os.remove(info['file'])

print(f"Data pull starting from {start_date} until {end_date}\n")

# We'll perform a round-robin data pull until all symbols are finished.
symbols_remaining = symbols.copy()
rotation = 0

while symbols_remaining:
    rotation += 1
    print(f"\n--- Rotation {rotation} ---")
    symbols_to_remove = []
    
    for symbol in symbols_remaining:
        info = symbol_info[symbol]
        current_since = info['current_since']
        current_dt = datetime.utcfromtimestamp(current_since / 1000)
        
        # If we've already reached the end date for this symbol, mark it as finished.
        if current_dt >= end_date:
            print(f"{symbol}: Data pulling complete (current timestamp {current_dt}).")
            symbols_to_remove.append(symbol)
            continue
        
        # Determine the cutoff for this segment (one month from the current pointer or end_date, whichever is earlier)
        next_cutoff = current_dt + relativedelta(months=1)
        if next_cutoff > end_date:
            next_cutoff = end_date
        
        print(f"{symbol}: Pulling data from {current_dt.strftime('%Y-%m-%d %H:%M:%S')} until {next_cutoff.strftime('%Y-%m-%d %H:%M:%S')}")
        segment_data = []
        
        # Fetch data in batches until we reach the next cutoff
        while True:
            current_dt = datetime.utcfromtimestamp(current_since / 1000)
            if current_dt >= next_cutoff:
                break  # We've pulled up to the monthly segment boundary
            
            print(f"{symbol}:  Fetching batch starting at {current_dt.strftime('%Y-%m-%d %H:%M:%S')}")
            klines = client.get_klines(symbol, timeframe=timeframe, since=current_since)
            if not klines:
                print(f"{symbol}: No more data returned. Ending segment.")
                break
            
            segment_data.extend(klines)
            
            # Get the timestamp of the last entry in the fetched batch
            last_timestamp_str = klines[-1]['timestamp']
            last_timestamp_dt = datetime.strptime(last_timestamp_str, '%Y-%m-%d %H:%M:%S')
            # Prepare next batch by adding 1 minute to avoid duplicate data
            current_since = int((last_timestamp_dt + timedelta(minutes=1)).timestamp() * 1000)
            
            # If we've reached or passed our segment cutoff, break
            if last_timestamp_dt >= next_cutoff:
                break
            
            # Pause briefly to respect API rate limits
            time.sleep(1)
        
        # Save the fetched segment data if available
        if segment_data:
            df_segment = pd.DataFrame(segment_data)
            # Sort by timestamp to ensure order
            df_segment = df_segment.sort_values('timestamp')
            file_name = info['file']
            # Append or create a new CSV file as needed
            if not os.path.exists(file_name):
                df_segment.to_csv(file_name, index=False, mode='w')
            else:
                df_segment.to_csv(file_name, index=False, mode='a', header=False)
            print(f"{symbol}: Saved {df_segment.shape[0]} rows to {file_name}")
        else:
            print(f"{symbol}: No new data in this segment.")
        
        # Update the symbol's current pointer
        info['current_since'] = current_since
    
    # Remove symbols that have reached the end date
    for symbol in symbols_to_remove:
        symbols_remaining.remove(symbol)
    
    # Pause briefly between rotations to help manage rate limits overall
    time.sleep(1)

print("\nData pulling complete for all symbols.")

Data pull starting from 2023-03-05 16:32:53.365427 until 2025-03-03 00:00:00


--- Rotation 1 ---
BTC/USDT: Pulling data from 2023-03-05 22:32:53 until 2023-04-05 22:32:53
BTC/USDT:  Fetching batch starting at 2023-03-05 22:32:53


  current_dt = datetime.utcfromtimestamp(current_since / 1000)
  current_dt = datetime.utcfromtimestamp(current_since / 1000)
  'timestamp': datetime.utcfromtimestamp(entry[0] / 1000).strftime('%Y-%m-%d %H:%M:%S'),


BTC/USDT:  Fetching batch starting at 2023-03-06 12:53:00
BTC/USDT:  Fetching batch starting at 2023-03-07 03:13:00
BTC/USDT:  Fetching batch starting at 2023-03-07 17:33:00
BTC/USDT:  Fetching batch starting at 2023-03-08 07:53:00
BTC/USDT:  Fetching batch starting at 2023-03-08 22:13:00
BTC/USDT:  Fetching batch starting at 2023-03-09 12:33:00
BTC/USDT:  Fetching batch starting at 2023-03-10 02:53:00
BTC/USDT:  Fetching batch starting at 2023-03-10 17:13:00
BTC/USDT:  Fetching batch starting at 2023-03-11 07:33:00
BTC/USDT:  Fetching batch starting at 2023-03-11 21:53:00
BTC/USDT:  Fetching batch starting at 2023-03-12 11:13:00
BTC/USDT:  Fetching batch starting at 2023-03-13 00:33:00
BTC/USDT:  Fetching batch starting at 2023-03-13 13:53:00
BTC/USDT:  Fetching batch starting at 2023-03-14 03:13:00
BTC/USDT:  Fetching batch starting at 2023-03-14 16:33:00
BTC/USDT:  Fetching batch starting at 2023-03-15 05:53:00
BTC/USDT:  Fetching batch starting at 2023-03-15 19:13:00
BTC/USDT:  Fet

## Saving Cleaned DataFrames
# 
# In this next part, we drop rows that are completely empty (all columns NaN) and save the cleaned DataFrames to new CSV files.

In [12]:
import pandas as pd
from datetime import datetime

# Define the file paths for the three symbols
files = {
    'BTC/USDT': 'binance_klines_BTCUSDT_2years.csv',
    'ETH/USDT': 'binance_klines_ETHUSDT_2years.csv',
    'ADA/USDT': 'binance_klines_ADAUSDT_2years.csv'
}

# Create a dictionary to hold the cleaned DataFrames
cleaned_dfs = {}
print("=== Data Cleaning and Missing Row Check ===\n")

=== Data Cleaning and Missing Row Check ===



In [14]:
for symbol, filepath in files.items():
    print(f"Processing {symbol} from file {filepath}")
    
    # Load the CSV file
    df = pd.read_csv(filepath)
    print(df)
    # Convert the 'timestamp' column to datetime
    df['timestamp'] = pd.to_datetime(df['timestamp'], format='%Y-%m-%d %H:%M:%S')
    
    # Sort the data by timestamp and set it as the index
    df.sort_values('timestamp', inplace=True)
    df.set_index('timestamp', inplace=True)
    
    # Create a full date range with 1-minute intervals from the minimum to maximum timestamp
    full_range = pd.date_range(start=df.index.min(), end=df.index.max(), freq='T')
    
    # Reindex the DataFrame to the full date range
    df_full = df.reindex(full_range)
    
    # Count the number of missing rows (rows with any NaN values)
    missing_count = df_full.isna().any(axis=1).sum()
    total_expected = len(full_range)
    
    print(f"Total expected rows: {total_expected}")
    print(f"Missing rows: {missing_count}")
    
    if missing_count > 0:
        # Display the first 5 missing timestamps as a sample
        missing_timestamps = df_full[df_full.isna().any(axis=1)].index.tolist()[:5]
        print("Sample missing timestamps:", missing_timestamps)
    else:
        print("No missing rows found.")
    
    print("\n" + "-"*50 + "\n")
    
    # Store the cleaned DataFrame for later use (without saving yet)
    cleaned_dfs[symbol] = df_full

print("Data cleaning and review complete.")

Processing BTC/USDT from file binance_klines_BTCUSDT_2years.csv
                  timestamp      open      high       low     close   volume
0       2023-03-05 22:33:00  22404.50  22411.02  22397.89  22397.89  1.36425
1       2023-03-05 22:34:00  22392.22  22409.44  22392.22  22407.91  0.18043
2       2023-03-05 22:35:00  22410.55  22428.09  22410.55  22428.09  0.29686
3       2023-03-05 22:36:00  22434.58  22434.58  22417.31  22429.73  0.83456
4       2023-03-05 22:37:00  22429.64  22431.52  22423.88  22427.08  1.19932
...                     ...       ...       ...       ...       ...      ...
639495  2025-03-02 19:48:00  93421.24  93491.57  93421.24  93491.57  0.00431
639496  2025-03-02 19:49:00  93491.57  93491.57  93449.26  93449.26  0.00153
639497  2025-03-02 19:50:00  93449.26  93449.26  93449.26  93449.26  0.00000
639498  2025-03-02 19:51:00  93449.26  93449.26  93449.26  93449.26  0.00000
639499  2025-03-02 19:52:00  93449.26  93449.26  93449.26  93449.26  0.00000

[639500 row

  full_range = pd.date_range(start=df.index.min(), end=df.index.max(), freq='T')


Sample missing timestamps: [Timestamp('2023-03-06 06:53:00'), Timestamp('2023-03-06 06:54:00'), Timestamp('2023-03-06 06:55:00'), Timestamp('2023-03-06 06:56:00'), Timestamp('2023-03-06 06:57:00')]

--------------------------------------------------

Processing ETH/USDT from file binance_klines_ETHUSDT_2years.csv
                  timestamp     open     high      low    close   volume
0       2023-03-05 22:33:00  1563.87  1564.15  1561.51  1562.12  48.4015
1       2023-03-05 22:34:00  1562.14  1562.82  1562.14  1562.82   5.9740
2       2023-03-05 22:35:00  1563.32  1564.55  1563.32  1564.38   1.8214
3       2023-03-05 22:36:00  1565.50  1565.61  1564.06  1565.61  36.4439
4       2023-03-05 22:37:00  1565.61  1565.61  1565.61  1565.61   0.0000
...                     ...      ...      ...      ...      ...      ...
639495  2025-03-02 19:48:00  2496.55  2496.55  2496.55  2496.55   0.0000
639496  2025-03-02 19:49:00  2499.33  2499.33  2499.29  2499.29   3.3149
639497  2025-03-02 19:50:00 

  full_range = pd.date_range(start=df.index.min(), end=df.index.max(), freq='T')


Sample missing timestamps: [Timestamp('2023-03-06 06:53:00'), Timestamp('2023-03-06 06:54:00'), Timestamp('2023-03-06 06:55:00'), Timestamp('2023-03-06 06:56:00'), Timestamp('2023-03-06 06:57:00')]

--------------------------------------------------

Processing ADA/USDT from file binance_klines_ADAUSDT_2years.csv
                  timestamp    open    high     low   close  volume
0       2023-03-05 22:33:00  0.3369  0.3369  0.3369  0.3369     0.0
1       2023-03-05 22:34:00  0.3369  0.3369  0.3369  0.3369     0.0
2       2023-03-05 22:35:00  0.3369  0.3369  0.3369  0.3369     0.0
3       2023-03-05 22:36:00  0.3369  0.3369  0.3369  0.3369     0.0
4       2023-03-05 22:37:00  0.3369  0.3369  0.3369  0.3369     0.0
...                     ...     ...     ...     ...     ...     ...
639495  2025-03-02 19:48:00  1.0397  1.0459  1.0397  1.0399   162.9
639496  2025-03-02 19:49:00  1.0466  1.0467  1.0412  1.0455  2372.2
639497  2025-03-02 19:50:00  1.0443  1.0443  1.0392  1.0415  3951.0
63949

  full_range = pd.date_range(start=df.index.min(), end=df.index.max(), freq='T')


Sample missing timestamps: [Timestamp('2023-03-06 06:53:00'), Timestamp('2023-03-06 06:54:00'), Timestamp('2023-03-06 06:55:00'), Timestamp('2023-03-06 06:56:00'), Timestamp('2023-03-06 06:57:00')]

--------------------------------------------------

Data cleaning and review complete.


In [9]:
print("BTC/USDT Cleaned Data Sample:")
display(cleaned_dfs['BTC/USDT'].head())

BTC/USDT Cleaned Data Sample:


Unnamed: 0,open,high,low,close,volume
2023-03-05 22:33:00,22404.5,22411.02,22397.89,22397.89,1.36425
2023-03-05 22:34:00,22392.22,22409.44,22392.22,22407.91,0.18043
2023-03-05 22:35:00,22410.55,22428.09,22410.55,22428.09,0.29686
2023-03-05 22:36:00,22434.58,22434.58,22417.31,22429.73,0.83456
2023-03-05 22:37:00,22429.64,22431.52,22423.88,22427.08,1.19932


In [8]:
import os

for symbol, df in cleaned_dfs.items():
    # Drop rows where all values are NaN
    df_dropped = df.dropna(how='all')
    print(f"{symbol}: After dropping empty rows, remaining rows = {df_dropped.shape[0]}")
    
    # Define a new filename for the saved cleaned data
    cleaned_filename = f"binance_klines_{symbol.replace('/', '')}_2years_cleaned_dropped.csv"
    
    # Save the DataFrame to CSV
    df_dropped.to_csv(cleaned_filename, index_label='timestamp')
    print(f"Cleaned data saved to {cleaned_filename}\n")

BTC/USDT: After dropping empty rows, remaining rows = 639500
Cleaned data saved to binance_klines_BTCUSDT_2years_cleaned_dropped.csv

ETH/USDT: After dropping empty rows, remaining rows = 639500
Cleaned data saved to binance_klines_ETHUSDT_2years_cleaned_dropped.csv

ADA/USDT: After dropping empty rows, remaining rows = 639500
Cleaned data saved to binance_klines_ADAUSDT_2years_cleaned_dropped.csv

