In [5]:
import matplotlib.pyplot as plt
from arch import arch_model
import pandas_ta
import pandas as pd
import numpy as np
import os
import yfinance as yf
from datetime import datetime, date, timedelta
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
from scipy.signal import argrelextrema


ModuleNotFoundError: No module named 'arch'

In [1]:
import os
import time
import boto3
import databento as db
from datetime import datetime

In [4]:

###############################
# Configuration
###############################

# Retrieve your Databento API key from environment
databento_api_key = "db-qLNS3JYakWiWWLtyV68KUFSUAnpE6"

# S3 settings
s3_bucket = "amazon-sagemaker-073160386915-us-east-2-10949371548f"     # Replace with your actual bucket name
s3_folder = "es_1m_data"      # Folder (prefix) where files are stored in S3

# How long to stream data each batch (in seconds)
batch_duration = 120  # 15 minutes

###############################
# Functions
###############################

def run_single_batch():
    """
    Runs a single 15-minute batch of streaming data from Databento,
    stores it locally as a DBN file, then uploads to S3 and deletes the file.
    """
    # Create a new live client each batch
    client = db.Live(key=databento_api_key)
    
    # Subscribe to 1-minute OHLCV data for ES futures
    client.subscribe(
        dataset="GLBX.MDP3",
        schema="ohlcv-1m",
        stype_in="continuous",
        symbols=["ES.c.0"],  # Adjust symbols as needed
    )
    
    # Create a timestamped file name for the DBN file
    timestamp_str = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
    local_filename = f"dbn_{timestamp_str}.dbn"
    
    # Configure the client to write the data stream to this local file
    client.add_stream(local_filename)
    
    # Start streaming
    client.start()
    
    # Block for batch_duration seconds, then close
    client.block_for_close(timeout=batch_duration)
    
    # After streaming finishes, optionally read the DBN file to verify
    # (Comment this out if you don't need to verify locally)
    # store = db.DBNStore.from_file(local_filename)
    # df = store.to_df(schema="ohlcv-1m")
    # print(f"Batch {timestamp_str} has {len(df)} rows of data.")
    
    # Upload the DBN file to S3
    s3_key = f"{s3_folder}/{local_filename}"
    s3_client = boto3.client("s3")
    try:
        s3_client.upload_file(local_filename, s3_bucket, s3_key)
        print(f"[{datetime.utcnow()}] Uploaded {local_filename} to s3://{s3_bucket}/{s3_key}")
    except Exception as e:
        print("Error uploading to S3:", e)
    
    # Remove local file
    try:
        os.remove(local_filename)
        print(f"[{datetime.utcnow()}] Deleted local file {local_filename}.")
    except Exception as e:
        print("Error deleting local file:", e)

###############################
# Main Loop
###############################

def main():
    """
    Continuously run 15-minute streaming batches, one after another.
    """
    while True:
        print(f"[{datetime.utcnow()}] Starting a new 15-minute batch...")
        run_single_batch()
        # If you want a pause between batches, add time.sleep(seconds) here
        # e.g. time.sleep(60) to wait 1 minute before next batch

if __name__ == "__main__":
    main()

[2025-03-05 04:45:50.581532] Starting a new 15-minute batch...
[2025-03-05 04:47:51.013944] Uploaded dbn_20250305T044550Z.dbn to s3://amazon-sagemaker-073160386915-us-east-2-10949371548f/es_1m_data/dbn_20250305T044550Z.dbn
[2025-03-05 04:47:51.014247] Deleted local file dbn_20250305T044550Z.dbn.
[2025-03-05 04:47:51.015604] Starting a new 15-minute batch...
[2025-03-05 04:49:51.434396] Uploaded dbn_20250305T044751Z.dbn to s3://amazon-sagemaker-073160386915-us-east-2-10949371548f/es_1m_data/dbn_20250305T044751Z.dbn
[2025-03-05 04:49:51.434772] Deleted local file dbn_20250305T044751Z.dbn.
[2025-03-05 04:49:51.436376] Starting a new 15-minute batch...


KeyboardInterrupt: 

In [None]:
from dotenv import load_dotenv

load_dotenv()
# Retrieve the API key from the environment variable
databento_api_key = os.getenv("DATABENTO_API_KEY")
if not databento_api_key:
    raise ValueError("Please set the DATABENTO_API_KEY environment variable.")

In [None]:
import databento as db
import time
from datetime import datetime, timedelta
import threading

# Global buffer to store incoming 1-minute records for the batch period
buffer = []

def handle_message(msg):
    """
    Callback function to process incoming messages.
    For example, this processes OHLCV messages.
    """
    if isinstance(msg, db.OhlcvMsg):
        record = {
            "ts_event": pd.to_datetime(msg.hd.ts_event, unit="ns"),
            "open": msg.open,
            "high": msg.high,
            "low": msg.low,
            "close": msg.close,
            "volume": msg.volume,
            "instrument_id": msg.hd.instrument_id,
        }
        buffer.append(record)
    else:
        # Handle other message types if needed.
        pass

def store_buffer():
    """
    Convert the buffered records to a DataFrame and store them (e.g. append to a CSV).
    Then clear the buffer.
    """
    global buffer
    if buffer:
        df_batch = pd.DataFrame(buffer)
        # Append to CSV file; if file doesn't exist, header is written automatically.
        # You can change the storage mechanism as needed (e.g., database insert).
        try:
            header = not pd.io.common.file_exists("5min_data.csv")
        except Exception:
            header = True
        df_batch.to_csv("5min_data.csv", mode="a", index=False, header=header)
        print(f"Stored batch of {len(buffer)} records at {datetime.utcnow()}")
        buffer = []

def run_batch_store():
    """
    Background thread that waits until the next 5-minute mark (UTC) to store the data.
    """
    while True:
        now = datetime.utcnow()
        # Compute the next 5-minute mark:
        # Floor current time to nearest 5-min mark and add 5 minutes.
        minutes = (now.minute // 5) * 5
        next_batch_time = now.replace(minute=minutes, second=0, microsecond=0) + timedelta(minutes=5)
        sleep_seconds = (next_batch_time - now).total_seconds()
        print(f"Sleeping for {sleep_seconds:.2f} seconds until next batch store at {next_batch_time} UTC.")
        time.sleep(sleep_seconds)
        store_buffer()

# Start the background storage thread (daemon thread so it exits with main thread)
storage_thread = threading.Thread(target=run_batch_store, daemon=True)
storage_thread.start()

# Create a live Databento client (replace with your actual API key)
client = db.Live(key=databento_api_key)

# Subscribe to 1-minute OHLCV data for ES futures.
client.subscribe(
    dataset="GLBX.MDP3",
    schema="OHLCV-1m",
    stype_in="parent",
    symbols="ES.FUT",
)

# Set the callback to process incoming messages.
client.add_callback(handle_message)

# Start the live stream (this call is blocking).
client.start()

Sleeping for 283.25 seconds until next batch store at 2025-03-04 18:20:00 UTC.


In [3]:
client.terminate()

In [None]:
databento_api_key

In [5]:
for message in client:
    # Process each message (e.g., print or store it)
    print(message)

ValueError: Cannot start iteration after streaming has started, records may be missed. Don't call `Live.start` before iterating.