In [1]:
import requests
import os
import datetime
import zipfile
from tqdm import tqdm  # Import tqdm for progress bars

# Define the base URL for downloading the ZIP files
base_url = 'https://data.binance.vision/data/futures/um/daily/aggTrades/ETHUSDT/'

# Set the download directory for ZIP files
zip_download_dir = '/allah/freqtrade/json_dict/binance_aggTrades'
# Set the directory for decompressed CSV files
csv_extract_dir = '/allah/freqtrade/json_dict/decompressed_csv'

# Create the download and extract directories if they don't exist
os.makedirs(zip_download_dir, exist_ok=True)
os.makedirs(csv_extract_dir, exist_ok=True)

# Define the start and end dates for the data
start_date = datetime.date(2023, 1, 1)
end_date = datetime.date(2023, 10, 10)

# Create a progress bar for downloading
download_bar = tqdm(total=(end_date - start_date).days + 1, desc="Downloading")

# Iterate through dates and download files
current_date = start_date
while current_date <= end_date:
    # Generate the download link for the current date
    date_str = current_date.strftime('%Y-%m-%d')
    link = f'{base_url}ETHUSDT-aggTrades-{date_str}.zip'
    
    # Download the file
    response = requests.get(link)

    # Check if the request was successful
    if response.status_code == 200:
        # Get the filename from the URL
        filename = os.path.join(zip_download_dir, f'ETHUSDT-aggTrades-{date_str}.zip')

        # Save the downloaded file
        with open(filename, 'wb') as file:
            file.write(response.content)

        download_bar.update(1)  # Update the progress bar
    else:
        print(f"Error: {response.status_code} - Unable to download the ZIP file for {date_str}.")

    # Move to the next date
    current_date += datetime.timedelta(days=1)

download_bar.close()  # Close the progress bar

# Create a progress bar for extraction
extract_bar = tqdm(total=len(os.listdir(zip_download_dir)), desc="Extracting")

# Extract all downloaded ZIP files
for filename in os.listdir(zip_download_dir):
    if filename.endswith('.zip'):
        with zipfile.ZipFile(os.path.join(zip_download_dir, filename), 'r') as zip_ref:
            zip_ref.extractall(csv_extract_dir)
        extract_bar.update(1)  # Update the progress bar

extract_bar.close()  # Close the progress bar

print("All ZIP files downloaded and CSV files extracted successfully.")


Downloading:   0%|          | 0/283 [00:00<?, ?it/s]

Downloading: 100%|██████████| 283/283 [06:50<00:00,  1.45s/it]
Extracting: 100%|██████████| 283/283 [01:46<00:00,  2.66it/s]

All ZIP files downloaded and CSV files extracted successfully.





In [2]:
import dask.dataframe as dd
import os
import pandas as pd

# Directory where the decompressed CSV files are located
csv_extract_dir = '/allah/freqtrade/json_dict/aggTrades/decompressed_csv'

# List all CSV files in the directory
csv_files = [file for file in os.listdir(csv_extract_dir) if file.endswith('.csv')]

# Sort the list of files by their names (assumes filenames are in YYYY-MM-DD.csv format)
csv_files.sort()

# Set the batch size (number of CSV files to process at a time)
batch_size = 10  # You can adjust this based on your available memory

# Initialize a Dask DataFrame to store the data
combined_ddf = None

# Create a progress bar (optional)
from tqdm import tqdm
progress_bar = tqdm(total=len(csv_files), desc="Combining CSV files")

# Directory to store intermediate batch files
batch_output_dir = '/allah/freqtrade/json_dict/aggTrades/batch_data'
os.makedirs(batch_output_dir, exist_ok=True)

# Loop through the CSV files, read each into a Dask DataFrame, and save each batch to a file
for i in range(0, len(csv_files), batch_size):
    batch_files = csv_files[i:i + batch_size]

    # Initialize a batch Dask DataFrame
    batch_ddf = None

    for csv_file in batch_files:
        file_path = os.path.join(csv_extract_dir, csv_file)
        df = dd.read_csv(file_path)

        if batch_ddf is None:
            batch_ddf = df
        else:
            batch_ddf = dd.concat([batch_ddf, df])

        # Update the progress bar (optional)
        progress_bar.update(1)

    # Save the batch to a file
    batch_output_path = os.path.join(batch_output_dir, f'batch_{i}.parquet')
    batch_ddf.to_parquet(batch_output_path)

    # Close the batch Dask DataFrame
    del batch_ddf

# Close the progress bar (optional)
progress_bar.close()



Combining CSV files:   2%|▏         | 5/283 [00:00<00:05, 47.32it/s]

Combining CSV files: 100%|██████████| 283/283 [01:25<00:00,  3.32it/s]


IsADirectoryError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: Path /allah/freqtrade/json_dict/aggTrades/batch_data/batch_0.parquet points to a directory, but only file paths are supported. To construct a nested or union dataset pass a list of dataset objects instead.

In [21]:
import dask.dataframe as dd
import pandas as pd
import os
from tqdm import tqdm

# Directory where the Parquet files are located
batch_output_dir = '/allah/freqtrade/json_dict/aggTrades/batch_data'

# Initialize an empty list to store the file paths of Parquet files
parquet_files_list = []

# List all Parquet files in the directory and its subdirectories
for root, dirs, files in os.walk(batch_output_dir):
    for file in files:
        if file.endswith('.parquet'):
            file_path = os.path.join(root, file)
            parquet_files_list.append(file_path)

# Now, the 'parquet_files_list' contains the file paths of all the Parquet files in the directory and its subdirectories

# Initialize an empty list to store DataFrames
data_frames = []

# Create a progress bar (optional)
progress_bar = tqdm(total=len(parquet_files_list), desc="Processing Parquet files")

# Loop through the Parquet files in parquet_files_list and apply block1 to each file
for parquet_file in parquet_files_list:
    part_ddf = dd.read_parquet(parquet_file)
    part_df = part_ddf.compute()

    # Ensure the 'utc_time' column is in datetime format
    part_df['utc_time'] = pd.to_datetime(part_df['transact_time'], unit='ms')

    # Set 'utc_time' as the index of the DataFrame
    part_df.set_index('utc_time', inplace=True)

    # Resample the DataFrame to one row per second
    resampled_df = part_df.resample('S').agg({
        'agg_trade_id': 'first',
        'price': 'ohlc',
        'quantity': 'sum',
        'first_trade_id': 'first',
        'last_trade_id': 'last',
        'is_buyer_maker': 'last'
    })

    # Reset the index to have the 'utc_time' as a column
    resampled_df.reset_index(inplace=True)

    # Drop the hierarchical column index created by the resample method
    resampled_df.columns = resampled_df.columns.droplevel()

    # Fill in missing values
    resampled_df.fillna(method='ffill', inplace=True)
    resampled_df.fillna(method='bfill', inplace=True)

    # Append the result DataFrame to the list
    data_frames.append(resampled_df)

    # Update the progress bar (optional)
    progress_bar.update(1)

# Close the progress bar (optional)
progress_bar.close()

# Concatenate all DataFrames into a single DataFrame in time order
combined_df = pd.concat(data_frames, ignore_index=True)

# Now, 'combined_df' contains all the data from the Parquet files in time order


Processing Parquet files: 100%|██████████| 285/285 [01:20<00:00,  3.55it/s]


: 

In [None]:
import requests
import os
import datetime
import zipfile
import dask.dataframe as dd
import pandas as pd
from tqdm import tqdm

class DataProcessor:
    def __init__(self, base_url, zip_download_dir, csv_extract_dir, batch_output_dir):
        self.base_url = base_url
        self.zip_download_dir = zip_download_dir
        self.csv_extract_dir = csv_extract_dir
        self.batch_output_dir = batch_output_dir

    def download_and_extract_data(self, start_date, end_date):
        os.makedirs(self.zip_download_dir, exist_ok=True)
        os.makedirs(self.csv_extract_dir, exist_ok=True)
        download_bar = tqdm(total=(end_date - start_date).days + 1, desc="Downloading")
        current_date = start_date
        while current_date <= end_date:
            date_str = current_date.strftime('%Y-%m-%d')
            link = f'{self.base_url}ETHUSDT-aggTrades-{date_str}.zip'
            response = requests.get(link)
            if response.status_code == 200:
                filename = os.path.join(self.zip_download_dir, f'ETHUSDT-aggTrades-{date_str}.zip')
                with open(filename, 'wb') as file:
                    file.write(response.content)
                download_bar.update(1)
            else:
                print(f"Error: {response.status_code} - Unable to download the ZIP file for {date_str}.")
            current_date += datetime.timedelta(days=1)
        download_bar.close()

        extract_bar = tqdm(total=len(os.listdir(self.zip_download_dir)), desc="Extracting")
        for filename in os.listdir(self.zip_download_dir):
            if filename.endswith('.zip'):
                with zipfile.ZipFile(os.path.join(self.zip_download_dir, filename), 'r') as zip_ref:
                    zip_ref.extractall(self.csv_extract_dir)
                extract_bar.update(1)
        extract_bar.close()

        print("All ZIP files downloaded and CSV files extracted successfully.")

    def process_csv_files(self, batch_size):
        csv_files = [file for file in os.listdir(self.csv_extract_dir) if file.endswith('.csv')]
        csv_files.sort()
        combined_ddf = None
        progress_bar = tqdm(total=len(csv_files), desc="Combining CSV files")
        batch_output_dir = self.batch_output_dir
        os.makedirs(batch_output_dir, exist_ok=True)
        for i in range(0, len(csv_files), batch_size):
            batch_files = csv_files[i:i + batch_size]
            batch_ddf = None
            for csv_file in batch_files:
                file_path = os.path.join(self.csv_extract_dir, csv_file)
                df = dd.read_csv(file_path)
                if batch_ddf is None:
                    batch_ddf = df
                else:
                    batch_ddf = dd.concat([batch_ddf, df])
                progress_bar.update(1)
            batch_output_path = os.path.join(batch_output_dir, f'batch_{i}.parquet')
            batch_ddf.to_parquet(batch_output_path)
            del batch_ddf
        progress_bar.close()

    def process_parquet_files(self):
        parquet_files_list = []
        for root, dirs, files in os.walk(self.batch_output_dir):
            for file in files:
                if file.endswith('.parquet'):
                    file_path = os.path.join(root, file)
                    parquet_files_list.append(file_path)
        data_frames = []
        progress_bar = tqdm(total=len(parquet_files_list), desc="Processing Parquet files")
        for parquet_file in parquet_files_list:
            part_ddf = dd.read_parquet(parquet_file)
            part_df = part_ddf.compute()
            part_df['utc_time'] = pd.to_datetime(part_df['transact_time'], unit='ms')
            part_df.set_index('utc_time', inplace=True)
            resampled_df = part_df.resample('S').agg({
                'agg_trade_id': 'first',
                'price': 'ohlc',
                'quantity': 'sum',
                'first_trade_id': 'first',
                'last_trade_id': 'last',
                'is_buyer_maker': 'last'
            })
            resampled_df.reset_index(inplace=True)
            resampled_df.columns = resampled_df.columns.droplevel()
            resampled_df.fillna(method='ffill', inplace=True)
            resampled_df.fillna(method='bfill', inplace=True)
            data_frames.append(resampled_df)
            progress_bar.update(1)
        progress_bar.close()
        combined_df = pd.concat(data_frames, ignore_index=True)

    def run_data_processing_workflow(self, start_date, end_date, batch_size):
        self.download_and_extract_data(start_date, end_date)
        self.process_csv_files(batch_size)
        self.process_parquet_files()

# Example usage:
base_url = 'https://data.binance.vision/data/futures/um/daily/aggTrades/ETHUSDT/'
zip_download_dir = '/allah/freqtrade/json_dict/binance_aggTrades'
csv_extract_dir = '/allah/freqtrade/json_dict/decompressed_csv'
batch_output_dir = '/allah/freqtrade/json_dict/aggTrades/batch_data'

data_processor = DataProcessor(base_url, zip_download_dir, csv_extract_dir, batch_output_dir)
start_date = datetime.date(2023, 1, 1)
end_date = datetime.date(2023, 10, 10)
batch_size = 10

data_processor.run_data_processing_workflow(start_date, end_date, batch_size)
